178 lines
4.4 KiB
TypeScript

import { execa } from "execa";
import PQueue from "p-queue";
import { v4 as uuidv4 } from "uuid"; // Make sure to install this package
// Configure the queue with a concurrency limit
const scriptQueue = new PQueue({ concurrency: 2 });
// Cleanup interval (24 hours in ms)
const CLEANUP_INTERVAL = 24 * 60 * 60 * 1000;
const RESULT_EXPIRY = 24 * 60 * 60 * 1000; // 24 hours in ms
export type ScriptParams = {
scriptPath: string;
args?: string[];
onProgress?: (data: string) => void;
};
export type ScriptResult = {
success: boolean;
stdout: string;
stderr: string;
error?: string;
};
export type ScriptQueueState = {
status: "pending" | "running" | "completed" | "failed";
result?: ScriptResult;
timestamp: number;
};
// Store execution results by token
const executionResults = new Map<string, ScriptQueueState>();
/**
* Clean up expired execution results
*/
function cleanupExpiredResults() {
const now = Date.now();
for (const [token, data] of executionResults.entries()) {
if (now - data.timestamp > RESULT_EXPIRY) {
executionResults.delete(token);
}
}
}
// Set up periodic cleanup
setInterval(cleanupExpiredResults, CLEANUP_INTERVAL);
/**
* Executes a bash script with queue management and returns a token immediately
*/
export function executeScript(params: ScriptParams): string {
const token = uuidv4();
// Initialize result as pending with current timestamp
executionResults.set(token, {
status: "pending",
timestamp: Date.now(),
});
// Add the script execution to the queue
scriptQueue.add(async () => {
try {
// hardcoded the allowed script path for security reasons
var allowedScriptPaths = [
"/tmp/api_scripts/hello_test.sh",
"/tmp/api_scripts/video_add_watermark.sh",
];
if (!allowedScriptPaths.includes(params.scriptPath)) {
executionResults.set(token, {
status: "failed",
result: {
success: false,
stdout: "",
stderr: "",
error: "Script path is not allowed",
},
timestamp: Date.now(),
});
return;
}
// Update status to running
executionResults.set(token, {
status: "running",
timestamp: Date.now(),
});
// Execute the bash script
const { stdout, stderr } = await execa(
"bash",
[params.scriptPath, ...(params.args || [])],
{
buffer: true, // Ensure we get the complete output
},
);
// If onProgress is provided, we can use it to stream script output
if (params.onProgress && stdout) {
params.onProgress(stdout.toString());
}
// Store successful result
executionResults.set(token, {
status: "completed",
result: {
success: true,
stdout: stdout ? stdout.toString() : "",
stderr: stderr ? stderr.toString() : "",
},
timestamp: Date.now(),
});
} catch (error: any) {
// Handle script execution errors
executionResults.set(token, {
status: "failed",
result: {
success: false,
stdout: error.stdout ? error.stdout.toString() : "",
stderr: error.stderr ? error.stderr.toString() : "",
error: error.message || "Failed to execute script",
},
timestamp: Date.now(),
});
}
});
return token;
}
/**
* Query the status and result of a script execution
* @param token The token returned by executeScript
* @returns An object containing execution status and result (if available)
*/
export function getScriptExecution(token: string): ScriptQueueState {
const execution = executionResults.get(token);
if (!execution) {
return {
status: "failed",
result: {
success: false,
stdout: "",
stderr: "",
error: "Invalid execution token or result expired",
},
timestamp: Date.now(),
};
}
return {
status: execution.status,
result: execution.result,
timestamp: Date.now(),
};
}
// Get current queue size and pending tasks
export function getQueueStats() {
return {
size: scriptQueue.size,
pending: scriptQueue.pending,
isPaused: scriptQueue.isPaused,
activeExecutions: executionResults.size,
};
}
// Pause the queue (no further tasks will be executed until resumed)
export function pauseQueue() {
scriptQueue.pause();
}
// Resume the queue
export function resumeQueue() {
scriptQueue.start();
}