fix(client): queue path support (#20)

This commit is contained in:
Daniel Rochetti 2023-10-27 10:25:38 -07:00 committed by GitHub
parent 897fb49914
commit 4797a8d4c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 49 additions and 14 deletions

View File

@ -1,7 +1,7 @@
{ {
"name": "@fal-ai/serverless-client", "name": "@fal-ai/serverless-client",
"description": "The fal serverless JS/TS client", "description": "The fal serverless JS/TS client",
"version": "0.3.2", "version": "0.4.0",
"license": "MIT", "license": "MIT",
"repository": { "repository": {
"type": "git", "type": "git",

View File

@ -9,7 +9,7 @@ import { isUUIDv4, isValidUrl } from './utils';
*/ */
type RunOptions<Input> = { type RunOptions<Input> = {
/** /**
* The path to the function, if any. Defaults to `/`. * The path to the function, if any. Defaults to ``.
*/ */
readonly path?: string; readonly path?: string;
@ -126,19 +126,24 @@ export async function subscribe<Input, Output>(
if (options.onEnqueue) { if (options.onEnqueue) {
options.onEnqueue(requestId); options.onEnqueue(requestId);
} }
const path = options.path ?? '';
return new Promise<Output>((resolve, reject) => { return new Promise<Output>((resolve, reject) => {
let timeoutId: ReturnType<typeof setTimeout>; let timeoutId: ReturnType<typeof setTimeout>;
const pollInterval = options.pollInterval ?? 1000; const pollInterval = options.pollInterval ?? 1000;
const poll = async () => { const poll = async () => {
try { try {
const requestStatus = await queue.status(id, requestId, options.logs ?? false); const requestStatus = await queue.status(id, {
requestId,
logs: options.logs ?? false,
path,
});
if (options.onQueueUpdate) { if (options.onQueueUpdate) {
options.onQueueUpdate(requestStatus); options.onQueueUpdate(requestStatus);
} }
if (requestStatus.status === 'COMPLETED') { if (requestStatus.status === 'COMPLETED') {
clearTimeout(timeoutId); clearTimeout(timeoutId);
try { try {
const result = await queue.result<Output>(id, requestId); const result = await queue.result<Output>(id, { requestId, path });
resolve(result); resolve(result);
} catch (error) { } catch (error) {
reject(error); reject(error);
@ -179,6 +184,26 @@ type QueueSubscribeOptions = {
/** /**
* If `true`, the response will include the logs for the request. * If `true`, the response will include the logs for the request.
* Defaults to `false`.
*/
logs?: boolean;
};
type BaseQueueOptions = {
/**
* The unique identifier for the enqueued request.
*/
requestId: string;
/**
* The path to the function, if any. Defaults to ``.
*/
path?: string;
};
type QueueStatusOptions = BaseQueueOptions & {
/**
* If `true`, the response will include the logs for the request.
* Defaults to `false`.
*/ */
logs?: boolean; logs?: boolean;
}; };
@ -201,20 +226,19 @@ interface Queue {
* Retrieves the status of a specific request in the queue. * Retrieves the status of a specific request in the queue.
* *
* @param id - The ID or URL of the function web endpoint. * @param id - The ID or URL of the function web endpoint.
* @param requestId - The unique identifier for the enqueued request. * @param options - Options to configure how the request is run.
* @param logs - If `true`, the response will include the logs for the request.
* @returns A promise that resolves to the status of the request. * @returns A promise that resolves to the status of the request.
*/ */
status(id: string, requestId: string, logs: boolean): Promise<QueueStatus>; status(id: string, options: QueueStatusOptions): Promise<QueueStatus>;
/** /**
* Retrieves the result of a specific request from the queue. * Retrieves the result of a specific request from the queue.
* *
* @param id - The ID or URL of the function web endpoint. * @param id - The ID or URL of the function web endpoint.
* @param requestId - The unique identifier for the enqueued request. * @param options - Options to configure how the request is run.
* @returns A promise that resolves to the result of the request. * @returns A promise that resolves to the result of the request.
*/ */
result<Output>(id: string, requestId: string): Promise<Output>; result<Output>(id: string, options: BaseQueueOptions): Promise<Output>;
/** /**
* @deprecated Use `fal.subscribe` instead. * @deprecated Use `fal.subscribe` instead.
@ -235,21 +259,32 @@ export const queue: Queue = {
id: string, id: string,
options: RunOptions<Input> options: RunOptions<Input>
): Promise<EnqueueResult> { ): Promise<EnqueueResult> {
return run(id, { ...options, method: 'post', path: '/fal/queue/submit/' }); const path = options.path ?? '';
return run(id, {
...options,
method: 'post',
path: '/fal/queue/submit' + path,
});
}, },
async status(id: string, requestId: string, logs = false): Promise<QueueStatus> { async status(
id: string,
{ requestId, logs = false, path = '' }: QueueStatusOptions
): Promise<QueueStatus> {
return run(id, { return run(id, {
method: 'get', method: 'get',
path: `/fal/queue/requests/${requestId}/status`, path: `/fal/queue/requests/${requestId}/status${path}`,
input: { input: {
logs: logs ? '1' : '0', logs: logs ? '1' : '0',
}, },
}); });
}, },
async result<Output>(id: string, requestId: string): Promise<Output> { async result<Output>(
id: string,
{ requestId, path = '' }: BaseQueueOptions
): Promise<Output> {
return run(id, { return run(id, {
method: 'get', method: 'get',
path: `/fal/queue/requests/${requestId}/response`, path: `/fal/queue/requests/${requestId}/response${path}`,
}); });
}, },
subscribe, subscribe,