diff --git a/README.md b/README.md index 0fe6b19..788053f 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,7 @@ The editor now also persists per-node runtime config in workflow versions, inclu The Runs workspace now shows project-scoped run history, run-level aggregated summaries, cancel/retry controls, and run detail views with persisted task summaries, stdout/stderr sections, result previews, and artifact links into Explore. Selected run tasks now expose the frozen node definition id, executor config snapshot, and code-hook metadata that were captured when the run was created. When a node uses `executorType=docker` and provides `executorConfig.image`, the worker now runs a real local Docker container with mounted `input.json` / `output.json` exchange files. If no image is configured, the executor falls back to the lightweight simulated behavior used by older demo tasks. +When a node uses the built-in Python path without a custom hook, `source-asset` now emits bound asset metadata from Mongo-backed asset records and `validate-structure` now performs a real directory validation pass against local source paths. On the current sample path `/Users/longtaowu/workspace/emboldata/data`, that validation reports `valid=false`, `videoFileCount=407`, and missing delivery files because the sample root is a mixed dataset collection rather than a delivery package. ## Repository Structure diff --git a/apps/worker/src/contracts/execution-context.ts b/apps/worker/src/contracts/execution-context.ts index 87dc1bd..fe01a7f 100644 --- a/apps/worker/src/contracts/execution-context.ts +++ b/apps/worker/src/contracts/execution-context.ts @@ -86,11 +86,21 @@ export type TaskRecord = { lastResultPreview?: Record; }; +export type ExecutionAsset = { + id: string; + displayName: string; + sourcePath?: string; + topLevelPaths?: string[]; + detectedFormats?: string[]; + summary?: Record; +}; + export type ExecutionContext = { taskId: string; workflowRunId?: string; workflowVersionId?: string; nodeId: string; assetIds?: string[]; + assets?: ExecutionAsset[]; nodeDefinitionId?: string; }; diff --git a/apps/worker/src/executors/python-executor.ts b/apps/worker/src/executors/python-executor.ts index b648ce4..b439a93 100644 --- a/apps/worker/src/executors/python-executor.ts +++ b/apps/worker/src/executors/python-executor.ts @@ -1,9 +1,10 @@ import { spawn } from "node:child_process"; -import { mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import { mkdtemp, readFile, readdir, rm, stat, writeFile } from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import type { + ExecutionAsset, ExecutionContext, ExecutorExecutionResult, TaskRecord, @@ -46,67 +47,163 @@ function createPythonHarness() { ].join("\n"); } +const REQUIRED_DELIVERY_FILES = ["meta.json", "intrinsics.json", "video_meta.json"] as const; + +function getEffectiveNodeDefinitionId(task: TaskRecord) { + return task.nodeDefinitionId ?? task.nodeId; +} + +function createBuiltinSourceResult(context: ExecutionContext): ExecutorExecutionResult { + const assets = context.assets ?? []; + return { + result: { + assetCount: assets.length, + assets: assets.map((asset) => ({ + id: asset.id, + displayName: asset.displayName, + sourcePath: asset.sourcePath, + detectedFormats: asset.detectedFormats ?? [], + topLevelPaths: asset.topLevelPaths ?? [], + summary: asset.summary ?? {}, + })), + }, + stdoutLines: [`loaded ${assets.length} bound asset${assets.length === 1 ? "" : "s"}`], + stderrLines: [], + }; +} + +async function countVideoFiles(sourcePath: string): Promise { + const current = await stat(sourcePath); + if (current.isFile()) { + return sourcePath.toLowerCase().endsWith(".mp4") ? 1 : 0; + } + + if (!current.isDirectory()) { + return 0; + } + + let count = 0; + const entries = await readdir(sourcePath, { withFileTypes: true }); + for (const entry of entries) { + const entryPath = path.join(sourcePath, entry.name); + if (entry.isDirectory()) { + count += await countVideoFiles(entryPath); + continue; + } + if (entry.isFile() && entry.name.toLowerCase().endsWith(".mp4")) { + count += 1; + } + } + + return count; +} + +async function summarizeAssetValidation(asset: ExecutionAsset) { + const topLevelPaths = asset.topLevelPaths ?? []; + const missingRequiredFiles = REQUIRED_DELIVERY_FILES.filter((required) => !topLevelPaths.includes(required)); + const videoFileCount = asset.sourcePath ? await countVideoFiles(asset.sourcePath) : 0; + + return { + id: asset.id, + displayName: asset.displayName, + sourcePath: asset.sourcePath, + detectedFormats: asset.detectedFormats ?? [], + valid: missingRequiredFiles.length === 0 && videoFileCount > 0, + missingRequiredFiles, + videoFileCount, + }; +} + +async function createBuiltinValidateResult(context: ExecutionContext): Promise { + const assets = context.assets ?? []; + const assetSummaries = await Promise.all(assets.map((asset) => summarizeAssetValidation(asset))); + const missingRequiredFiles = Array.from( + new Set(assetSummaries.flatMap((asset) => asset.missingRequiredFiles)), + ); + const videoFileCount = assetSummaries.reduce((total, asset) => total + asset.videoFileCount, 0); + const valid = assetSummaries.length > 0 && assetSummaries.every((asset) => asset.valid); + + return { + result: { + assetCount: assets.length, + valid, + requiredFiles: [...REQUIRED_DELIVERY_FILES], + videoFileCount, + ...(missingRequiredFiles.length > 0 ? { missingRequiredFiles } : {}), + }, + stdoutLines: [`validated ${assets.length} asset${assets.length === 1 ? "" : "s"}`], + stderrLines: [], + }; +} + export class PythonExecutor { executionCount = 0; async execute(task: TaskRecord, context: ExecutionContext): Promise { this.executionCount += 1; - if (!task.codeHookSpec?.source) { - return createDefaultResult(task); - } + if (task.codeHookSpec?.source) { + const tempDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-python-executor-")); + const inputPath = path.join(tempDir, "input.json"); + const resultPath = path.join(tempDir, "result.json"); + const runnerPath = path.join(tempDir, "runner.py"); - const tempDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-python-executor-")); - const inputPath = path.join(tempDir, "input.json"); - const resultPath = path.join(tempDir, "result.json"); - const runnerPath = path.join(tempDir, "runner.py"); + await writeFile( + inputPath, + JSON.stringify({ + task, + context, + entrypoint: task.codeHookSpec.entrypoint ?? "process", + source: task.codeHookSpec.source, + resultPath, + }), + ); + await writeFile(runnerPath, createPythonHarness()); - await writeFile( - inputPath, - JSON.stringify({ - task, - context, - entrypoint: task.codeHookSpec.entrypoint ?? "process", - source: task.codeHookSpec.source, - resultPath, - }), - ); - await writeFile(runnerPath, createPythonHarness()); + const child = spawn("python3", [runnerPath, inputPath], { + stdio: ["ignore", "pipe", "pipe"], + }); - const child = spawn("python3", [runnerPath, inputPath], { - stdio: ["ignore", "pipe", "pipe"], - }); + let stdout = ""; + let stderr = ""; + child.stdout.on("data", (chunk) => { + stdout += String(chunk); + }); + child.stderr.on("data", (chunk) => { + stderr += String(chunk); + }); - let stdout = ""; - let stderr = ""; - child.stdout.on("data", (chunk) => { - stdout += String(chunk); - }); - child.stderr.on("data", (chunk) => { - stderr += String(chunk); - }); + const exitCode = await new Promise((resolve, reject) => { + child.on("error", reject); + child.on("close", (code) => resolve(code ?? 1)); + }); - const exitCode = await new Promise((resolve, reject) => { - child.on("error", reject); - child.on("close", (code) => resolve(code ?? 1)); - }); + try { + if (exitCode !== 0) { + throw Object.assign(new Error(`python executor failed with exit code ${exitCode}`), { + stdoutLines: splitOutputLines(stdout), + stderrLines: splitOutputLines(stderr), + }); + } - try { - if (exitCode !== 0) { - throw Object.assign(new Error(`python executor failed with exit code ${exitCode}`), { + const resultPayload = JSON.parse(await readFile(resultPath, "utf8")) as { result?: unknown }; + return { + result: resultPayload.result, stdoutLines: splitOutputLines(stdout), stderrLines: splitOutputLines(stderr), - }); + }; + } finally { + await rm(tempDir, { recursive: true, force: true }); } + } - const resultPayload = JSON.parse(await readFile(resultPath, "utf8")) as { result?: unknown }; - return { - result: resultPayload.result, - stdoutLines: splitOutputLines(stdout), - stderrLines: splitOutputLines(stderr), - }; - } finally { - await rm(tempDir, { recursive: true, force: true }); + switch (getEffectiveNodeDefinitionId(task)) { + case "source-asset": + return createBuiltinSourceResult(context); + case "validate-structure": + return createBuiltinValidateResult(context); + default: + return createDefaultResult(task); } } } diff --git a/apps/worker/src/runtime/mongo-worker-store.ts b/apps/worker/src/runtime/mongo-worker-store.ts index 16d6432..10f8fe4 100644 --- a/apps/worker/src/runtime/mongo-worker-store.ts +++ b/apps/worker/src/runtime/mongo-worker-store.ts @@ -4,6 +4,7 @@ import type { Db } from "mongodb"; import type { CodeHookSpec, + ExecutionAsset, ExecutorType, NodeRuntimeConfig, RunExecutionSummary, @@ -74,6 +75,15 @@ type RunTaskDocument = { updatedAt: string; }; +type AssetDocument = { + _id: string; + displayName?: string; + sourcePath?: string; + topLevelPaths?: string[]; + detectedFormats?: string[]; + summary?: Record; +}; + function nowIso() { return new Date().toISOString(); } @@ -256,6 +266,34 @@ export class MongoWorkerStore { .findOne({ _id: workflowVersionId }); } + async getAssetsByIds(assetIds: string[]): Promise { + if (assetIds.length === 0) { + return []; + } + + const assets = await this.db + .collection("assets") + .find({ _id: { $in: assetIds } }) + .toArray(); + const assetMap = new Map( + assets.map((asset) => [ + asset._id, + { + id: asset._id, + displayName: asset.displayName ?? asset._id, + sourcePath: asset.sourcePath, + topLevelPaths: asset.topLevelPaths ?? [], + detectedFormats: asset.detectedFormats ?? [], + summary: asset.summary ?? {}, + } satisfies ExecutionAsset, + ]), + ); + + return assetIds + .map((assetId) => assetMap.get(assetId)) + .filter((asset): asset is ExecutionAsset => Boolean(asset)); + } + async createTaskArtifact(task: TaskRecord, payload: Record) { const artifact = { _id: `artifact-${randomUUID()}`, diff --git a/apps/worker/src/runtime/worker-runtime.ts b/apps/worker/src/runtime/worker-runtime.ts index e191bd7..2106e2e 100644 --- a/apps/worker/src/runtime/worker-runtime.ts +++ b/apps/worker/src/runtime/worker-runtime.ts @@ -38,6 +38,7 @@ export class WorkerRuntime { return undefined; } const startedAt = task.startedAt ?? new Date().toISOString(); + const assets = await this.store.getAssetsByIds(task.assetIds ?? []); const context: ExecutionContext = { taskId: task.id, @@ -46,6 +47,7 @@ export class WorkerRuntime { nodeId: task.nodeId, nodeDefinitionId: task.nodeDefinitionId, assetIds: task.assetIds, + assets, }; try { diff --git a/apps/worker/test/mongo-worker-runtime.spec.ts b/apps/worker/test/mongo-worker-runtime.spec.ts index c0d014b..1fe4a5b 100644 --- a/apps/worker/test/mongo-worker-runtime.spec.ts +++ b/apps/worker/test/mongo-worker-runtime.spec.ts @@ -1,6 +1,9 @@ import test from "node:test"; import assert from "node:assert/strict"; import { spawnSync } from "node:child_process"; +import { mkdtemp, mkdir, writeFile } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; import { MongoMemoryServer } from "mongodb-memory-server"; import { MongoClient } from "mongodb"; @@ -580,3 +583,158 @@ test("worker executes a queued docker task inside a real container", { hooked: true, }); }); + +test("worker loads bound asset metadata into the execution context for built-in source nodes", async (t) => { + let capturedContext: ExecutionContext | null = null; + const fixture = await createRuntimeFixture("emboflow-worker-source-context", { + executors: { + python: { + async execute(_task: TaskRecord, context: ExecutionContext) { + capturedContext = context; + return { + result: { assetCount: context.assets?.length ?? 0 }, + stdoutLines: ["custom source executor"], + stderrLines: [], + }; + }, + }, + }, + }); + t.after(async () => { + await fixture.close(); + }); + + await fixture.db.collection("assets").insertOne({ + _id: "asset-context-1", + workspaceId: "workspace-1", + projectId: "project-1", + type: "folder", + sourceType: "registered_path", + displayName: "Sample Asset", + sourcePath: "/tmp/sample-asset", + status: "probed", + storageRef: {}, + topLevelPaths: ["meta.json"], + detectedFormats: ["delivery_package"], + fileCount: 4, + summary: { kind: "delivery_package" }, + createdBy: "local-user", + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }); + + await fixture.db.collection("workflow_runs").insertOne({ + _id: "run-source-context", + workflowDefinitionId: "workflow-source-context", + workflowVersionId: "workflow-source-context-v1", + status: "queued", + triggeredBy: "local-user", + assetIds: ["asset-context-1"], + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }); + + await fixture.db.collection("run_tasks").insertOne({ + _id: "task-source-context", + workflowRunId: "run-source-context", + workflowVersionId: "workflow-source-context-v1", + nodeId: "source-asset", + nodeDefinitionId: "source-asset", + nodeType: "source", + executorType: "python", + status: "queued", + attempt: 1, + assetIds: ["asset-context-1"], + upstreamNodeIds: [], + outputArtifactIds: [], + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }); + + await fixture.runtime.runNextTask(); + + assert.equal(capturedContext?.assets?.[0]?.id, "asset-context-1"); + assert.equal(capturedContext?.assets?.[0]?.displayName, "Sample Asset"); + assert.equal(capturedContext?.assets?.[0]?.sourcePath, "/tmp/sample-asset"); +}); + +test("worker validates delivery structure against the bound asset path for validate-structure", async (t) => { + const sourceDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-worker-validate-")); + await mkdir(path.join(sourceDir, "DJI_001")); + await writeFile(path.join(sourceDir, "meta.json"), "{}"); + await writeFile(path.join(sourceDir, "intrinsics.json"), "{}"); + await writeFile(path.join(sourceDir, "video_meta.json"), "{}"); + await writeFile(path.join(sourceDir, "DJI_001", "DJI_001.mp4"), ""); + + const fixture = await createRuntimeFixture("emboflow-worker-validate-structure"); + t.after(async () => { + await fixture.close(); + }); + + await fixture.db.collection("assets").insertOne({ + _id: "asset-validate-1", + workspaceId: "workspace-1", + projectId: "project-1", + type: "folder", + sourceType: "registered_path", + displayName: "Validation Asset", + sourcePath: sourceDir, + status: "probed", + storageRef: {}, + topLevelPaths: ["DJI_001", "meta.json", "intrinsics.json", "video_meta.json"], + detectedFormats: ["delivery_package"], + fileCount: 4, + summary: { kind: "delivery_package" }, + createdBy: "local-user", + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }); + + await fixture.db.collection("workflow_runs").insertOne({ + _id: "run-validate-structure", + workflowDefinitionId: "workflow-validate-structure", + workflowVersionId: "workflow-validate-structure-v1", + status: "queued", + triggeredBy: "local-user", + assetIds: ["asset-validate-1"], + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }); + + await fixture.db.collection("run_tasks").insertOne({ + _id: "task-validate-structure", + workflowRunId: "run-validate-structure", + workflowVersionId: "workflow-validate-structure-v1", + nodeId: "validate-structure", + nodeDefinitionId: "validate-structure", + nodeType: "inspect", + executorType: "python", + status: "queued", + attempt: 1, + assetIds: ["asset-validate-1"], + upstreamNodeIds: [], + outputArtifactIds: [], + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }); + + await fixture.runtime.runNextTask(); + + const task = await fixture.store.getRunTask("task-validate-structure"); + const artifact = await fixture.db.collection("artifacts").findOne({ producerId: "task-validate-structure" }); + + assert.equal(task?.status, "success"); + assert.match(task?.stdoutLines?.[0] ?? "", /validated 1 asset/i); + assert.deepEqual(task?.lastResultPreview, { + assetCount: 1, + valid: true, + requiredFiles: ["meta.json", "intrinsics.json", "video_meta.json"], + videoFileCount: 1, + }); + assert.deepEqual((artifact?.payload as { result?: Record } | undefined)?.result, { + assetCount: 1, + valid: true, + requiredFiles: ["meta.json", "intrinsics.json", "video_meta.json"], + videoFileCount: 1, + }); +}); diff --git a/design/03-workflows/workflow-execution-model.md b/design/03-workflows/workflow-execution-model.md index 5789d29..5e8fe55 100644 --- a/design/03-workflows/workflow-execution-model.md +++ b/design/03-workflows/workflow-execution-model.md @@ -259,6 +259,8 @@ Each task receives a normalized execution context containing: - workflow run id - task id - actor id +- bound asset ids +- bound asset metadata summary, including display name, detected formats, top-level paths, and local source path when available - node config - code hook content - input references @@ -316,6 +318,15 @@ The worker-backed runtime now persists task execution summaries directly on `run This makes the run detail view stable even when artifacts are large or delayed and keeps task-level observability queryable without reopening every artifact payload. +The current built-in Python path now also has first-pass node semantics for two delivery-focused nodes when no custom code hook is present: + +- `source-asset` + Emits a normalized summary of the bound assets from Mongo-backed asset metadata, so downstream nodes and operators see concrete display names, detected formats, top-level paths, and local source paths instead of only opaque asset ids. +- `validate-structure` + Inspects the bound asset source paths, checks the delivery-required files `meta.json`, `intrinsics.json`, and `video_meta.json`, recursively counts `.mp4` files, and emits a stable validation summary with `valid`, `requiredFiles`, `missingRequiredFiles`, and `videoFileCount`. + +This replaces the earlier placeholder `"python executor processed ..."` behavior for those built-in nodes and makes the default worker output useful even before custom hooks are authored. + The current runtime also aggregates execution state back onto `workflow_runs`. Each refresh computes: - run-level `startedAt` and `finishedAt` diff --git a/docs/plans/2026-03-26-emboflow-v1-foundation-and-mvp.md b/docs/plans/2026-03-26-emboflow-v1-foundation-and-mvp.md index 81469e5..3ada1d3 100644 --- a/docs/plans/2026-03-26-emboflow-v1-foundation-and-mvp.md +++ b/docs/plans/2026-03-26-emboflow-v1-foundation-and-mvp.md @@ -25,6 +25,7 @@ - `2026-03-27`: The current run-control pass adds run cancellation, run retry from immutable snapshots, and task retry for failed/cancelled nodes with downstream reset semantics. - `2026-03-27`: The current runtime-config pass freezes per-node executor config into `workflow_runs` and `run_tasks`, exposes runtime editing controls in the React workflow editor, and executes trusted-local Python code hooks from the task snapshot. - `2026-03-27`: The current Docker-runtime pass upgrades `executorType=docker` from a pure stub to a real local container execution path whenever `executorConfig.image` is provided, while retaining a compatibility fallback for older demo tasks without an image. +- `2026-03-27`: The current built-in-node pass enriches the worker execution context with bound asset metadata and gives the default Python implementations for `source-asset` and `validate-structure` real delivery-oriented behavior instead of placeholder output. --- diff --git a/scripts/check_doc_code_sync.py b/scripts/check_doc_code_sync.py index 8502c68..8b50952 100755 --- a/scripts/check_doc_code_sync.py +++ b/scripts/check_doc_code_sync.py @@ -54,10 +54,10 @@ def classify(path_text: str) -> str: lower = path_text.lower() path = Path(path_text) - if any(token in lower for token in DOC_PATTERNS) or path.suffix == ".md": - return "docs" if any(token in lower for token in TEST_HINTS): return "tests" + if any(token in lower for token in DOC_PATTERNS) or path.suffix == ".md": + return "docs" if any(token in lower for token in CODE_HINTS): return "code" if path.suffix in CODE_SUFFIXES: diff --git a/tests/test_doc_code_sync.py b/tests/test_doc_code_sync.py index 56c071f..19bfb88 100644 --- a/tests/test_doc_code_sync.py +++ b/tests/test_doc_code_sync.py @@ -25,6 +25,18 @@ class DocCodeSyncAssessmentTests(unittest.TestCase): def test_classifies_app_paths_as_code(self): self.assertEqual(MODULE.classify("apps/web/package.json"), "code") + def test_classifies_spec_files_under_test_directories_as_tests(self): + self.assertEqual( + MODULE.classify("apps/worker/test/mongo-worker-runtime.spec.ts"), + "tests", + ) + + def test_classifies_markdown_plan_files_as_docs(self): + self.assertEqual( + MODULE.classify("docs/plans/2026-03-26-emboflow-v1-foundation-and-mvp.md"), + "docs", + ) + def test_classifies_env_example_as_config(self): self.assertEqual(MODULE.classify(".env.example"), "config")