feat(client): request abort signal support (#112)

* feat(client): request abort signal support

* fix: lint errors
This commit is contained in:
Daniel Rochetti 2024-11-25 18:07:21 -08:00 committed by GitHub
parent 8b2f66b63f
commit 1b5b50f8e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 56 additions and 17 deletions

View File

@ -1,7 +1,7 @@
{ {
"name": "@fal-ai/client", "name": "@fal-ai/client",
"description": "The fal.ai client for JavaScript and TypeScript", "description": "The fal.ai client for JavaScript and TypeScript",
"version": "1.2.0-alpha.5", "version": "1.2.0-alpha.6",
"license": "MIT", "license": "MIT",
"repository": { "repository": {
"type": "git", "type": "git",

View File

@ -106,6 +106,9 @@ export function createFalClient(userConfig: Config = {}): FalClient {
...config, ...config,
responseHandler: resultResponseHandler, responseHandler: resultResponseHandler,
}, },
options: {
signal: options.abortSignal,
},
}); });
}, },
subscribe: async (endpointId, options) => { subscribe: async (endpointId, options) => {

View File

@ -123,6 +123,11 @@ type BaseQueueOptions = {
* The unique identifier for the enqueued request. * The unique identifier for the enqueued request.
*/ */
requestId: string; requestId: string;
/**
* The signal to abort the request.
*/
abortSignal?: AbortSignal;
}; };
export type QueueStatusOptions = BaseQueueOptions & { export type QueueStatusOptions = BaseQueueOptions & {
@ -246,11 +251,14 @@ export const createQueueClient = ({
}, },
input: input as Input, input: input as Input,
config, config,
options: {
signal: options.abortSignal,
},
}); });
}, },
async status( async status(
endpointId: string, endpointId: string,
{ requestId, logs = false }: QueueStatusOptions, { requestId, logs = false, abortSignal }: QueueStatusOptions,
): Promise<QueueStatus> { ): Promise<QueueStatus> {
const appId = parseEndpointId(endpointId); const appId = parseEndpointId(endpointId);
const prefix = appId.namespace ? `${appId.namespace}/` : ""; const prefix = appId.namespace ? `${appId.namespace}/` : "";
@ -262,6 +270,9 @@ export const createQueueClient = ({
path: `/requests/${requestId}/status`, path: `/requests/${requestId}/status`,
}), }),
config, config,
options: {
signal: abortSignal,
},
}); });
}, },
@ -379,6 +390,7 @@ export const createQueueClient = ({
const requestStatus = await ref.status(endpointId, { const requestStatus = await ref.status(endpointId, {
requestId, requestId,
logs: options.logs ?? false, logs: options.logs ?? false,
abortSignal: options.abortSignal,
}); });
if (options.onQueueUpdate) { if (options.onQueueUpdate) {
options.onQueueUpdate(requestStatus); options.onQueueUpdate(requestStatus);
@ -400,7 +412,7 @@ export const createQueueClient = ({
async result<Output>( async result<Output>(
endpointId: string, endpointId: string,
{ requestId }: BaseQueueOptions, { requestId, abortSignal }: BaseQueueOptions,
): Promise<Result<Output>> { ): Promise<Result<Output>> {
const appId = parseEndpointId(endpointId); const appId = parseEndpointId(endpointId);
const prefix = appId.namespace ? `${appId.namespace}/` : ""; const prefix = appId.namespace ? `${appId.namespace}/` : "";
@ -414,12 +426,15 @@ export const createQueueClient = ({
...config, ...config,
responseHandler: resultResponseHandler, responseHandler: resultResponseHandler,
}, },
options: {
signal: abortSignal,
},
}); });
}, },
async cancel( async cancel(
endpointId: string, endpointId: string,
{ requestId }: BaseQueueOptions, { requestId, abortSignal }: BaseQueueOptions,
): Promise<void> { ): Promise<void> {
const appId = parseEndpointId(endpointId); const appId = parseEndpointId(endpointId);
const prefix = appId.namespace ? `${appId.namespace}/` : ""; const prefix = appId.namespace ? `${appId.namespace}/` : "";
@ -430,6 +445,9 @@ export const createQueueClient = ({
path: `/requests/${requestId}/cancel`, path: `/requests/${requestId}/cancel`,
}), }),
config, config,
options: {
signal: abortSignal,
},
}); });
}, },
}; };

View File

@ -105,7 +105,7 @@ async function partUploadRetries(
uploadUrl: string, uploadUrl: string,
chunk: Blob, chunk: Blob,
config: RequiredConfig, config: RequiredConfig,
tries: number = 3, tries = 3,
): Promise<MultipartObject> { ): Promise<MultipartObject> {
if (tries === 0) { if (tries === 0) {
throw new Error("Part upload failed, retries exhausted"); throw new Error("Part upload failed, retries exhausted");
@ -142,21 +142,17 @@ async function multipartUpload(
const responses: MultipartObject[] = []; const responses: MultipartObject[] = [];
try { for (let i = 0; i < chunks; i++) {
for (let i = 0; i < chunks; i++) { const start = i * chunkSize;
const start = i * chunkSize; const end = Math.min(start + chunkSize, file.size);
const end = Math.min(start + chunkSize, file.size);
const chunk = file.slice(start, end); const chunk = file.slice(start, end);
const partNumber = i + 1; const partNumber = i + 1;
// {uploadUrl}/{part_number}?uploadUrlParams=... // {uploadUrl}/{part_number}?uploadUrlParams=...
const partUploadUrl = `${parsedUrl.origin}${parsedUrl.pathname}/${partNumber}${parsedUrl.search}`; const partUploadUrl = `${parsedUrl.origin}${parsedUrl.pathname}/${partNumber}${parsedUrl.search}`;
responses.push(await partUploadRetries(partUploadUrl, chunk, config)); responses.push(await partUploadRetries(partUploadUrl, chunk, config));
}
} catch (error) {
throw error;
} }
// Complete the upload // Complete the upload

View File

@ -63,6 +63,11 @@ export type StreamOptions<Input> = {
* support streaming. * support streaming.
*/ */
readonly connectionMode?: StreamingConnectionMode; readonly connectionMode?: StreamingConnectionMode;
/**
* The signal to abort the request.
*/
readonly signal?: AbortSignal;
}; };
const EVENT_STREAM_TIMEOUT = 15 * 1000; const EVENT_STREAM_TIMEOUT = 15 * 1000;
@ -129,6 +134,14 @@ export class FalStream<Input, Output> {
reject(error); reject(error);
}); });
}); });
// if a abort signal was passed, sync it with the internal one
if (options.signal) {
options.signal.addEventListener("abort", () => {
this.abortController.abort();
});
}
// start the streaming request
this.start().catch(this.handleError); this.start().catch(this.handleError);
} }
@ -345,6 +358,10 @@ export class FalStream<Input, Output> {
/** /**
* Gets the `AbortSignal` instance that can be used to listen for abort events. * Gets the `AbortSignal` instance that can be used to listen for abort events.
*
* **Note:** this signal is internal to the `FalStream` instance. If you pass your
* own abort signal, the `FalStream` will listen to it and abort it appropriately.
*
* @returns the `AbortSignal` instance. * @returns the `AbortSignal` instance.
* @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal * @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal
*/ */

View File

@ -22,6 +22,11 @@ export type RunOptions<Input> = {
* The HTTP method, defaults to `post`; * The HTTP method, defaults to `post`;
*/ */
readonly method?: "get" | "post" | "put" | "delete" | string; readonly method?: "get" | "post" | "put" | "delete" | string;
/**
* The abort signal to cancel the request.
*/
readonly abortSignal?: AbortSignal;
}; };
export type UrlOptions = { export type UrlOptions = {