feat: queue cancel support (#75)

* feat(client): queue request cancel support

* feat(proxy): add mapping to put requests
This commit is contained in:
Daniel Rochetti 2024-07-22 12:59:45 -07:00 committed by GitHub
parent 2839e796db
commit 903af74da1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 60 additions and 5 deletions

View File

@ -1,7 +1,7 @@
{ {
"name": "@fal-ai/serverless-client", "name": "@fal-ai/serverless-client",
"description": "The fal serverless JS/TS client", "description": "The fal serverless JS/TS client",
"version": "0.14.0-alpha.1", "version": "0.14.0-alpha.2",
"license": "MIT", "license": "MIT",
"repository": { "repository": {
"type": "git", "type": "git",

View File

@ -123,6 +123,8 @@ export async function run<Input, Output>(
return send(id, options); return send(id, options);
} }
type TimeoutId = ReturnType<typeof setTimeout>;
const DEFAULT_POLL_INTERVAL = 500; const DEFAULT_POLL_INTERVAL = 500;
/** /**
@ -140,6 +142,16 @@ export async function subscribe<Input, Output>(
if (options.onEnqueue) { if (options.onEnqueue) {
options.onEnqueue(requestId); options.onEnqueue(requestId);
} }
const timeout = options.timeout;
let timeoutId: TimeoutId = undefined;
if (timeout) {
timeoutId = setTimeout(() => {
queue.cancel(id, { requestId }).catch(console.warn);
throw new Error(
`Client timed out waiting for the request to complete after ${timeout}ms`
);
}, timeout);
}
if (options.mode === 'streaming') { if (options.mode === 'streaming') {
const status = await queue.streamStatus(id, { const status = await queue.streamStatus(id, {
requestId, requestId,
@ -160,6 +172,9 @@ export async function subscribe<Input, Output>(
} }
}); });
await status.done(); await status.done();
if (timeoutId) {
clearTimeout(timeoutId);
}
return queue.result<Output>(id, { requestId }); return queue.result<Output>(id, { requestId });
} }
// default to polling until status streaming is stable and faster // default to polling until status streaming is stable and faster
@ -228,6 +243,15 @@ type QueueSubscribeOptions = {
*/ */
logs?: boolean; logs?: boolean;
/**
* The timeout (in milliseconds) for the request. If the request is not
* completed within this time, the subscription will be cancelled.
*
* Note: currently, the timeout is not enforced and the default is `undefined`.
* This behavior might change in the future.
*/
timeout?: number;
/** /**
* The URL to send a webhook notification to when the request is completed. * The URL to send a webhook notification to when the request is completed.
* @see WebHookResponse * @see WebHookResponse
@ -283,7 +307,7 @@ interface Queue {
/** /**
* Submits a request to the queue. * Submits a request to the queue.
* *
* @param endpointId - The ID or URL of the function web endpoint. * @param endpointId - The ID of the function web endpoint.
* @param options - Options to configure how the request is run. * @param options - Options to configure how the request is run.
* @returns A promise that resolves to the result of enqueuing the request. * @returns A promise that resolves to the result of enqueuing the request.
*/ */
@ -295,7 +319,7 @@ interface Queue {
/** /**
* Retrieves the status of a specific request in the queue. * Retrieves the status of a specific request in the queue.
* *
* @param endpointId - The ID or URL of the function web endpoint. * @param endpointId - The ID of the function web endpoint.
* @param options - Options to configure how the request is run. * @param options - Options to configure how the request is run.
* @returns A promise that resolves to the status of the request. * @returns A promise that resolves to the status of the request.
*/ */
@ -304,7 +328,7 @@ interface Queue {
/** /**
* Retrieves the result of a specific request from the queue. * Retrieves the result of a specific request from the queue.
* *
* @param endpointId - The ID or URL of the function web endpoint. * @param endpointId - The ID of the function web endpoint.
* @param options - Options to configure how the request is run. * @param options - Options to configure how the request is run.
* @returns A promise that resolves to the result of the request. * @returns A promise that resolves to the result of the request.
*/ */
@ -321,10 +345,27 @@ interface Queue {
options: RunOptions<Input> & QueueSubscribeOptions options: RunOptions<Input> & QueueSubscribeOptions
): Promise<Output>; ): Promise<Output>;
/**
* Subscribes to updates for a specific request in the queue.
*
* @param endpointId - The ID of the function web endpoint.
* @param options - Options to configure how the request is run and how updates are received.
*/
streamStatus( streamStatus(
endpointId: string, endpointId: string,
options: QueueStatusOptions options: QueueStatusOptions
): Promise<FalStream<unknown, QueueStatus>>; ): Promise<FalStream<unknown, QueueStatus>>;
/**
* Cancels a request in the queue.
*
* @param endpointId - The ID of the function web endpoint.
* @param options - Options to configure how the request
* is run and how updates are received.
* @returns A promise that resolves once the request is cancelled.
* @throws {Error} If the request cannot be cancelled.
*/
cancel(endpointId: string, options: BaseQueueOptions): Promise<void>;
} }
/** /**
@ -395,5 +436,17 @@ export const queue: Queue = {
path: `/requests/${requestId}`, path: `/requests/${requestId}`,
}); });
}, },
async cancel(
endpointId: string,
{ requestId }: BaseQueueOptions
): Promise<void> {
const appId = parseAppId(endpointId);
const prefix = appId.namespace ? `${appId.namespace}/` : '';
await send(`${prefix}${appId.owner}/${appId.alias}`, {
subdomain: 'queue',
method: 'put',
path: `/requests/${requestId}/cancel`,
});
},
subscribe, subscribe,
}; };

View File

@ -1,6 +1,6 @@
{ {
"name": "@fal-ai/serverless-proxy", "name": "@fal-ai/serverless-proxy",
"version": "0.7.4", "version": "0.7.5",
"license": "MIT", "license": "MIT",
"repository": { "repository": {
"type": "git", "type": "git",

View File

@ -55,4 +55,5 @@ export const route = {
handler: routeHandler, handler: routeHandler,
GET: routeHandler, GET: routeHandler,
POST: routeHandler, POST: routeHandler,
PUT: routeHandler,
}; };

View File

@ -44,5 +44,6 @@ export const createRequestHandler = ({
requestHandler: handler, requestHandler: handler,
GET: handler, GET: handler,
POST: handler, POST: handler,
PUT: handler,
}; };
}; };