import { Injectable, InternalServerErrorException, NotFoundException, RequestTimeoutException, ServiceUnavailableException, } from "@nestjs/common"; import type { Request } from "express"; import { FunctionsService } from "./functions.service.js"; import { randomUUID } from "node:crypto"; import { promises as fs } from "node:fs"; import path from "node:path"; import { spawn } from "node:child_process"; import { performance } from "node:perf_hooks"; import { ExecutionQueue, ExecutionQueueOverflowError, } from "./execution.queue.js"; import { IsNumber, IsObject, IsString, Max, Min, validate, } from "class-validator"; export interface ExecutionResponse { project: string; function: string; stdout: string; stderr: string; exitCode: number | null; durationMs: number; timedOut: boolean; responsePayload?: ExecutorHttpResponse | null; } export class ExecutorHttpResponse { @IsNumber({}) @Min(100) @Max(599) statusCode!: number; @IsObject() headers!: Record; @IsString() body!: string; } @Injectable() export class FunctionExecutionService { private baseRuntimeDir = "/tmp/sigma"; private executorBinary = process.env.EXECUTOR_BIN_PATH ?? path.resolve( process.cwd(), "..", "executor", "target", "release", "executor" ); private timeoutMs = 1000; private maxConcurrency = 10; private maxQueueSize = 32; private queue = new ExecutionQueue(this.maxConcurrency, this.maxQueueSize); constructor(private functionsService: FunctionsService) {} async execute( projectSlug: string, request: Request ): Promise { const project = await this.functionsService.findProjectBySlug(projectSlug); if (!project) { throw new NotFoundException(`Project ${projectSlug} not found`); } const functionPath = this.extractFunctionPath( request.path ?? request.url, projectSlug ); const func = await this.functionsService.findRunnableFunction( project._id, functionPath, request.method ); if (!func) { throw new NotFoundException( `No function for ${request.method.toUpperCase()} ${functionPath}` ); } await this.functionsService.ensureCpuQuota(project); const eventPayload = this.buildEventPayload(request, functionPath); const executionResult = await this.runExecutor( projectSlug, func.code, eventPayload ); await this.functionsService.recordCpuUsage( project, executionResult.durationMs ); await this.functionsService.recordInvocation( func._id.toString(), executionResult.exitCode === 0, executionResult.stderr .split("\n") .filter((line) => line.trim().length > 0) ); if (executionResult.timedOut) { throw new RequestTimeoutException("Function execution timed out"); } return { project: project.slug, function: func.name, ...executionResult, }; } private extractFunctionPath( pathname: string | undefined, slug: string ): string { if (!pathname) { return "/"; } const needle = `/exec/${slug}`; const index = pathname.indexOf(needle); let relative = index === -1 ? pathname : pathname.slice(index + needle.length); if (!relative || relative.length === 0) { return "/"; } if (!relative.startsWith("/")) { relative = `/${relative}`; } if (relative.length > 1 && relative.endsWith("/")) { relative = relative.slice(0, -1); } return relative || "/"; } private buildEventPayload(request: Request, functionPath: string) { const body = this.serializeBody(request.body); const pathValue = request.path ?? request.url ?? functionPath; return { method: request.method, path: pathValue, originalUrl: request.originalUrl, functionPath, headers: request.headers, query: request.query, params: request.params, body, }; } private serializeBody(body: unknown): unknown { if (Buffer.isBuffer(body)) { return body.toString("utf8"); } return body; } private async runExecutor(slug: string, code: string, eventPayload: unknown) { try { return await this.queue.run(() => this.executeWithinRuntime(slug, code, eventPayload) ); } catch (error) { if (error instanceof ExecutionQueueOverflowError) { throw new ServiceUnavailableException( "Execution queue is full, please retry later" ); } throw error; } } private async executeWithinRuntime( slug: string, code: string, eventPayload: unknown ) { await fs.mkdir(this.baseRuntimeDir, { recursive: true }); const tempDir = path.join(this.baseRuntimeDir, slug, randomUUID()); await fs.mkdir(tempDir, { recursive: true }); const scriptPath = path.join(tempDir, "function.js"); const eventPath = path.join(tempDir, "event.json"); await fs.writeFile(scriptPath, `${code}\n`, "utf8"); await fs.writeFile( eventPath, JSON.stringify(eventPayload, (_, value) => value ?? null, 2), "utf8" ); const start = performance.now(); try { const result = await this.spawnExecutor(slug, scriptPath, tempDir); const durationMs = performance.now() - start; return { ...result, durationMs, responsePayload: await this.extractResponsePayload(result.stdout), }; } catch (e) { await fs.rm(tempDir, { recursive: true, force: true }); return { stdout: "", stderr: (e as Error).message, exitCode: null, durationMs: performance.now() - start, timedOut: false, }; } } private spawnExecutor(slug: string, scriptPath: string, cwd: string) { return new Promise<{ stdout: string; stderr: string; exitCode: number | null; timedOut: boolean; }>((resolve, reject) => { const child = spawn( this.executorBinary, ["--enable-fs", scriptPath], { cwd, } ); let stdout = ""; let stderr = ""; child.stdout?.on("data", (chunk) => { stdout += chunk.toString(); }); child.stderr?.on("data", (chunk) => { stderr += chunk.toString(); }); let timedOut = false; const timer = setTimeout(() => { timedOut = true; child.kill("SIGKILL"); }, this.timeoutMs); child.on("error", (err) => { clearTimeout(timer); reject(new InternalServerErrorException(err.message)); }); child.on("close", (code) => { clearTimeout(timer); resolve({ stdout, stderr, exitCode: code, timedOut, }); }); }); } private async extractResponsePayload( stdout: string ): Promise { try { const parsed = JSON.parse(stdout.trim()); const response = new ExecutorHttpResponse(); response.statusCode = parsed.statusCode; response.headers = parsed.headers; response.body = parsed.body; const errors = await validate(response); if (errors.length === 0) { return response; } return null; } catch (error) { return null; } } }