import { randomUUID } from "node:crypto"; import type { Db } from "mongodb"; import type { CodeHookSpec, ExecutionAsset, ExecutionDataset, ExecutorType, NodeRuntimeConfig, RunExecutionSummary, TaskExecutionSummary, TaskRecord, TaskStatusCounts, TaskStatus, WorkflowInputBinding, } from "../contracts/execution-context.ts"; type WorkflowRunDocument = { _id: string; workflowDefinitionId: string; workflowVersionId: string; status: "queued" | "running" | "success" | "failed" | "cancelled"; triggeredBy: string; inputBindings?: WorkflowInputBinding[]; assetIds: string[]; datasetIds?: string[]; runtimeSnapshot?: { selectedPreset?: string; nodeBindings?: Record; nodeConfigs?: Record; pluginRefs?: string[]; }; startedAt?: string; finishedAt?: string; durationMs?: number; summary?: RunExecutionSummary; createdAt: string; updatedAt: string; }; type WorkflowVersionDocument = { _id: string; workflowDefinitionId: string; logicGraph: { nodes: Array<{ id: string; type: string }>; edges: Array<{ from: string; to: string }>; }; runtimeGraph: Record; }; type RunTaskDocument = { _id: string; workflowRunId: string; workflowVersionId: string; nodeId: string; nodeType: string; nodeDefinitionId?: string; executorType: ExecutorType; executorConfig?: Record; codeHookSpec?: CodeHookSpec; artifactType?: "json" | "directory" | "video"; artifactTitle?: string; status: TaskStatus; attempt: number; inputBindings?: WorkflowInputBinding[]; assetIds: string[]; datasetIds?: string[]; upstreamNodeIds: string[]; outputArtifactIds: string[]; errorMessage?: string; startedAt?: string; finishedAt?: string; durationMs?: number; logLines?: string[]; stdoutLines?: string[]; stderrLines?: string[]; summary?: TaskExecutionSummary; lastResultPreview?: Record; createdAt: string; updatedAt: string; }; type AssetDocument = { _id: string; displayName?: string; sourcePath?: string; topLevelPaths?: string[]; detectedFormats?: string[]; summary?: Record; }; type DatasetDocument = { _id: string; name: string; storagePath?: string; sourceAssetIds?: string[]; latestVersionId?: string; latestVersionNumber?: number; summary?: Record; }; function nowIso() { return new Date().toISOString(); } function toTaskRecord(task: RunTaskDocument): TaskRecord { return { id: task._id, workflowRunId: task.workflowRunId, workflowVersionId: task.workflowVersionId, nodeId: task.nodeId, nodeType: task.nodeType, nodeDefinitionId: task.nodeDefinitionId, executorType: task.executorType, executorConfig: task.executorConfig, codeHookSpec: task.codeHookSpec, artifactType: task.artifactType, artifactTitle: task.artifactTitle, status: task.status, attempt: task.attempt, inputBindings: task.inputBindings ?? [], assetIds: task.assetIds, datasetIds: task.datasetIds ?? [], upstreamNodeIds: task.upstreamNodeIds, outputArtifactIds: task.outputArtifactIds, errorMessage: task.errorMessage, startedAt: task.startedAt, finishedAt: task.finishedAt, durationMs: task.durationMs, logLines: task.logLines ?? [], stdoutLines: task.stdoutLines ?? [], stderrLines: task.stderrLines ?? [], summary: task.summary, lastResultPreview: task.lastResultPreview, }; } function buildTaskStatusCounts(tasks: TaskRecord[]): TaskStatusCounts { const counts: TaskStatusCounts = { pending: 0, queued: 0, running: 0, success: 0, failed: 0, cancelled: 0, }; for (const task of tasks) { counts[task.status] += 1; } return counts; } function buildRunExecutionSummary(tasks: TaskRecord[]): RunExecutionSummary { const taskCounts = buildTaskStatusCounts(tasks); return { totalTaskCount: tasks.length, completedTaskCount: taskCounts.success + taskCounts.failed + taskCounts.cancelled, artifactCount: tasks.reduce((total, task) => total + (task.outputArtifactIds?.length ?? 0), 0), stdoutLineCount: tasks.reduce((total, task) => total + (task.stdoutLines?.length ?? 0), 0), stderrLineCount: tasks.reduce((total, task) => total + (task.stderrLines?.length ?? 0), 0), failedTaskIds: tasks.filter((task) => task.status === "failed").map((task) => task.id), taskCounts, }; } function minIso(values: Array) { const filtered = values.filter((value): value is string => Boolean(value)); if (filtered.length === 0) { return undefined; } return filtered.reduce((current, value) => (value < current ? value : current)); } function maxIso(values: Array) { const filtered = values.filter((value): value is string => Boolean(value)); if (filtered.length === 0) { return undefined; } return filtered.reduce((current, value) => (value > current ? value : current)); } export class MongoWorkerStore { private readonly db: Db; constructor(db: Db) { this.db = db; } async claimNextQueuedTask(): Promise { for (;;) { const candidate = await this.db .collection("run_tasks") .find({ status: "queued" }) .sort({ createdAt: 1 }) .limit(1) .next(); if (!candidate) { return undefined; } const run = await this.db .collection("workflow_runs") .findOne({ _id: candidate.workflowRunId }); if (run?.status === "cancelled") { const finishedAt = nowIso(); await this.db.collection("run_tasks").updateOne( { _id: candidate._id, status: "queued" }, { $set: { status: "cancelled", finishedAt, updatedAt: finishedAt, }, $push: { logLines: { $each: ["Task cancelled before execution"], }, }, }, ); await this.refreshRunStatus(candidate.workflowRunId); continue; } const startedAt = nowIso(); const task = await this.db.collection("run_tasks").findOneAndUpdate( { _id: candidate._id, status: "queued" }, { $set: { status: "running", startedAt, updatedAt: startedAt, }, $push: { logLines: { $each: ["Task claimed by worker"], }, }, }, { returnDocument: "after", }, ); if (!task) { continue; } await this.db.collection("workflow_runs").updateOne( { _id: task.workflowRunId }, { $set: { status: "running", updatedAt: nowIso() } }, ); return toTaskRecord(task); } } async getRun(runId: string) { return this.db.collection("workflow_runs").findOne({ _id: runId }); } async getRunTask(taskId: string) { const task = await this.db.collection("run_tasks").findOne({ _id: taskId }); return task ? toTaskRecord(task) : null; } async listRunTasks(runId: string) { const tasks = await this.db .collection("run_tasks") .find({ workflowRunId: runId }) .sort({ createdAt: 1 }) .toArray(); return tasks.map(toTaskRecord); } async getWorkflowVersion(workflowVersionId: string) { return this.db .collection("workflow_definition_versions") .findOne({ _id: workflowVersionId }); } async getAssetsByIds(assetIds: string[]): Promise { if (assetIds.length === 0) { return []; } const assets = await this.db .collection("assets") .find({ _id: { $in: assetIds } }) .toArray(); const assetMap = new Map( assets.map((asset) => [ asset._id, { id: asset._id, displayName: asset.displayName ?? asset._id, sourcePath: asset.sourcePath, topLevelPaths: asset.topLevelPaths ?? [], detectedFormats: asset.detectedFormats ?? [], summary: asset.summary ?? {}, } satisfies ExecutionAsset, ]), ); return assetIds .map((assetId) => assetMap.get(assetId)) .filter((asset): asset is ExecutionAsset => Boolean(asset)); } async getDatasetsByIds(datasetIds: string[]): Promise { if (datasetIds.length === 0) { return []; } const datasets = await this.db .collection("datasets") .find({ _id: { $in: datasetIds } }) .toArray(); const datasetMap = new Map( datasets.map((dataset) => [ dataset._id, { id: dataset._id, name: dataset.name, storagePath: dataset.storagePath, sourceAssetIds: dataset.sourceAssetIds ?? [], latestVersionId: dataset.latestVersionId, latestVersionNumber: dataset.latestVersionNumber, summary: dataset.summary ?? {}, } satisfies ExecutionDataset, ]), ); return datasetIds .map((datasetId) => datasetMap.get(datasetId)) .filter((dataset): dataset is ExecutionDataset => Boolean(dataset)); } async createTaskArtifact(task: TaskRecord, payload: Record) { const artifact = { _id: `artifact-${randomUUID()}`, type: task.artifactType ?? "json", title: task.artifactTitle ?? `Task Result: ${task.nodeId}`, producerType: "run_task", producerId: task.id, payload, createdAt: nowIso(), updatedAt: nowIso(), }; await this.db.collection("artifacts").insertOne(artifact); await this.db.collection("run_tasks").updateOne( { _id: task.id }, { $push: { outputArtifactIds: artifact._id }, $set: { updatedAt: nowIso() }, }, ); return artifact; } async markTaskSuccess( taskId: string, input: { assetIds: string[]; finishedAt: string; durationMs: number; summary: TaskExecutionSummary; stdoutLines: string[]; stderrLines: string[]; logLines: string[]; lastResultPreview?: Record; }, ) { await this.db.collection("run_tasks").updateOne( { _id: taskId }, { $set: { assetIds: input.assetIds, status: "success", finishedAt: input.finishedAt, durationMs: input.durationMs, stdoutLines: input.stdoutLines, stderrLines: input.stderrLines, summary: input.summary, lastResultPreview: input.lastResultPreview, updatedAt: input.finishedAt, }, $push: { logLines: { $each: input.logLines, }, }, }, ); } async markTaskFailed( taskId: string, errorMessage: string, input: { assetIds: string[]; finishedAt: string; durationMs: number; summary: TaskExecutionSummary; stdoutLines: string[]; stderrLines: string[]; logLines: string[]; }, ) { await this.db.collection("run_tasks").updateOne( { _id: taskId }, { $set: { assetIds: input.assetIds, status: "failed", errorMessage, finishedAt: input.finishedAt, durationMs: input.durationMs, stdoutLines: input.stdoutLines, stderrLines: input.stderrLines, summary: input.summary, updatedAt: input.finishedAt, }, $push: { logLines: { $each: input.logLines, }, }, }, ); } async queueReadyDependents(runId: string) { const run = await this.getRun(runId); if (run?.status === "cancelled") { return; } const tasks = await this.db .collection("run_tasks") .find({ workflowRunId: runId }) .toArray(); const successfulNodes = new Set( tasks.filter((task) => task.status === "success").map((task) => task.nodeId), ); const readyTaskIds = tasks .filter( (task) => task.status === "pending" && task.upstreamNodeIds.every((nodeId) => successfulNodes.has(nodeId)), ) .map((task) => task._id); if (readyTaskIds.length === 0) { return; } await this.db.collection("run_tasks").updateMany( { _id: { $in: readyTaskIds } }, { $set: { status: "queued", updatedAt: nowIso(), }, }, ); } async refreshRunStatus(runId: string) { const tasks = await this.listRunTasks(runId); if (tasks.length === 0) { return; } const currentRun = await this.getRun(runId); if (!currentRun) { return; } let status: WorkflowRunDocument["status"] = "queued"; if (currentRun.status === "cancelled") { status = "cancelled"; } else if (tasks.every((task) => task.status === "success")) { status = "success"; } else if (tasks.some((task) => task.status === "running")) { status = "running"; } else if (tasks.some((task) => task.status === "queued")) { status = "queued"; } else if (tasks.some((task) => task.status === "failed")) { status = "failed"; } const startedAt = minIso(tasks.map((task) => task.startedAt)); const finishedAt = status === "success" || status === "failed" || status === "cancelled" ? maxIso(tasks.map((task) => task.finishedAt)) : undefined; const summary = buildRunExecutionSummary(tasks); const updateSet: Partial = { status, summary, updatedAt: nowIso(), }; if (startedAt) { updateSet.startedAt = startedAt; } if (finishedAt) { updateSet.finishedAt = finishedAt; updateSet.durationMs = Math.max(Date.parse(finishedAt) - Date.parse(startedAt ?? finishedAt), 0); } await this.db.collection("workflow_runs").updateOne( { _id: runId }, { $set: updateSet }, ); } }