diff --git a/apps/demo-nextjs-app-router/app/queue/page.tsx b/apps/demo-nextjs-app-router/app/queue/page.tsx
index b8edc23..e663a3b 100644
--- a/apps/demo-nextjs-app-router/app/queue/page.tsx
+++ b/apps/demo-nextjs-app-router/app/queue/page.tsx
@@ -52,6 +52,8 @@ export default function Home() {
const result: any = await fal.subscribe(endpointId, {
input: JSON.parse(input),
logs: true,
+ mode: 'streaming',
+ // pollInterval: 1000,
onQueueUpdate(update) {
console.log('queue update');
console.log(update);
diff --git a/libs/client/package.json b/libs/client/package.json
index 01ac312..77e0dbd 100644
--- a/libs/client/package.json
+++ b/libs/client/package.json
@@ -1,7 +1,7 @@
{
"name": "@fal-ai/serverless-client",
"description": "The fal serverless JS/TS client",
- "version": "0.12.0",
+ "version": "0.13.0-alpha.0",
"license": "MIT",
"repository": {
"type": "git",
diff --git a/libs/client/src/function.ts b/libs/client/src/function.ts
index c311307..019ad44 100644
--- a/libs/client/src/function.ts
+++ b/libs/client/src/function.ts
@@ -125,6 +125,8 @@ export async function run(
return send(id, options);
}
+const DEFAULT_POLL_INTERVAL = 500;
+
/**
* Subscribes to updates for a specific request in the queue.
*
@@ -140,22 +142,59 @@ export async function subscribe(
if (options.onEnqueue) {
options.onEnqueue(requestId);
}
- const status = await queue.streamStatus(id, {
- requestId,
- logs: options.logs,
- });
- const logs: RequestLog[] = [];
- status.on('message', (data: QueueStatus) => {
- if (options.onQueueUpdate) {
- // accumulate logs to match previous polling behavior
- if ('logs' in data && Array.isArray(data.logs) && data.logs.length > 0) {
- logs.push(...data.logs);
+ if (options.mode === 'streaming') {
+ const status = await queue.streamStatus(id, {
+ requestId,
+ logs: options.logs,
+ });
+ const logs: RequestLog[] = [];
+ status.on('message', (data: QueueStatus) => {
+ if (options.onQueueUpdate) {
+ // accumulate logs to match previous polling behavior
+ if (
+ 'logs' in data &&
+ Array.isArray(data.logs) &&
+ data.logs.length > 0
+ ) {
+ logs.push(...data.logs);
+ }
+ options.onQueueUpdate('logs' in data ? { ...data, logs } : data);
}
- options.onQueueUpdate('logs' in data ? { ...data, logs } : data);
- }
+ });
+ await status.done();
+ return queue.result