From 4ea43b4cead83ed4f8d9f13d0c8da5c9d3c44c65 Mon Sep 17 00:00:00 2001 From: Daniel Rochetti Date: Tue, 25 Jun 2024 15:00:01 -0700 Subject: [PATCH] feat(client): stream queue status (#71) * feat(client): stream queue status * chore: remove console log * fix: accumulative logs when streaming * fix(client): stream logs on queue update * chore(apps): remove pollInterval from sample apps --- .../app/comfy/image-to-image/page.tsx | 1 - .../app/comfy/image-to-video/page.tsx | 1 - .../app/comfy/text-to-image/page.tsx | 1 - apps/demo-nextjs-app-router/app/page.tsx | 1 - .../demo-nextjs-app-router/app/queue/page.tsx | 152 ++++++++++++++++++ .../app/whisper/page.tsx | 1 - apps/demo-nextjs-page-router/pages/index.tsx | 1 - libs/client/package.json | 2 +- libs/client/src/function.ts | 107 +++++++----- libs/client/src/streaming.ts | 21 ++- 10 files changed, 231 insertions(+), 57 deletions(-) create mode 100644 apps/demo-nextjs-app-router/app/queue/page.tsx diff --git a/apps/demo-nextjs-app-router/app/comfy/image-to-image/page.tsx b/apps/demo-nextjs-app-router/app/comfy/image-to-image/page.tsx index 1f349f3..6a4c4de 100644 --- a/apps/demo-nextjs-app-router/app/comfy/image-to-image/page.tsx +++ b/apps/demo-nextjs-app-router/app/comfy/image-to-image/page.tsx @@ -90,7 +90,6 @@ export default function ComfyImageToImagePage() { prompt: prompt, loadimage_1: imageFile, }, - pollInterval: 3000, // Default is 1000 (every 1s) logs: true, onQueueUpdate(update) { setElapsedTime(Date.now() - start); diff --git a/apps/demo-nextjs-app-router/app/comfy/image-to-video/page.tsx b/apps/demo-nextjs-app-router/app/comfy/image-to-video/page.tsx index 037fc3c..1baec15 100644 --- a/apps/demo-nextjs-app-router/app/comfy/image-to-video/page.tsx +++ b/apps/demo-nextjs-app-router/app/comfy/image-to-video/page.tsx @@ -85,7 +85,6 @@ export default function ComfyImageToVideoPage() { input: { loadimage_1: imageFile, }, - pollInterval: 3000, // Default is 1000 (every 1s) logs: true, onQueueUpdate(update) { setElapsedTime(Date.now() - start); diff --git a/apps/demo-nextjs-app-router/app/comfy/text-to-image/page.tsx b/apps/demo-nextjs-app-router/app/comfy/text-to-image/page.tsx index 45ccd30..b9d5d7f 100644 --- a/apps/demo-nextjs-app-router/app/comfy/text-to-image/page.tsx +++ b/apps/demo-nextjs-app-router/app/comfy/text-to-image/page.tsx @@ -86,7 +86,6 @@ export default function ComfyTextToImagePage() { input: { prompt: prompt, }, - pollInterval: 3000, // Default is 1000 (every 1s) logs: true, onQueueUpdate(update) { setElapsedTime(Date.now() - start); diff --git a/apps/demo-nextjs-app-router/app/page.tsx b/apps/demo-nextjs-app-router/app/page.tsx index bfc74eb..2fa1afd 100644 --- a/apps/demo-nextjs-app-router/app/page.tsx +++ b/apps/demo-nextjs-app-router/app/page.tsx @@ -85,7 +85,6 @@ export default function Home() { image_url: imageFile, image_size: 'square_hd', }, - pollInterval: 3000, // Default is 1000 (every 1s) logs: true, onQueueUpdate(update) { setElapsedTime(Date.now() - start); diff --git a/apps/demo-nextjs-app-router/app/queue/page.tsx b/apps/demo-nextjs-app-router/app/queue/page.tsx new file mode 100644 index 0000000..b8edc23 --- /dev/null +++ b/apps/demo-nextjs-app-router/app/queue/page.tsx @@ -0,0 +1,152 @@ +'use client'; + +import * as fal from '@fal-ai/serverless-client'; +import { useState } from 'react'; + +fal.config({ + proxyUrl: '/api/fal/proxy', +}); + +type ErrorProps = { + error: any; +}; + +function Error(props: ErrorProps) { + if (!props.error) { + return null; + } + return ( +
+ Error {props.error.message} +
+ ); +} + +export default function Home() { + // Input state + const [endpointId, setEndpointId] = useState(''); + const [input, setInput] = useState('{}'); + // Result state + const [loading, setLoading] = useState(false); + const [error, setError] = useState(null); + const [result, setResult] = useState(null); + const [logs, setLogs] = useState([]); + const [elapsedTime, setElapsedTime] = useState(0); + + const reset = () => { + setLoading(false); + setError(null); + setResult(null); + setLogs([]); + setElapsedTime(0); + }; + + const run = async () => { + reset(); + setLoading(true); + const start = Date.now(); + try { + const result: any = await fal.subscribe(endpointId, { + input: JSON.parse(input), + logs: true, + onQueueUpdate(update) { + console.log('queue update'); + console.log(update); + setElapsedTime(Date.now() - start); + if ( + update.status === 'IN_PROGRESS' || + update.status === 'COMPLETED' + ) { + if (update.logs && update.logs.length > logs.length) { + setLogs((update.logs || []).map((log) => log.message)); + } + } + }, + }); + setResult(result); + } catch (error: any) { + setError(error); + } finally { + setLoading(false); + setElapsedTime(Date.now() - start); + } + }; + return ( +
+
+

+ fal + queue +

+
+ + setEndpointId(e.target.value)} + /> +
+
+ + +
+ + + + + +
+
+

JSON Result

+

+ {`Elapsed Time (seconds): ${(elapsedTime / 1000).toFixed(2)}`} +

+
+              {result
+                ? JSON.stringify(result, null, 2)
+                : '// result pending...'}
+            
+
+ +
+

Logs

+
+              {logs.join('\n')}
+            
+
+
+
+
+ ); +} diff --git a/apps/demo-nextjs-app-router/app/whisper/page.tsx b/apps/demo-nextjs-app-router/app/whisper/page.tsx index db9d386..b1391c4 100644 --- a/apps/demo-nextjs-app-router/app/whisper/page.tsx +++ b/apps/demo-nextjs-app-router/app/whisper/page.tsx @@ -113,7 +113,6 @@ export default function WhisperDemo() { file_name: 'recording.wav', audio_url: audioFile, }, - pollInterval: 1000, logs: true, onQueueUpdate(update) { setElapsedTime(Date.now() - start); diff --git a/apps/demo-nextjs-page-router/pages/index.tsx b/apps/demo-nextjs-page-router/pages/index.tsx index 2a279d8..7d397d7 100644 --- a/apps/demo-nextjs-page-router/pages/index.tsx +++ b/apps/demo-nextjs-page-router/pages/index.tsx @@ -78,7 +78,6 @@ export function Index() { model_name: 'stabilityai/stable-diffusion-xl-base-1.0', image_size: 'square_hd', }, - pollInterval: 3000, // Default is 1000 (every 1s) logs: true, onQueueUpdate(update) { setElapsedTime(Date.now() - start); diff --git a/libs/client/package.json b/libs/client/package.json index ed25eb6..01ac312 100644 --- a/libs/client/package.json +++ b/libs/client/package.json @@ -1,7 +1,7 @@ { "name": "@fal-ai/serverless-client", "description": "The fal serverless JS/TS client", - "version": "0.11.0", + "version": "0.12.0", "license": "MIT", "repository": { "type": "git", diff --git a/libs/client/src/function.ts b/libs/client/src/function.ts index a464a4b..c311307 100644 --- a/libs/client/src/function.ts +++ b/libs/client/src/function.ts @@ -1,6 +1,8 @@ +import { getTemporaryAuthToken } from './auth'; import { dispatchRequest } from './request'; import { storageImpl } from './storage'; -import { EnqueueResult, QueueStatus } from './types'; +import { FalStream } from './streaming'; +import { EnqueueResult, QueueStatus, RequestLog } from './types'; import { ensureAppIdFormat, isUUIDv4, isValidUrl, parseAppId } from './utils'; /** @@ -138,36 +140,22 @@ export async function subscribe( if (options.onEnqueue) { options.onEnqueue(requestId); } - return new Promise((resolve, reject) => { - let timeoutId: ReturnType; - const pollInterval = options.pollInterval ?? 1000; - const poll = async () => { - try { - const requestStatus = await queue.status(id, { - requestId, - logs: options.logs ?? false, - }); - if (options.onQueueUpdate) { - options.onQueueUpdate(requestStatus); - } - if (requestStatus.status === 'COMPLETED') { - clearTimeout(timeoutId); - try { - const result = await queue.result(id, { requestId }); - resolve(result); - } catch (error) { - reject(error); - } - return; - } - timeoutId = setTimeout(poll, pollInterval); - } catch (error) { - clearTimeout(timeoutId); - reject(error); - } - }; - poll().catch(reject); + const status = await queue.streamStatus(id, { + requestId, + logs: options.logs, }); + const logs: RequestLog[] = []; + status.on('message', (data: QueueStatus) => { + if (options.onQueueUpdate) { + // accumulate logs to match previous polling behavior + if ('logs' in data && Array.isArray(data.logs) && data.logs.length > 0) { + logs.push(...data.logs); + } + options.onQueueUpdate('logs' in data ? { ...data, logs } : data); + } + }); + await status.done(); + return queue.result(id, { requestId }); } /** @@ -177,6 +165,9 @@ type QueueSubscribeOptions = { /** * The interval (in milliseconds) at which to poll for updates. * If not provided, a default value of `1000` will be used. + * + * @deprecated starting from v0.12.0 the queue status is streamed + * using the `queue.subscribeToStatus` method. */ pollInterval?: number; @@ -239,40 +230,48 @@ interface Queue { /** * Submits a request to the queue. * - * @param id - The ID or URL of the function web endpoint. + * @param endpointId - The ID or URL of the function web endpoint. * @param options - Options to configure how the request is run. * @returns A promise that resolves to the result of enqueuing the request. */ submit( - id: string, + endpointId: string, options: SubmitOptions ): Promise; /** * Retrieves the status of a specific request in the queue. * - * @param id - The ID or URL of the function web endpoint. + * @param endpointId - The ID or URL of the function web endpoint. * @param options - Options to configure how the request is run. * @returns A promise that resolves to the status of the request. */ - status(id: string, options: QueueStatusOptions): Promise; + status(endpointId: string, options: QueueStatusOptions): Promise; /** * Retrieves the result of a specific request from the queue. * - * @param id - The ID or URL of the function web endpoint. + * @param endpointId - The ID or URL of the function web endpoint. * @param options - Options to configure how the request is run. * @returns A promise that resolves to the result of the request. */ - result(id: string, options: BaseQueueOptions): Promise; + result( + endpointId: string, + options: BaseQueueOptions + ): Promise; /** * @deprecated Use `fal.subscribe` instead. */ subscribe( - id: string, + endpointId: string, options: RunOptions & QueueSubscribeOptions ): Promise; + + streamStatus( + endpointId: string, + options: QueueStatusOptions + ): Promise>; } /** @@ -282,11 +281,11 @@ interface Queue { */ export const queue: Queue = { async submit( - id: string, + endpointId: string, options: SubmitOptions ): Promise { const { webhookUrl, path = '', ...runOptions } = options; - return send(id, { + return send(endpointId, { ...runOptions, subdomain: 'queue', method: 'post', @@ -295,10 +294,10 @@ export const queue: Queue = { }); }, async status( - id: string, + endpointId: string, { requestId, logs = false }: QueueStatusOptions ): Promise { - const appId = parseAppId(id); + const appId = parseAppId(endpointId); const prefix = appId.namespace ? `${appId.namespace}/` : ''; return send(`${prefix}${appId.owner}/${appId.alias}`, { subdomain: 'queue', @@ -309,11 +308,33 @@ export const queue: Queue = { }, }); }, + async streamStatus( + endpointId: string, + { requestId, logs = false }: QueueStatusOptions + ): Promise> { + const appId = parseAppId(endpointId); + const prefix = appId.namespace ? `${appId.namespace}/` : ''; + const token = await getTemporaryAuthToken(endpointId); + const url = buildUrl(`${prefix}${appId.owner}/${appId.alias}`, { + subdomain: 'queue', + path: `/requests/${requestId}/status/stream`, + }); + + const queryParams = new URLSearchParams({ + fal_jwt_token: token, + logs: logs ? '1' : '0', + }); + + return new FalStream(`${url}?${queryParams}`, { + input: {}, + method: 'get', + }); + }, async result( - id: string, + endpointId: string, { requestId }: BaseQueueOptions ): Promise { - const appId = parseAppId(id); + const appId = parseAppId(endpointId); const prefix = appId.namespace ? `${appId.namespace}/` : ''; return send(`${prefix}${appId.owner}/${appId.alias}`, { subdomain: 'queue', diff --git a/libs/client/src/streaming.ts b/libs/client/src/streaming.ts index efe6cb0..6dd7aa9 100644 --- a/libs/client/src/streaming.ts +++ b/libs/client/src/streaming.ts @@ -12,18 +12,23 @@ type StreamOptions = { /** * The API input payload. */ - input: Input; + readonly input?: Input; /** * The maximum time interval in milliseconds between stream chunks. Defaults to 15s. */ - timeout?: number; + readonly timeout?: number; /** * Whether it should auto-upload File-like types to fal's storage * or not. */ - autoUpload?: boolean; + readonly autoUpload?: boolean; + + /** + * The HTTP method, defaults to `post`; + */ + readonly method?: 'get' | 'post' | 'put' | 'delete' | string; }; const EVENT_STREAM_TIMEOUT = 15 * 1000; @@ -35,7 +40,7 @@ type EventHandler = (event: any) => void; /** * The class representing a streaming response. With t */ -class FalStream { +export class FalStream { // properties url: string; options: StreamOptions; @@ -76,14 +81,16 @@ class FalStream { } private start = async () => { + const { url, options } = this; + const { input, method = 'post' } = options; try { - const response = await fetch(this.url, { - method: 'POST', + const response = await fetch(url, { + method: method.toUpperCase(), headers: { accept: 'text/event-stream', 'content-type': 'application/json', }, - body: JSON.stringify(this.options.input), + body: input && method !== 'get' ? JSON.stringify(input) : undefined, }); this.handleResponse(response); } catch (error) {