From a6ee6cfa38058077af414c1329905d8281de3d03 Mon Sep 17 00:00:00 2001 From: eust-w Date: Thu, 26 Mar 2026 21:13:05 +0800 Subject: [PATCH] :sparkles: feat: add mongo-backed worker execution loop --- Makefile | 8 +- README.md | 4 +- apps/api/package.json | 4 +- apps/api/src/runtime/mongo-store.ts | 9 + .../api/test/runtime-http.integration.spec.ts | 26 +- apps/worker/package.json | 7 + .../worker/src/contracts/execution-context.ts | 12 +- apps/worker/src/main.ts | 79 +++++- apps/worker/src/runtime/mongo-worker-store.ts | 224 ++++++++++++++++++ apps/worker/src/runtime/worker-runtime.ts | 65 +++++ apps/worker/test/mongo-worker-runtime.spec.ts | 178 ++++++++++++++ design/02-architecture/system-architecture.md | 1 + .../03-workflows/workflow-execution-model.md | 16 +- design/05-data/mongodb-data-model.md | 16 +- docs/development-workflow.md | 1 + pnpm-lock.yaml | 88 ++++++- tests/test_dev_commands.py | 2 + 17 files changed, 705 insertions(+), 35 deletions(-) create mode 100644 apps/worker/src/runtime/mongo-worker-store.ts create mode 100644 apps/worker/src/runtime/worker-runtime.ts create mode 100644 apps/worker/test/mongo-worker-runtime.spec.ts diff --git a/Makefile b/Makefile index 201058b..104bcc4 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ SHELL := /bin/bash -.PHONY: bootstrap test dev-api dev-web dev-worker serve-api serve-web infra-up infra-down guardrails +.PHONY: bootstrap test dev-api dev-web dev-worker serve-api serve-web serve-worker infra-up infra-down guardrails bootstrap: pnpm install @@ -34,6 +34,12 @@ serve-web: VITE_API_BASE_URL="$${VITE_API_BASE_URL:-http://127.0.0.1:3001}" \ pnpm --filter web start -- --host 127.0.0.1 --port 3000 +serve-worker: + MONGO_URI="$${MONGO_URI:-mongodb://127.0.0.1:27017}" \ + MONGO_DB="$${MONGO_DB:-emboflow}" \ + WORKER_POLL_INTERVAL_MS="$${WORKER_POLL_INTERVAL_MS:-1000}" \ + pnpm --filter worker start + infra-up: docker compose up -d mongo minio diff --git a/README.md b/README.md index 09f6aaa..b5d9b47 100644 --- a/README.md +++ b/README.md @@ -47,12 +47,14 @@ Start the API and web app in separate terminals: ```bash make serve-api make serve-web +make serve-worker ``` The default local stack uses: - API: `http://127.0.0.1:3001` - Web: `http://127.0.0.1:3000` +- Worker: Mongo polling loop with `WORKER_POLL_INTERVAL_MS=1000` ### Local Data Validation @@ -68,7 +70,7 @@ You can register that directory from the Assets page or via `POST /api/assets/re - `apps/api` contains the control-plane modules for workspaces, assets, workflows, runs, and artifacts. - `apps/web` contains the React shell, asset workspace, workflow editor surface, run detail view, and explore renderers. -- `apps/worker` contains the local scheduler, task runner, and executor contracts. +- `apps/worker` contains the Mongo-backed worker runtime, task runner, and executor contracts. - `design/` contains the architecture and product design documents that must stay aligned with implementation. - `docs/` contains workflow guidance and the executable implementation plan. diff --git a/apps/api/package.json b/apps/api/package.json index f01e494..8baa775 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -6,7 +6,7 @@ "scripts": { "dev": "tsx watch src/main.ts", "start": "tsx src/main.ts", - "test": "tsx --test" + "test": "node --import tsx --test" }, "dependencies": { "cors": "^2.8.6", @@ -14,6 +14,6 @@ "mongodb": "^7.1.1" }, "devDependencies": { - "mongodb-memory-server": "^11.0.1" + "mongodb-memory-server": "^10.3.0" } } diff --git a/apps/api/src/runtime/mongo-store.ts b/apps/api/src/runtime/mongo-store.ts index 3ae1cc4..20c22a6 100644 --- a/apps/api/src/runtime/mongo-store.ts +++ b/apps/api/src/runtime/mongo-store.ts @@ -5,6 +5,7 @@ import type { Db, Document, WithId } from "mongodb"; import type { AssetType } from "../../../../packages/contracts/src/domain.ts"; import { DELIVERY_NODE_DEFINITIONS } from "../modules/plugins/builtin/delivery-nodes.ts"; import { probeLocalSourcePath } from "./local-source-probe.ts"; +import type { ExecutorType } from "../../../worker/src/contracts/execution-context.ts"; type Timestamped = { createdAt: string; @@ -101,8 +102,11 @@ type RunTaskDocument = Timestamped & { workflowVersionId: string; nodeId: string; nodeType: string; + executorType: ExecutorType; status: "queued" | "pending"; attempt: number; + upstreamNodeIds: string[]; + outputArtifactIds: string[]; }; type ArtifactDocument = Timestamped & { @@ -457,8 +461,13 @@ export class MongoAppStore { workflowVersionId: version._id, nodeId: node.id, nodeType: node.type, + executorType: "python", status: targetNodes.has(node.id) ? "pending" : "queued", attempt: 1, + upstreamNodeIds: version.logicGraph.edges + .filter((edge) => edge.to === node.id) + .map((edge) => edge.from), + outputArtifactIds: [], createdAt: nowIso(), updatedAt: nowIso(), })); diff --git a/apps/api/test/runtime-http.integration.spec.ts b/apps/api/test/runtime-http.integration.spec.ts index 1a5ba30..6a7557e 100644 --- a/apps/api/test/runtime-http.integration.spec.ts +++ b/apps/api/test/runtime-http.integration.spec.ts @@ -44,7 +44,12 @@ async function startRuntimeServer(config: ApiRuntimeConfig) { } test("mongo-backed runtime reuses bootstrapped workspace and project across restarts", async (t) => { - const mongod = await MongoMemoryServer.create(); + const mongod = await MongoMemoryServer.create({ + instance: { + ip: "127.0.0.1", + port: 27117, + }, + }); t.after(async () => { await mongod.stop(); }); @@ -98,7 +103,12 @@ test("mongo-backed runtime persists probed assets and workflow runs through the await writeFile(path.join(sourceDir, "video_meta.json"), "{}"); await writeFile(path.join(sourceDir, "DJI_001", "DJI_001.mp4"), ""); - const mongod = await MongoMemoryServer.create(); + const mongod = await MongoMemoryServer.create({ + instance: { + ip: "127.0.0.1", + port: 27118, + }, + }); t.after(async () => { await mongod.stop(); }); @@ -191,7 +201,14 @@ test("mongo-backed runtime persists probed assets and workflow runs through the }), }), ); - const tasks = await readJson>( + const tasks = await readJson< + Array<{ + nodeId: string; + status: string; + executorType: string; + upstreamNodeIds: string[]; + }> + >( await fetch(`${server.baseUrl}/api/runs/${run._id}/tasks`), ); @@ -204,6 +221,9 @@ test("mongo-backed runtime persists probed assets and workflow runs through the assert.equal(run.status, "queued"); assert.equal(tasks.length, 3); assert.equal(tasks[0]?.nodeId, "source-asset"); + assert.equal(tasks[0]?.executorType, "python"); + assert.deepEqual(tasks[0]?.upstreamNodeIds, []); assert.equal(tasks[0]?.status, "queued"); + assert.deepEqual(tasks[1]?.upstreamNodeIds, ["source-asset"]); assert.equal(tasks[1]?.status, "pending"); }); diff --git a/apps/worker/package.json b/apps/worker/package.json index a824255..cc270d0 100644 --- a/apps/worker/package.json +++ b/apps/worker/package.json @@ -5,6 +5,13 @@ "type": "module", "scripts": { "dev": "tsx watch src/main.ts", + "start": "tsx src/main.ts", "test": "node --test --experimental-strip-types" + }, + "dependencies": { + "mongodb": "^7.1.1" + }, + "devDependencies": { + "mongodb-memory-server": "^10.3.0" } } diff --git a/apps/worker/src/contracts/execution-context.ts b/apps/worker/src/contracts/execution-context.ts index afb5fa9..b4ce539 100644 --- a/apps/worker/src/contracts/execution-context.ts +++ b/apps/worker/src/contracts/execution-context.ts @@ -1,13 +1,23 @@ export type ExecutorType = "python" | "docker" | "http"; +export type TaskStatus = "pending" | "queued" | "running" | "success" | "failed"; export type TaskRecord = { id: string; + workflowRunId?: string; + workflowVersionId?: string; nodeId: string; + nodeType?: string; executorType: ExecutorType; - status: "pending" | "running" | "success" | "failed"; + status: TaskStatus; + attempt?: number; + upstreamNodeIds?: string[]; + outputArtifactIds?: string[]; + errorMessage?: string; }; export type ExecutionContext = { taskId: string; + workflowRunId?: string; + workflowVersionId?: string; nodeId: string; }; diff --git a/apps/worker/src/main.ts b/apps/worker/src/main.ts index 8205aa3..2b74ab7 100644 --- a/apps/worker/src/main.ts +++ b/apps/worker/src/main.ts @@ -1,9 +1,84 @@ -export function bootstrapWorker() { +import process from "node:process"; + +import { MongoClient } from "mongodb"; + +import { MongoWorkerStore } from "./runtime/mongo-worker-store.ts"; +import { WorkerRuntime } from "./runtime/worker-runtime.ts"; + +export type WorkerRuntimeConfig = { + mongoUri: string; + database: string; + pollIntervalMs: number; +}; + +export function resolveWorkerRuntimeConfig( + env: NodeJS.ProcessEnv = process.env, +): WorkerRuntimeConfig { + return { + mongoUri: env.MONGO_URI ?? "mongodb://127.0.0.1:27017", + database: env.MONGO_DB ?? "emboflow", + pollIntervalMs: Number(env.WORKER_POLL_INTERVAL_MS ?? 1000), + }; +} + +export function bootstrapWorker(config = resolveWorkerRuntimeConfig()) { return { status: "ready" as const, + mode: "mongo-polling" as const, + pollIntervalMs: config.pollIntervalMs, + }; +} + +export async function startWorkerLoop(config = resolveWorkerRuntimeConfig()) { + const client = new MongoClient(config.mongoUri); + await client.connect(); + const runtime = new WorkerRuntime({ + store: new MongoWorkerStore(client.db(config.database)), + }); + + let running = false; + const tick = async () => { + if (running) { + return; + } + running = true; + try { + await runtime.runNextTask(); + } catch (error) { + const message = error instanceof Error ? error.message : "worker tick failed"; + process.stderr.write(`${message}\n`); + } finally { + running = false; + } + }; + + await tick(); + const timer = setInterval(() => { + void tick(); + }, config.pollIntervalMs); + + return { + close: async () => { + clearInterval(timer); + await client.close(); + }, }; } if (import.meta.url === `file://${process.argv[1]}`) { - process.stdout.write(JSON.stringify(bootstrapWorker(), null, 2)); + const config = resolveWorkerRuntimeConfig(); + const service = await startWorkerLoop(config); + process.stdout.write(`${JSON.stringify(bootstrapWorker(config), null, 2)}\n`); + + const shutdown = async () => { + await service.close(); + process.exit(0); + }; + + process.on("SIGINT", () => { + void shutdown(); + }); + process.on("SIGTERM", () => { + void shutdown(); + }); } diff --git a/apps/worker/src/runtime/mongo-worker-store.ts b/apps/worker/src/runtime/mongo-worker-store.ts new file mode 100644 index 0000000..5943ae2 --- /dev/null +++ b/apps/worker/src/runtime/mongo-worker-store.ts @@ -0,0 +1,224 @@ +import { randomUUID } from "node:crypto"; + +import type { Db } from "mongodb"; + +import type { ExecutorType, TaskRecord, TaskStatus } from "../contracts/execution-context.ts"; + +type WorkflowRunDocument = { + _id: string; + workflowDefinitionId: string; + workflowVersionId: string; + status: "queued" | "running" | "success" | "failed"; + triggeredBy: string; + createdAt: string; + updatedAt: string; +}; + +type WorkflowVersionDocument = { + _id: string; + workflowDefinitionId: string; + logicGraph: { + nodes: Array<{ id: string; type: string }>; + edges: Array<{ from: string; to: string }>; + }; + runtimeGraph: Record; +}; + +type RunTaskDocument = { + _id: string; + workflowRunId: string; + workflowVersionId: string; + nodeId: string; + nodeType: string; + executorType: ExecutorType; + status: TaskStatus; + attempt: number; + upstreamNodeIds: string[]; + outputArtifactIds: string[]; + errorMessage?: string; + createdAt: string; + updatedAt: string; +}; + +function nowIso() { + return new Date().toISOString(); +} + +function toTaskRecord(task: RunTaskDocument): TaskRecord { + return { + id: task._id, + workflowRunId: task.workflowRunId, + workflowVersionId: task.workflowVersionId, + nodeId: task.nodeId, + nodeType: task.nodeType, + executorType: task.executorType, + status: task.status, + attempt: task.attempt, + upstreamNodeIds: task.upstreamNodeIds, + outputArtifactIds: task.outputArtifactIds, + errorMessage: task.errorMessage, + }; +} + +export class MongoWorkerStore { + private readonly db: Db; + + constructor(db: Db) { + this.db = db; + } + + async claimNextQueuedTask(): Promise { + const task = await this.db.collection("run_tasks").findOneAndUpdate( + { status: "queued" }, + { + $set: { + status: "running", + updatedAt: nowIso(), + }, + }, + { + sort: { createdAt: 1 }, + returnDocument: "after", + }, + ); + + if (!task) { + return undefined; + } + + await this.db.collection("workflow_runs").updateOne( + { _id: task.workflowRunId }, + { $set: { status: "running", updatedAt: nowIso() } }, + ); + + return toTaskRecord(task); + } + + async getRun(runId: string) { + return this.db.collection("workflow_runs").findOne({ _id: runId }); + } + + async getRunTask(taskId: string) { + const task = await this.db.collection("run_tasks").findOne({ _id: taskId }); + return task ? toTaskRecord(task) : null; + } + + async listRunTasks(runId: string) { + const tasks = await this.db + .collection("run_tasks") + .find({ workflowRunId: runId }) + .sort({ createdAt: 1 }) + .toArray(); + return tasks.map(toTaskRecord); + } + + async getWorkflowVersion(workflowVersionId: string) { + return this.db + .collection("workflow_definition_versions") + .findOne({ _id: workflowVersionId }); + } + + async createTaskArtifact(task: TaskRecord, payload: Record) { + const artifact = { + _id: `artifact-${randomUUID()}`, + type: "json", + title: `Task Result: ${task.nodeId}`, + producerType: "run_task", + producerId: task.id, + payload, + createdAt: nowIso(), + updatedAt: nowIso(), + }; + await this.db.collection("artifacts").insertOne(artifact); + await this.db.collection("run_tasks").updateOne( + { _id: task.id }, + { + $push: { outputArtifactIds: artifact._id }, + $set: { updatedAt: nowIso() }, + }, + ); + return artifact; + } + + async markTaskSuccess(taskId: string) { + await this.db.collection("run_tasks").updateOne( + { _id: taskId }, + { + $set: { + status: "success", + updatedAt: nowIso(), + }, + }, + ); + } + + async markTaskFailed(taskId: string, errorMessage: string) { + await this.db.collection("run_tasks").updateOne( + { _id: taskId }, + { + $set: { + status: "failed", + errorMessage, + updatedAt: nowIso(), + }, + }, + ); + } + + async queueReadyDependents(runId: string) { + const tasks = await this.db + .collection("run_tasks") + .find({ workflowRunId: runId }) + .toArray(); + const successfulNodes = new Set( + tasks.filter((task) => task.status === "success").map((task) => task.nodeId), + ); + const readyTaskIds = tasks + .filter( + (task) => + task.status === "pending" && + task.upstreamNodeIds.every((nodeId) => successfulNodes.has(nodeId)), + ) + .map((task) => task._id); + + if (readyTaskIds.length === 0) { + return; + } + + await this.db.collection("run_tasks").updateMany( + { _id: { $in: readyTaskIds } }, + { + $set: { + status: "queued", + updatedAt: nowIso(), + }, + }, + ); + } + + async refreshRunStatus(runId: string) { + const tasks = await this.listRunTasks(runId); + if (tasks.length === 0) { + return; + } + + let status: WorkflowRunDocument["status"] = "queued"; + if (tasks.some((task) => task.status === "failed")) { + status = "failed"; + } else if (tasks.every((task) => task.status === "success")) { + status = "success"; + } else if (tasks.some((task) => task.status === "running" || task.status === "success")) { + status = "running"; + } + + await this.db.collection("workflow_runs").updateOne( + { _id: runId }, + { + $set: { + status, + updatedAt: nowIso(), + }, + }, + ); + } +} diff --git a/apps/worker/src/runtime/worker-runtime.ts b/apps/worker/src/runtime/worker-runtime.ts new file mode 100644 index 0000000..1a41b54 --- /dev/null +++ b/apps/worker/src/runtime/worker-runtime.ts @@ -0,0 +1,65 @@ +import { DockerExecutor } from "../executors/docker-executor.ts"; +import { HttpExecutor } from "../executors/http-executor.ts"; +import { PythonExecutor } from "../executors/python-executor.ts"; +import type { ExecutionContext, ExecutorType, TaskRecord } from "../contracts/execution-context.ts"; +import { MongoWorkerStore } from "./mongo-worker-store.ts"; + +type ExecutorMap = { + python: PythonExecutor; + docker: DockerExecutor; + http: HttpExecutor; +}; + +export class WorkerRuntime { + private readonly store: MongoWorkerStore; + private readonly executors: ExecutorMap; + + constructor(config: { + store: MongoWorkerStore; + executors?: Partial; + }) { + this.store = config.store; + this.executors = { + python: config.executors?.python ?? new PythonExecutor(), + docker: config.executors?.docker ?? new DockerExecutor(), + http: config.executors?.http ?? new HttpExecutor(), + }; + } + + async runNextTask(): Promise { + const task = await this.store.claimNextQueuedTask(); + if (!task) { + return undefined; + } + + const context: ExecutionContext = { + taskId: task.id, + workflowRunId: task.workflowRunId, + workflowVersionId: task.workflowVersionId, + nodeId: task.nodeId, + }; + + try { + const result = await this.executors[task.executorType as ExecutorType].execute(task, context); + await this.store.createTaskArtifact(task, { + nodeId: task.nodeId, + nodeType: task.nodeType, + executorType: task.executorType, + result, + }); + await this.store.markTaskSuccess(task.id); + if (task.workflowRunId) { + await this.store.queueReadyDependents(task.workflowRunId); + await this.store.refreshRunStatus(task.workflowRunId); + } + return this.store.getRunTask(task.id) ?? { ...task, status: "success" }; + } catch (error) { + const message = error instanceof Error ? error.message : "worker execution failed"; + await this.store.markTaskFailed(task.id, message); + if (task.workflowRunId) { + await this.store.refreshRunStatus(task.workflowRunId); + } + throw error; + } + } +} diff --git a/apps/worker/test/mongo-worker-runtime.spec.ts b/apps/worker/test/mongo-worker-runtime.spec.ts new file mode 100644 index 0000000..13d5011 --- /dev/null +++ b/apps/worker/test/mongo-worker-runtime.spec.ts @@ -0,0 +1,178 @@ +import test from "node:test"; +import assert from "node:assert/strict"; + +import { MongoMemoryServer } from "mongodb-memory-server"; +import { MongoClient } from "mongodb"; + +import { MongoWorkerStore } from "../src/runtime/mongo-worker-store.ts"; +import { WorkerRuntime } from "../src/runtime/worker-runtime.ts"; + +async function createRuntimeFixture(database: string) { + const mongod = await MongoMemoryServer.create({ + instance: { + ip: "127.0.0.1", + }, + }); + const client = new MongoClient(mongod.getUri()); + await client.connect(); + const db = client.db(database); + const store = new MongoWorkerStore(db); + const runtime = new WorkerRuntime({ store }); + + return { + db, + store, + runtime, + close: async () => { + await client.close(); + await mongod.stop(); + }, + }; +} + +test("worker claims a queued task, creates an artifact, and queues the dependent task", async (t) => { + const fixture = await createRuntimeFixture("emboflow-worker-progress"); + t.after(async () => { + await fixture.close(); + }); + + await fixture.db.collection("workflow_definition_versions").insertOne({ + _id: "workflow-1-v1", + workflowDefinitionId: "workflow-1", + workspaceId: "workspace-1", + projectId: "project-1", + versionNumber: 1, + visualGraph: {}, + logicGraph: { + nodes: [ + { id: "source-asset", type: "source" }, + { id: "rename-folder", type: "transform" }, + ], + edges: [{ from: "source-asset", to: "rename-folder" }], + }, + runtimeGraph: { + nodeBindings: { + "source-asset": "source-asset", + "rename-folder": "rename-folder", + }, + }, + pluginRefs: ["builtin:delivery-nodes"], + createdBy: "local-user", + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }); + + await fixture.db.collection("workflow_runs").insertOne({ + _id: "run-1", + workflowDefinitionId: "workflow-1", + workflowVersionId: "workflow-1-v1", + status: "queued", + triggeredBy: "local-user", + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }); + + await fixture.db.collection("run_tasks").insertMany([ + { + _id: "task-source", + workflowRunId: "run-1", + workflowVersionId: "workflow-1-v1", + nodeId: "source-asset", + nodeType: "source", + executorType: "python", + status: "queued", + attempt: 1, + upstreamNodeIds: [], + outputArtifactIds: [], + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }, + { + _id: "task-rename", + workflowRunId: "run-1", + workflowVersionId: "workflow-1-v1", + nodeId: "rename-folder", + nodeType: "transform", + executorType: "python", + status: "pending", + attempt: 1, + upstreamNodeIds: ["source-asset"], + outputArtifactIds: [], + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }, + ]); + + const task = await fixture.runtime.runNextTask(); + const tasks = await fixture.store.listRunTasks("run-1"); + const run = await fixture.store.getRun("run-1"); + const artifacts = await fixture.db.collection("artifacts").find({ producerId: "task-source" }).toArray(); + + assert.equal(task?.id, "task-source"); + assert.equal(tasks[0]?.status, "success"); + assert.equal(tasks[1]?.status, "queued"); + assert.equal(artifacts.length, 1); + assert.equal(run?.status, "running"); +}); + +test("worker marks the run successful after the final queued task completes", async (t) => { + const fixture = await createRuntimeFixture("emboflow-worker-success"); + t.after(async () => { + await fixture.close(); + }); + + await fixture.db.collection("workflow_definition_versions").insertOne({ + _id: "workflow-2-v1", + workflowDefinitionId: "workflow-2", + workspaceId: "workspace-1", + projectId: "project-1", + versionNumber: 1, + visualGraph: {}, + logicGraph: { + nodes: [{ id: "export-delivery-package", type: "export" }], + edges: [], + }, + runtimeGraph: { + nodeBindings: { + "export-delivery-package": "export-delivery-package", + }, + }, + pluginRefs: ["builtin:delivery-nodes"], + createdBy: "local-user", + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }); + + await fixture.db.collection("workflow_runs").insertOne({ + _id: "run-2", + workflowDefinitionId: "workflow-2", + workflowVersionId: "workflow-2-v1", + status: "queued", + triggeredBy: "local-user", + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }); + + await fixture.db.collection("run_tasks").insertOne({ + _id: "task-export", + workflowRunId: "run-2", + workflowVersionId: "workflow-2-v1", + nodeId: "export-delivery-package", + nodeType: "export", + executorType: "python", + status: "queued", + attempt: 1, + upstreamNodeIds: [], + outputArtifactIds: [], + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }); + + await fixture.runtime.runNextTask(); + + const run = await fixture.store.getRun("run-2"); + const task = await fixture.db.collection("run_tasks").findOne({ _id: "task-export" }); + + assert.equal(task?.status, "success"); + assert.equal(run?.status, "success"); +}); diff --git a/design/02-architecture/system-architecture.md b/design/02-architecture/system-architecture.md index e5ab5c0..65db6f9 100644 --- a/design/02-architecture/system-architecture.md +++ b/design/02-architecture/system-architecture.md @@ -206,5 +206,6 @@ The current repository runtime now includes: - a real HTTP API process backed by MongoDB - a React and Vite web application that reads those APIs - a local-path asset registration flow for development and dataset inspection +- a worker process that polls Mongo-backed `run_tasks`, creates task artifacts, and refreshes run status The repository still keeps some in-memory module tests for contract stability, but the executable local stack now runs through Mongo-backed runtime services and adds HTTP integration coverage against a real Mongo runtime. diff --git a/design/03-workflows/workflow-execution-model.md b/design/03-workflows/workflow-execution-model.md index 94cb46e..a0b3f91 100644 --- a/design/03-workflows/workflow-execution-model.md +++ b/design/03-workflows/workflow-execution-model.md @@ -220,12 +220,6 @@ Recommended cache key inputs: - upstream reference summary - config summary - code hook digest - -## Current V1 Runtime Notes - -- The React workflow editor now loads the latest persisted version from the Mongo-backed API instead of rendering only a fixed starter graph. -- Draft edits are local editor state until the user saves, at which point the draft is serialized into a new workflow version document. -- The API runtime now has direct HTTP integration coverage against a real Mongo runtime through `mongodb-memory-server`, in addition to the older in-memory contract tests. - plugin version - executor version @@ -276,8 +270,16 @@ The persisted local runtime now covers: - workspace and project bootstrap - asset registration and probe reporting - workflow definition and immutable version snapshots -- workflow runs and task creation +- workflow runs and task creation with worker-consumable dependency snapshots +- worker polling of queued tasks from Mongo-backed `run_tasks` +- run-task status transitions from `queued/pending` to `running/success/failed` +- downstream task promotion when upstream nodes succeed - artifact registration and producer lookup +- task-level artifact creation by the worker runtime + +The React workflow editor now loads the latest persisted version from the Mongo-backed API instead of rendering only a fixed starter graph. Draft edits are local editor state until the user saves, at which point the draft is serialized into a new workflow version document. + +The API and worker runtimes now both have direct integration coverage against a real Mongo runtime through `mongodb-memory-server`, in addition to the older in-memory contract tests. The first web authoring surface already follows the three-pane layout contract with: diff --git a/design/05-data/mongodb-data-model.md b/design/05-data/mongodb-data-model.md index adce11f..559b18d 100644 --- a/design/05-data/mongodb-data-model.md +++ b/design/05-data/mongodb-data-model.md @@ -311,22 +311,28 @@ Core fields: - `workflowVersionId` - `nodeId` - `nodeType` +- `executorType` - `status` - `attempt` -- `executor` -- `scheduler` -- `inputRefs` -- `outputRefs` +- `upstreamNodeIds` +- `outputArtifactIds` - `logRef` - `cacheKey` - `cacheHit` -- `errorSummary` +- `errorMessage` - `startedAt` - `finishedAt` - `createdAt` This collection should remain separate from `workflow_runs` because task volume grows quickly. +The current executable worker path expects `run_tasks` to be self-sufficient enough for dequeue and dependency promotion. That means V1 runtime tasks already persist: + +- executor choice +- upstream node dependencies +- produced artifact ids +- per-task status and error message + ### artifacts Purpose: diff --git a/docs/development-workflow.md b/docs/development-workflow.md index 512b897..21a6654 100644 --- a/docs/development-workflow.md +++ b/docs/development-workflow.md @@ -75,6 +75,7 @@ make dev-web make dev-worker make serve-api make serve-web +make serve-worker make infra-up make infra-down make guardrails diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1571df9..16b2626 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -25,8 +25,8 @@ importers: version: 7.1.1 devDependencies: mongodb-memory-server: - specifier: ^11.0.1 - version: 11.0.1 + specifier: ^10.3.0 + version: 10.4.3 apps/web: dependencies: @@ -44,7 +44,15 @@ importers: specifier: ^8.0.3 version: 8.0.3(esbuild@0.27.4)(tsx@4.21.0) - apps/worker: {} + apps/worker: + dependencies: + mongodb: + specifier: ^7.1.1 + version: 7.1.1 + devDependencies: + mongodb-memory-server: + specifier: ^10.3.0 + version: 10.4.3 packages/contracts: {} @@ -325,6 +333,9 @@ packages: '@types/webidl-conversions@7.0.3': resolution: {integrity: sha512-CiJJvcRtIgzadHCYXw7dqEnMNRjhGZlYK05Mj9OyktqV8uVT8fD2BFOB7S1uwBE3Kj2Z+4UyPmFw/Ixgw/LAlA==} + '@types/whatwg-url@11.0.5': + resolution: {integrity: sha512-coYR071JRaHa+xoEvvYqvnIHaVqaYrLPbsufM9BF63HkwI5Lgmy2QR8Q5K/lYDYo5AK82wOvSOS0UsLTpTG7uQ==} + '@types/whatwg-url@13.0.0': resolution: {integrity: sha512-N8WXpbE6Wgri7KUSvrmQcqrMllKZ9uxkYWMt+mCSGwNc0Hsw9VQTW7ApqI4XNrx6/SaM2QQJCzMPDEXE058s+Q==} @@ -405,6 +416,10 @@ packages: resolution: {integrity: sha512-oP5VkATKlNwcgvxi0vM0p/D3n2C3EReYVX+DNYs5TjZFn/oQt2j+4sVJtSMr18pdRr8wjTcBl6LoV+FUwzPmNA==} engines: {node: '>=18'} + bson@6.10.4: + resolution: {integrity: sha512-WIsKqkSC0ABoBJuT1LEX+2HEvNmNKKgnTAyd0fL8qzK4SH2i9NXg+t08YtdZp/V9IZ33cxe3iV4yM0qg8lMQng==} + engines: {node: '>=16.20.1'} + bson@7.2.0: resolution: {integrity: sha512-YCEo7KjMlbNlyHhz7zAZNDpIpQbd+wOEHJYezv0nMYTn4x31eIUM2yomNNubclAt63dObUzKHWsBLJ9QcZNSnQ==} engines: {node: '>=20.19.0'} @@ -705,17 +720,47 @@ packages: resolution: {integrity: sha512-Lbgzdk0h4juoQ9fCKXW4by0UJqj+nOOrI9MJ1sSj4nI8aI2eo1qmvQEie4VD1glsS250n15LsWsYtCugiStS5A==} engines: {node: '>=18'} + mongodb-connection-string-url@3.0.2: + resolution: {integrity: sha512-rMO7CGo/9BFwyZABcKAWL8UJwH/Kc2x0g72uhDWzG48URRax5TCIcJ7Rc3RZqffZzO/Gwff/jyKwCU9TN8gehA==} + mongodb-connection-string-url@7.0.1: resolution: {integrity: sha512-h0AZ9A7IDVwwHyMxmdMXKy+9oNlF0zFoahHiX3vQ8e3KFcSP3VmsmfvtRSuLPxmyv2vjIDxqty8smTgie/SNRQ==} engines: {node: '>=20.19.0'} - mongodb-memory-server-core@11.0.1: - resolution: {integrity: sha512-IcIb2S9Xf7Lmz43Z1ZujMqNg7PU5Q7yn+4wOnu7l6pfeGPkEmlqzV1hIbroVx8s4vXhPB1oMGC1u8clW7aj3Xw==} - engines: {node: '>=20.19.0'} + mongodb-memory-server-core@10.4.3: + resolution: {integrity: sha512-IPjlw73IoSYopnqBibQKxmAXMbOEPf5uGAOsBcaUiNH/TOI7V19WO+K7n5KYtnQ9FqzLGLpvwCGuPOTBSg4s5Q==} + engines: {node: '>=16.20.1'} - mongodb-memory-server@11.0.1: - resolution: {integrity: sha512-nUlKovSJZBh7q5hPsewFRam9H66D08Ne18nyknkNalfXMPtK1Og3kOcuqQhcX88x/pghSZPIJHrLbxNFW3OWiw==} - engines: {node: '>=20.19.0'} + mongodb-memory-server@10.4.3: + resolution: {integrity: sha512-CDZvFisXvGIigsIw5gqH6r9NI/zxGa/uRdutgUL/isuJh+inj0YXb7Ykw6oFMFzqgTJWb7x0I5DpzrqCstBWpg==} + engines: {node: '>=16.20.1'} + + mongodb@6.21.0: + resolution: {integrity: sha512-URyb/VXMjJ4da46OeSXg+puO39XH9DeQpWCslifrRn9JWugy0D+DvvBvkm2WxmHe61O/H19JM66p1z7RHVkZ6A==} + engines: {node: '>=16.20.1'} + peerDependencies: + '@aws-sdk/credential-providers': ^3.188.0 + '@mongodb-js/zstd': ^1.1.0 || ^2.0.0 + gcp-metadata: ^5.2.0 + kerberos: ^2.0.1 + mongodb-client-encryption: '>=6.0.0 <7' + snappy: ^7.3.2 + socks: ^2.7.1 + peerDependenciesMeta: + '@aws-sdk/credential-providers': + optional: true + '@mongodb-js/zstd': + optional: true + gcp-metadata: + optional: true + kerberos: + optional: true + mongodb-client-encryption: + optional: true + snappy: + optional: true + socks: + optional: true mongodb@7.1.1: resolution: {integrity: sha512-067DXiMjcpYQl6bGjWQoTUEE9UoRViTtKFcoqX7z08I+iDZv/emH1g8XEFiO3qiDfXAheT5ozl1VffDTKhIW/w==} @@ -1179,6 +1224,10 @@ snapshots: '@types/webidl-conversions@7.0.3': {} + '@types/whatwg-url@11.0.5': + dependencies: + '@types/webidl-conversions': 7.0.3 + '@types/whatwg-url@13.0.0': dependencies: '@types/webidl-conversions': 7.0.3 @@ -1247,6 +1296,8 @@ snapshots: transitivePeerDependencies: - supports-color + bson@6.10.4: {} + bson@7.2.0: {} buffer-crc32@0.2.13: {} @@ -1545,12 +1596,17 @@ snapshots: dependencies: mime-db: 1.54.0 + mongodb-connection-string-url@3.0.2: + dependencies: + '@types/whatwg-url': 11.0.5 + whatwg-url: 14.2.0 + mongodb-connection-string-url@7.0.1: dependencies: '@types/whatwg-url': 13.0.0 whatwg-url: 14.2.0 - mongodb-memory-server-core@11.0.1: + mongodb-memory-server-core@10.4.3: dependencies: async-mutex: 0.5.0 camelcase: 6.3.0 @@ -1558,7 +1614,7 @@ snapshots: find-cache-dir: 3.3.2 follow-redirects: 1.15.11(debug@4.4.3) https-proxy-agent: 7.0.6 - mongodb: 7.1.1 + mongodb: 6.21.0 new-find-package-json: 2.0.0 semver: 7.7.4 tar-stream: 3.1.8 @@ -1577,9 +1633,9 @@ snapshots: - socks - supports-color - mongodb-memory-server@11.0.1: + mongodb-memory-server@10.4.3: dependencies: - mongodb-memory-server-core: 11.0.1 + mongodb-memory-server-core: 10.4.3 tslib: 2.8.1 transitivePeerDependencies: - '@aws-sdk/credential-providers' @@ -1594,6 +1650,12 @@ snapshots: - socks - supports-color + mongodb@6.21.0: + dependencies: + '@mongodb-js/saslprep': 1.4.6 + bson: 6.10.4 + mongodb-connection-string-url: 3.0.2 + mongodb@7.1.1: dependencies: '@mongodb-js/saslprep': 1.4.6 diff --git a/tests/test_dev_commands.py b/tests/test_dev_commands.py index ebcd9a1..2f8e4d0 100644 --- a/tests/test_dev_commands.py +++ b/tests/test_dev_commands.py @@ -17,6 +17,7 @@ class DevCommandDocsTest(unittest.TestCase): "dev-worker:", "serve-api:", "serve-web:", + "serve-worker:", "infra-up:", "infra-down:", "guardrails:", @@ -37,6 +38,7 @@ class DevCommandDocsTest(unittest.TestCase): "make infra-up", "make serve-api", "make serve-web", + "make serve-worker", ): with self.subTest(phrase=phrase): self.assertIn(phrase, readme)