1125 lines
37 KiB
TypeScript
1125 lines
37 KiB
TypeScript
import test from "node:test";
|
|
import assert from "node:assert/strict";
|
|
import { spawnSync } from "node:child_process";
|
|
import { mkdtemp, mkdir, writeFile } from "node:fs/promises";
|
|
import os from "node:os";
|
|
import path from "node:path";
|
|
|
|
import { MongoMemoryServer } from "mongodb-memory-server";
|
|
import { MongoClient } from "mongodb";
|
|
|
|
import { MongoWorkerStore } from "../src/runtime/mongo-worker-store.ts";
|
|
import { WorkerRuntime } from "../src/runtime/worker-runtime.ts";
|
|
import type { ExecutionContext, TaskRecord } from "../src/contracts/execution-context.ts";
|
|
|
|
function hasDockerRuntime() {
|
|
const result = spawnSync("docker", ["info"], {
|
|
stdio: "ignore",
|
|
});
|
|
return result.status === 0;
|
|
}
|
|
|
|
function ensureDockerImage(image: string) {
|
|
const result = spawnSync("docker", ["image", "inspect", image], {
|
|
stdio: "ignore",
|
|
});
|
|
if (result.status === 0) {
|
|
return;
|
|
}
|
|
|
|
const pull = spawnSync("docker", ["pull", image], {
|
|
stdio: "ignore",
|
|
});
|
|
if (pull.status !== 0) {
|
|
throw new Error(`failed to pull docker image: ${image}`);
|
|
}
|
|
}
|
|
|
|
async function createRuntimeFixture(
|
|
database: string,
|
|
options: {
|
|
executors?: ConstructorParameters<typeof WorkerRuntime>[0]["executors"];
|
|
} = {},
|
|
) {
|
|
const mongod = await MongoMemoryServer.create({
|
|
instance: {
|
|
ip: "127.0.0.1",
|
|
},
|
|
});
|
|
const client = new MongoClient(mongod.getUri());
|
|
await client.connect();
|
|
const db = client.db(database);
|
|
const store = new MongoWorkerStore(db);
|
|
const runtime = new WorkerRuntime({ store, executors: options.executors });
|
|
|
|
return {
|
|
db,
|
|
store,
|
|
runtime,
|
|
close: async () => {
|
|
await client.close();
|
|
await mongod.stop();
|
|
},
|
|
};
|
|
}
|
|
|
|
test("worker claims a queued task, creates an artifact, and queues the dependent task", async (t) => {
|
|
const fixture = await createRuntimeFixture("emboflow-worker-progress");
|
|
t.after(async () => {
|
|
await fixture.close();
|
|
});
|
|
|
|
await fixture.db.collection("workflow_definition_versions").insertOne({
|
|
_id: "workflow-1-v1",
|
|
workflowDefinitionId: "workflow-1",
|
|
workspaceId: "workspace-1",
|
|
projectId: "project-1",
|
|
versionNumber: 1,
|
|
visualGraph: {},
|
|
logicGraph: {
|
|
nodes: [
|
|
{ id: "source-asset", type: "source" },
|
|
{ id: "rename-folder", type: "transform" },
|
|
],
|
|
edges: [{ from: "source-asset", to: "rename-folder" }],
|
|
},
|
|
runtimeGraph: {
|
|
nodeBindings: {
|
|
"source-asset": "source-asset",
|
|
"rename-folder": "rename-folder",
|
|
},
|
|
},
|
|
pluginRefs: ["builtin:delivery-nodes"],
|
|
createdBy: "local-user",
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("workflow_runs").insertOne({
|
|
_id: "run-1",
|
|
workflowDefinitionId: "workflow-1",
|
|
workflowVersionId: "workflow-1-v1",
|
|
status: "queued",
|
|
triggeredBy: "local-user",
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("run_tasks").insertMany([
|
|
{
|
|
_id: "task-source",
|
|
workflowRunId: "run-1",
|
|
workflowVersionId: "workflow-1-v1",
|
|
nodeId: "source-asset",
|
|
nodeType: "source",
|
|
executorType: "python",
|
|
status: "queued",
|
|
attempt: 1,
|
|
assetIds: ["asset-1"],
|
|
upstreamNodeIds: [],
|
|
outputArtifactIds: [],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
},
|
|
{
|
|
_id: "task-rename",
|
|
workflowRunId: "run-1",
|
|
workflowVersionId: "workflow-1-v1",
|
|
nodeId: "rename-folder",
|
|
nodeType: "transform",
|
|
executorType: "python",
|
|
status: "pending",
|
|
attempt: 1,
|
|
assetIds: ["asset-1"],
|
|
upstreamNodeIds: ["source-asset"],
|
|
outputArtifactIds: [],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
},
|
|
]);
|
|
|
|
const task = await fixture.runtime.runNextTask();
|
|
const tasks = await fixture.store.listRunTasks("run-1");
|
|
const run = await fixture.store.getRun("run-1");
|
|
const artifacts = await fixture.db.collection("artifacts").find({ producerId: "task-source" }).toArray();
|
|
|
|
assert.equal(task?.id, "task-source");
|
|
assert.equal(tasks[0]?.status, "success");
|
|
assert.deepEqual(tasks[0]?.assetIds, ["asset-1"]);
|
|
assert.equal(tasks[1]?.status, "queued");
|
|
assert.deepEqual(tasks[1]?.assetIds, ["asset-1"]);
|
|
assert.equal(artifacts.length, 1);
|
|
assert.equal(run?.status, "queued");
|
|
});
|
|
|
|
test("worker marks the run successful after the final queued task completes", async (t) => {
|
|
const fixture = await createRuntimeFixture("emboflow-worker-success");
|
|
t.after(async () => {
|
|
await fixture.close();
|
|
});
|
|
|
|
await fixture.db.collection("workflow_definition_versions").insertOne({
|
|
_id: "workflow-2-v1",
|
|
workflowDefinitionId: "workflow-2",
|
|
workspaceId: "workspace-1",
|
|
projectId: "project-1",
|
|
versionNumber: 1,
|
|
visualGraph: {},
|
|
logicGraph: {
|
|
nodes: [{ id: "export-delivery-package", type: "export" }],
|
|
edges: [],
|
|
},
|
|
runtimeGraph: {
|
|
nodeBindings: {
|
|
"export-delivery-package": "export-delivery-package",
|
|
},
|
|
},
|
|
pluginRefs: ["builtin:delivery-nodes"],
|
|
createdBy: "local-user",
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("workflow_runs").insertOne({
|
|
_id: "run-2",
|
|
workflowDefinitionId: "workflow-2",
|
|
workflowVersionId: "workflow-2-v1",
|
|
status: "queued",
|
|
triggeredBy: "local-user",
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("run_tasks").insertOne({
|
|
_id: "task-export",
|
|
workflowRunId: "run-2",
|
|
workflowVersionId: "workflow-2-v1",
|
|
nodeId: "export-delivery-package",
|
|
nodeType: "export",
|
|
executorType: "python",
|
|
status: "queued",
|
|
attempt: 1,
|
|
assetIds: ["asset-final"],
|
|
upstreamNodeIds: [],
|
|
outputArtifactIds: [],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.runtime.runNextTask();
|
|
|
|
const run = await fixture.store.getRun("run-2");
|
|
const task = await fixture.db.collection("run_tasks").findOne({ _id: "task-export" });
|
|
|
|
assert.equal(task?.status, "success");
|
|
assert.equal(run?.status, "success");
|
|
assert.ok(typeof task?.startedAt === "string");
|
|
assert.ok(typeof task?.finishedAt === "string");
|
|
assert.ok(typeof task?.durationMs === "number");
|
|
assert.deepEqual(task?.stdoutLines, ["python executor processed export-delivery-package"]);
|
|
assert.deepEqual(task?.stderrLines, []);
|
|
assert.equal(task?.summary?.outcome, "success");
|
|
assert.equal(task?.summary?.executorType, "python");
|
|
assert.equal(task?.summary?.stdoutLineCount, 1);
|
|
assert.equal(task?.summary?.stderrLineCount, 0);
|
|
assert.match(task?.logLines?.[0] ?? "", /claimed/i);
|
|
assert.match(task?.logLines?.[1] ?? "", /stdout:/i);
|
|
assert.match(task?.logLines?.at(-1) ?? "", /completed/i);
|
|
assert.deepEqual(task?.lastResultPreview, {
|
|
taskId: "task-export",
|
|
executor: "python",
|
|
});
|
|
assert.ok(typeof run?.startedAt === "string");
|
|
assert.ok(typeof run?.finishedAt === "string");
|
|
assert.ok(typeof run?.durationMs === "number");
|
|
assert.equal(run?.summary?.totalTaskCount, 1);
|
|
assert.equal(run?.summary?.artifactCount, 1);
|
|
assert.equal(run?.summary?.stdoutLineCount, 1);
|
|
assert.equal(run?.summary?.stderrLineCount, 0);
|
|
assert.equal(run?.summary?.taskCounts?.success, 1);
|
|
});
|
|
|
|
test("worker passes bound asset ids into the execution context and task artifacts", async (t) => {
|
|
let capturedTask: TaskRecord | null = null;
|
|
let capturedContext: ExecutionContext | null = null;
|
|
const fixture = await createRuntimeFixture("emboflow-worker-asset-context", {
|
|
executors: {
|
|
python: {
|
|
async execute(task: TaskRecord, context: ExecutionContext) {
|
|
capturedTask = task;
|
|
capturedContext = context;
|
|
return {
|
|
result: {
|
|
taskId: task.id,
|
|
assetIds: context.assetIds,
|
|
},
|
|
stdoutLines: ["custom executor started"],
|
|
stderrLines: [],
|
|
};
|
|
},
|
|
},
|
|
},
|
|
});
|
|
t.after(async () => {
|
|
await fixture.close();
|
|
});
|
|
|
|
await fixture.db.collection("workflow_runs").insertOne({
|
|
_id: "run-asset",
|
|
workflowDefinitionId: "workflow-asset",
|
|
workflowVersionId: "workflow-asset-v1",
|
|
status: "queued",
|
|
triggeredBy: "local-user",
|
|
assetIds: ["asset-42"],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("run_tasks").insertOne({
|
|
_id: "task-asset",
|
|
workflowRunId: "run-asset",
|
|
workflowVersionId: "workflow-asset-v1",
|
|
nodeId: "source-asset",
|
|
nodeType: "source",
|
|
executorType: "python",
|
|
status: "queued",
|
|
attempt: 1,
|
|
assetIds: ["asset-42"],
|
|
upstreamNodeIds: [],
|
|
outputArtifactIds: [],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.runtime.runNextTask();
|
|
|
|
const storedTask = await fixture.store.getRunTask("task-asset");
|
|
const artifact = await fixture.db.collection("artifacts").findOne({ producerId: "task-asset" });
|
|
|
|
assert.deepEqual(capturedTask?.assetIds, ["asset-42"]);
|
|
assert.deepEqual(capturedContext?.assetIds, ["asset-42"]);
|
|
assert.deepEqual(storedTask?.assetIds, ["asset-42"]);
|
|
assert.deepEqual(
|
|
(artifact?.payload as { assetIds?: string[] } | undefined)?.assetIds,
|
|
["asset-42"],
|
|
);
|
|
assert.equal(storedTask?.summary?.outcome, "success");
|
|
assert.equal(storedTask?.summary?.assetCount, 1);
|
|
assert.deepEqual(storedTask?.summary?.artifactIds, [artifact?._id]);
|
|
assert.deepEqual(storedTask?.stdoutLines, ["custom executor started"]);
|
|
assert.deepEqual(storedTask?.stderrLines, []);
|
|
});
|
|
|
|
test("worker persists failure summaries and task log lines when execution throws", async (t) => {
|
|
const fixture = await createRuntimeFixture("emboflow-worker-failure-summary", {
|
|
executors: {
|
|
python: {
|
|
async execute() {
|
|
throw Object.assign(new Error("intentional executor failure"), {
|
|
stdoutLines: ["failure path entered"],
|
|
stderrLines: ["stack trace line"],
|
|
});
|
|
},
|
|
},
|
|
},
|
|
});
|
|
t.after(async () => {
|
|
await fixture.close();
|
|
});
|
|
|
|
await fixture.db.collection("workflow_runs").insertOne({
|
|
_id: "run-failure",
|
|
workflowDefinitionId: "workflow-failure",
|
|
workflowVersionId: "workflow-failure-v1",
|
|
status: "queued",
|
|
triggeredBy: "local-user",
|
|
assetIds: ["asset-9"],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("run_tasks").insertOne({
|
|
_id: "task-failure",
|
|
workflowRunId: "run-failure",
|
|
workflowVersionId: "workflow-failure-v1",
|
|
nodeId: "validate-structure",
|
|
nodeType: "inspect",
|
|
executorType: "python",
|
|
status: "queued",
|
|
attempt: 1,
|
|
assetIds: ["asset-9"],
|
|
upstreamNodeIds: [],
|
|
outputArtifactIds: [],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await assert.rejects(() => fixture.runtime.runNextTask(), /intentional executor failure/);
|
|
|
|
const task = await fixture.db.collection("run_tasks").findOne({ _id: "task-failure" });
|
|
const run = await fixture.store.getRun("run-failure");
|
|
|
|
assert.equal(task?.status, "failed");
|
|
assert.equal(run?.status, "failed");
|
|
assert.equal(task?.errorMessage, "intentional executor failure");
|
|
assert.equal(task?.summary?.outcome, "failed");
|
|
assert.equal(task?.summary?.executorType, "python");
|
|
assert.equal(task?.summary?.stdoutLineCount, 1);
|
|
assert.equal(task?.summary?.stderrLineCount, 1);
|
|
assert.equal(task?.summary?.artifactIds?.length ?? 0, 0);
|
|
assert.ok(typeof task?.startedAt === "string");
|
|
assert.ok(typeof task?.finishedAt === "string");
|
|
assert.ok(typeof task?.durationMs === "number");
|
|
assert.deepEqual(task?.stdoutLines, ["failure path entered"]);
|
|
assert.deepEqual(task?.stderrLines, ["stack trace line"]);
|
|
assert.match(task?.logLines?.[0] ?? "", /claimed/i);
|
|
assert.match(task?.logLines?.[1] ?? "", /stdout:/i);
|
|
assert.match(task?.logLines?.[2] ?? "", /stderr:/i);
|
|
assert.match(task?.logLines?.at(-1) ?? "", /intentional executor failure/i);
|
|
assert.equal(run?.summary?.totalTaskCount, 1);
|
|
assert.equal(run?.summary?.taskCounts?.failed, 1);
|
|
assert.equal(run?.summary?.stdoutLineCount, 1);
|
|
assert.equal(run?.summary?.stderrLineCount, 1);
|
|
assert.deepEqual(run?.summary?.failedTaskIds, ["task-failure"]);
|
|
});
|
|
|
|
test("worker skips queued tasks that belong to a cancelled run", async (t) => {
|
|
const fixture = await createRuntimeFixture("emboflow-worker-cancelled-run-skip");
|
|
t.after(async () => {
|
|
await fixture.close();
|
|
});
|
|
|
|
await fixture.db.collection("workflow_runs").insertMany([
|
|
{
|
|
_id: "run-cancelled",
|
|
workflowDefinitionId: "workflow-cancelled",
|
|
workflowVersionId: "workflow-cancelled-v1",
|
|
status: "cancelled",
|
|
triggeredBy: "local-user",
|
|
assetIds: ["asset-cancelled"],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
},
|
|
{
|
|
_id: "run-active",
|
|
workflowDefinitionId: "workflow-active",
|
|
workflowVersionId: "workflow-active-v1",
|
|
status: "queued",
|
|
triggeredBy: "local-user",
|
|
assetIds: ["asset-active"],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
},
|
|
]);
|
|
|
|
await fixture.db.collection("run_tasks").insertMany([
|
|
{
|
|
_id: "task-cancelled",
|
|
workflowRunId: "run-cancelled",
|
|
workflowVersionId: "workflow-cancelled-v1",
|
|
nodeId: "source-asset",
|
|
nodeType: "source",
|
|
executorType: "python",
|
|
status: "queued",
|
|
attempt: 1,
|
|
assetIds: ["asset-cancelled"],
|
|
upstreamNodeIds: [],
|
|
outputArtifactIds: [],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
},
|
|
{
|
|
_id: "task-active",
|
|
workflowRunId: "run-active",
|
|
workflowVersionId: "workflow-active-v1",
|
|
nodeId: "source-asset",
|
|
nodeType: "source",
|
|
executorType: "python",
|
|
status: "queued",
|
|
attempt: 1,
|
|
assetIds: ["asset-active"],
|
|
upstreamNodeIds: [],
|
|
outputArtifactIds: [],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
},
|
|
]);
|
|
|
|
const claimedTask = await fixture.runtime.runNextTask();
|
|
const cancelledTask = await fixture.store.getRunTask("task-cancelled");
|
|
const activeTask = await fixture.store.getRunTask("task-active");
|
|
|
|
assert.equal(claimedTask?.id, "task-active");
|
|
assert.equal(cancelledTask?.status, "cancelled");
|
|
assert.match(cancelledTask?.logLines?.at(-1) ?? "", /cancelled/i);
|
|
assert.equal(activeTask?.status, "success");
|
|
});
|
|
|
|
test("worker executes a python code hook snapshot from the queued task", async (t) => {
|
|
const fixture = await createRuntimeFixture("emboflow-worker-python-hook");
|
|
t.after(async () => {
|
|
await fixture.close();
|
|
});
|
|
|
|
await fixture.db.collection("workflow_runs").insertOne({
|
|
_id: "run-python-hook",
|
|
workflowDefinitionId: "workflow-python-hook",
|
|
workflowVersionId: "workflow-python-hook-v1",
|
|
status: "queued",
|
|
triggeredBy: "local-user",
|
|
assetIds: ["asset-hook"],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("run_tasks").insertOne({
|
|
_id: "task-python-hook",
|
|
workflowRunId: "run-python-hook",
|
|
workflowVersionId: "workflow-python-hook-v1",
|
|
nodeId: "validate-structure",
|
|
nodeType: "inspect",
|
|
executorType: "python",
|
|
status: "queued",
|
|
attempt: 1,
|
|
assetIds: ["asset-hook"],
|
|
upstreamNodeIds: [],
|
|
outputArtifactIds: [],
|
|
codeHookSpec: {
|
|
language: "python",
|
|
entrypoint: "process",
|
|
source: [
|
|
"def process(task, context):",
|
|
" print(f\"hook running for {task['nodeId']}\")",
|
|
" return {'nodeId': task['nodeId'], 'assetIds': context['assetIds'], 'hooked': True}",
|
|
].join("\n"),
|
|
},
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.runtime.runNextTask();
|
|
|
|
const task = await fixture.store.getRunTask("task-python-hook");
|
|
const artifact = await fixture.db.collection("artifacts").findOne({ producerId: "task-python-hook" });
|
|
|
|
assert.equal(task?.status, "success");
|
|
assert.deepEqual(task?.stdoutLines, ["hook running for validate-structure"]);
|
|
assert.deepEqual(task?.stderrLines, []);
|
|
assert.equal(task?.summary?.executorType, "python");
|
|
assert.deepEqual((artifact?.payload as { result?: { hooked?: boolean } } | undefined)?.result, {
|
|
nodeId: "validate-structure",
|
|
assetIds: ["asset-hook"],
|
|
hooked: true,
|
|
});
|
|
});
|
|
|
|
test("worker executes a queued docker task inside a real container", {
|
|
skip: !hasDockerRuntime(),
|
|
}, async (t) => {
|
|
ensureDockerImage("alpine:3.20");
|
|
const fixture = await createRuntimeFixture("emboflow-worker-real-docker");
|
|
t.after(async () => {
|
|
await fixture.close();
|
|
});
|
|
|
|
await fixture.db.collection("workflow_runs").insertOne({
|
|
_id: "run-real-docker",
|
|
workflowDefinitionId: "workflow-real-docker",
|
|
workflowVersionId: "workflow-real-docker-v1",
|
|
status: "queued",
|
|
triggeredBy: "local-user",
|
|
assetIds: ["asset-docker"],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("run_tasks").insertOne({
|
|
_id: "task-real-docker",
|
|
workflowRunId: "run-real-docker",
|
|
workflowVersionId: "workflow-real-docker-v1",
|
|
nodeId: "source-asset",
|
|
nodeDefinitionId: "source-asset",
|
|
nodeType: "source",
|
|
executorType: "docker",
|
|
executorConfig: {
|
|
image: "alpine:3.20",
|
|
command: [
|
|
"sh",
|
|
"-lc",
|
|
[
|
|
"echo docker-container-started",
|
|
"cat > \"$EMBOFLOW_OUTPUT_PATH\" <<'JSON'",
|
|
"{\"mode\":\"docker\",\"nodeId\":\"source-asset\",\"hooked\":true}",
|
|
"JSON",
|
|
].join("\n"),
|
|
],
|
|
},
|
|
status: "queued",
|
|
attempt: 1,
|
|
assetIds: ["asset-docker"],
|
|
upstreamNodeIds: [],
|
|
outputArtifactIds: [],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.runtime.runNextTask();
|
|
|
|
const task = await fixture.store.getRunTask("task-real-docker");
|
|
const artifact = await fixture.db.collection("artifacts").findOne({ producerId: "task-real-docker" });
|
|
|
|
assert.equal(task?.status, "success");
|
|
assert.deepEqual(task?.stdoutLines, ["docker-container-started"]);
|
|
assert.deepEqual(task?.stderrLines, []);
|
|
assert.equal(task?.summary?.executorType, "docker");
|
|
assert.deepEqual(task?.lastResultPreview, {
|
|
mode: "docker",
|
|
nodeId: "source-asset",
|
|
hooked: true,
|
|
});
|
|
assert.deepEqual((artifact?.payload as { result?: Record<string, unknown> } | undefined)?.result, {
|
|
mode: "docker",
|
|
nodeId: "source-asset",
|
|
hooked: true,
|
|
});
|
|
});
|
|
|
|
test("worker executes built-in docker source nodes when codeHookSpec is null", {
|
|
skip: !hasDockerRuntime(),
|
|
}, async (t) => {
|
|
ensureDockerImage("python:3.11-alpine");
|
|
const fixture = await createRuntimeFixture("emboflow-worker-built-in-docker-source");
|
|
t.after(async () => {
|
|
await fixture.close();
|
|
});
|
|
|
|
await fixture.db.collection("assets").insertOne({
|
|
_id: "asset-built-in-docker-source",
|
|
workspaceId: "workspace-1",
|
|
projectId: "project-1",
|
|
type: "folder",
|
|
sourceType: "registered_path",
|
|
displayName: "Built-in Docker Source Asset",
|
|
sourcePath: "/tmp/built-in-docker-source",
|
|
status: "probed",
|
|
storageRef: {},
|
|
topLevelPaths: ["meta.json"],
|
|
detectedFormats: ["delivery_package"],
|
|
fileCount: 1,
|
|
summary: {},
|
|
createdBy: "local-user",
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("workflow_runs").insertOne({
|
|
_id: "run-built-in-docker-source",
|
|
workflowDefinitionId: "workflow-built-in-docker-source",
|
|
workflowVersionId: "workflow-built-in-docker-source-v1",
|
|
status: "queued",
|
|
triggeredBy: "local-user",
|
|
assetIds: ["asset-built-in-docker-source"],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("run_tasks").insertOne({
|
|
_id: "task-built-in-docker-source",
|
|
workflowRunId: "run-built-in-docker-source",
|
|
workflowVersionId: "workflow-built-in-docker-source-v1",
|
|
nodeId: "source-asset",
|
|
nodeDefinitionId: "source-asset",
|
|
nodeType: "source",
|
|
executorType: "docker",
|
|
executorConfig: {
|
|
image: "python:3.11-alpine",
|
|
networkMode: "none",
|
|
},
|
|
codeHookSpec: null,
|
|
status: "queued",
|
|
attempt: 1,
|
|
assetIds: ["asset-built-in-docker-source"],
|
|
upstreamNodeIds: [],
|
|
outputArtifactIds: [],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.runtime.runNextTask();
|
|
|
|
const task = await fixture.store.getRunTask("task-built-in-docker-source");
|
|
const artifact = await fixture.db.collection("artifacts").findOne({ producerId: "task-built-in-docker-source" });
|
|
|
|
assert.equal(task?.status, "success");
|
|
assert.deepEqual(task?.stderrLines, []);
|
|
assert.deepEqual(task?.stdoutLines, ["loaded 1 bound asset"]);
|
|
assert.deepEqual((artifact?.payload as { result?: { assetCount?: number } } | undefined)?.result?.assetCount, 1);
|
|
});
|
|
|
|
test("worker loads bound asset metadata into the execution context for built-in source nodes", async (t) => {
|
|
let capturedContext: ExecutionContext | null = null;
|
|
const fixture = await createRuntimeFixture("emboflow-worker-source-context", {
|
|
executors: {
|
|
python: {
|
|
async execute(_task: TaskRecord, context: ExecutionContext) {
|
|
capturedContext = context;
|
|
return {
|
|
result: { assetCount: context.assets?.length ?? 0 },
|
|
stdoutLines: ["custom source executor"],
|
|
stderrLines: [],
|
|
};
|
|
},
|
|
},
|
|
},
|
|
});
|
|
t.after(async () => {
|
|
await fixture.close();
|
|
});
|
|
|
|
await fixture.db.collection("assets").insertOne({
|
|
_id: "asset-context-1",
|
|
workspaceId: "workspace-1",
|
|
projectId: "project-1",
|
|
type: "folder",
|
|
sourceType: "registered_path",
|
|
displayName: "Sample Asset",
|
|
sourcePath: "/tmp/sample-asset",
|
|
status: "probed",
|
|
storageRef: {},
|
|
topLevelPaths: ["meta.json"],
|
|
detectedFormats: ["delivery_package"],
|
|
fileCount: 4,
|
|
summary: { kind: "delivery_package" },
|
|
createdBy: "local-user",
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("workflow_runs").insertOne({
|
|
_id: "run-source-context",
|
|
workflowDefinitionId: "workflow-source-context",
|
|
workflowVersionId: "workflow-source-context-v1",
|
|
status: "queued",
|
|
triggeredBy: "local-user",
|
|
assetIds: ["asset-context-1"],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("run_tasks").insertOne({
|
|
_id: "task-source-context",
|
|
workflowRunId: "run-source-context",
|
|
workflowVersionId: "workflow-source-context-v1",
|
|
nodeId: "source-asset",
|
|
nodeDefinitionId: "source-asset",
|
|
nodeType: "source",
|
|
executorType: "python",
|
|
status: "queued",
|
|
attempt: 1,
|
|
assetIds: ["asset-context-1"],
|
|
upstreamNodeIds: [],
|
|
outputArtifactIds: [],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.runtime.runNextTask();
|
|
|
|
assert.equal(capturedContext?.assets?.[0]?.id, "asset-context-1");
|
|
assert.equal(capturedContext?.assets?.[0]?.displayName, "Sample Asset");
|
|
assert.equal(capturedContext?.assets?.[0]?.sourcePath, "/tmp/sample-asset");
|
|
});
|
|
|
|
test("worker validates delivery structure against the bound asset path for validate-structure", async (t) => {
|
|
const sourceDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-worker-validate-"));
|
|
await mkdir(path.join(sourceDir, "DJI_001"));
|
|
await writeFile(path.join(sourceDir, "meta.json"), "{}");
|
|
await writeFile(path.join(sourceDir, "intrinsics.json"), "{}");
|
|
await writeFile(path.join(sourceDir, "video_meta.json"), "{}");
|
|
await writeFile(path.join(sourceDir, "DJI_001", "DJI_001.mp4"), "");
|
|
|
|
const fixture = await createRuntimeFixture("emboflow-worker-validate-structure");
|
|
t.after(async () => {
|
|
await fixture.close();
|
|
});
|
|
|
|
await fixture.db.collection("assets").insertOne({
|
|
_id: "asset-validate-1",
|
|
workspaceId: "workspace-1",
|
|
projectId: "project-1",
|
|
type: "folder",
|
|
sourceType: "registered_path",
|
|
displayName: "Validation Asset",
|
|
sourcePath: sourceDir,
|
|
status: "probed",
|
|
storageRef: {},
|
|
topLevelPaths: ["DJI_001", "meta.json", "intrinsics.json", "video_meta.json"],
|
|
detectedFormats: ["delivery_package"],
|
|
fileCount: 4,
|
|
summary: { kind: "delivery_package" },
|
|
createdBy: "local-user",
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("workflow_runs").insertOne({
|
|
_id: "run-validate-structure",
|
|
workflowDefinitionId: "workflow-validate-structure",
|
|
workflowVersionId: "workflow-validate-structure-v1",
|
|
status: "queued",
|
|
triggeredBy: "local-user",
|
|
assetIds: ["asset-validate-1"],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("run_tasks").insertOne({
|
|
_id: "task-validate-structure",
|
|
workflowRunId: "run-validate-structure",
|
|
workflowVersionId: "workflow-validate-structure-v1",
|
|
nodeId: "validate-structure",
|
|
nodeDefinitionId: "validate-structure",
|
|
nodeType: "inspect",
|
|
executorType: "python",
|
|
status: "queued",
|
|
attempt: 1,
|
|
assetIds: ["asset-validate-1"],
|
|
upstreamNodeIds: [],
|
|
outputArtifactIds: [],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.runtime.runNextTask();
|
|
|
|
const task = await fixture.store.getRunTask("task-validate-structure");
|
|
const artifact = await fixture.db.collection("artifacts").findOne({ producerId: "task-validate-structure" });
|
|
|
|
assert.equal(task?.status, "success");
|
|
assert.match(task?.stdoutLines?.[0] ?? "", /validated 1 asset/i);
|
|
assert.deepEqual(task?.lastResultPreview, {
|
|
assetCount: 1,
|
|
valid: true,
|
|
requiredFiles: ["meta.json", "intrinsics.json", "video_meta.json"],
|
|
videoFileCount: 1,
|
|
});
|
|
assert.deepEqual((artifact?.payload as { result?: Record<string, unknown> } | undefined)?.result, {
|
|
assetCount: 1,
|
|
valid: true,
|
|
requiredFiles: ["meta.json", "intrinsics.json", "video_meta.json"],
|
|
videoFileCount: 1,
|
|
});
|
|
});
|
|
|
|
test("worker applies intersect-assets and narrows the downstream effective asset set", async (t) => {
|
|
const sourceDirA = await mkdtemp(path.join(os.tmpdir(), "emboflow-worker-intersect-a-"));
|
|
const sourceDirB = await mkdtemp(path.join(os.tmpdir(), "emboflow-worker-intersect-b-"));
|
|
await mkdir(path.join(sourceDirA, "DJI_A"));
|
|
await mkdir(path.join(sourceDirB, "DJI_B"));
|
|
for (const root of [sourceDirA, sourceDirB]) {
|
|
await writeFile(path.join(root, "meta.json"), "{}");
|
|
await writeFile(path.join(root, "intrinsics.json"), "{}");
|
|
await writeFile(path.join(root, "video_meta.json"), "{}");
|
|
}
|
|
await writeFile(path.join(sourceDirA, "DJI_A", "A.mp4"), "");
|
|
await writeFile(path.join(sourceDirB, "DJI_B", "B.mp4"), "");
|
|
|
|
const fixture = await createRuntimeFixture("emboflow-worker-intersect-assets");
|
|
t.after(async () => {
|
|
await fixture.close();
|
|
});
|
|
|
|
await fixture.db.collection("assets").insertMany([
|
|
{
|
|
_id: "asset-intersect-a",
|
|
workspaceId: "workspace-1",
|
|
projectId: "project-1",
|
|
type: "folder",
|
|
sourceType: "registered_path",
|
|
displayName: "Intersect Asset A",
|
|
sourcePath: sourceDirA,
|
|
status: "probed",
|
|
storageRef: {},
|
|
topLevelPaths: ["DJI_A", "meta.json", "intrinsics.json", "video_meta.json"],
|
|
detectedFormats: ["delivery_package"],
|
|
fileCount: 4,
|
|
summary: { kind: "delivery_package" },
|
|
createdBy: "local-user",
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
},
|
|
{
|
|
_id: "asset-intersect-b",
|
|
workspaceId: "workspace-1",
|
|
projectId: "project-1",
|
|
type: "folder",
|
|
sourceType: "registered_path",
|
|
displayName: "Intersect Asset B",
|
|
sourcePath: sourceDirB,
|
|
status: "probed",
|
|
storageRef: {},
|
|
topLevelPaths: ["DJI_B", "meta.json", "intrinsics.json", "video_meta.json"],
|
|
detectedFormats: ["delivery_package"],
|
|
fileCount: 4,
|
|
summary: { kind: "delivery_package" },
|
|
createdBy: "local-user",
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
},
|
|
]);
|
|
|
|
await fixture.db.collection("workflow_runs").insertOne({
|
|
_id: "run-intersect-assets",
|
|
workflowDefinitionId: "workflow-intersect-assets",
|
|
workflowVersionId: "workflow-intersect-assets-v1",
|
|
status: "queued",
|
|
triggeredBy: "local-user",
|
|
assetIds: ["asset-intersect-a", "asset-intersect-b"],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("run_tasks").insertMany([
|
|
{
|
|
_id: "task-upstream-a",
|
|
workflowRunId: "run-intersect-assets",
|
|
workflowVersionId: "workflow-intersect-assets-v1",
|
|
nodeId: "source-assets-a",
|
|
nodeDefinitionId: "source-asset",
|
|
nodeType: "source",
|
|
executorType: "python",
|
|
status: "success",
|
|
attempt: 1,
|
|
assetIds: ["asset-intersect-a", "asset-intersect-b"],
|
|
upstreamNodeIds: [],
|
|
outputArtifactIds: [],
|
|
lastResultPreview: { assetIds: ["asset-intersect-a", "asset-intersect-b"] },
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
},
|
|
{
|
|
_id: "task-upstream-b",
|
|
workflowRunId: "run-intersect-assets",
|
|
workflowVersionId: "workflow-intersect-assets-v1",
|
|
nodeId: "source-assets-b",
|
|
nodeDefinitionId: "source-asset",
|
|
nodeType: "source",
|
|
executorType: "python",
|
|
status: "success",
|
|
attempt: 1,
|
|
assetIds: ["asset-intersect-b"],
|
|
upstreamNodeIds: [],
|
|
outputArtifactIds: [],
|
|
lastResultPreview: { assetIds: ["asset-intersect-b"] },
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
},
|
|
{
|
|
_id: "task-intersect-assets",
|
|
workflowRunId: "run-intersect-assets",
|
|
workflowVersionId: "workflow-intersect-assets-v1",
|
|
nodeId: "intersect-assets-1",
|
|
nodeDefinitionId: "intersect-assets",
|
|
nodeType: "utility",
|
|
executorType: "python",
|
|
status: "queued",
|
|
attempt: 1,
|
|
assetIds: ["asset-intersect-a", "asset-intersect-b"],
|
|
upstreamNodeIds: ["source-assets-a", "source-assets-b"],
|
|
outputArtifactIds: [],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
},
|
|
{
|
|
_id: "task-downstream-validate",
|
|
workflowRunId: "run-intersect-assets",
|
|
workflowVersionId: "workflow-intersect-assets-v1",
|
|
nodeId: "validate-structure",
|
|
nodeDefinitionId: "validate-structure",
|
|
nodeType: "inspect",
|
|
executorType: "python",
|
|
status: "pending",
|
|
attempt: 1,
|
|
assetIds: ["asset-intersect-a", "asset-intersect-b"],
|
|
upstreamNodeIds: ["intersect-assets-1"],
|
|
outputArtifactIds: [],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
},
|
|
]);
|
|
|
|
await fixture.runtime.runNextTask();
|
|
const intersectTask = await fixture.store.getRunTask("task-intersect-assets");
|
|
const queuedValidate = await fixture.store.getRunTask("task-downstream-validate");
|
|
|
|
assert.equal(intersectTask?.status, "success");
|
|
assert.deepEqual(intersectTask?.lastResultPreview?.assetIds, ["asset-intersect-b"]);
|
|
assert.match(intersectTask?.stdoutLines?.[0] ?? "", /intersection resolved 1 asset/i);
|
|
assert.equal(queuedValidate?.status, "queued");
|
|
|
|
await fixture.runtime.runNextTask();
|
|
const validateTask = await fixture.store.getRunTask("task-downstream-validate");
|
|
|
|
assert.equal(validateTask?.status, "success");
|
|
assert.equal(validateTask?.summary?.assetCount, 1);
|
|
assert.deepEqual(validateTask?.lastResultPreview, {
|
|
assetCount: 1,
|
|
valid: true,
|
|
requiredFiles: ["meta.json", "intrinsics.json", "video_meta.json"],
|
|
videoFileCount: 1,
|
|
});
|
|
});
|
|
|
|
test("worker executes built-in union-assets inside docker when docker is available", async (t) => {
|
|
if (!hasDockerRuntime()) {
|
|
t.diagnostic("docker runtime unavailable; skipping built-in docker union-assets test");
|
|
return;
|
|
}
|
|
ensureDockerImage("python:3.11-alpine");
|
|
|
|
const fixture = await createRuntimeFixture("emboflow-worker-docker-union-assets");
|
|
t.after(async () => {
|
|
await fixture.close();
|
|
});
|
|
|
|
await fixture.db.collection("workflow_runs").insertOne({
|
|
_id: "run-docker-union-assets",
|
|
workflowDefinitionId: "workflow-docker-union-assets",
|
|
workflowVersionId: "workflow-docker-union-assets-v1",
|
|
status: "queued",
|
|
triggeredBy: "local-user",
|
|
assetIds: ["asset-union-a", "asset-union-b"],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("run_tasks").insertMany([
|
|
{
|
|
_id: "task-union-upstream-a",
|
|
workflowRunId: "run-docker-union-assets",
|
|
workflowVersionId: "workflow-docker-union-assets-v1",
|
|
nodeId: "source-assets-a",
|
|
nodeDefinitionId: "source-asset",
|
|
nodeType: "source",
|
|
executorType: "python",
|
|
status: "success",
|
|
attempt: 1,
|
|
assetIds: ["asset-union-a"],
|
|
upstreamNodeIds: [],
|
|
outputArtifactIds: [],
|
|
lastResultPreview: { assetIds: ["asset-union-a"] },
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
},
|
|
{
|
|
_id: "task-union-upstream-b",
|
|
workflowRunId: "run-docker-union-assets",
|
|
workflowVersionId: "workflow-docker-union-assets-v1",
|
|
nodeId: "source-assets-b",
|
|
nodeDefinitionId: "source-asset",
|
|
nodeType: "source",
|
|
executorType: "python",
|
|
status: "success",
|
|
attempt: 1,
|
|
assetIds: ["asset-union-b"],
|
|
upstreamNodeIds: [],
|
|
outputArtifactIds: [],
|
|
lastResultPreview: { assetIds: ["asset-union-b"] },
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
},
|
|
{
|
|
_id: "task-union-docker",
|
|
workflowRunId: "run-docker-union-assets",
|
|
workflowVersionId: "workflow-docker-union-assets-v1",
|
|
nodeId: "union-assets-1",
|
|
nodeDefinitionId: "union-assets",
|
|
nodeType: "utility",
|
|
executorType: "docker",
|
|
executorConfig: {
|
|
image: "python:3.11-alpine",
|
|
},
|
|
status: "queued",
|
|
attempt: 1,
|
|
assetIds: ["asset-union-a", "asset-union-b"],
|
|
upstreamNodeIds: ["source-assets-a", "source-assets-b"],
|
|
outputArtifactIds: [],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
},
|
|
]);
|
|
|
|
await fixture.runtime.runNextTask();
|
|
const task = await fixture.store.getRunTask("task-union-docker");
|
|
|
|
assert.equal(task?.status, "success");
|
|
assert.equal(task?.summary?.executorType, "docker");
|
|
assert.match(task?.stdoutLines?.[0] ?? "", /union resolved 2 assets/i);
|
|
assert.deepEqual(task?.lastResultPreview?.assetIds, ["asset-union-a", "asset-union-b"]);
|
|
});
|
|
|
|
test("worker builds and executes a custom dockerfile node when docker is available", async (t) => {
|
|
if (!hasDockerRuntime()) {
|
|
t.diagnostic("docker runtime unavailable; skipping custom dockerfile node test");
|
|
return;
|
|
}
|
|
ensureDockerImage("python:3.11-alpine");
|
|
|
|
const fixture = await createRuntimeFixture("emboflow-worker-custom-dockerfile-node");
|
|
t.after(async () => {
|
|
await fixture.close();
|
|
});
|
|
|
|
await fixture.db.collection("workflow_runs").insertOne({
|
|
_id: "run-custom-dockerfile-node",
|
|
workflowDefinitionId: "workflow-custom-dockerfile-node",
|
|
workflowVersionId: "workflow-custom-dockerfile-node-v1",
|
|
status: "queued",
|
|
triggeredBy: "local-user",
|
|
assetIds: ["asset-custom-1", "asset-custom-2"],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("run_tasks").insertOne({
|
|
_id: "task-custom-dockerfile-node",
|
|
workflowRunId: "run-custom-dockerfile-node",
|
|
workflowVersionId: "workflow-custom-dockerfile-node-v1",
|
|
nodeId: "custom-node-merge-assets-1",
|
|
nodeDefinitionId: "custom-merge-assets",
|
|
nodeType: "utility",
|
|
executorType: "docker",
|
|
executorConfig: {
|
|
imageTag: "emboflow-test/custom-merge-assets:latest",
|
|
dockerfileContent: [
|
|
"FROM python:3.11-alpine",
|
|
"CMD [\"python3\", \"-c\", \"import json,os,pathlib; payload=json.loads(pathlib.Path(os.environ['EMBOFLOW_INPUT_PATH']).read_text()); asset_ids=payload['context'].get('assetIds', []); pathlib.Path(os.environ['EMBOFLOW_OUTPUT_PATH']).write_text(json.dumps({'result': {'assetIds': asset_ids, 'assetCount': len(asset_ids), 'kind': 'custom-dockerfile'}})); print(f'custom dockerfile handled {len(asset_ids)} assets')\"]",
|
|
].join("\n"),
|
|
contract: {
|
|
inputMode: "single_asset_set",
|
|
outputMode: "asset_set",
|
|
artifactType: "json",
|
|
},
|
|
},
|
|
status: "queued",
|
|
attempt: 1,
|
|
assetIds: ["asset-custom-1", "asset-custom-2"],
|
|
upstreamNodeIds: [],
|
|
outputArtifactIds: [],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.runtime.runNextTask();
|
|
const task = await fixture.store.getRunTask("task-custom-dockerfile-node");
|
|
|
|
assert.equal(task?.status, "success");
|
|
assert.equal(task?.summary?.executorType, "docker");
|
|
assert.match(task?.stdoutLines?.[0] ?? "", /custom dockerfile handled 2 assets/i);
|
|
assert.deepEqual(task?.lastResultPreview, {
|
|
assetIds: ["asset-custom-1", "asset-custom-2"],
|
|
assetCount: 2,
|
|
kind: "custom-dockerfile",
|
|
});
|
|
});
|