import test from "node:test"; import assert from "node:assert/strict"; import { MongoMemoryServer } from "mongodb-memory-server"; import { MongoClient } from "mongodb"; import { MongoWorkerStore } from "../src/runtime/mongo-worker-store.ts"; import { WorkerRuntime } from "../src/runtime/worker-runtime.ts"; import type { ExecutionContext, TaskRecord } from "../src/contracts/execution-context.ts"; async function createRuntimeFixture( database: string, options: { executors?: ConstructorParameters[0]["executors"]; } = {}, ) { const mongod = await MongoMemoryServer.create({ instance: { ip: "127.0.0.1", }, }); const client = new MongoClient(mongod.getUri()); await client.connect(); const db = client.db(database); const store = new MongoWorkerStore(db); const runtime = new WorkerRuntime({ store, executors: options.executors }); return { db, store, runtime, close: async () => { await client.close(); await mongod.stop(); }, }; } test("worker claims a queued task, creates an artifact, and queues the dependent task", async (t) => { const fixture = await createRuntimeFixture("emboflow-worker-progress"); t.after(async () => { await fixture.close(); }); await fixture.db.collection("workflow_definition_versions").insertOne({ _id: "workflow-1-v1", workflowDefinitionId: "workflow-1", workspaceId: "workspace-1", projectId: "project-1", versionNumber: 1, visualGraph: {}, logicGraph: { nodes: [ { id: "source-asset", type: "source" }, { id: "rename-folder", type: "transform" }, ], edges: [{ from: "source-asset", to: "rename-folder" }], }, runtimeGraph: { nodeBindings: { "source-asset": "source-asset", "rename-folder": "rename-folder", }, }, pluginRefs: ["builtin:delivery-nodes"], createdBy: "local-user", createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("workflow_runs").insertOne({ _id: "run-1", workflowDefinitionId: "workflow-1", workflowVersionId: "workflow-1-v1", status: "queued", triggeredBy: "local-user", createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("run_tasks").insertMany([ { _id: "task-source", workflowRunId: "run-1", workflowVersionId: "workflow-1-v1", nodeId: "source-asset", nodeType: "source", executorType: "python", status: "queued", attempt: 1, assetIds: ["asset-1"], upstreamNodeIds: [], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }, { _id: "task-rename", workflowRunId: "run-1", workflowVersionId: "workflow-1-v1", nodeId: "rename-folder", nodeType: "transform", executorType: "python", status: "pending", attempt: 1, assetIds: ["asset-1"], upstreamNodeIds: ["source-asset"], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }, ]); const task = await fixture.runtime.runNextTask(); const tasks = await fixture.store.listRunTasks("run-1"); const run = await fixture.store.getRun("run-1"); const artifacts = await fixture.db.collection("artifacts").find({ producerId: "task-source" }).toArray(); assert.equal(task?.id, "task-source"); assert.equal(tasks[0]?.status, "success"); assert.deepEqual(tasks[0]?.assetIds, ["asset-1"]); assert.equal(tasks[1]?.status, "queued"); assert.deepEqual(tasks[1]?.assetIds, ["asset-1"]); assert.equal(artifacts.length, 1); assert.equal(run?.status, "running"); }); test("worker marks the run successful after the final queued task completes", async (t) => { const fixture = await createRuntimeFixture("emboflow-worker-success"); t.after(async () => { await fixture.close(); }); await fixture.db.collection("workflow_definition_versions").insertOne({ _id: "workflow-2-v1", workflowDefinitionId: "workflow-2", workspaceId: "workspace-1", projectId: "project-1", versionNumber: 1, visualGraph: {}, logicGraph: { nodes: [{ id: "export-delivery-package", type: "export" }], edges: [], }, runtimeGraph: { nodeBindings: { "export-delivery-package": "export-delivery-package", }, }, pluginRefs: ["builtin:delivery-nodes"], createdBy: "local-user", createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("workflow_runs").insertOne({ _id: "run-2", workflowDefinitionId: "workflow-2", workflowVersionId: "workflow-2-v1", status: "queued", triggeredBy: "local-user", createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("run_tasks").insertOne({ _id: "task-export", workflowRunId: "run-2", workflowVersionId: "workflow-2-v1", nodeId: "export-delivery-package", nodeType: "export", executorType: "python", status: "queued", attempt: 1, assetIds: ["asset-final"], upstreamNodeIds: [], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.runtime.runNextTask(); const run = await fixture.store.getRun("run-2"); const task = await fixture.db.collection("run_tasks").findOne({ _id: "task-export" }); assert.equal(task?.status, "success"); assert.equal(run?.status, "success"); assert.ok(typeof task?.startedAt === "string"); assert.ok(typeof task?.finishedAt === "string"); assert.ok(typeof task?.durationMs === "number"); assert.deepEqual(task?.stdoutLines, ["python executor processed export-delivery-package"]); assert.deepEqual(task?.stderrLines, []); assert.equal(task?.summary?.outcome, "success"); assert.equal(task?.summary?.executorType, "python"); assert.equal(task?.summary?.stdoutLineCount, 1); assert.equal(task?.summary?.stderrLineCount, 0); assert.match(task?.logLines?.[0] ?? "", /claimed/i); assert.match(task?.logLines?.[1] ?? "", /stdout:/i); assert.match(task?.logLines?.at(-1) ?? "", /completed/i); assert.deepEqual(task?.lastResultPreview, { taskId: "task-export", executor: "python", }); assert.ok(typeof run?.startedAt === "string"); assert.ok(typeof run?.finishedAt === "string"); assert.ok(typeof run?.durationMs === "number"); assert.equal(run?.summary?.totalTaskCount, 1); assert.equal(run?.summary?.artifactCount, 1); assert.equal(run?.summary?.stdoutLineCount, 1); assert.equal(run?.summary?.stderrLineCount, 0); assert.equal(run?.summary?.taskCounts?.success, 1); }); test("worker passes bound asset ids into the execution context and task artifacts", async (t) => { let capturedTask: TaskRecord | null = null; let capturedContext: ExecutionContext | null = null; const fixture = await createRuntimeFixture("emboflow-worker-asset-context", { executors: { python: { async execute(task: TaskRecord, context: ExecutionContext) { capturedTask = task; capturedContext = context; return { result: { taskId: task.id, assetIds: context.assetIds, }, stdoutLines: ["custom executor started"], stderrLines: [], }; }, }, }, }); t.after(async () => { await fixture.close(); }); await fixture.db.collection("workflow_runs").insertOne({ _id: "run-asset", workflowDefinitionId: "workflow-asset", workflowVersionId: "workflow-asset-v1", status: "queued", triggeredBy: "local-user", assetIds: ["asset-42"], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("run_tasks").insertOne({ _id: "task-asset", workflowRunId: "run-asset", workflowVersionId: "workflow-asset-v1", nodeId: "source-asset", nodeType: "source", executorType: "python", status: "queued", attempt: 1, assetIds: ["asset-42"], upstreamNodeIds: [], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.runtime.runNextTask(); const storedTask = await fixture.store.getRunTask("task-asset"); const artifact = await fixture.db.collection("artifacts").findOne({ producerId: "task-asset" }); assert.deepEqual(capturedTask?.assetIds, ["asset-42"]); assert.deepEqual(capturedContext?.assetIds, ["asset-42"]); assert.deepEqual(storedTask?.assetIds, ["asset-42"]); assert.deepEqual( (artifact?.payload as { assetIds?: string[] } | undefined)?.assetIds, ["asset-42"], ); assert.equal(storedTask?.summary?.outcome, "success"); assert.equal(storedTask?.summary?.assetCount, 1); assert.deepEqual(storedTask?.summary?.artifactIds, [artifact?._id]); assert.deepEqual(storedTask?.stdoutLines, ["custom executor started"]); assert.deepEqual(storedTask?.stderrLines, []); }); test("worker persists failure summaries and task log lines when execution throws", async (t) => { const fixture = await createRuntimeFixture("emboflow-worker-failure-summary", { executors: { python: { async execute() { throw Object.assign(new Error("intentional executor failure"), { stdoutLines: ["failure path entered"], stderrLines: ["stack trace line"], }); }, }, }, }); t.after(async () => { await fixture.close(); }); await fixture.db.collection("workflow_runs").insertOne({ _id: "run-failure", workflowDefinitionId: "workflow-failure", workflowVersionId: "workflow-failure-v1", status: "queued", triggeredBy: "local-user", assetIds: ["asset-9"], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("run_tasks").insertOne({ _id: "task-failure", workflowRunId: "run-failure", workflowVersionId: "workflow-failure-v1", nodeId: "validate-structure", nodeType: "inspect", executorType: "python", status: "queued", attempt: 1, assetIds: ["asset-9"], upstreamNodeIds: [], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await assert.rejects(() => fixture.runtime.runNextTask(), /intentional executor failure/); const task = await fixture.db.collection("run_tasks").findOne({ _id: "task-failure" }); const run = await fixture.store.getRun("run-failure"); assert.equal(task?.status, "failed"); assert.equal(run?.status, "failed"); assert.equal(task?.errorMessage, "intentional executor failure"); assert.equal(task?.summary?.outcome, "failed"); assert.equal(task?.summary?.executorType, "python"); assert.equal(task?.summary?.stdoutLineCount, 1); assert.equal(task?.summary?.stderrLineCount, 1); assert.equal(task?.summary?.artifactIds?.length ?? 0, 0); assert.ok(typeof task?.startedAt === "string"); assert.ok(typeof task?.finishedAt === "string"); assert.ok(typeof task?.durationMs === "number"); assert.deepEqual(task?.stdoutLines, ["failure path entered"]); assert.deepEqual(task?.stderrLines, ["stack trace line"]); assert.match(task?.logLines?.[0] ?? "", /claimed/i); assert.match(task?.logLines?.[1] ?? "", /stdout:/i); assert.match(task?.logLines?.[2] ?? "", /stderr:/i); assert.match(task?.logLines?.at(-1) ?? "", /intentional executor failure/i); assert.equal(run?.summary?.totalTaskCount, 1); assert.equal(run?.summary?.taskCounts?.failed, 1); assert.equal(run?.summary?.stdoutLineCount, 1); assert.equal(run?.summary?.stderrLineCount, 1); assert.deepEqual(run?.summary?.failedTaskIds, ["task-failure"]); });