diff --git a/README.md b/README.md index d778654..0fe6b19 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,7 @@ The workflow editor currently requires selecting at least one registered asset b The editor now also persists per-node runtime config in workflow versions, including executor overrides, optional artifact title overrides, and Python code-hook source for inspect and transform style nodes. The Runs workspace now shows project-scoped run history, run-level aggregated summaries, cancel/retry controls, and run detail views with persisted task summaries, stdout/stderr sections, result previews, and artifact links into Explore. Selected run tasks now expose the frozen node definition id, executor config snapshot, and code-hook metadata that were captured when the run was created. +When a node uses `executorType=docker` and provides `executorConfig.image`, the worker now runs a real local Docker container with mounted `input.json` / `output.json` exchange files. If no image is configured, the executor falls back to the lightweight simulated behavior used by older demo tasks. ## Repository Structure diff --git a/apps/worker/src/executors/docker-executor.ts b/apps/worker/src/executors/docker-executor.ts index bb72325..3dca9bf 100644 --- a/apps/worker/src/executors/docker-executor.ts +++ b/apps/worker/src/executors/docker-executor.ts @@ -1,27 +1,158 @@ +import { spawn } from "node:child_process"; +import { mkdtemp, readFile, rm, stat, writeFile } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + import type { ExecutionContext, ExecutorExecutionResult, TaskRecord, } from "../contracts/execution-context.ts"; +function splitOutputLines(output: string) { + return output + .split(/\r?\n/u) + .map((line) => line.trimEnd()) + .filter((line) => line.length > 0); +} + +function buildFallbackResult(task: TaskRecord, image: string, command: string[]) { + return { + result: { + taskId: task.id, + executor: "docker" as const, + image, + command, + }, + stdoutLines: [`docker executor processed ${task.nodeId} with ${image}`], + stderrLines: [], + }; +} + +async function fileExists(filePath: string) { + try { + await stat(filePath); + return true; + } catch { + return false; + } +} + +function parseDockerResult(payload: unknown) { + if (payload && typeof payload === "object" && !Array.isArray(payload) && "result" in payload) { + return (payload as { result: unknown }).result; + } + return payload; +} + export class DockerExecutor { executionCount = 0; - async execute(task: TaskRecord, _context: ExecutionContext): Promise { + async execute(task: TaskRecord, context: ExecutionContext): Promise { this.executionCount += 1; - const image = typeof task.executorConfig?.image === "string" ? task.executorConfig.image : "docker://local-simulated"; + + const image = typeof task.executorConfig?.image === "string" ? task.executorConfig.image.trim() : ""; const command = Array.isArray(task.executorConfig?.command) ? task.executorConfig.command.filter((item): item is string => typeof item === "string") : []; - return { - result: { + + if (!image) { + return buildFallbackResult(task, "docker://local-simulated", command); + } + + const workdir = typeof task.executorConfig?.workdir === "string" && task.executorConfig.workdir.trim().length > 0 + ? task.executorConfig.workdir + : "/workspace"; + const networkMode = + typeof task.executorConfig?.networkMode === "string" && task.executorConfig.networkMode.trim().length > 0 + ? task.executorConfig.networkMode + : "none"; + const envVars = + task.executorConfig?.env && typeof task.executorConfig.env === "object" && !Array.isArray(task.executorConfig.env) + ? Object.entries(task.executorConfig.env).filter( + (entry): entry is [string, string] => typeof entry[0] === "string" && typeof entry[1] === "string", + ) + : []; + + const tempDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-docker-executor-")); + const inputPath = path.join(tempDir, "input.json"); + const outputPath = path.join(tempDir, "output.json"); + await writeFile( + inputPath, + JSON.stringify({ + task, + context, + }), + ); + + const dockerArgs = [ + "run", + "--rm", + "--network", + networkMode, + "--workdir", + workdir, + "--volume", + `${tempDir}:${workdir}`, + "--env", + `EMBOFLOW_INPUT_PATH=${workdir}/input.json`, + "--env", + `EMBOFLOW_OUTPUT_PATH=${workdir}/output.json`, + "--env", + `EMBOFLOW_TASK_ID=${task.id}`, + "--env", + `EMBOFLOW_NODE_ID=${task.nodeId}`, + "--env", + `EMBOFLOW_WORKFLOW_RUN_ID=${context.workflowRunId ?? ""}`, + ...envVars.flatMap(([key, value]) => ["--env", `${key}=${value}`]), + image, + ...(command.length > 0 ? command : ["sh", "-lc", "cat \"$EMBOFLOW_INPUT_PATH\" > \"$EMBOFLOW_OUTPUT_PATH\""]), + ]; + + const child = spawn("docker", dockerArgs, { + stdio: ["ignore", "pipe", "pipe"], + }); + + let stdout = ""; + let stderr = ""; + child.stdout.on("data", (chunk) => { + stdout += String(chunk); + }); + child.stderr.on("data", (chunk) => { + stderr += String(chunk); + }); + + const exitCode = await new Promise((resolve, reject) => { + child.on("error", reject); + child.on("close", (code) => resolve(code ?? 1)); + }); + + try { + if (exitCode !== 0) { + throw Object.assign(new Error(`docker executor failed with exit code ${exitCode}`), { + stdoutLines: splitOutputLines(stdout), + stderrLines: splitOutputLines(stderr), + }); + } + + let result: unknown = { taskId: task.id, executor: "docker" as const, image, command, - }, - stdoutLines: [`docker executor processed ${task.nodeId} with ${image}`], - stderrLines: [], - }; + }; + if (await fileExists(outputPath)) { + const outputPayload = JSON.parse(await readFile(outputPath, "utf8")) as unknown; + result = parseDockerResult(outputPayload); + } + + return { + result, + stdoutLines: splitOutputLines(stdout), + stderrLines: splitOutputLines(stderr), + }; + } finally { + await rm(tempDir, { recursive: true, force: true }); + } } } diff --git a/apps/worker/test/mongo-worker-runtime.spec.ts b/apps/worker/test/mongo-worker-runtime.spec.ts index 50d352d..c0d014b 100644 --- a/apps/worker/test/mongo-worker-runtime.spec.ts +++ b/apps/worker/test/mongo-worker-runtime.spec.ts @@ -1,5 +1,6 @@ import test from "node:test"; import assert from "node:assert/strict"; +import { spawnSync } from "node:child_process"; import { MongoMemoryServer } from "mongodb-memory-server"; import { MongoClient } from "mongodb"; @@ -8,6 +9,29 @@ import { MongoWorkerStore } from "../src/runtime/mongo-worker-store.ts"; import { WorkerRuntime } from "../src/runtime/worker-runtime.ts"; import type { ExecutionContext, TaskRecord } from "../src/contracts/execution-context.ts"; +function hasDockerRuntime() { + const result = spawnSync("docker", ["info"], { + stdio: "ignore", + }); + return result.status === 0; +} + +function ensureDockerImage(image: string) { + const result = spawnSync("docker", ["image", "inspect", image], { + stdio: "ignore", + }); + if (result.status === 0) { + return; + } + + const pull = spawnSync("docker", ["pull", image], { + stdio: "ignore", + }); + if (pull.status !== 0) { + throw new Error(`failed to pull docker image: ${image}`); + } +} + async function createRuntimeFixture( database: string, options: { @@ -485,3 +509,74 @@ test("worker executes a python code hook snapshot from the queued task", async ( hooked: true, }); }); + +test("worker executes a queued docker task inside a real container", { + skip: !hasDockerRuntime(), +}, async (t) => { + ensureDockerImage("alpine:3.20"); + const fixture = await createRuntimeFixture("emboflow-worker-real-docker"); + t.after(async () => { + await fixture.close(); + }); + + await fixture.db.collection("workflow_runs").insertOne({ + _id: "run-real-docker", + workflowDefinitionId: "workflow-real-docker", + workflowVersionId: "workflow-real-docker-v1", + status: "queued", + triggeredBy: "local-user", + assetIds: ["asset-docker"], + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }); + + await fixture.db.collection("run_tasks").insertOne({ + _id: "task-real-docker", + workflowRunId: "run-real-docker", + workflowVersionId: "workflow-real-docker-v1", + nodeId: "source-asset", + nodeDefinitionId: "source-asset", + nodeType: "source", + executorType: "docker", + executorConfig: { + image: "alpine:3.20", + command: [ + "sh", + "-lc", + [ + "echo docker-container-started", + "cat > \"$EMBOFLOW_OUTPUT_PATH\" <<'JSON'", + "{\"mode\":\"docker\",\"nodeId\":\"source-asset\",\"hooked\":true}", + "JSON", + ].join("\n"), + ], + }, + status: "queued", + attempt: 1, + assetIds: ["asset-docker"], + upstreamNodeIds: [], + outputArtifactIds: [], + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }); + + await fixture.runtime.runNextTask(); + + const task = await fixture.store.getRunTask("task-real-docker"); + const artifact = await fixture.db.collection("artifacts").findOne({ producerId: "task-real-docker" }); + + assert.equal(task?.status, "success"); + assert.deepEqual(task?.stdoutLines, ["docker-container-started"]); + assert.deepEqual(task?.stderrLines, []); + assert.equal(task?.summary?.executorType, "docker"); + assert.deepEqual(task?.lastResultPreview, { + mode: "docker", + nodeId: "source-asset", + hooked: true, + }); + assert.deepEqual((artifact?.payload as { result?: Record } | undefined)?.result, { + mode: "docker", + nodeId: "source-asset", + hooked: true, + }); +}); diff --git a/design/03-workflows/workflow-execution-model.md b/design/03-workflows/workflow-execution-model.md index 4f75645..5789d29 100644 --- a/design/03-workflows/workflow-execution-model.md +++ b/design/03-workflows/workflow-execution-model.md @@ -128,6 +128,22 @@ This keeps serialization, logging, and runtime control predictable. The current V1 worker executes trusted-local Python hooks when a `run_task` carries a `codeHookSpec`. The hook is executed through a constrained Python harness with the task snapshot and execution context passed in as JSON. Hook stdout is captured into `stdoutLines`, hook failures populate `stderrLines`, and the returned object becomes the task artifact payload. +The current V1 Docker executor now has two modes: + +- compatibility mode when no image is configured on the node runtime config +- real container mode when `executorConfig.image` is set + +In real container mode the worker: + +- creates a temp working directory +- writes `input.json` containing the frozen task snapshot and execution context +- mounts that directory into the container +- sets `EMBOFLOW_INPUT_PATH` and `EMBOFLOW_OUTPUT_PATH` +- captures container stdout and stderr from the Docker CLI process +- parses `output.json` back into the task artifact payload when present + +The default Docker runtime policy is `--network none`. This keeps V1 safer for local processing nodes unless a later phase deliberately opens network access for containerized tasks. + ## Data Flow Contract Tasks should exchange managed references, not loose file paths. diff --git a/docs/plans/2026-03-26-emboflow-v1-foundation-and-mvp.md b/docs/plans/2026-03-26-emboflow-v1-foundation-and-mvp.md index 865ef8c..81469e5 100644 --- a/docs/plans/2026-03-26-emboflow-v1-foundation-and-mvp.md +++ b/docs/plans/2026-03-26-emboflow-v1-foundation-and-mvp.md @@ -24,6 +24,7 @@ - `2026-03-27`: The current follow-up observability pass adds persisted stdout/stderr fields on `run_tasks` plus aggregated run summaries, durations, and task counts on `workflow_runs`. - `2026-03-27`: The current run-control pass adds run cancellation, run retry from immutable snapshots, and task retry for failed/cancelled nodes with downstream reset semantics. - `2026-03-27`: The current runtime-config pass freezes per-node executor config into `workflow_runs` and `run_tasks`, exposes runtime editing controls in the React workflow editor, and executes trusted-local Python code hooks from the task snapshot. +- `2026-03-27`: The current Docker-runtime pass upgrades `executorType=docker` from a pure stub to a real local container execution path whenever `executorConfig.image` is provided, while retaining a compatibility fallback for older demo tasks without an image. ---