311 lines
7.3 KiB
TypeScript
311 lines
7.3 KiB
TypeScript
import {
|
|
Injectable,
|
|
InternalServerErrorException,
|
|
NotFoundException,
|
|
RequestTimeoutException,
|
|
ServiceUnavailableException,
|
|
} from "@nestjs/common";
|
|
import type { Request } from "express";
|
|
import { FunctionsService } from "./functions.service.js";
|
|
import { randomUUID } from "node:crypto";
|
|
import { promises as fs } from "node:fs";
|
|
import path from "node:path";
|
|
import { spawn } from "node:child_process";
|
|
import { performance } from "node:perf_hooks";
|
|
import {
|
|
ExecutionQueue,
|
|
ExecutionQueueOverflowError,
|
|
} from "./execution.queue.js";
|
|
import {
|
|
IsNumber,
|
|
IsObject,
|
|
IsString,
|
|
Max,
|
|
Min,
|
|
validate,
|
|
} from "class-validator";
|
|
|
|
export interface ExecutionResponse {
|
|
project: string;
|
|
function: string;
|
|
stdout: string;
|
|
stderr: string;
|
|
exitCode: number | null;
|
|
durationMs: number;
|
|
timedOut: boolean;
|
|
responsePayload?: ExecutorHttpResponse | null;
|
|
}
|
|
|
|
export class ExecutorHttpResponse {
|
|
@IsNumber({})
|
|
@Min(100)
|
|
@Max(599)
|
|
statusCode!: number;
|
|
|
|
@IsObject()
|
|
headers!: Record<string, string>;
|
|
|
|
@IsString()
|
|
body!: string;
|
|
}
|
|
|
|
@Injectable()
|
|
export class FunctionExecutionService {
|
|
private baseRuntimeDir = "/tmp/sigma";
|
|
private executorBinary =
|
|
process.env.EXECUTOR_BIN_PATH ??
|
|
path.resolve(
|
|
process.cwd(),
|
|
"..",
|
|
"executor",
|
|
"target",
|
|
"release",
|
|
"executor"
|
|
);
|
|
|
|
private timeoutMs = 1000;
|
|
private maxConcurrency = 10;
|
|
private maxQueueSize = 32;
|
|
|
|
private queue = new ExecutionQueue(this.maxConcurrency, this.maxQueueSize);
|
|
|
|
constructor(private functionsService: FunctionsService) {}
|
|
|
|
async execute(
|
|
projectSlug: string,
|
|
request: Request
|
|
): Promise<ExecutionResponse> {
|
|
const project = await this.functionsService.findProjectBySlug(projectSlug);
|
|
if (!project) {
|
|
throw new NotFoundException(`Project ${projectSlug} not found`);
|
|
}
|
|
|
|
const functionPath = this.extractFunctionPath(
|
|
request.path ?? request.url,
|
|
projectSlug
|
|
);
|
|
|
|
const func = await this.functionsService.findRunnableFunction(
|
|
project._id,
|
|
functionPath,
|
|
request.method
|
|
);
|
|
|
|
if (!func) {
|
|
throw new NotFoundException(
|
|
`No function for ${request.method.toUpperCase()} ${functionPath}`
|
|
);
|
|
}
|
|
|
|
await this.functionsService.ensureCpuQuota(project);
|
|
|
|
const eventPayload = this.buildEventPayload(request, functionPath);
|
|
const executionResult = await this.runExecutor(
|
|
projectSlug,
|
|
func.code,
|
|
eventPayload
|
|
);
|
|
|
|
await this.functionsService.recordCpuUsage(
|
|
project,
|
|
executionResult.durationMs
|
|
);
|
|
|
|
await this.functionsService.recordInvocation(
|
|
func._id.toString(),
|
|
executionResult.exitCode === 0,
|
|
executionResult.stderr
|
|
.split("\n")
|
|
.filter((line) => line.trim().length > 0)
|
|
);
|
|
|
|
if (executionResult.timedOut) {
|
|
throw new RequestTimeoutException("Function execution timed out");
|
|
}
|
|
|
|
return {
|
|
project: project.slug,
|
|
function: func.name,
|
|
...executionResult,
|
|
};
|
|
}
|
|
|
|
private extractFunctionPath(
|
|
pathname: string | undefined,
|
|
slug: string
|
|
): string {
|
|
if (!pathname) {
|
|
return "/";
|
|
}
|
|
|
|
const needle = `/exec/${slug}`;
|
|
const index = pathname.indexOf(needle);
|
|
let relative =
|
|
index === -1 ? pathname : pathname.slice(index + needle.length);
|
|
if (!relative || relative.length === 0) {
|
|
return "/";
|
|
}
|
|
|
|
if (!relative.startsWith("/")) {
|
|
relative = `/${relative}`;
|
|
}
|
|
|
|
if (relative.length > 1 && relative.endsWith("/")) {
|
|
relative = relative.slice(0, -1);
|
|
}
|
|
|
|
return relative || "/";
|
|
}
|
|
|
|
private buildEventPayload(request: Request, functionPath: string) {
|
|
const body = this.serializeBody(request.body);
|
|
const pathValue = request.path ?? request.url ?? functionPath;
|
|
|
|
return {
|
|
method: request.method,
|
|
path: pathValue,
|
|
originalUrl: request.originalUrl,
|
|
functionPath,
|
|
headers: request.headers,
|
|
query: request.query,
|
|
params: request.params,
|
|
body,
|
|
};
|
|
}
|
|
|
|
private serializeBody(body: unknown): unknown {
|
|
if (Buffer.isBuffer(body)) {
|
|
return body.toString("utf8");
|
|
}
|
|
|
|
return body;
|
|
}
|
|
|
|
private async runExecutor(slug: string, code: string, eventPayload: unknown) {
|
|
try {
|
|
return await this.queue.run(() =>
|
|
this.executeWithinRuntime(slug, code, eventPayload)
|
|
);
|
|
} catch (error) {
|
|
if (error instanceof ExecutionQueueOverflowError) {
|
|
throw new ServiceUnavailableException(
|
|
"Execution queue is full, please retry later"
|
|
);
|
|
}
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
private async executeWithinRuntime(
|
|
slug: string,
|
|
code: string,
|
|
eventPayload: unknown
|
|
) {
|
|
await fs.mkdir(this.baseRuntimeDir, { recursive: true });
|
|
const tempDir = path.join(this.baseRuntimeDir, slug, randomUUID());
|
|
await fs.mkdir(tempDir, { recursive: true });
|
|
|
|
const scriptPath = path.join(tempDir, "function.js");
|
|
const eventPath = path.join(tempDir, "event.json");
|
|
|
|
await fs.writeFile(scriptPath, `${code}\n`, "utf8");
|
|
await fs.writeFile(
|
|
eventPath,
|
|
JSON.stringify(eventPayload, (_, value) => value ?? null, 2),
|
|
"utf8"
|
|
);
|
|
|
|
const start = performance.now();
|
|
try {
|
|
const result = await this.spawnExecutor(slug, scriptPath, tempDir);
|
|
const durationMs = performance.now() - start;
|
|
|
|
return {
|
|
...result,
|
|
durationMs,
|
|
responsePayload: await this.extractResponsePayload(result.stdout),
|
|
};
|
|
} catch (e) {
|
|
await fs.rm(tempDir, { recursive: true, force: true });
|
|
|
|
return {
|
|
stdout: "",
|
|
stderr: (e as Error).message,
|
|
exitCode: null,
|
|
durationMs: performance.now() - start,
|
|
timedOut: false,
|
|
};
|
|
}
|
|
}
|
|
|
|
private spawnExecutor(slug: string, scriptPath: string, cwd: string) {
|
|
return new Promise<{
|
|
stdout: string;
|
|
stderr: string;
|
|
exitCode: number | null;
|
|
timedOut: boolean;
|
|
}>((resolve, reject) => {
|
|
const child = spawn(
|
|
this.executorBinary,
|
|
["--enable-fs", "--enable-sql", `scope_${slug}`, scriptPath],
|
|
{
|
|
cwd,
|
|
}
|
|
);
|
|
|
|
let stdout = "";
|
|
let stderr = "";
|
|
child.stdout?.on("data", (chunk) => {
|
|
stdout += chunk.toString();
|
|
});
|
|
child.stderr?.on("data", (chunk) => {
|
|
stderr += chunk.toString();
|
|
});
|
|
|
|
let timedOut = false;
|
|
const timer = setTimeout(() => {
|
|
timedOut = true;
|
|
child.kill("SIGKILL");
|
|
}, this.timeoutMs);
|
|
|
|
child.on("error", (err) => {
|
|
clearTimeout(timer);
|
|
reject(new InternalServerErrorException(err.message));
|
|
});
|
|
|
|
child.on("close", (code) => {
|
|
clearTimeout(timer);
|
|
resolve({
|
|
stdout,
|
|
stderr,
|
|
exitCode: code,
|
|
timedOut,
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
private async extractResponsePayload(
|
|
stdout: string
|
|
): Promise<ExecutorHttpResponse | null> {
|
|
try {
|
|
const parsed = JSON.parse(stdout.trim());
|
|
|
|
const response = new ExecutorHttpResponse();
|
|
response.statusCode = parsed.statusCode;
|
|
response.headers = parsed.headers;
|
|
response.body = parsed.body;
|
|
|
|
const errors = await validate(response);
|
|
|
|
if (errors.length === 0) {
|
|
return response;
|
|
}
|
|
|
|
return null;
|
|
} catch (error) {
|
|
return null;
|
|
}
|
|
}
|
|
}
|