fal-js/apps/demo-nextjs-app-router/services/tuziApiQueueService.ts

192 lines
4.7 KiB
TypeScript

import PQueue from "p-queue";
import { v4 as uuidv4 } from "uuid"; // Make sure to install this package
// Configure the queue with a concurrency limit
const scriptQueue = new PQueue({ concurrency: 2 });
// Cleanup interval (1 hours in ms)
const CLEANUP_INTERVAL = 1 * 60 * 60 * 1000;
const RESULT_EXPIRY = 3 * 60 * 60 * 1000;
// API authentication settings
const API_AUTH_SETTINGS: [RegExp, string][] = [
[/(\.|^)tu-zi\.com$/, `Bearer ${process.env.TUZI_API_KEY}`],
];
export type ApiParams = {
apiTarget: string;
postBody: string;
headers?: Map<string, string>;
};
export type ApiResult = {
taskId: string;
success: boolean;
response: string;
error?: string;
};
export type ApiQueueState = {
status: "pending" | "running" | "completed" | "failed";
result?: ApiResult;
timestamp: number;
};
// Store execution results by token
const apiExecResults = new Map<string, ApiQueueState>();
/**
* Clean up expired execution results
*/
function cleanupExpiredResults() {
const now = Date.now();
for (const [token, data] of apiExecResults.entries()) {
if (now - data.timestamp > RESULT_EXPIRY) {
apiExecResults.delete(token);
}
}
}
// Set up periodic cleanup
setInterval(cleanupExpiredResults, CLEANUP_INTERVAL);
export function queueApiRequest(params: ApiParams): string {
const token = uuidv4();
// Initialize result as pending with current timestamp
apiExecResults.set(token, {
status: "pending",
timestamp: Date.now(),
});
// Add the API request to the queue
scriptQueue.add(async () => {
try {
// Update status to running
apiExecResults.set(token, {
status: "running",
timestamp: Date.now(),
});
// Prepare headers with authentication
const headers = new Headers();
// Add authentication based on URL pattern
var allowdHost = false;
const targetURL = new URL(params.apiTarget);
// Iterate over the API_AUTH_SETTINGS array
for (const [pattern, authValue] of API_AUTH_SETTINGS) {
if (pattern.test(targetURL.hostname)) {
headers.set("Authorization", authValue);
allowdHost = true;
break; // Stop checking once a match is found
}
}
// If no matching pattern found, throw an error
if (!allowdHost) {
throw new Error(
`No matching authentication pattern for ${targetURL.hostname}`,
);
}
if (params.headers) {
for (const [key, value] of params.headers.entries()) {
headers.set(key, value);
}
}
// Set content type if not already set
if (!headers.has("Content-Type")) {
headers.set("Content-Type", "application/json");
}
// Execute the fetch request
const response = await fetch(params.apiTarget, {
method: "POST",
headers: headers,
body: params.postBody,
});
// Check if response is OK
if (!response.ok) {
throw new Error(`API request failed with status ${response.status}`);
}
// Get the response text
const responseText = await response.text();
// Store successful result
apiExecResults.set(token, {
status: "completed",
result: {
taskId: token,
success: true,
response: responseText,
},
timestamp: Date.now(),
});
} catch (error: any) {
// Handle API execution errors
apiExecResults.set(token, {
status: "failed",
result: {
taskId: token,
success: false,
response: "",
error: error.message || "Failed to execute API request",
},
timestamp: Date.now(),
});
}
});
return token;
}
/**
* Query the status and result of a script execution
* @param token The token returned by executeScript
* @returns An object containing execution status and result (if available)
*/
export function getApiCallState(token: string): ApiQueueState {
const execution = apiExecResults.get(token);
if (!execution) {
return {
status: "failed",
result: {
taskId: token,
success: false,
response: "",
error: "Invalid execution token or result expired",
},
timestamp: Date.now(),
};
}
return {
status: execution.status,
result: execution.result,
timestamp: Date.now(),
};
}
// Get current queue size and pending tasks
export function getQueueStats() {
return {
size: scriptQueue.size,
pending: scriptQueue.pending,
isPaused: scriptQueue.isPaused,
activeExecutions: apiExecResults.size,
};
}
// Pause the queue (no further tasks will be executed until resumed)
export function pauseQueue() {
scriptQueue.pause();
}
// Resume the queue
export function resumeQueue() {
scriptQueue.start();
}