🐳 feat: run docker executor tasks in containers

This commit is contained in:
eust-w 2026-03-27 11:09:02 +08:00
parent f6ca6246f9
commit cb462464e6
5 changed files with 252 additions and 8 deletions

View File

@ -69,6 +69,7 @@ The workflow editor currently requires selecting at least one registered asset b
The editor now also persists per-node runtime config in workflow versions, including executor overrides, optional artifact title overrides, and Python code-hook source for inspect and transform style nodes.
The Runs workspace now shows project-scoped run history, run-level aggregated summaries, cancel/retry controls, and run detail views with persisted task summaries, stdout/stderr sections, result previews, and artifact links into Explore.
Selected run tasks now expose the frozen node definition id, executor config snapshot, and code-hook metadata that were captured when the run was created.
When a node uses `executorType=docker` and provides `executorConfig.image`, the worker now runs a real local Docker container with mounted `input.json` / `output.json` exchange files. If no image is configured, the executor falls back to the lightweight simulated behavior used by older demo tasks.
## Repository Structure

View File

@ -1,18 +1,22 @@
import { spawn } from "node:child_process";
import { mkdtemp, readFile, rm, stat, writeFile } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import type {
ExecutionContext,
ExecutorExecutionResult,
TaskRecord,
} from "../contracts/execution-context.ts";
export class DockerExecutor {
executionCount = 0;
function splitOutputLines(output: string) {
return output
.split(/\r?\n/u)
.map((line) => line.trimEnd())
.filter((line) => line.length > 0);
}
async execute(task: TaskRecord, _context: ExecutionContext): Promise<ExecutorExecutionResult> {
this.executionCount += 1;
const image = typeof task.executorConfig?.image === "string" ? task.executorConfig.image : "docker://local-simulated";
const command = Array.isArray(task.executorConfig?.command)
? task.executorConfig.command.filter((item): item is string => typeof item === "string")
: [];
function buildFallbackResult(task: TaskRecord, image: string, command: string[]) {
return {
result: {
taskId: task.id,
@ -24,4 +28,131 @@ export class DockerExecutor {
stderrLines: [],
};
}
async function fileExists(filePath: string) {
try {
await stat(filePath);
return true;
} catch {
return false;
}
}
function parseDockerResult(payload: unknown) {
if (payload && typeof payload === "object" && !Array.isArray(payload) && "result" in payload) {
return (payload as { result: unknown }).result;
}
return payload;
}
export class DockerExecutor {
executionCount = 0;
async execute(task: TaskRecord, context: ExecutionContext): Promise<ExecutorExecutionResult> {
this.executionCount += 1;
const image = typeof task.executorConfig?.image === "string" ? task.executorConfig.image.trim() : "";
const command = Array.isArray(task.executorConfig?.command)
? task.executorConfig.command.filter((item): item is string => typeof item === "string")
: [];
if (!image) {
return buildFallbackResult(task, "docker://local-simulated", command);
}
const workdir = typeof task.executorConfig?.workdir === "string" && task.executorConfig.workdir.trim().length > 0
? task.executorConfig.workdir
: "/workspace";
const networkMode =
typeof task.executorConfig?.networkMode === "string" && task.executorConfig.networkMode.trim().length > 0
? task.executorConfig.networkMode
: "none";
const envVars =
task.executorConfig?.env && typeof task.executorConfig.env === "object" && !Array.isArray(task.executorConfig.env)
? Object.entries(task.executorConfig.env).filter(
(entry): entry is [string, string] => typeof entry[0] === "string" && typeof entry[1] === "string",
)
: [];
const tempDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-docker-executor-"));
const inputPath = path.join(tempDir, "input.json");
const outputPath = path.join(tempDir, "output.json");
await writeFile(
inputPath,
JSON.stringify({
task,
context,
}),
);
const dockerArgs = [
"run",
"--rm",
"--network",
networkMode,
"--workdir",
workdir,
"--volume",
`${tempDir}:${workdir}`,
"--env",
`EMBOFLOW_INPUT_PATH=${workdir}/input.json`,
"--env",
`EMBOFLOW_OUTPUT_PATH=${workdir}/output.json`,
"--env",
`EMBOFLOW_TASK_ID=${task.id}`,
"--env",
`EMBOFLOW_NODE_ID=${task.nodeId}`,
"--env",
`EMBOFLOW_WORKFLOW_RUN_ID=${context.workflowRunId ?? ""}`,
...envVars.flatMap(([key, value]) => ["--env", `${key}=${value}`]),
image,
...(command.length > 0 ? command : ["sh", "-lc", "cat \"$EMBOFLOW_INPUT_PATH\" > \"$EMBOFLOW_OUTPUT_PATH\""]),
];
const child = spawn("docker", dockerArgs, {
stdio: ["ignore", "pipe", "pipe"],
});
let stdout = "";
let stderr = "";
child.stdout.on("data", (chunk) => {
stdout += String(chunk);
});
child.stderr.on("data", (chunk) => {
stderr += String(chunk);
});
const exitCode = await new Promise<number>((resolve, reject) => {
child.on("error", reject);
child.on("close", (code) => resolve(code ?? 1));
});
try {
if (exitCode !== 0) {
throw Object.assign(new Error(`docker executor failed with exit code ${exitCode}`), {
stdoutLines: splitOutputLines(stdout),
stderrLines: splitOutputLines(stderr),
});
}
let result: unknown = {
taskId: task.id,
executor: "docker" as const,
image,
command,
};
if (await fileExists(outputPath)) {
const outputPayload = JSON.parse(await readFile(outputPath, "utf8")) as unknown;
result = parseDockerResult(outputPayload);
}
return {
result,
stdoutLines: splitOutputLines(stdout),
stderrLines: splitOutputLines(stderr),
};
} finally {
await rm(tempDir, { recursive: true, force: true });
}
}
}

View File

@ -1,5 +1,6 @@
import test from "node:test";
import assert from "node:assert/strict";
import { spawnSync } from "node:child_process";
import { MongoMemoryServer } from "mongodb-memory-server";
import { MongoClient } from "mongodb";
@ -8,6 +9,29 @@ import { MongoWorkerStore } from "../src/runtime/mongo-worker-store.ts";
import { WorkerRuntime } from "../src/runtime/worker-runtime.ts";
import type { ExecutionContext, TaskRecord } from "../src/contracts/execution-context.ts";
function hasDockerRuntime() {
const result = spawnSync("docker", ["info"], {
stdio: "ignore",
});
return result.status === 0;
}
function ensureDockerImage(image: string) {
const result = spawnSync("docker", ["image", "inspect", image], {
stdio: "ignore",
});
if (result.status === 0) {
return;
}
const pull = spawnSync("docker", ["pull", image], {
stdio: "ignore",
});
if (pull.status !== 0) {
throw new Error(`failed to pull docker image: ${image}`);
}
}
async function createRuntimeFixture(
database: string,
options: {
@ -485,3 +509,74 @@ test("worker executes a python code hook snapshot from the queued task", async (
hooked: true,
});
});
test("worker executes a queued docker task inside a real container", {
skip: !hasDockerRuntime(),
}, async (t) => {
ensureDockerImage("alpine:3.20");
const fixture = await createRuntimeFixture("emboflow-worker-real-docker");
t.after(async () => {
await fixture.close();
});
await fixture.db.collection("workflow_runs").insertOne({
_id: "run-real-docker",
workflowDefinitionId: "workflow-real-docker",
workflowVersionId: "workflow-real-docker-v1",
status: "queued",
triggeredBy: "local-user",
assetIds: ["asset-docker"],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.db.collection("run_tasks").insertOne({
_id: "task-real-docker",
workflowRunId: "run-real-docker",
workflowVersionId: "workflow-real-docker-v1",
nodeId: "source-asset",
nodeDefinitionId: "source-asset",
nodeType: "source",
executorType: "docker",
executorConfig: {
image: "alpine:3.20",
command: [
"sh",
"-lc",
[
"echo docker-container-started",
"cat > \"$EMBOFLOW_OUTPUT_PATH\" <<'JSON'",
"{\"mode\":\"docker\",\"nodeId\":\"source-asset\",\"hooked\":true}",
"JSON",
].join("\n"),
],
},
status: "queued",
attempt: 1,
assetIds: ["asset-docker"],
upstreamNodeIds: [],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.runtime.runNextTask();
const task = await fixture.store.getRunTask("task-real-docker");
const artifact = await fixture.db.collection("artifacts").findOne({ producerId: "task-real-docker" });
assert.equal(task?.status, "success");
assert.deepEqual(task?.stdoutLines, ["docker-container-started"]);
assert.deepEqual(task?.stderrLines, []);
assert.equal(task?.summary?.executorType, "docker");
assert.deepEqual(task?.lastResultPreview, {
mode: "docker",
nodeId: "source-asset",
hooked: true,
});
assert.deepEqual((artifact?.payload as { result?: Record<string, unknown> } | undefined)?.result, {
mode: "docker",
nodeId: "source-asset",
hooked: true,
});
});

View File

@ -128,6 +128,22 @@ This keeps serialization, logging, and runtime control predictable.
The current V1 worker executes trusted-local Python hooks when a `run_task` carries a `codeHookSpec`. The hook is executed through a constrained Python harness with the task snapshot and execution context passed in as JSON. Hook stdout is captured into `stdoutLines`, hook failures populate `stderrLines`, and the returned object becomes the task artifact payload.
The current V1 Docker executor now has two modes:
- compatibility mode when no image is configured on the node runtime config
- real container mode when `executorConfig.image` is set
In real container mode the worker:
- creates a temp working directory
- writes `input.json` containing the frozen task snapshot and execution context
- mounts that directory into the container
- sets `EMBOFLOW_INPUT_PATH` and `EMBOFLOW_OUTPUT_PATH`
- captures container stdout and stderr from the Docker CLI process
- parses `output.json` back into the task artifact payload when present
The default Docker runtime policy is `--network none`. This keeps V1 safer for local processing nodes unless a later phase deliberately opens network access for containerized tasks.
## Data Flow Contract
Tasks should exchange managed references, not loose file paths.

View File

@ -24,6 +24,7 @@
- `2026-03-27`: The current follow-up observability pass adds persisted stdout/stderr fields on `run_tasks` plus aggregated run summaries, durations, and task counts on `workflow_runs`.
- `2026-03-27`: The current run-control pass adds run cancellation, run retry from immutable snapshots, and task retry for failed/cancelled nodes with downstream reset semantics.
- `2026-03-27`: The current runtime-config pass freezes per-node executor config into `workflow_runs` and `run_tasks`, exposes runtime editing controls in the React workflow editor, and executes trusted-local Python code hooks from the task snapshot.
- `2026-03-27`: The current Docker-runtime pass upgrades `executorType=docker` from a pure stub to a real local container execution path whenever `executorConfig.image` is provided, while retaining a compatibility fallback for older demo tasks without an image.
---