import test from "node:test"; import assert from "node:assert/strict"; import { spawnSync } from "node:child_process"; import { mkdtemp, mkdir, writeFile } from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { MongoMemoryServer } from "mongodb-memory-server"; import { MongoClient } from "mongodb"; import { MongoWorkerStore } from "../src/runtime/mongo-worker-store.ts"; import { WorkerRuntime } from "../src/runtime/worker-runtime.ts"; import type { ExecutionContext, TaskRecord } from "../src/contracts/execution-context.ts"; function hasDockerRuntime() { const result = spawnSync("docker", ["info"], { stdio: "ignore", }); return result.status === 0; } function ensureDockerImage(image: string) { const result = spawnSync("docker", ["image", "inspect", image], { stdio: "ignore", }); if (result.status === 0) { return; } const pull = spawnSync("docker", ["pull", image], { stdio: "ignore", }); if (pull.status !== 0) { throw new Error(`failed to pull docker image: ${image}`); } } async function createRuntimeFixture( database: string, options: { executors?: ConstructorParameters[0]["executors"]; } = {}, ) { const mongod = await MongoMemoryServer.create({ instance: { ip: "127.0.0.1", }, }); const client = new MongoClient(mongod.getUri()); await client.connect(); const db = client.db(database); const store = new MongoWorkerStore(db); const runtime = new WorkerRuntime({ store, executors: options.executors }); return { db, store, runtime, close: async () => { await client.close(); await mongod.stop(); }, }; } test("worker claims a queued task, creates an artifact, and queues the dependent task", async (t) => { const fixture = await createRuntimeFixture("emboflow-worker-progress"); t.after(async () => { await fixture.close(); }); await fixture.db.collection("workflow_definition_versions").insertOne({ _id: "workflow-1-v1", workflowDefinitionId: "workflow-1", workspaceId: "workspace-1", projectId: "project-1", versionNumber: 1, visualGraph: {}, logicGraph: { nodes: [ { id: "source-asset", type: "source" }, { id: "rename-folder", type: "transform" }, ], edges: [{ from: "source-asset", to: "rename-folder" }], }, runtimeGraph: { nodeBindings: { "source-asset": "source-asset", "rename-folder": "rename-folder", }, }, pluginRefs: ["builtin:delivery-nodes"], createdBy: "local-user", createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("workflow_runs").insertOne({ _id: "run-1", workflowDefinitionId: "workflow-1", workflowVersionId: "workflow-1-v1", status: "queued", triggeredBy: "local-user", createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("run_tasks").insertMany([ { _id: "task-source", workflowRunId: "run-1", workflowVersionId: "workflow-1-v1", nodeId: "source-asset", nodeType: "source", executorType: "python", status: "queued", attempt: 1, assetIds: ["asset-1"], upstreamNodeIds: [], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }, { _id: "task-rename", workflowRunId: "run-1", workflowVersionId: "workflow-1-v1", nodeId: "rename-folder", nodeType: "transform", executorType: "python", status: "pending", attempt: 1, assetIds: ["asset-1"], upstreamNodeIds: ["source-asset"], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }, ]); const task = await fixture.runtime.runNextTask(); const tasks = await fixture.store.listRunTasks("run-1"); const run = await fixture.store.getRun("run-1"); const artifacts = await fixture.db.collection("artifacts").find({ producerId: "task-source" }).toArray(); assert.equal(task?.id, "task-source"); assert.equal(tasks[0]?.status, "success"); assert.deepEqual(tasks[0]?.assetIds, ["asset-1"]); assert.equal(tasks[1]?.status, "queued"); assert.deepEqual(tasks[1]?.assetIds, ["asset-1"]); assert.equal(artifacts.length, 1); assert.equal(run?.status, "queued"); }); test("worker marks the run successful after the final queued task completes", async (t) => { const fixture = await createRuntimeFixture("emboflow-worker-success"); t.after(async () => { await fixture.close(); }); await fixture.db.collection("workflow_definition_versions").insertOne({ _id: "workflow-2-v1", workflowDefinitionId: "workflow-2", workspaceId: "workspace-1", projectId: "project-1", versionNumber: 1, visualGraph: {}, logicGraph: { nodes: [{ id: "export-delivery-package", type: "export" }], edges: [], }, runtimeGraph: { nodeBindings: { "export-delivery-package": "export-delivery-package", }, }, pluginRefs: ["builtin:delivery-nodes"], createdBy: "local-user", createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("workflow_runs").insertOne({ _id: "run-2", workflowDefinitionId: "workflow-2", workflowVersionId: "workflow-2-v1", status: "queued", triggeredBy: "local-user", createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("run_tasks").insertOne({ _id: "task-export", workflowRunId: "run-2", workflowVersionId: "workflow-2-v1", nodeId: "export-delivery-package", nodeType: "export", executorType: "python", status: "queued", attempt: 1, assetIds: ["asset-final"], upstreamNodeIds: [], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.runtime.runNextTask(); const run = await fixture.store.getRun("run-2"); const task = await fixture.db.collection("run_tasks").findOne({ _id: "task-export" }); assert.equal(task?.status, "success"); assert.equal(run?.status, "success"); assert.ok(typeof task?.startedAt === "string"); assert.ok(typeof task?.finishedAt === "string"); assert.ok(typeof task?.durationMs === "number"); assert.deepEqual(task?.stdoutLines, ["python executor processed export-delivery-package"]); assert.deepEqual(task?.stderrLines, []); assert.equal(task?.summary?.outcome, "success"); assert.equal(task?.summary?.executorType, "python"); assert.equal(task?.summary?.stdoutLineCount, 1); assert.equal(task?.summary?.stderrLineCount, 0); assert.match(task?.logLines?.[0] ?? "", /claimed/i); assert.match(task?.logLines?.[1] ?? "", /stdout:/i); assert.match(task?.logLines?.at(-1) ?? "", /completed/i); assert.deepEqual(task?.lastResultPreview, { taskId: "task-export", executor: "python", }); assert.ok(typeof run?.startedAt === "string"); assert.ok(typeof run?.finishedAt === "string"); assert.ok(typeof run?.durationMs === "number"); assert.equal(run?.summary?.totalTaskCount, 1); assert.equal(run?.summary?.artifactCount, 1); assert.equal(run?.summary?.stdoutLineCount, 1); assert.equal(run?.summary?.stderrLineCount, 0); assert.equal(run?.summary?.taskCounts?.success, 1); }); test("worker passes bound asset ids into the execution context and task artifacts", async (t) => { let capturedTask: TaskRecord | null = null; let capturedContext: ExecutionContext | null = null; const fixture = await createRuntimeFixture("emboflow-worker-asset-context", { executors: { python: { async execute(task: TaskRecord, context: ExecutionContext) { capturedTask = task; capturedContext = context; return { result: { taskId: task.id, assetIds: context.assetIds, }, stdoutLines: ["custom executor started"], stderrLines: [], }; }, }, }, }); t.after(async () => { await fixture.close(); }); await fixture.db.collection("workflow_runs").insertOne({ _id: "run-asset", workflowDefinitionId: "workflow-asset", workflowVersionId: "workflow-asset-v1", status: "queued", triggeredBy: "local-user", assetIds: ["asset-42"], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("run_tasks").insertOne({ _id: "task-asset", workflowRunId: "run-asset", workflowVersionId: "workflow-asset-v1", nodeId: "source-asset", nodeType: "source", executorType: "python", status: "queued", attempt: 1, assetIds: ["asset-42"], upstreamNodeIds: [], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.runtime.runNextTask(); const storedTask = await fixture.store.getRunTask("task-asset"); const artifact = await fixture.db.collection("artifacts").findOne({ producerId: "task-asset" }); assert.deepEqual(capturedTask?.assetIds, ["asset-42"]); assert.deepEqual(capturedContext?.assetIds, ["asset-42"]); assert.deepEqual(storedTask?.assetIds, ["asset-42"]); assert.deepEqual( (artifact?.payload as { assetIds?: string[] } | undefined)?.assetIds, ["asset-42"], ); assert.equal(storedTask?.summary?.outcome, "success"); assert.equal(storedTask?.summary?.assetCount, 1); assert.deepEqual(storedTask?.summary?.artifactIds, [artifact?._id]); assert.deepEqual(storedTask?.stdoutLines, ["custom executor started"]); assert.deepEqual(storedTask?.stderrLines, []); }); test("worker persists failure summaries and task log lines when execution throws", async (t) => { const fixture = await createRuntimeFixture("emboflow-worker-failure-summary", { executors: { python: { async execute() { throw Object.assign(new Error("intentional executor failure"), { stdoutLines: ["failure path entered"], stderrLines: ["stack trace line"], }); }, }, }, }); t.after(async () => { await fixture.close(); }); await fixture.db.collection("workflow_runs").insertOne({ _id: "run-failure", workflowDefinitionId: "workflow-failure", workflowVersionId: "workflow-failure-v1", status: "queued", triggeredBy: "local-user", assetIds: ["asset-9"], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("run_tasks").insertOne({ _id: "task-failure", workflowRunId: "run-failure", workflowVersionId: "workflow-failure-v1", nodeId: "validate-structure", nodeType: "inspect", executorType: "python", status: "queued", attempt: 1, assetIds: ["asset-9"], upstreamNodeIds: [], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await assert.rejects(() => fixture.runtime.runNextTask(), /intentional executor failure/); const task = await fixture.db.collection("run_tasks").findOne({ _id: "task-failure" }); const run = await fixture.store.getRun("run-failure"); assert.equal(task?.status, "failed"); assert.equal(run?.status, "failed"); assert.equal(task?.errorMessage, "intentional executor failure"); assert.equal(task?.summary?.outcome, "failed"); assert.equal(task?.summary?.executorType, "python"); assert.equal(task?.summary?.stdoutLineCount, 1); assert.equal(task?.summary?.stderrLineCount, 1); assert.equal(task?.summary?.artifactIds?.length ?? 0, 0); assert.ok(typeof task?.startedAt === "string"); assert.ok(typeof task?.finishedAt === "string"); assert.ok(typeof task?.durationMs === "number"); assert.deepEqual(task?.stdoutLines, ["failure path entered"]); assert.deepEqual(task?.stderrLines, ["stack trace line"]); assert.match(task?.logLines?.[0] ?? "", /claimed/i); assert.match(task?.logLines?.[1] ?? "", /stdout:/i); assert.match(task?.logLines?.[2] ?? "", /stderr:/i); assert.match(task?.logLines?.at(-1) ?? "", /intentional executor failure/i); assert.equal(run?.summary?.totalTaskCount, 1); assert.equal(run?.summary?.taskCounts?.failed, 1); assert.equal(run?.summary?.stdoutLineCount, 1); assert.equal(run?.summary?.stderrLineCount, 1); assert.deepEqual(run?.summary?.failedTaskIds, ["task-failure"]); }); test("worker skips queued tasks that belong to a cancelled run", async (t) => { const fixture = await createRuntimeFixture("emboflow-worker-cancelled-run-skip"); t.after(async () => { await fixture.close(); }); await fixture.db.collection("workflow_runs").insertMany([ { _id: "run-cancelled", workflowDefinitionId: "workflow-cancelled", workflowVersionId: "workflow-cancelled-v1", status: "cancelled", triggeredBy: "local-user", assetIds: ["asset-cancelled"], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }, { _id: "run-active", workflowDefinitionId: "workflow-active", workflowVersionId: "workflow-active-v1", status: "queued", triggeredBy: "local-user", assetIds: ["asset-active"], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }, ]); await fixture.db.collection("run_tasks").insertMany([ { _id: "task-cancelled", workflowRunId: "run-cancelled", workflowVersionId: "workflow-cancelled-v1", nodeId: "source-asset", nodeType: "source", executorType: "python", status: "queued", attempt: 1, assetIds: ["asset-cancelled"], upstreamNodeIds: [], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }, { _id: "task-active", workflowRunId: "run-active", workflowVersionId: "workflow-active-v1", nodeId: "source-asset", nodeType: "source", executorType: "python", status: "queued", attempt: 1, assetIds: ["asset-active"], upstreamNodeIds: [], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }, ]); const claimedTask = await fixture.runtime.runNextTask(); const cancelledTask = await fixture.store.getRunTask("task-cancelled"); const activeTask = await fixture.store.getRunTask("task-active"); assert.equal(claimedTask?.id, "task-active"); assert.equal(cancelledTask?.status, "cancelled"); assert.match(cancelledTask?.logLines?.at(-1) ?? "", /cancelled/i); assert.equal(activeTask?.status, "success"); }); test("worker executes a python code hook snapshot from the queued task", async (t) => { const fixture = await createRuntimeFixture("emboflow-worker-python-hook"); t.after(async () => { await fixture.close(); }); await fixture.db.collection("workflow_runs").insertOne({ _id: "run-python-hook", workflowDefinitionId: "workflow-python-hook", workflowVersionId: "workflow-python-hook-v1", status: "queued", triggeredBy: "local-user", assetIds: ["asset-hook"], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("run_tasks").insertOne({ _id: "task-python-hook", workflowRunId: "run-python-hook", workflowVersionId: "workflow-python-hook-v1", nodeId: "validate-structure", nodeType: "inspect", executorType: "python", status: "queued", attempt: 1, assetIds: ["asset-hook"], upstreamNodeIds: [], outputArtifactIds: [], codeHookSpec: { language: "python", entrypoint: "process", source: [ "def process(task, context):", " print(f\"hook running for {task['nodeId']}\")", " return {'nodeId': task['nodeId'], 'assetIds': context['assetIds'], 'hooked': True}", ].join("\n"), }, createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.runtime.runNextTask(); const task = await fixture.store.getRunTask("task-python-hook"); const artifact = await fixture.db.collection("artifacts").findOne({ producerId: "task-python-hook" }); assert.equal(task?.status, "success"); assert.deepEqual(task?.stdoutLines, ["hook running for validate-structure"]); assert.deepEqual(task?.stderrLines, []); assert.equal(task?.summary?.executorType, "python"); assert.deepEqual((artifact?.payload as { result?: { hooked?: boolean } } | undefined)?.result, { nodeId: "validate-structure", assetIds: ["asset-hook"], hooked: true, }); }); test("worker executes a queued docker task inside a real container", { skip: !hasDockerRuntime(), }, async (t) => { ensureDockerImage("alpine:3.20"); const fixture = await createRuntimeFixture("emboflow-worker-real-docker"); t.after(async () => { await fixture.close(); }); await fixture.db.collection("workflow_runs").insertOne({ _id: "run-real-docker", workflowDefinitionId: "workflow-real-docker", workflowVersionId: "workflow-real-docker-v1", status: "queued", triggeredBy: "local-user", assetIds: ["asset-docker"], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("run_tasks").insertOne({ _id: "task-real-docker", workflowRunId: "run-real-docker", workflowVersionId: "workflow-real-docker-v1", nodeId: "source-asset", nodeDefinitionId: "source-asset", nodeType: "source", executorType: "docker", executorConfig: { image: "alpine:3.20", command: [ "sh", "-lc", [ "echo docker-container-started", "cat > \"$EMBOFLOW_OUTPUT_PATH\" <<'JSON'", "{\"mode\":\"docker\",\"nodeId\":\"source-asset\",\"hooked\":true}", "JSON", ].join("\n"), ], }, status: "queued", attempt: 1, assetIds: ["asset-docker"], upstreamNodeIds: [], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.runtime.runNextTask(); const task = await fixture.store.getRunTask("task-real-docker"); const artifact = await fixture.db.collection("artifacts").findOne({ producerId: "task-real-docker" }); assert.equal(task?.status, "success"); assert.deepEqual(task?.stdoutLines, ["docker-container-started"]); assert.deepEqual(task?.stderrLines, []); assert.equal(task?.summary?.executorType, "docker"); assert.deepEqual(task?.lastResultPreview, { mode: "docker", nodeId: "source-asset", hooked: true, }); assert.deepEqual((artifact?.payload as { result?: Record } | undefined)?.result, { mode: "docker", nodeId: "source-asset", hooked: true, }); }); test("worker executes built-in docker source nodes when codeHookSpec is null", { skip: !hasDockerRuntime(), }, async (t) => { ensureDockerImage("python:3.11-alpine"); const fixture = await createRuntimeFixture("emboflow-worker-built-in-docker-source"); t.after(async () => { await fixture.close(); }); await fixture.db.collection("assets").insertOne({ _id: "asset-built-in-docker-source", workspaceId: "workspace-1", projectId: "project-1", type: "folder", sourceType: "registered_path", displayName: "Built-in Docker Source Asset", sourcePath: "/tmp/built-in-docker-source", status: "probed", storageRef: {}, topLevelPaths: ["meta.json"], detectedFormats: ["delivery_package"], fileCount: 1, summary: {}, createdBy: "local-user", createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("workflow_runs").insertOne({ _id: "run-built-in-docker-source", workflowDefinitionId: "workflow-built-in-docker-source", workflowVersionId: "workflow-built-in-docker-source-v1", status: "queued", triggeredBy: "local-user", assetIds: ["asset-built-in-docker-source"], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("run_tasks").insertOne({ _id: "task-built-in-docker-source", workflowRunId: "run-built-in-docker-source", workflowVersionId: "workflow-built-in-docker-source-v1", nodeId: "source-asset", nodeDefinitionId: "source-asset", nodeType: "source", executorType: "docker", executorConfig: { image: "python:3.11-alpine", networkMode: "none", }, codeHookSpec: null, status: "queued", attempt: 1, assetIds: ["asset-built-in-docker-source"], upstreamNodeIds: [], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.runtime.runNextTask(); const task = await fixture.store.getRunTask("task-built-in-docker-source"); const artifact = await fixture.db.collection("artifacts").findOne({ producerId: "task-built-in-docker-source" }); assert.equal(task?.status, "success"); assert.deepEqual(task?.stderrLines, []); assert.deepEqual(task?.stdoutLines, ["loaded 1 bound asset"]); assert.deepEqual((artifact?.payload as { result?: { assetCount?: number } } | undefined)?.result?.assetCount, 1); }); test("worker loads bound asset metadata into the execution context for built-in source nodes", async (t) => { let capturedContext: ExecutionContext | null = null; const fixture = await createRuntimeFixture("emboflow-worker-source-context", { executors: { python: { async execute(_task: TaskRecord, context: ExecutionContext) { capturedContext = context; return { result: { assetCount: context.assets?.length ?? 0 }, stdoutLines: ["custom source executor"], stderrLines: [], }; }, }, }, }); t.after(async () => { await fixture.close(); }); await fixture.db.collection("assets").insertOne({ _id: "asset-context-1", workspaceId: "workspace-1", projectId: "project-1", type: "folder", sourceType: "registered_path", displayName: "Sample Asset", sourcePath: "/tmp/sample-asset", status: "probed", storageRef: {}, topLevelPaths: ["meta.json"], detectedFormats: ["delivery_package"], fileCount: 4, summary: { kind: "delivery_package" }, createdBy: "local-user", createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("workflow_runs").insertOne({ _id: "run-source-context", workflowDefinitionId: "workflow-source-context", workflowVersionId: "workflow-source-context-v1", status: "queued", triggeredBy: "local-user", assetIds: ["asset-context-1"], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("run_tasks").insertOne({ _id: "task-source-context", workflowRunId: "run-source-context", workflowVersionId: "workflow-source-context-v1", nodeId: "source-asset", nodeDefinitionId: "source-asset", nodeType: "source", executorType: "python", status: "queued", attempt: 1, assetIds: ["asset-context-1"], upstreamNodeIds: [], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.runtime.runNextTask(); assert.equal(capturedContext?.assets?.[0]?.id, "asset-context-1"); assert.equal(capturedContext?.assets?.[0]?.displayName, "Sample Asset"); assert.equal(capturedContext?.assets?.[0]?.sourcePath, "/tmp/sample-asset"); }); test("worker validates delivery structure against the bound asset path for validate-structure", async (t) => { const sourceDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-worker-validate-")); await mkdir(path.join(sourceDir, "DJI_001")); await writeFile(path.join(sourceDir, "meta.json"), "{}"); await writeFile(path.join(sourceDir, "intrinsics.json"), "{}"); await writeFile(path.join(sourceDir, "video_meta.json"), "{}"); await writeFile(path.join(sourceDir, "DJI_001", "DJI_001.mp4"), ""); const fixture = await createRuntimeFixture("emboflow-worker-validate-structure"); t.after(async () => { await fixture.close(); }); await fixture.db.collection("assets").insertOne({ _id: "asset-validate-1", workspaceId: "workspace-1", projectId: "project-1", type: "folder", sourceType: "registered_path", displayName: "Validation Asset", sourcePath: sourceDir, status: "probed", storageRef: {}, topLevelPaths: ["DJI_001", "meta.json", "intrinsics.json", "video_meta.json"], detectedFormats: ["delivery_package"], fileCount: 4, summary: { kind: "delivery_package" }, createdBy: "local-user", createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("workflow_runs").insertOne({ _id: "run-validate-structure", workflowDefinitionId: "workflow-validate-structure", workflowVersionId: "workflow-validate-structure-v1", status: "queued", triggeredBy: "local-user", assetIds: ["asset-validate-1"], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("run_tasks").insertOne({ _id: "task-validate-structure", workflowRunId: "run-validate-structure", workflowVersionId: "workflow-validate-structure-v1", nodeId: "validate-structure", nodeDefinitionId: "validate-structure", nodeType: "inspect", executorType: "python", status: "queued", attempt: 1, assetIds: ["asset-validate-1"], upstreamNodeIds: [], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.runtime.runNextTask(); const task = await fixture.store.getRunTask("task-validate-structure"); const artifact = await fixture.db.collection("artifacts").findOne({ producerId: "task-validate-structure" }); assert.equal(task?.status, "success"); assert.match(task?.stdoutLines?.[0] ?? "", /validated 1 asset/i); assert.deepEqual(task?.lastResultPreview, { assetCount: 1, valid: true, requiredFiles: ["meta.json", "intrinsics.json", "video_meta.json"], videoFileCount: 1, }); assert.deepEqual((artifact?.payload as { result?: Record } | undefined)?.result, { assetCount: 1, valid: true, requiredFiles: ["meta.json", "intrinsics.json", "video_meta.json"], videoFileCount: 1, }); }); test("worker applies intersect-assets and narrows the downstream effective asset set", async (t) => { const sourceDirA = await mkdtemp(path.join(os.tmpdir(), "emboflow-worker-intersect-a-")); const sourceDirB = await mkdtemp(path.join(os.tmpdir(), "emboflow-worker-intersect-b-")); await mkdir(path.join(sourceDirA, "DJI_A")); await mkdir(path.join(sourceDirB, "DJI_B")); for (const root of [sourceDirA, sourceDirB]) { await writeFile(path.join(root, "meta.json"), "{}"); await writeFile(path.join(root, "intrinsics.json"), "{}"); await writeFile(path.join(root, "video_meta.json"), "{}"); } await writeFile(path.join(sourceDirA, "DJI_A", "A.mp4"), ""); await writeFile(path.join(sourceDirB, "DJI_B", "B.mp4"), ""); const fixture = await createRuntimeFixture("emboflow-worker-intersect-assets"); t.after(async () => { await fixture.close(); }); await fixture.db.collection("assets").insertMany([ { _id: "asset-intersect-a", workspaceId: "workspace-1", projectId: "project-1", type: "folder", sourceType: "registered_path", displayName: "Intersect Asset A", sourcePath: sourceDirA, status: "probed", storageRef: {}, topLevelPaths: ["DJI_A", "meta.json", "intrinsics.json", "video_meta.json"], detectedFormats: ["delivery_package"], fileCount: 4, summary: { kind: "delivery_package" }, createdBy: "local-user", createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }, { _id: "asset-intersect-b", workspaceId: "workspace-1", projectId: "project-1", type: "folder", sourceType: "registered_path", displayName: "Intersect Asset B", sourcePath: sourceDirB, status: "probed", storageRef: {}, topLevelPaths: ["DJI_B", "meta.json", "intrinsics.json", "video_meta.json"], detectedFormats: ["delivery_package"], fileCount: 4, summary: { kind: "delivery_package" }, createdBy: "local-user", createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }, ]); await fixture.db.collection("workflow_runs").insertOne({ _id: "run-intersect-assets", workflowDefinitionId: "workflow-intersect-assets", workflowVersionId: "workflow-intersect-assets-v1", status: "queued", triggeredBy: "local-user", assetIds: ["asset-intersect-a", "asset-intersect-b"], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("run_tasks").insertMany([ { _id: "task-upstream-a", workflowRunId: "run-intersect-assets", workflowVersionId: "workflow-intersect-assets-v1", nodeId: "source-assets-a", nodeDefinitionId: "source-asset", nodeType: "source", executorType: "python", status: "success", attempt: 1, assetIds: ["asset-intersect-a", "asset-intersect-b"], upstreamNodeIds: [], outputArtifactIds: [], lastResultPreview: { assetIds: ["asset-intersect-a", "asset-intersect-b"] }, createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }, { _id: "task-upstream-b", workflowRunId: "run-intersect-assets", workflowVersionId: "workflow-intersect-assets-v1", nodeId: "source-assets-b", nodeDefinitionId: "source-asset", nodeType: "source", executorType: "python", status: "success", attempt: 1, assetIds: ["asset-intersect-b"], upstreamNodeIds: [], outputArtifactIds: [], lastResultPreview: { assetIds: ["asset-intersect-b"] }, createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }, { _id: "task-intersect-assets", workflowRunId: "run-intersect-assets", workflowVersionId: "workflow-intersect-assets-v1", nodeId: "intersect-assets-1", nodeDefinitionId: "intersect-assets", nodeType: "utility", executorType: "python", status: "queued", attempt: 1, assetIds: ["asset-intersect-a", "asset-intersect-b"], upstreamNodeIds: ["source-assets-a", "source-assets-b"], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }, { _id: "task-downstream-validate", workflowRunId: "run-intersect-assets", workflowVersionId: "workflow-intersect-assets-v1", nodeId: "validate-structure", nodeDefinitionId: "validate-structure", nodeType: "inspect", executorType: "python", status: "pending", attempt: 1, assetIds: ["asset-intersect-a", "asset-intersect-b"], upstreamNodeIds: ["intersect-assets-1"], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }, ]); await fixture.runtime.runNextTask(); const intersectTask = await fixture.store.getRunTask("task-intersect-assets"); const queuedValidate = await fixture.store.getRunTask("task-downstream-validate"); assert.equal(intersectTask?.status, "success"); assert.deepEqual(intersectTask?.lastResultPreview?.assetIds, ["asset-intersect-b"]); assert.match(intersectTask?.stdoutLines?.[0] ?? "", /intersection resolved 1 asset/i); assert.equal(queuedValidate?.status, "queued"); await fixture.runtime.runNextTask(); const validateTask = await fixture.store.getRunTask("task-downstream-validate"); assert.equal(validateTask?.status, "success"); assert.equal(validateTask?.summary?.assetCount, 1); assert.deepEqual(validateTask?.lastResultPreview, { assetCount: 1, valid: true, requiredFiles: ["meta.json", "intrinsics.json", "video_meta.json"], videoFileCount: 1, }); }); test("worker executes built-in union-assets inside docker when docker is available", async (t) => { if (!hasDockerRuntime()) { t.diagnostic("docker runtime unavailable; skipping built-in docker union-assets test"); return; } ensureDockerImage("python:3.11-alpine"); const fixture = await createRuntimeFixture("emboflow-worker-docker-union-assets"); t.after(async () => { await fixture.close(); }); await fixture.db.collection("workflow_runs").insertOne({ _id: "run-docker-union-assets", workflowDefinitionId: "workflow-docker-union-assets", workflowVersionId: "workflow-docker-union-assets-v1", status: "queued", triggeredBy: "local-user", assetIds: ["asset-union-a", "asset-union-b"], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("run_tasks").insertMany([ { _id: "task-union-upstream-a", workflowRunId: "run-docker-union-assets", workflowVersionId: "workflow-docker-union-assets-v1", nodeId: "source-assets-a", nodeDefinitionId: "source-asset", nodeType: "source", executorType: "python", status: "success", attempt: 1, assetIds: ["asset-union-a"], upstreamNodeIds: [], outputArtifactIds: [], lastResultPreview: { assetIds: ["asset-union-a"] }, createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }, { _id: "task-union-upstream-b", workflowRunId: "run-docker-union-assets", workflowVersionId: "workflow-docker-union-assets-v1", nodeId: "source-assets-b", nodeDefinitionId: "source-asset", nodeType: "source", executorType: "python", status: "success", attempt: 1, assetIds: ["asset-union-b"], upstreamNodeIds: [], outputArtifactIds: [], lastResultPreview: { assetIds: ["asset-union-b"] }, createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }, { _id: "task-union-docker", workflowRunId: "run-docker-union-assets", workflowVersionId: "workflow-docker-union-assets-v1", nodeId: "union-assets-1", nodeDefinitionId: "union-assets", nodeType: "utility", executorType: "docker", executorConfig: { image: "python:3.11-alpine", }, status: "queued", attempt: 1, assetIds: ["asset-union-a", "asset-union-b"], upstreamNodeIds: ["source-assets-a", "source-assets-b"], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }, ]); await fixture.runtime.runNextTask(); const task = await fixture.store.getRunTask("task-union-docker"); assert.equal(task?.status, "success"); assert.equal(task?.summary?.executorType, "docker"); assert.match(task?.stdoutLines?.[0] ?? "", /union resolved 2 assets/i); assert.deepEqual(task?.lastResultPreview?.assetIds, ["asset-union-a", "asset-union-b"]); }); test("worker builds and executes a custom dockerfile node when docker is available", async (t) => { if (!hasDockerRuntime()) { t.diagnostic("docker runtime unavailable; skipping custom dockerfile node test"); return; } ensureDockerImage("python:3.11-alpine"); const fixture = await createRuntimeFixture("emboflow-worker-custom-dockerfile-node"); t.after(async () => { await fixture.close(); }); await fixture.db.collection("workflow_runs").insertOne({ _id: "run-custom-dockerfile-node", workflowDefinitionId: "workflow-custom-dockerfile-node", workflowVersionId: "workflow-custom-dockerfile-node-v1", status: "queued", triggeredBy: "local-user", assetIds: ["asset-custom-1", "asset-custom-2"], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.db.collection("run_tasks").insertOne({ _id: "task-custom-dockerfile-node", workflowRunId: "run-custom-dockerfile-node", workflowVersionId: "workflow-custom-dockerfile-node-v1", nodeId: "custom-node-merge-assets-1", nodeDefinitionId: "custom-merge-assets", nodeType: "utility", executorType: "docker", executorConfig: { imageTag: "emboflow-test/custom-merge-assets:latest", dockerfileContent: [ "FROM python:3.11-alpine", "CMD [\"python3\", \"-c\", \"import json,os,pathlib; payload=json.loads(pathlib.Path(os.environ['EMBOFLOW_INPUT_PATH']).read_text()); asset_ids=payload['context'].get('assetIds', []); pathlib.Path(os.environ['EMBOFLOW_OUTPUT_PATH']).write_text(json.dumps({'result': {'assetIds': asset_ids, 'assetCount': len(asset_ids), 'kind': 'custom-dockerfile'}})); print(f'custom dockerfile handled {len(asset_ids)} assets')\"]", ].join("\n"), contract: { inputMode: "single_asset_set", outputMode: "asset_set", artifactType: "json", }, }, status: "queued", attempt: 1, assetIds: ["asset-custom-1", "asset-custom-2"], upstreamNodeIds: [], outputArtifactIds: [], createdAt: new Date().toISOString(), updatedAt: new Date().toISOString(), }); await fixture.runtime.runNextTask(); const task = await fixture.store.getRunTask("task-custom-dockerfile-node"); assert.equal(task?.status, "success"); assert.equal(task?.summary?.executorType, "docker"); assert.match(task?.stdoutLines?.[0] ?? "", /custom dockerfile handled 2 assets/i); assert.deepEqual(task?.lastResultPreview, { assetIds: ["asset-custom-1", "asset-custom-2"], assetCount: 2, kind: "custom-dockerfile", }); });