179 lines
5.1 KiB
TypeScript
179 lines
5.1 KiB
TypeScript
import test from "node:test";
|
|
import assert from "node:assert/strict";
|
|
|
|
import { MongoMemoryServer } from "mongodb-memory-server";
|
|
import { MongoClient } from "mongodb";
|
|
|
|
import { MongoWorkerStore } from "../src/runtime/mongo-worker-store.ts";
|
|
import { WorkerRuntime } from "../src/runtime/worker-runtime.ts";
|
|
|
|
async function createRuntimeFixture(database: string) {
|
|
const mongod = await MongoMemoryServer.create({
|
|
instance: {
|
|
ip: "127.0.0.1",
|
|
},
|
|
});
|
|
const client = new MongoClient(mongod.getUri());
|
|
await client.connect();
|
|
const db = client.db(database);
|
|
const store = new MongoWorkerStore(db);
|
|
const runtime = new WorkerRuntime({ store });
|
|
|
|
return {
|
|
db,
|
|
store,
|
|
runtime,
|
|
close: async () => {
|
|
await client.close();
|
|
await mongod.stop();
|
|
},
|
|
};
|
|
}
|
|
|
|
test("worker claims a queued task, creates an artifact, and queues the dependent task", async (t) => {
|
|
const fixture = await createRuntimeFixture("emboflow-worker-progress");
|
|
t.after(async () => {
|
|
await fixture.close();
|
|
});
|
|
|
|
await fixture.db.collection("workflow_definition_versions").insertOne({
|
|
_id: "workflow-1-v1",
|
|
workflowDefinitionId: "workflow-1",
|
|
workspaceId: "workspace-1",
|
|
projectId: "project-1",
|
|
versionNumber: 1,
|
|
visualGraph: {},
|
|
logicGraph: {
|
|
nodes: [
|
|
{ id: "source-asset", type: "source" },
|
|
{ id: "rename-folder", type: "transform" },
|
|
],
|
|
edges: [{ from: "source-asset", to: "rename-folder" }],
|
|
},
|
|
runtimeGraph: {
|
|
nodeBindings: {
|
|
"source-asset": "source-asset",
|
|
"rename-folder": "rename-folder",
|
|
},
|
|
},
|
|
pluginRefs: ["builtin:delivery-nodes"],
|
|
createdBy: "local-user",
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("workflow_runs").insertOne({
|
|
_id: "run-1",
|
|
workflowDefinitionId: "workflow-1",
|
|
workflowVersionId: "workflow-1-v1",
|
|
status: "queued",
|
|
triggeredBy: "local-user",
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("run_tasks").insertMany([
|
|
{
|
|
_id: "task-source",
|
|
workflowRunId: "run-1",
|
|
workflowVersionId: "workflow-1-v1",
|
|
nodeId: "source-asset",
|
|
nodeType: "source",
|
|
executorType: "python",
|
|
status: "queued",
|
|
attempt: 1,
|
|
upstreamNodeIds: [],
|
|
outputArtifactIds: [],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
},
|
|
{
|
|
_id: "task-rename",
|
|
workflowRunId: "run-1",
|
|
workflowVersionId: "workflow-1-v1",
|
|
nodeId: "rename-folder",
|
|
nodeType: "transform",
|
|
executorType: "python",
|
|
status: "pending",
|
|
attempt: 1,
|
|
upstreamNodeIds: ["source-asset"],
|
|
outputArtifactIds: [],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
},
|
|
]);
|
|
|
|
const task = await fixture.runtime.runNextTask();
|
|
const tasks = await fixture.store.listRunTasks("run-1");
|
|
const run = await fixture.store.getRun("run-1");
|
|
const artifacts = await fixture.db.collection("artifacts").find({ producerId: "task-source" }).toArray();
|
|
|
|
assert.equal(task?.id, "task-source");
|
|
assert.equal(tasks[0]?.status, "success");
|
|
assert.equal(tasks[1]?.status, "queued");
|
|
assert.equal(artifacts.length, 1);
|
|
assert.equal(run?.status, "running");
|
|
});
|
|
|
|
test("worker marks the run successful after the final queued task completes", async (t) => {
|
|
const fixture = await createRuntimeFixture("emboflow-worker-success");
|
|
t.after(async () => {
|
|
await fixture.close();
|
|
});
|
|
|
|
await fixture.db.collection("workflow_definition_versions").insertOne({
|
|
_id: "workflow-2-v1",
|
|
workflowDefinitionId: "workflow-2",
|
|
workspaceId: "workspace-1",
|
|
projectId: "project-1",
|
|
versionNumber: 1,
|
|
visualGraph: {},
|
|
logicGraph: {
|
|
nodes: [{ id: "export-delivery-package", type: "export" }],
|
|
edges: [],
|
|
},
|
|
runtimeGraph: {
|
|
nodeBindings: {
|
|
"export-delivery-package": "export-delivery-package",
|
|
},
|
|
},
|
|
pluginRefs: ["builtin:delivery-nodes"],
|
|
createdBy: "local-user",
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("workflow_runs").insertOne({
|
|
_id: "run-2",
|
|
workflowDefinitionId: "workflow-2",
|
|
workflowVersionId: "workflow-2-v1",
|
|
status: "queued",
|
|
triggeredBy: "local-user",
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.db.collection("run_tasks").insertOne({
|
|
_id: "task-export",
|
|
workflowRunId: "run-2",
|
|
workflowVersionId: "workflow-2-v1",
|
|
nodeId: "export-delivery-package",
|
|
nodeType: "export",
|
|
executorType: "python",
|
|
status: "queued",
|
|
attempt: 1,
|
|
upstreamNodeIds: [],
|
|
outputArtifactIds: [],
|
|
createdAt: new Date().toISOString(),
|
|
updatedAt: new Date().toISOString(),
|
|
});
|
|
|
|
await fixture.runtime.runNextTask();
|
|
|
|
const run = await fixture.store.getRun("run-2");
|
|
const task = await fixture.db.collection("run_tasks").findOne({ _id: "task-export" });
|
|
|
|
assert.equal(task?.status, "success");
|
|
assert.equal(run?.status, "success");
|
|
});
|