From f6ca6246f9448a63cb9510b9b07afce716765a38 Mon Sep 17 00:00:00 2001 From: eust-w Date: Fri, 27 Mar 2026 03:08:21 +0800 Subject: [PATCH] :sparkles: feat: add configurable node runtime snapshots --- README.md | 2 + .../common/mongo/schemas/run-task.schema.ts | 5 + .../mongo/schemas/workflow-run.schema.ts | 1 + apps/api/src/runtime/mongo-store.ts | 138 +++++++++++++- .../api/test/runtime-http.integration.spec.ts | 174 ++++++++++++++++++ .../runs/components/run-graph-view.tsx | 3 + .../runs/components/task-log-panel.tsx | 3 + .../components/node-config-panel.tsx | 2 + .../workflows/workflow-editor-page.test.tsx | 9 + apps/web/src/runtime/app.tsx | 135 ++++++++++++++ .../src/runtime/workflow-editor-state.test.ts | 32 ++++ apps/web/src/runtime/workflow-editor-state.ts | 65 +++++++ .../worker/src/contracts/execution-context.ts | 29 +++ apps/worker/src/executors/docker-executor.ts | 13 +- apps/worker/src/executors/http-executor.ts | 32 +++- apps/worker/src/executors/python-executor.ts | 106 ++++++++++- apps/worker/src/runner/task-runner.ts | 1 + apps/worker/src/runtime/mongo-worker-store.ts | 24 ++- apps/worker/src/runtime/worker-runtime.ts | 2 + apps/worker/test/mongo-worker-runtime.spec.ts | 58 ++++++ .../03-workflows/workflow-execution-model.md | 8 + ...nformation-architecture-and-key-screens.md | 2 +- design/05-data/mongodb-data-model.md | 7 + ...26-03-26-emboflow-v1-foundation-and-mvp.md | 1 + 24 files changed, 836 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index bac8b3a..d778654 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,9 @@ The local validation path currently used for embodied data testing is: You can register that directory from the Assets page or via `POST /api/assets/register`. The workflow editor currently requires selecting at least one registered asset before a run can be created. +The editor now also persists per-node runtime config in workflow versions, including executor overrides, optional artifact title overrides, and Python code-hook source for inspect and transform style nodes. The Runs workspace now shows project-scoped run history, run-level aggregated summaries, cancel/retry controls, and run detail views with persisted task summaries, stdout/stderr sections, result previews, and artifact links into Explore. +Selected run tasks now expose the frozen node definition id, executor config snapshot, and code-hook metadata that were captured when the run was created. ## Repository Structure diff --git a/apps/api/src/common/mongo/schemas/run-task.schema.ts b/apps/api/src/common/mongo/schemas/run-task.schema.ts index f13823f..b1719c9 100644 --- a/apps/api/src/common/mongo/schemas/run-task.schema.ts +++ b/apps/api/src/common/mongo/schemas/run-task.schema.ts @@ -5,7 +5,12 @@ export const runTaskSchemaDefinition = { workflowVersionId: { type: "string", required: true }, nodeId: { type: "string", required: true }, nodeType: { type: "string", required: true }, + nodeDefinitionId: { type: "string", required: false, default: null }, executorType: { type: "string", required: true }, + executorConfig: { type: "object", required: false, default: null }, + codeHookSpec: { type: "object", required: false, default: null }, + artifactType: { type: "string", required: false, default: null }, + artifactTitle: { type: "string", required: false, default: null }, status: { type: "string", required: true }, attempt: { type: "number", required: true, default: 1 }, assetIds: { type: "array", required: true, default: [] }, diff --git a/apps/api/src/common/mongo/schemas/workflow-run.schema.ts b/apps/api/src/common/mongo/schemas/workflow-run.schema.ts index 4666398..42985e7 100644 --- a/apps/api/src/common/mongo/schemas/workflow-run.schema.ts +++ b/apps/api/src/common/mongo/schemas/workflow-run.schema.ts @@ -6,6 +6,7 @@ export const workflowRunSchemaDefinition = { status: { type: "string", required: true }, triggeredBy: { type: "string", required: true }, assetIds: { type: "array", required: true, default: [] }, + runtimeSnapshot: { type: "object", required: false, default: null }, summary: { type: "object", required: false, default: null }, startedAt: { type: "date", required: false, default: null }, finishedAt: { type: "date", required: false, default: null }, diff --git a/apps/api/src/runtime/mongo-store.ts b/apps/api/src/runtime/mongo-store.ts index 6006d7d..89503f3 100644 --- a/apps/api/src/runtime/mongo-store.ts +++ b/apps/api/src/runtime/mongo-store.ts @@ -6,8 +6,11 @@ import type { AssetType } from "../../../../packages/contracts/src/domain.ts"; import { DELIVERY_NODE_DEFINITIONS } from "../modules/plugins/builtin/delivery-nodes.ts"; import { probeLocalSourcePath } from "./local-source-probe.ts"; import type { + CodeHookSpec, + NodeRuntimeConfig, ExecutorType, RunExecutionSummary, + RunRuntimeSnapshot, TaskExecutionSummary, TaskStatusCounts, } from "../../../worker/src/contracts/execution-context.ts"; @@ -102,6 +105,7 @@ type WorkflowRunDocument = Timestamped & { status: "queued" | "running" | "success" | "failed" | "cancelled"; triggeredBy: string; assetIds: string[]; + runtimeSnapshot?: RunRuntimeSnapshot; startedAt?: string; finishedAt?: string; durationMs?: number; @@ -114,7 +118,12 @@ type RunTaskDocument = Timestamped & { workflowVersionId: string; nodeId: string; nodeType: string; + nodeDefinitionId?: string; executorType: ExecutorType; + executorConfig?: Record; + codeHookSpec?: CodeHookSpec; + artifactType?: "json" | "directory" | "video"; + artifactTitle?: string; status: "queued" | "pending" | "running" | "success" | "failed" | "cancelled"; attempt: number; assetIds: string[]; @@ -140,6 +149,12 @@ type ArtifactDocument = Timestamped & { payload: Record; }; +type WorkflowRuntimeGraph = Record & { + selectedPreset?: string; + nodeBindings?: Record; + nodeConfigs?: Record; +}; + function nowIso(): string { return new Date().toISOString(); } @@ -186,6 +201,108 @@ function buildRunExecutionSummary(tasks: RunTaskDocument[]): RunExecutionSummary }; } +function inferDefinitionId(nodeId: string) { + return nodeId.replace(/-\d+$/, ""); +} + +function isRecord(value: unknown): value is Record { + return Boolean(value) && typeof value === "object" && !Array.isArray(value); +} + +function sanitizeCodeHookSpec(value: unknown): CodeHookSpec | undefined { + if (!isRecord(value)) { + return undefined; + } + if (value.language !== "python" || typeof value.source !== "string" || value.source.trim().length === 0) { + return undefined; + } + + const entrypoint = typeof value.entrypoint === "string" && value.entrypoint.trim().length > 0 + ? value.entrypoint + : undefined; + + return { + language: "python", + entrypoint, + source: value.source, + }; +} + +function sanitizeArtifactType(value: unknown): "json" | "directory" | "video" | undefined { + return value === "json" || value === "directory" || value === "video" ? value : undefined; +} + +function sanitizeNodeRuntimeConfig(value: unknown, fallbackDefinitionId: string): NodeRuntimeConfig | undefined { + if (!isRecord(value)) { + return undefined; + } + + const executorType = value.executorType === "python" || value.executorType === "docker" || value.executorType === "http" + ? value.executorType + : undefined; + const definitionId = typeof value.definitionId === "string" && value.definitionId.trim().length > 0 + ? value.definitionId + : fallbackDefinitionId; + const executorConfig = isRecord(value.executorConfig) ? { ...value.executorConfig } : undefined; + const codeHookSpec = sanitizeCodeHookSpec(value.codeHookSpec); + const artifactType = sanitizeArtifactType(value.artifactType); + const artifactTitle = typeof value.artifactTitle === "string" && value.artifactTitle.trim().length > 0 + ? value.artifactTitle + : undefined; + + if (!executorType && !executorConfig && !codeHookSpec && !artifactType && !artifactTitle) { + return definitionId !== fallbackDefinitionId ? { definitionId } : undefined; + } + + const config: NodeRuntimeConfig = {}; + if (definitionId !== fallbackDefinitionId) { + config.definitionId = definitionId; + } + if (executorType) { + config.executorType = executorType; + } + if (executorConfig) { + config.executorConfig = executorConfig; + } + if (codeHookSpec) { + config.codeHookSpec = codeHookSpec; + } + if (artifactType) { + config.artifactType = artifactType; + } + if (artifactTitle) { + config.artifactTitle = artifactTitle; + } + + return config; +} + +function buildRuntimeSnapshot( + runtimeGraph: Record, + logicGraph: WorkflowDefinitionVersionDocument["logicGraph"], + pluginRefs: string[], +): RunRuntimeSnapshot { + const graph = runtimeGraph as WorkflowRuntimeGraph; + const nodeBindings: Record = {}; + const nodeConfigs: Record = {}; + + for (const node of logicGraph.nodes) { + const definitionId = graph.nodeBindings?.[node.id] ?? inferDefinitionId(node.id); + nodeBindings[node.id] = definitionId; + const config = sanitizeNodeRuntimeConfig(graph.nodeConfigs?.[node.id], definitionId); + if (config) { + nodeConfigs[node.id] = config; + } + } + + return { + selectedPreset: typeof graph.selectedPreset === "string" ? graph.selectedPreset : undefined, + nodeBindings, + nodeConfigs, + pluginRefs: [...pluginRefs], + }; +} + function collectRetryNodeIds(tasks: RunTaskDocument[], rootNodeId: string) { const pending = [rootNodeId]; const collected = new Set([rootNodeId]); @@ -527,6 +644,12 @@ export class MongoAppStore { throw new Error("bound assets must belong to the workflow project"); } + const runtimeSnapshot = buildRuntimeSnapshot( + version.runtimeGraph, + version.logicGraph, + version.pluginRefs, + ); + const run: WorkflowRunDocument = { _id: `run-${randomUUID()}`, workflowDefinitionId: input.workflowDefinitionId, @@ -536,18 +659,26 @@ export class MongoAppStore { status: "queued", triggeredBy: input.triggeredBy, assetIds, + runtimeSnapshot, createdAt: nowIso(), updatedAt: nowIso(), }; const targetNodes = new Set(version.logicGraph.edges.map((edge) => edge.to)); - const tasks = version.logicGraph.nodes.map((node) => ({ + const tasks = version.logicGraph.nodes.map((node) => { + const config = runtimeSnapshot.nodeConfigs?.[node.id]; + return { _id: `task-${randomUUID()}`, workflowRunId: run._id, workflowVersionId: version._id, nodeId: node.id, nodeType: node.type, - executorType: "python", + nodeDefinitionId: runtimeSnapshot.nodeBindings?.[node.id] ?? inferDefinitionId(node.id), + executorType: config?.executorType ?? "python", + executorConfig: config?.executorConfig, + codeHookSpec: config?.codeHookSpec, + artifactType: config?.artifactType, + artifactTitle: config?.artifactTitle, status: targetNodes.has(node.id) ? "pending" : "queued", attempt: 1, assetIds, @@ -558,7 +689,8 @@ export class MongoAppStore { logLines: [], createdAt: nowIso(), updatedAt: nowIso(), - })); + }; + }); run.summary = buildRunExecutionSummary(tasks); await this.db.collection("workflow_runs").insertOne(run); diff --git a/apps/api/test/runtime-http.integration.spec.ts b/apps/api/test/runtime-http.integration.spec.ts index 67f238e..1e6180b 100644 --- a/apps/api/test/runtime-http.integration.spec.ts +++ b/apps/api/test/runtime-http.integration.spec.ts @@ -234,6 +234,180 @@ test("mongo-backed runtime persists probed assets and workflow runs through the assert.equal(tasks[1]?.status, "pending"); }); +test("mongo-backed runtime snapshots per-node executor config into runs and tasks", async (t) => { + const sourceDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-runtime-snapshot-")); + await mkdir(path.join(sourceDir, "DJI_001")); + await writeFile(path.join(sourceDir, "meta.json"), "{}"); + await writeFile(path.join(sourceDir, "intrinsics.json"), "{}"); + await writeFile(path.join(sourceDir, "video_meta.json"), "{}"); + await writeFile(path.join(sourceDir, "DJI_001", "DJI_001.mp4"), ""); + + const mongod = await MongoMemoryServer.create({ + instance: { + ip: "127.0.0.1", + port: 27124, + }, + }); + t.after(async () => { + await mongod.stop(); + }); + + const server = await startRuntimeServer({ + host: "127.0.0.1", + port: 0, + mongoUri: mongod.getUri(), + database: "emboflow-runtime-snapshots", + corsOrigin: "http://127.0.0.1:3000", + }); + t.after(async () => { + await server.close(); + }); + + const bootstrap = await readJson<{ + workspace: { _id: string }; + project: { _id: string }; + }>( + await fetch(`${server.baseUrl}/api/dev/bootstrap`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ userId: "snapshot-user", projectName: "Snapshot Project" }), + }), + ); + + const asset = await readJson<{ _id: string }>( + await fetch(`${server.baseUrl}/api/assets/register`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + workspaceId: bootstrap.workspace._id, + projectId: bootstrap.project._id, + sourcePath: sourceDir, + }), + }), + ); + await readJson(await fetch(`${server.baseUrl}/api/assets/${asset._id}/probe`, { method: "POST" })); + + const workflow = await readJson<{ _id: string }>( + await fetch(`${server.baseUrl}/api/workflows`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + workspaceId: bootstrap.workspace._id, + projectId: bootstrap.project._id, + name: "Snapshot Flow", + }), + }), + ); + + const version = await readJson<{ _id: string }>( + await fetch(`${server.baseUrl}/api/workflows/${workflow._id}/versions`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + visualGraph: { viewport: { x: 0, y: 0, zoom: 1 } }, + logicGraph: { + nodes: [ + { id: "source-asset", type: "source" }, + { id: "validate-structure", type: "inspect" }, + { id: "export-delivery-package", type: "export" }, + ], + edges: [ + { from: "source-asset", to: "validate-structure" }, + { from: "validate-structure", to: "export-delivery-package" }, + ], + }, + runtimeGraph: { + selectedPreset: "delivery-normalization", + nodeConfigs: { + "source-asset": { + executorType: "docker", + executorConfig: { + image: "python:3.11", + command: ["python", "-V"], + }, + }, + "validate-structure": { + executorType: "python", + codeHookSpec: { + language: "python", + entrypoint: "process", + source: [ + "def process(task, context):", + " return {'nodeId': task['nodeId'], 'hooked': True}", + ].join("\n"), + }, + }, + "export-delivery-package": { + executorType: "http", + executorConfig: { + url: "http://127.0.0.1:3010/mock-executor", + method: "POST", + }, + }, + }, + }, + pluginRefs: ["builtin:delivery-nodes"], + }), + }), + ); + + const createdRun = await readJson<{ _id: string }>( + await fetch(`${server.baseUrl}/api/runs`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + workflowDefinitionId: workflow._id, + workflowVersionId: version._id, + assetIds: [asset._id], + }), + }), + ); + + const run = await readJson<{ + _id: string; + runtimeSnapshot?: { + selectedPreset?: string; + nodeConfigs?: Record< + string, + { + executorType?: string; + executorConfig?: { image?: string; url?: string }; + codeHookSpec?: { source?: string }; + } + >; + }; + }>(await fetch(`${server.baseUrl}/api/runs/${createdRun._id}`)); + const tasks = await readJson< + Array<{ + nodeId: string; + executorType: string; + executorConfig?: { image?: string; url?: string }; + codeHookSpec?: { source?: string }; + }> + >(await fetch(`${server.baseUrl}/api/runs/${createdRun._id}/tasks`)); + + const sourceTask = tasks.find((task) => task.nodeId === "source-asset"); + const validateTask = tasks.find((task) => task.nodeId === "validate-structure"); + const exportTask = tasks.find((task) => task.nodeId === "export-delivery-package"); + + assert.equal(run.runtimeSnapshot?.selectedPreset, "delivery-normalization"); + assert.equal(run.runtimeSnapshot?.nodeConfigs?.["source-asset"]?.executorType, "docker"); + assert.equal( + run.runtimeSnapshot?.nodeConfigs?.["source-asset"]?.executorConfig?.image, + "python:3.11", + ); + assert.match( + run.runtimeSnapshot?.nodeConfigs?.["validate-structure"]?.codeHookSpec?.source ?? "", + /def process/, + ); + assert.equal(sourceTask?.executorType, "docker"); + assert.equal(sourceTask?.executorConfig?.image, "python:3.11"); + assert.equal(validateTask?.executorType, "python"); + assert.match(validateTask?.codeHookSpec?.source ?? "", /hooked/); + assert.equal(exportTask?.executorType, "http"); + assert.equal(exportTask?.executorConfig?.url, "http://127.0.0.1:3010/mock-executor"); +}); + test("mongo-backed runtime rejects workflow runs without bound assets", async (t) => { const mongod = await MongoMemoryServer.create({ instance: { diff --git a/apps/web/src/features/runs/components/run-graph-view.tsx b/apps/web/src/features/runs/components/run-graph-view.tsx index c62bdb5..67704ea 100644 --- a/apps/web/src/features/runs/components/run-graph-view.tsx +++ b/apps/web/src/features/runs/components/run-graph-view.tsx @@ -2,6 +2,7 @@ export type RunTaskView = { id: string; nodeId: string; nodeName: string; + nodeDefinitionId?: string; status: string; assetIds?: string[]; artifactIds?: string[]; @@ -10,6 +11,8 @@ export type RunTaskView = { errorMessage?: string; stdoutLines?: string[]; stderrLines?: string[]; + codeHookLabel?: string; + executorConfigLabel?: string; canRetry?: boolean; logLines: string[]; }; diff --git a/apps/web/src/features/runs/components/task-log-panel.tsx b/apps/web/src/features/runs/components/task-log-panel.tsx index d5ca857..ad91ca8 100644 --- a/apps/web/src/features/runs/components/task-log-panel.tsx +++ b/apps/web/src/features/runs/components/task-log-panel.tsx @@ -14,10 +14,13 @@ export function renderTaskLogPanel(