EmboFlow/apps/worker/test/mongo-worker-runtime.spec.ts

488 lines
16 KiB
TypeScript

import test from "node:test";
import assert from "node:assert/strict";
import { MongoMemoryServer } from "mongodb-memory-server";
import { MongoClient } from "mongodb";
import { MongoWorkerStore } from "../src/runtime/mongo-worker-store.ts";
import { WorkerRuntime } from "../src/runtime/worker-runtime.ts";
import type { ExecutionContext, TaskRecord } from "../src/contracts/execution-context.ts";
async function createRuntimeFixture(
database: string,
options: {
executors?: ConstructorParameters<typeof WorkerRuntime>[0]["executors"];
} = {},
) {
const mongod = await MongoMemoryServer.create({
instance: {
ip: "127.0.0.1",
},
});
const client = new MongoClient(mongod.getUri());
await client.connect();
const db = client.db(database);
const store = new MongoWorkerStore(db);
const runtime = new WorkerRuntime({ store, executors: options.executors });
return {
db,
store,
runtime,
close: async () => {
await client.close();
await mongod.stop();
},
};
}
test("worker claims a queued task, creates an artifact, and queues the dependent task", async (t) => {
const fixture = await createRuntimeFixture("emboflow-worker-progress");
t.after(async () => {
await fixture.close();
});
await fixture.db.collection("workflow_definition_versions").insertOne({
_id: "workflow-1-v1",
workflowDefinitionId: "workflow-1",
workspaceId: "workspace-1",
projectId: "project-1",
versionNumber: 1,
visualGraph: {},
logicGraph: {
nodes: [
{ id: "source-asset", type: "source" },
{ id: "rename-folder", type: "transform" },
],
edges: [{ from: "source-asset", to: "rename-folder" }],
},
runtimeGraph: {
nodeBindings: {
"source-asset": "source-asset",
"rename-folder": "rename-folder",
},
},
pluginRefs: ["builtin:delivery-nodes"],
createdBy: "local-user",
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.db.collection("workflow_runs").insertOne({
_id: "run-1",
workflowDefinitionId: "workflow-1",
workflowVersionId: "workflow-1-v1",
status: "queued",
triggeredBy: "local-user",
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.db.collection("run_tasks").insertMany([
{
_id: "task-source",
workflowRunId: "run-1",
workflowVersionId: "workflow-1-v1",
nodeId: "source-asset",
nodeType: "source",
executorType: "python",
status: "queued",
attempt: 1,
assetIds: ["asset-1"],
upstreamNodeIds: [],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
{
_id: "task-rename",
workflowRunId: "run-1",
workflowVersionId: "workflow-1-v1",
nodeId: "rename-folder",
nodeType: "transform",
executorType: "python",
status: "pending",
attempt: 1,
assetIds: ["asset-1"],
upstreamNodeIds: ["source-asset"],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
]);
const task = await fixture.runtime.runNextTask();
const tasks = await fixture.store.listRunTasks("run-1");
const run = await fixture.store.getRun("run-1");
const artifacts = await fixture.db.collection("artifacts").find({ producerId: "task-source" }).toArray();
assert.equal(task?.id, "task-source");
assert.equal(tasks[0]?.status, "success");
assert.deepEqual(tasks[0]?.assetIds, ["asset-1"]);
assert.equal(tasks[1]?.status, "queued");
assert.deepEqual(tasks[1]?.assetIds, ["asset-1"]);
assert.equal(artifacts.length, 1);
assert.equal(run?.status, "queued");
});
test("worker marks the run successful after the final queued task completes", async (t) => {
const fixture = await createRuntimeFixture("emboflow-worker-success");
t.after(async () => {
await fixture.close();
});
await fixture.db.collection("workflow_definition_versions").insertOne({
_id: "workflow-2-v1",
workflowDefinitionId: "workflow-2",
workspaceId: "workspace-1",
projectId: "project-1",
versionNumber: 1,
visualGraph: {},
logicGraph: {
nodes: [{ id: "export-delivery-package", type: "export" }],
edges: [],
},
runtimeGraph: {
nodeBindings: {
"export-delivery-package": "export-delivery-package",
},
},
pluginRefs: ["builtin:delivery-nodes"],
createdBy: "local-user",
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.db.collection("workflow_runs").insertOne({
_id: "run-2",
workflowDefinitionId: "workflow-2",
workflowVersionId: "workflow-2-v1",
status: "queued",
triggeredBy: "local-user",
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.db.collection("run_tasks").insertOne({
_id: "task-export",
workflowRunId: "run-2",
workflowVersionId: "workflow-2-v1",
nodeId: "export-delivery-package",
nodeType: "export",
executorType: "python",
status: "queued",
attempt: 1,
assetIds: ["asset-final"],
upstreamNodeIds: [],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.runtime.runNextTask();
const run = await fixture.store.getRun("run-2");
const task = await fixture.db.collection("run_tasks").findOne({ _id: "task-export" });
assert.equal(task?.status, "success");
assert.equal(run?.status, "success");
assert.ok(typeof task?.startedAt === "string");
assert.ok(typeof task?.finishedAt === "string");
assert.ok(typeof task?.durationMs === "number");
assert.deepEqual(task?.stdoutLines, ["python executor processed export-delivery-package"]);
assert.deepEqual(task?.stderrLines, []);
assert.equal(task?.summary?.outcome, "success");
assert.equal(task?.summary?.executorType, "python");
assert.equal(task?.summary?.stdoutLineCount, 1);
assert.equal(task?.summary?.stderrLineCount, 0);
assert.match(task?.logLines?.[0] ?? "", /claimed/i);
assert.match(task?.logLines?.[1] ?? "", /stdout:/i);
assert.match(task?.logLines?.at(-1) ?? "", /completed/i);
assert.deepEqual(task?.lastResultPreview, {
taskId: "task-export",
executor: "python",
});
assert.ok(typeof run?.startedAt === "string");
assert.ok(typeof run?.finishedAt === "string");
assert.ok(typeof run?.durationMs === "number");
assert.equal(run?.summary?.totalTaskCount, 1);
assert.equal(run?.summary?.artifactCount, 1);
assert.equal(run?.summary?.stdoutLineCount, 1);
assert.equal(run?.summary?.stderrLineCount, 0);
assert.equal(run?.summary?.taskCounts?.success, 1);
});
test("worker passes bound asset ids into the execution context and task artifacts", async (t) => {
let capturedTask: TaskRecord | null = null;
let capturedContext: ExecutionContext | null = null;
const fixture = await createRuntimeFixture("emboflow-worker-asset-context", {
executors: {
python: {
async execute(task: TaskRecord, context: ExecutionContext) {
capturedTask = task;
capturedContext = context;
return {
result: {
taskId: task.id,
assetIds: context.assetIds,
},
stdoutLines: ["custom executor started"],
stderrLines: [],
};
},
},
},
});
t.after(async () => {
await fixture.close();
});
await fixture.db.collection("workflow_runs").insertOne({
_id: "run-asset",
workflowDefinitionId: "workflow-asset",
workflowVersionId: "workflow-asset-v1",
status: "queued",
triggeredBy: "local-user",
assetIds: ["asset-42"],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.db.collection("run_tasks").insertOne({
_id: "task-asset",
workflowRunId: "run-asset",
workflowVersionId: "workflow-asset-v1",
nodeId: "source-asset",
nodeType: "source",
executorType: "python",
status: "queued",
attempt: 1,
assetIds: ["asset-42"],
upstreamNodeIds: [],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.runtime.runNextTask();
const storedTask = await fixture.store.getRunTask("task-asset");
const artifact = await fixture.db.collection("artifacts").findOne({ producerId: "task-asset" });
assert.deepEqual(capturedTask?.assetIds, ["asset-42"]);
assert.deepEqual(capturedContext?.assetIds, ["asset-42"]);
assert.deepEqual(storedTask?.assetIds, ["asset-42"]);
assert.deepEqual(
(artifact?.payload as { assetIds?: string[] } | undefined)?.assetIds,
["asset-42"],
);
assert.equal(storedTask?.summary?.outcome, "success");
assert.equal(storedTask?.summary?.assetCount, 1);
assert.deepEqual(storedTask?.summary?.artifactIds, [artifact?._id]);
assert.deepEqual(storedTask?.stdoutLines, ["custom executor started"]);
assert.deepEqual(storedTask?.stderrLines, []);
});
test("worker persists failure summaries and task log lines when execution throws", async (t) => {
const fixture = await createRuntimeFixture("emboflow-worker-failure-summary", {
executors: {
python: {
async execute() {
throw Object.assign(new Error("intentional executor failure"), {
stdoutLines: ["failure path entered"],
stderrLines: ["stack trace line"],
});
},
},
},
});
t.after(async () => {
await fixture.close();
});
await fixture.db.collection("workflow_runs").insertOne({
_id: "run-failure",
workflowDefinitionId: "workflow-failure",
workflowVersionId: "workflow-failure-v1",
status: "queued",
triggeredBy: "local-user",
assetIds: ["asset-9"],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.db.collection("run_tasks").insertOne({
_id: "task-failure",
workflowRunId: "run-failure",
workflowVersionId: "workflow-failure-v1",
nodeId: "validate-structure",
nodeType: "inspect",
executorType: "python",
status: "queued",
attempt: 1,
assetIds: ["asset-9"],
upstreamNodeIds: [],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await assert.rejects(() => fixture.runtime.runNextTask(), /intentional executor failure/);
const task = await fixture.db.collection("run_tasks").findOne({ _id: "task-failure" });
const run = await fixture.store.getRun("run-failure");
assert.equal(task?.status, "failed");
assert.equal(run?.status, "failed");
assert.equal(task?.errorMessage, "intentional executor failure");
assert.equal(task?.summary?.outcome, "failed");
assert.equal(task?.summary?.executorType, "python");
assert.equal(task?.summary?.stdoutLineCount, 1);
assert.equal(task?.summary?.stderrLineCount, 1);
assert.equal(task?.summary?.artifactIds?.length ?? 0, 0);
assert.ok(typeof task?.startedAt === "string");
assert.ok(typeof task?.finishedAt === "string");
assert.ok(typeof task?.durationMs === "number");
assert.deepEqual(task?.stdoutLines, ["failure path entered"]);
assert.deepEqual(task?.stderrLines, ["stack trace line"]);
assert.match(task?.logLines?.[0] ?? "", /claimed/i);
assert.match(task?.logLines?.[1] ?? "", /stdout:/i);
assert.match(task?.logLines?.[2] ?? "", /stderr:/i);
assert.match(task?.logLines?.at(-1) ?? "", /intentional executor failure/i);
assert.equal(run?.summary?.totalTaskCount, 1);
assert.equal(run?.summary?.taskCounts?.failed, 1);
assert.equal(run?.summary?.stdoutLineCount, 1);
assert.equal(run?.summary?.stderrLineCount, 1);
assert.deepEqual(run?.summary?.failedTaskIds, ["task-failure"]);
});
test("worker skips queued tasks that belong to a cancelled run", async (t) => {
const fixture = await createRuntimeFixture("emboflow-worker-cancelled-run-skip");
t.after(async () => {
await fixture.close();
});
await fixture.db.collection("workflow_runs").insertMany([
{
_id: "run-cancelled",
workflowDefinitionId: "workflow-cancelled",
workflowVersionId: "workflow-cancelled-v1",
status: "cancelled",
triggeredBy: "local-user",
assetIds: ["asset-cancelled"],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
{
_id: "run-active",
workflowDefinitionId: "workflow-active",
workflowVersionId: "workflow-active-v1",
status: "queued",
triggeredBy: "local-user",
assetIds: ["asset-active"],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
]);
await fixture.db.collection("run_tasks").insertMany([
{
_id: "task-cancelled",
workflowRunId: "run-cancelled",
workflowVersionId: "workflow-cancelled-v1",
nodeId: "source-asset",
nodeType: "source",
executorType: "python",
status: "queued",
attempt: 1,
assetIds: ["asset-cancelled"],
upstreamNodeIds: [],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
{
_id: "task-active",
workflowRunId: "run-active",
workflowVersionId: "workflow-active-v1",
nodeId: "source-asset",
nodeType: "source",
executorType: "python",
status: "queued",
attempt: 1,
assetIds: ["asset-active"],
upstreamNodeIds: [],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
]);
const claimedTask = await fixture.runtime.runNextTask();
const cancelledTask = await fixture.store.getRunTask("task-cancelled");
const activeTask = await fixture.store.getRunTask("task-active");
assert.equal(claimedTask?.id, "task-active");
assert.equal(cancelledTask?.status, "cancelled");
assert.match(cancelledTask?.logLines?.at(-1) ?? "", /cancelled/i);
assert.equal(activeTask?.status, "success");
});
test("worker executes a python code hook snapshot from the queued task", async (t) => {
const fixture = await createRuntimeFixture("emboflow-worker-python-hook");
t.after(async () => {
await fixture.close();
});
await fixture.db.collection("workflow_runs").insertOne({
_id: "run-python-hook",
workflowDefinitionId: "workflow-python-hook",
workflowVersionId: "workflow-python-hook-v1",
status: "queued",
triggeredBy: "local-user",
assetIds: ["asset-hook"],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.db.collection("run_tasks").insertOne({
_id: "task-python-hook",
workflowRunId: "run-python-hook",
workflowVersionId: "workflow-python-hook-v1",
nodeId: "validate-structure",
nodeType: "inspect",
executorType: "python",
status: "queued",
attempt: 1,
assetIds: ["asset-hook"],
upstreamNodeIds: [],
outputArtifactIds: [],
codeHookSpec: {
language: "python",
entrypoint: "process",
source: [
"def process(task, context):",
" print(f\"hook running for {task['nodeId']}\")",
" return {'nodeId': task['nodeId'], 'assetIds': context['assetIds'], 'hooked': True}",
].join("\n"),
},
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.runtime.runNextTask();
const task = await fixture.store.getRunTask("task-python-hook");
const artifact = await fixture.db.collection("artifacts").findOne({ producerId: "task-python-hook" });
assert.equal(task?.status, "success");
assert.deepEqual(task?.stdoutLines, ["hook running for validate-structure"]);
assert.deepEqual(task?.stderrLines, []);
assert.equal(task?.summary?.executorType, "python");
assert.deepEqual((artifact?.payload as { result?: { hooked?: boolean } } | undefined)?.result, {
nodeId: "validate-structure",
assetIds: ["asset-hook"],
hooked: true,
});
});