feat(client): enable both polling and streaming for queue status (#72)

This commit is contained in:
Daniel Rochetti 2024-07-11 08:41:46 -07:00 committed by GitHub
parent 4ea43b4cea
commit ab210d9da4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 76 additions and 21 deletions

View File

@ -52,6 +52,8 @@ export default function Home() {
const result: any = await fal.subscribe(endpointId, { const result: any = await fal.subscribe(endpointId, {
input: JSON.parse(input), input: JSON.parse(input),
logs: true, logs: true,
mode: 'streaming',
// pollInterval: 1000,
onQueueUpdate(update) { onQueueUpdate(update) {
console.log('queue update'); console.log('queue update');
console.log(update); console.log(update);

View File

@ -1,7 +1,7 @@
{ {
"name": "@fal-ai/serverless-client", "name": "@fal-ai/serverless-client",
"description": "The fal serverless JS/TS client", "description": "The fal serverless JS/TS client",
"version": "0.12.0", "version": "0.13.0-alpha.0",
"license": "MIT", "license": "MIT",
"repository": { "repository": {
"type": "git", "type": "git",

View File

@ -125,6 +125,8 @@ export async function run<Input, Output>(
return send(id, options); return send(id, options);
} }
const DEFAULT_POLL_INTERVAL = 500;
/** /**
* Subscribes to updates for a specific request in the queue. * Subscribes to updates for a specific request in the queue.
* *
@ -140,22 +142,59 @@ export async function subscribe<Input, Output>(
if (options.onEnqueue) { if (options.onEnqueue) {
options.onEnqueue(requestId); options.onEnqueue(requestId);
} }
const status = await queue.streamStatus(id, { if (options.mode === 'streaming') {
requestId, const status = await queue.streamStatus(id, {
logs: options.logs, requestId,
}); logs: options.logs,
const logs: RequestLog[] = []; });
status.on('message', (data: QueueStatus) => { const logs: RequestLog[] = [];
if (options.onQueueUpdate) { status.on('message', (data: QueueStatus) => {
// accumulate logs to match previous polling behavior if (options.onQueueUpdate) {
if ('logs' in data && Array.isArray(data.logs) && data.logs.length > 0) { // accumulate logs to match previous polling behavior
logs.push(...data.logs); if (
'logs' in data &&
Array.isArray(data.logs) &&
data.logs.length > 0
) {
logs.push(...data.logs);
}
options.onQueueUpdate('logs' in data ? { ...data, logs } : data);
} }
options.onQueueUpdate('logs' in data ? { ...data, logs } : data); });
} await status.done();
return queue.result<Output>(id, { requestId });
}
// default to polling until status streaming is stable and faster
return new Promise<Output>((resolve, reject) => {
let timeoutId: ReturnType<typeof setTimeout>;
const pollInterval = options.pollInterval ?? DEFAULT_POLL_INTERVAL;
const poll = async () => {
try {
const requestStatus = await queue.status(id, {
requestId,
logs: options.logs ?? false,
});
if (options.onQueueUpdate) {
options.onQueueUpdate(requestStatus);
}
if (requestStatus.status === 'COMPLETED') {
clearTimeout(timeoutId);
try {
const result = await queue.result<Output>(id, { requestId });
resolve(result);
} catch (error) {
reject(error);
}
return;
}
timeoutId = setTimeout(poll, pollInterval);
} catch (error) {
clearTimeout(timeoutId);
reject(error);
}
};
poll().catch(reject);
}); });
await status.done();
return queue.result<Output>(id, { requestId });
} }
/** /**
@ -163,13 +202,15 @@ export async function subscribe<Input, Output>(
*/ */
type QueueSubscribeOptions = { type QueueSubscribeOptions = {
/** /**
* The interval (in milliseconds) at which to poll for updates. * The mode to use for subscribing to updates. It defaults to `polling`.
* If not provided, a default value of `1000` will be used. * You can also use client-side streaming by setting it to `streaming`.
* *
* @deprecated starting from v0.12.0 the queue status is streamed * **Note:** Streaming is currently experimental and once stable, it will
* using the `queue.subscribeToStatus` method. * be the default mode.
*
* @see pollInterval
*/ */
pollInterval?: number; mode?: 'polling' | 'streaming';
/** /**
* Callback function that is called when a request is enqueued. * Callback function that is called when a request is enqueued.
@ -194,7 +235,19 @@ type QueueSubscribeOptions = {
* @see WebHookResponse * @see WebHookResponse
*/ */
webhookUrl?: string; webhookUrl?: string;
}; } & (
| {
mode?: 'polling';
/**
* The interval (in milliseconds) at which to poll for updates.
* If not provided, a default value of `500` will be used.
*/
pollInterval?: number;
}
| {
mode: 'streaming';
}
);
/** /**
* Options for submitting a request to the queue. * Options for submitting a request to the queue.