import { getTemporaryAuthToken } from './auth'; import { dispatchRequest } from './request'; import { storageImpl } from './storage'; import { FalStream } from './streaming'; import { EnqueueResult, QueueStatus, RequestLog } from './types'; import { ensureAppIdFormat, isUUIDv4, isValidUrl, parseAppId } from './utils'; /** * The function input and other configuration when running * the function, such as the HTTP method to use. */ type RunOptions = { /** * The path to the function, if any. Defaults to ``. * @deprecated Pass the path as part of the app id itself, e.g. `fal-ai/sdxl/image-to-image` */ readonly path?: string; /** * The function input. It will be submitted either as query params * or the body payload, depending on the `method`. */ readonly input?: Input; /** * The HTTP method, defaults to `post`; */ readonly method?: 'get' | 'post' | 'put' | 'delete' | string; /** * If `true`, the function will automatically upload any files * (i.e. instances of `Blob`). * * This is enabled by default. You can disable it by setting it to `false`. */ readonly autoUpload?: boolean; }; type ExtraOptions = { /** * If `true`, the function will use the queue to run the function * asynchronously and return the result in a separate call. This * influences how the URL is built. */ readonly subdomain?: string; /** * The query parameters to include in the URL. */ readonly query?: Record; }; /** * Builds the final url to run the function based on its `id` or alias and * a the options from `RunOptions`. * * @private * @param id the function id or alias * @param options the run options * @returns the final url to run the function */ export function buildUrl( id: string, options: RunOptions & ExtraOptions = {} ): string { const method = (options.method ?? 'post').toLowerCase(); const path = (options.path ?? '').replace(/^\//, '').replace(/\/{2,}/, '/'); const input = options.input; const params = { ...(options.query || {}), ...(method === 'get' ? input : {}), }; const queryParams = Object.keys(params).length > 0 ? `?${new URLSearchParams(params).toString()}` : ''; const parts = id.split('/'); // if a fal url is passed, just use it if (isValidUrl(id)) { const url = id.endsWith('/') ? id : `${id}/`; return `${url}${path}${queryParams}`; } // TODO remove this after some time, fal.run should be preferred if (parts.length === 2 && isUUIDv4(parts[1])) { const host = 'gateway.shark.fal.ai'; return `https://${host}/trigger/${id}/${path}${queryParams}`; } const appId = ensureAppIdFormat(id); const subdomain = options.subdomain ? `${options.subdomain}.` : ''; const url = `https://${subdomain}fal.run/${appId}/${path}`; return `${url.replace(/\/$/, '')}${queryParams}`; } export async function send( id: string, options: RunOptions & ExtraOptions = {} ): Promise { const input = options.input && options.autoUpload !== false ? await storageImpl.transformInput(options.input) : options.input; return dispatchRequest( options.method ?? 'post', buildUrl(id, options), input as Input ); } /** * Runs a fal serverless function identified by its `id`. * * @param id the registered function revision id or alias. * @returns the remote function output */ export async function run( id: string, options: RunOptions = {} ): Promise { return send(id, options); } const DEFAULT_POLL_INTERVAL = 500; /** * Subscribes to updates for a specific request in the queue. * * @param id - The ID or URL of the function web endpoint. * @param options - Options to configure how the request is run and how updates are received. * @returns A promise that resolves to the result of the request once it's completed. */ export async function subscribe( id: string, options: RunOptions & QueueSubscribeOptions = {} ): Promise { const { request_id: requestId } = await queue.submit(id, options); if (options.onEnqueue) { options.onEnqueue(requestId); } if (options.mode === 'streaming') { const status = await queue.streamStatus(id, { requestId, logs: options.logs, }); const logs: RequestLog[] = []; status.on('message', (data: QueueStatus) => { if (options.onQueueUpdate) { // accumulate logs to match previous polling behavior if ( 'logs' in data && Array.isArray(data.logs) && data.logs.length > 0 ) { logs.push(...data.logs); } options.onQueueUpdate('logs' in data ? { ...data, logs } : data); } }); await status.done(); return queue.result(id, { requestId }); } // default to polling until status streaming is stable and faster return new Promise((resolve, reject) => { let timeoutId: ReturnType; const pollInterval = options.pollInterval ?? DEFAULT_POLL_INTERVAL; const poll = async () => { try { const requestStatus = await queue.status(id, { requestId, logs: options.logs ?? false, }); if (options.onQueueUpdate) { options.onQueueUpdate(requestStatus); } if (requestStatus.status === 'COMPLETED') { clearTimeout(timeoutId); try { const result = await queue.result(id, { requestId }); resolve(result); } catch (error) { reject(error); } return; } timeoutId = setTimeout(poll, pollInterval); } catch (error) { clearTimeout(timeoutId); reject(error); } }; poll().catch(reject); }); } /** * Options for subscribing to the request queue. */ type QueueSubscribeOptions = { /** * The mode to use for subscribing to updates. It defaults to `polling`. * You can also use client-side streaming by setting it to `streaming`. * * **Note:** Streaming is currently experimental and once stable, it will * be the default mode. * * @see pollInterval */ mode?: 'polling' | 'streaming'; /** * Callback function that is called when a request is enqueued. * @param requestId - The unique identifier for the enqueued request. */ onEnqueue?: (requestId: string) => void; /** * Callback function that is called when the status of the queue changes. * @param status - The current status of the queue. */ onQueueUpdate?: (status: QueueStatus) => void; /** * If `true`, the response will include the logs for the request. * Defaults to `false`. */ logs?: boolean; /** * The URL to send a webhook notification to when the request is completed. * @see WebHookResponse */ webhookUrl?: string; } & ( | { mode?: 'polling'; /** * The interval (in milliseconds) at which to poll for updates. * If not provided, a default value of `500` will be used. * * This value is ignored if `mode` is set to `streaming`. */ pollInterval?: number; } | { mode: 'streaming'; } ); /** * Options for submitting a request to the queue. */ type SubmitOptions = RunOptions & { /** * The URL to send a webhook notification to when the request is completed. * @see WebHookResponse */ webhookUrl?: string; }; type BaseQueueOptions = { /** * The unique identifier for the enqueued request. */ requestId: string; }; type QueueStatusOptions = BaseQueueOptions & { /** * If `true`, the response will include the logs for the request. * Defaults to `false`. */ logs?: boolean; }; /** * Represents a request queue with methods for submitting requests, * checking their status, retrieving results, and subscribing to updates. */ interface Queue { /** * Submits a request to the queue. * * @param endpointId - The ID or URL of the function web endpoint. * @param options - Options to configure how the request is run. * @returns A promise that resolves to the result of enqueuing the request. */ submit( endpointId: string, options: SubmitOptions ): Promise; /** * Retrieves the status of a specific request in the queue. * * @param endpointId - The ID or URL of the function web endpoint. * @param options - Options to configure how the request is run. * @returns A promise that resolves to the status of the request. */ status(endpointId: string, options: QueueStatusOptions): Promise; /** * Retrieves the result of a specific request from the queue. * * @param endpointId - The ID or URL of the function web endpoint. * @param options - Options to configure how the request is run. * @returns A promise that resolves to the result of the request. */ result( endpointId: string, options: BaseQueueOptions ): Promise; /** * @deprecated Use `fal.subscribe` instead. */ subscribe( endpointId: string, options: RunOptions & QueueSubscribeOptions ): Promise; streamStatus( endpointId: string, options: QueueStatusOptions ): Promise>; } /** * The fal run queue module. It allows to submit a function to the queue and get its result * on a separate call. This is useful for long running functions that can be executed * asynchronously and not . */ export const queue: Queue = { async submit( endpointId: string, options: SubmitOptions ): Promise { const { webhookUrl, path = '', ...runOptions } = options; return send(endpointId, { ...runOptions, subdomain: 'queue', method: 'post', path: path, query: webhookUrl ? { fal_webhook: webhookUrl } : undefined, }); }, async status( endpointId: string, { requestId, logs = false }: QueueStatusOptions ): Promise { const appId = parseAppId(endpointId); const prefix = appId.namespace ? `${appId.namespace}/` : ''; return send(`${prefix}${appId.owner}/${appId.alias}`, { subdomain: 'queue', method: 'get', path: `/requests/${requestId}/status`, input: { logs: logs ? '1' : '0', }, }); }, async streamStatus( endpointId: string, { requestId, logs = false }: QueueStatusOptions ): Promise> { const appId = parseAppId(endpointId); const prefix = appId.namespace ? `${appId.namespace}/` : ''; const token = await getTemporaryAuthToken(endpointId); const url = buildUrl(`${prefix}${appId.owner}/${appId.alias}`, { subdomain: 'queue', path: `/requests/${requestId}/status/stream`, }); const queryParams = new URLSearchParams({ fal_jwt_token: token, logs: logs ? '1' : '0', }); return new FalStream(`${url}?${queryParams}`, { input: {}, method: 'get', }); }, async result( endpointId: string, { requestId }: BaseQueueOptions ): Promise { const appId = parseAppId(endpointId); const prefix = appId.namespace ? `${appId.namespace}/` : ''; return send(`${prefix}${appId.owner}/${appId.alias}`, { subdomain: 'queue', method: 'get', path: `/requests/${requestId}`, }); }, subscribe, };