import { execa } from "execa"; import PQueue from "p-queue"; import { v4 as uuidv4 } from "uuid"; // Make sure to install this package // Configure the queue with a concurrency limit const scriptQueue = new PQueue({ concurrency: 2 }); // Cleanup interval (24 hours in ms) const CLEANUP_INTERVAL = 24 * 60 * 60 * 1000; const RESULT_EXPIRY = 24 * 60 * 60 * 1000; // 24 hours in ms export type ScriptParams = { scriptPath: string; args?: string[]; onProgress?: (data: string) => void; }; export type ScriptResult = { success: boolean; stdout: string; stderr: string; error?: string; }; export type ScriptQueueState = { status: "pending" | "running" | "completed" | "failed"; result?: ScriptResult; timestamp: number; }; // Store execution results by token const executionResults = new Map(); /** * Clean up expired execution results */ function cleanupExpiredResults() { const now = Date.now(); for (const [token, data] of executionResults.entries()) { if (now - data.timestamp > RESULT_EXPIRY) { executionResults.delete(token); } } } // Set up periodic cleanup setInterval(cleanupExpiredResults, CLEANUP_INTERVAL); /** * Executes a bash script with queue management and returns a token immediately */ export function executeScript(params: ScriptParams): string { const token = uuidv4(); // Initialize result as pending with current timestamp executionResults.set(token, { status: "pending", timestamp: Date.now(), }); // Add the script execution to the queue scriptQueue.add(async () => { try { // hardcoded the allowed script path for security reasons var allowedScriptPaths = [ "/tmp/api_scripts/hello_test.sh", "/tmp/api_scripts/video_add_watermark.sh", ]; if (!allowedScriptPaths.includes(params.scriptPath)) { executionResults.set(token, { status: "failed", result: { success: false, stdout: "", stderr: "", error: "Script path is not allowed", }, timestamp: Date.now(), }); return; } // Update status to running executionResults.set(token, { status: "running", timestamp: Date.now(), }); // Execute the bash script const { stdout, stderr } = await execa( "bash", [params.scriptPath, ...(params.args || [])], { buffer: true, // Ensure we get the complete output }, ); // If onProgress is provided, we can use it to stream script output if (params.onProgress && stdout) { params.onProgress(stdout.toString()); } // Store successful result executionResults.set(token, { status: "completed", result: { success: true, stdout: stdout ? stdout.toString() : "", stderr: stderr ? stderr.toString() : "", }, timestamp: Date.now(), }); } catch (error: any) { // Handle script execution errors executionResults.set(token, { status: "failed", result: { success: false, stdout: error.stdout ? error.stdout.toString() : "", stderr: error.stderr ? error.stderr.toString() : "", error: error.message || "Failed to execute script", }, timestamp: Date.now(), }); } }); return token; } /** * Query the status and result of a script execution * @param token The token returned by executeScript * @returns An object containing execution status and result (if available) */ export function getScriptExecution(token: string): ScriptQueueState { const execution = executionResults.get(token); if (!execution) { return { status: "failed", result: { success: false, stdout: "", stderr: "", error: "Invalid execution token or result expired", }, timestamp: Date.now(), }; } return { status: execution.status, result: execution.result, timestamp: Date.now(), }; } // Get current queue size and pending tasks export function getQueueStats() { return { size: scriptQueue.size, pending: scriptQueue.pending, isPaused: scriptQueue.isPaused, activeExecutions: executionResults.size, }; } // Pause the queue (no further tasks will be executed until resumed) export function pauseQueue() { scriptQueue.pause(); } // Resume the queue export function resumeQueue() { scriptQueue.start(); }