feat: add built-in asset context and validation semantics

This commit is contained in:
eust-w 2026-03-27 11:19:57 +08:00
parent cb462464e6
commit 1cf1f45690
10 changed files with 378 additions and 48 deletions

View File

@ -70,6 +70,7 @@ The editor now also persists per-node runtime config in workflow versions, inclu
The Runs workspace now shows project-scoped run history, run-level aggregated summaries, cancel/retry controls, and run detail views with persisted task summaries, stdout/stderr sections, result previews, and artifact links into Explore.
Selected run tasks now expose the frozen node definition id, executor config snapshot, and code-hook metadata that were captured when the run was created.
When a node uses `executorType=docker` and provides `executorConfig.image`, the worker now runs a real local Docker container with mounted `input.json` / `output.json` exchange files. If no image is configured, the executor falls back to the lightweight simulated behavior used by older demo tasks.
When a node uses the built-in Python path without a custom hook, `source-asset` now emits bound asset metadata from Mongo-backed asset records and `validate-structure` now performs a real directory validation pass against local source paths. On the current sample path `/Users/longtaowu/workspace/emboldata/data`, that validation reports `valid=false`, `videoFileCount=407`, and missing delivery files because the sample root is a mixed dataset collection rather than a delivery package.
## Repository Structure

View File

@ -86,11 +86,21 @@ export type TaskRecord = {
lastResultPreview?: Record<string, unknown>;
};
export type ExecutionAsset = {
id: string;
displayName: string;
sourcePath?: string;
topLevelPaths?: string[];
detectedFormats?: string[];
summary?: Record<string, unknown>;
};
export type ExecutionContext = {
taskId: string;
workflowRunId?: string;
workflowVersionId?: string;
nodeId: string;
assetIds?: string[];
assets?: ExecutionAsset[];
nodeDefinitionId?: string;
};

View File

@ -1,9 +1,10 @@
import { spawn } from "node:child_process";
import { mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { mkdtemp, readFile, readdir, rm, stat, writeFile } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import type {
ExecutionAsset,
ExecutionContext,
ExecutorExecutionResult,
TaskRecord,
@ -46,67 +47,163 @@ function createPythonHarness() {
].join("\n");
}
const REQUIRED_DELIVERY_FILES = ["meta.json", "intrinsics.json", "video_meta.json"] as const;
function getEffectiveNodeDefinitionId(task: TaskRecord) {
return task.nodeDefinitionId ?? task.nodeId;
}
function createBuiltinSourceResult(context: ExecutionContext): ExecutorExecutionResult {
const assets = context.assets ?? [];
return {
result: {
assetCount: assets.length,
assets: assets.map((asset) => ({
id: asset.id,
displayName: asset.displayName,
sourcePath: asset.sourcePath,
detectedFormats: asset.detectedFormats ?? [],
topLevelPaths: asset.topLevelPaths ?? [],
summary: asset.summary ?? {},
})),
},
stdoutLines: [`loaded ${assets.length} bound asset${assets.length === 1 ? "" : "s"}`],
stderrLines: [],
};
}
async function countVideoFiles(sourcePath: string): Promise<number> {
const current = await stat(sourcePath);
if (current.isFile()) {
return sourcePath.toLowerCase().endsWith(".mp4") ? 1 : 0;
}
if (!current.isDirectory()) {
return 0;
}
let count = 0;
const entries = await readdir(sourcePath, { withFileTypes: true });
for (const entry of entries) {
const entryPath = path.join(sourcePath, entry.name);
if (entry.isDirectory()) {
count += await countVideoFiles(entryPath);
continue;
}
if (entry.isFile() && entry.name.toLowerCase().endsWith(".mp4")) {
count += 1;
}
}
return count;
}
async function summarizeAssetValidation(asset: ExecutionAsset) {
const topLevelPaths = asset.topLevelPaths ?? [];
const missingRequiredFiles = REQUIRED_DELIVERY_FILES.filter((required) => !topLevelPaths.includes(required));
const videoFileCount = asset.sourcePath ? await countVideoFiles(asset.sourcePath) : 0;
return {
id: asset.id,
displayName: asset.displayName,
sourcePath: asset.sourcePath,
detectedFormats: asset.detectedFormats ?? [],
valid: missingRequiredFiles.length === 0 && videoFileCount > 0,
missingRequiredFiles,
videoFileCount,
};
}
async function createBuiltinValidateResult(context: ExecutionContext): Promise<ExecutorExecutionResult> {
const assets = context.assets ?? [];
const assetSummaries = await Promise.all(assets.map((asset) => summarizeAssetValidation(asset)));
const missingRequiredFiles = Array.from(
new Set(assetSummaries.flatMap((asset) => asset.missingRequiredFiles)),
);
const videoFileCount = assetSummaries.reduce((total, asset) => total + asset.videoFileCount, 0);
const valid = assetSummaries.length > 0 && assetSummaries.every((asset) => asset.valid);
return {
result: {
assetCount: assets.length,
valid,
requiredFiles: [...REQUIRED_DELIVERY_FILES],
videoFileCount,
...(missingRequiredFiles.length > 0 ? { missingRequiredFiles } : {}),
},
stdoutLines: [`validated ${assets.length} asset${assets.length === 1 ? "" : "s"}`],
stderrLines: [],
};
}
export class PythonExecutor {
executionCount = 0;
async execute(task: TaskRecord, context: ExecutionContext): Promise<ExecutorExecutionResult> {
this.executionCount += 1;
if (!task.codeHookSpec?.source) {
return createDefaultResult(task);
}
if (task.codeHookSpec?.source) {
const tempDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-python-executor-"));
const inputPath = path.join(tempDir, "input.json");
const resultPath = path.join(tempDir, "result.json");
const runnerPath = path.join(tempDir, "runner.py");
const tempDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-python-executor-"));
const inputPath = path.join(tempDir, "input.json");
const resultPath = path.join(tempDir, "result.json");
const runnerPath = path.join(tempDir, "runner.py");
await writeFile(
inputPath,
JSON.stringify({
task,
context,
entrypoint: task.codeHookSpec.entrypoint ?? "process",
source: task.codeHookSpec.source,
resultPath,
}),
);
await writeFile(runnerPath, createPythonHarness());
await writeFile(
inputPath,
JSON.stringify({
task,
context,
entrypoint: task.codeHookSpec.entrypoint ?? "process",
source: task.codeHookSpec.source,
resultPath,
}),
);
await writeFile(runnerPath, createPythonHarness());
const child = spawn("python3", [runnerPath, inputPath], {
stdio: ["ignore", "pipe", "pipe"],
});
const child = spawn("python3", [runnerPath, inputPath], {
stdio: ["ignore", "pipe", "pipe"],
});
let stdout = "";
let stderr = "";
child.stdout.on("data", (chunk) => {
stdout += String(chunk);
});
child.stderr.on("data", (chunk) => {
stderr += String(chunk);
});
let stdout = "";
let stderr = "";
child.stdout.on("data", (chunk) => {
stdout += String(chunk);
});
child.stderr.on("data", (chunk) => {
stderr += String(chunk);
});
const exitCode = await new Promise<number>((resolve, reject) => {
child.on("error", reject);
child.on("close", (code) => resolve(code ?? 1));
});
const exitCode = await new Promise<number>((resolve, reject) => {
child.on("error", reject);
child.on("close", (code) => resolve(code ?? 1));
});
try {
if (exitCode !== 0) {
throw Object.assign(new Error(`python executor failed with exit code ${exitCode}`), {
stdoutLines: splitOutputLines(stdout),
stderrLines: splitOutputLines(stderr),
});
}
try {
if (exitCode !== 0) {
throw Object.assign(new Error(`python executor failed with exit code ${exitCode}`), {
const resultPayload = JSON.parse(await readFile(resultPath, "utf8")) as { result?: unknown };
return {
result: resultPayload.result,
stdoutLines: splitOutputLines(stdout),
stderrLines: splitOutputLines(stderr),
});
};
} finally {
await rm(tempDir, { recursive: true, force: true });
}
}
const resultPayload = JSON.parse(await readFile(resultPath, "utf8")) as { result?: unknown };
return {
result: resultPayload.result,
stdoutLines: splitOutputLines(stdout),
stderrLines: splitOutputLines(stderr),
};
} finally {
await rm(tempDir, { recursive: true, force: true });
switch (getEffectiveNodeDefinitionId(task)) {
case "source-asset":
return createBuiltinSourceResult(context);
case "validate-structure":
return createBuiltinValidateResult(context);
default:
return createDefaultResult(task);
}
}
}

View File

@ -4,6 +4,7 @@ import type { Db } from "mongodb";
import type {
CodeHookSpec,
ExecutionAsset,
ExecutorType,
NodeRuntimeConfig,
RunExecutionSummary,
@ -74,6 +75,15 @@ type RunTaskDocument = {
updatedAt: string;
};
type AssetDocument = {
_id: string;
displayName?: string;
sourcePath?: string;
topLevelPaths?: string[];
detectedFormats?: string[];
summary?: Record<string, unknown>;
};
function nowIso() {
return new Date().toISOString();
}
@ -256,6 +266,34 @@ export class MongoWorkerStore {
.findOne({ _id: workflowVersionId });
}
async getAssetsByIds(assetIds: string[]): Promise<ExecutionAsset[]> {
if (assetIds.length === 0) {
return [];
}
const assets = await this.db
.collection<AssetDocument>("assets")
.find({ _id: { $in: assetIds } })
.toArray();
const assetMap = new Map(
assets.map((asset) => [
asset._id,
{
id: asset._id,
displayName: asset.displayName ?? asset._id,
sourcePath: asset.sourcePath,
topLevelPaths: asset.topLevelPaths ?? [],
detectedFormats: asset.detectedFormats ?? [],
summary: asset.summary ?? {},
} satisfies ExecutionAsset,
]),
);
return assetIds
.map((assetId) => assetMap.get(assetId))
.filter((asset): asset is ExecutionAsset => Boolean(asset));
}
async createTaskArtifact(task: TaskRecord, payload: Record<string, unknown>) {
const artifact = {
_id: `artifact-${randomUUID()}`,

View File

@ -38,6 +38,7 @@ export class WorkerRuntime {
return undefined;
}
const startedAt = task.startedAt ?? new Date().toISOString();
const assets = await this.store.getAssetsByIds(task.assetIds ?? []);
const context: ExecutionContext = {
taskId: task.id,
@ -46,6 +47,7 @@ export class WorkerRuntime {
nodeId: task.nodeId,
nodeDefinitionId: task.nodeDefinitionId,
assetIds: task.assetIds,
assets,
};
try {

View File

@ -1,6 +1,9 @@
import test from "node:test";
import assert from "node:assert/strict";
import { spawnSync } from "node:child_process";
import { mkdtemp, mkdir, writeFile } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { MongoMemoryServer } from "mongodb-memory-server";
import { MongoClient } from "mongodb";
@ -580,3 +583,158 @@ test("worker executes a queued docker task inside a real container", {
hooked: true,
});
});
test("worker loads bound asset metadata into the execution context for built-in source nodes", async (t) => {
let capturedContext: ExecutionContext | null = null;
const fixture = await createRuntimeFixture("emboflow-worker-source-context", {
executors: {
python: {
async execute(_task: TaskRecord, context: ExecutionContext) {
capturedContext = context;
return {
result: { assetCount: context.assets?.length ?? 0 },
stdoutLines: ["custom source executor"],
stderrLines: [],
};
},
},
},
});
t.after(async () => {
await fixture.close();
});
await fixture.db.collection("assets").insertOne({
_id: "asset-context-1",
workspaceId: "workspace-1",
projectId: "project-1",
type: "folder",
sourceType: "registered_path",
displayName: "Sample Asset",
sourcePath: "/tmp/sample-asset",
status: "probed",
storageRef: {},
topLevelPaths: ["meta.json"],
detectedFormats: ["delivery_package"],
fileCount: 4,
summary: { kind: "delivery_package" },
createdBy: "local-user",
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.db.collection("workflow_runs").insertOne({
_id: "run-source-context",
workflowDefinitionId: "workflow-source-context",
workflowVersionId: "workflow-source-context-v1",
status: "queued",
triggeredBy: "local-user",
assetIds: ["asset-context-1"],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.db.collection("run_tasks").insertOne({
_id: "task-source-context",
workflowRunId: "run-source-context",
workflowVersionId: "workflow-source-context-v1",
nodeId: "source-asset",
nodeDefinitionId: "source-asset",
nodeType: "source",
executorType: "python",
status: "queued",
attempt: 1,
assetIds: ["asset-context-1"],
upstreamNodeIds: [],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.runtime.runNextTask();
assert.equal(capturedContext?.assets?.[0]?.id, "asset-context-1");
assert.equal(capturedContext?.assets?.[0]?.displayName, "Sample Asset");
assert.equal(capturedContext?.assets?.[0]?.sourcePath, "/tmp/sample-asset");
});
test("worker validates delivery structure against the bound asset path for validate-structure", async (t) => {
const sourceDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-worker-validate-"));
await mkdir(path.join(sourceDir, "DJI_001"));
await writeFile(path.join(sourceDir, "meta.json"), "{}");
await writeFile(path.join(sourceDir, "intrinsics.json"), "{}");
await writeFile(path.join(sourceDir, "video_meta.json"), "{}");
await writeFile(path.join(sourceDir, "DJI_001", "DJI_001.mp4"), "");
const fixture = await createRuntimeFixture("emboflow-worker-validate-structure");
t.after(async () => {
await fixture.close();
});
await fixture.db.collection("assets").insertOne({
_id: "asset-validate-1",
workspaceId: "workspace-1",
projectId: "project-1",
type: "folder",
sourceType: "registered_path",
displayName: "Validation Asset",
sourcePath: sourceDir,
status: "probed",
storageRef: {},
topLevelPaths: ["DJI_001", "meta.json", "intrinsics.json", "video_meta.json"],
detectedFormats: ["delivery_package"],
fileCount: 4,
summary: { kind: "delivery_package" },
createdBy: "local-user",
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.db.collection("workflow_runs").insertOne({
_id: "run-validate-structure",
workflowDefinitionId: "workflow-validate-structure",
workflowVersionId: "workflow-validate-structure-v1",
status: "queued",
triggeredBy: "local-user",
assetIds: ["asset-validate-1"],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.db.collection("run_tasks").insertOne({
_id: "task-validate-structure",
workflowRunId: "run-validate-structure",
workflowVersionId: "workflow-validate-structure-v1",
nodeId: "validate-structure",
nodeDefinitionId: "validate-structure",
nodeType: "inspect",
executorType: "python",
status: "queued",
attempt: 1,
assetIds: ["asset-validate-1"],
upstreamNodeIds: [],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.runtime.runNextTask();
const task = await fixture.store.getRunTask("task-validate-structure");
const artifact = await fixture.db.collection("artifacts").findOne({ producerId: "task-validate-structure" });
assert.equal(task?.status, "success");
assert.match(task?.stdoutLines?.[0] ?? "", /validated 1 asset/i);
assert.deepEqual(task?.lastResultPreview, {
assetCount: 1,
valid: true,
requiredFiles: ["meta.json", "intrinsics.json", "video_meta.json"],
videoFileCount: 1,
});
assert.deepEqual((artifact?.payload as { result?: Record<string, unknown> } | undefined)?.result, {
assetCount: 1,
valid: true,
requiredFiles: ["meta.json", "intrinsics.json", "video_meta.json"],
videoFileCount: 1,
});
});

View File

@ -259,6 +259,8 @@ Each task receives a normalized execution context containing:
- workflow run id
- task id
- actor id
- bound asset ids
- bound asset metadata summary, including display name, detected formats, top-level paths, and local source path when available
- node config
- code hook content
- input references
@ -316,6 +318,15 @@ The worker-backed runtime now persists task execution summaries directly on `run
This makes the run detail view stable even when artifacts are large or delayed and keeps task-level observability queryable without reopening every artifact payload.
The current built-in Python path now also has first-pass node semantics for two delivery-focused nodes when no custom code hook is present:
- `source-asset`
Emits a normalized summary of the bound assets from Mongo-backed asset metadata, so downstream nodes and operators see concrete display names, detected formats, top-level paths, and local source paths instead of only opaque asset ids.
- `validate-structure`
Inspects the bound asset source paths, checks the delivery-required files `meta.json`, `intrinsics.json`, and `video_meta.json`, recursively counts `.mp4` files, and emits a stable validation summary with `valid`, `requiredFiles`, `missingRequiredFiles`, and `videoFileCount`.
This replaces the earlier placeholder `"python executor processed ..."` behavior for those built-in nodes and makes the default worker output useful even before custom hooks are authored.
The current runtime also aggregates execution state back onto `workflow_runs`. Each refresh computes:
- run-level `startedAt` and `finishedAt`

View File

@ -25,6 +25,7 @@
- `2026-03-27`: The current run-control pass adds run cancellation, run retry from immutable snapshots, and task retry for failed/cancelled nodes with downstream reset semantics.
- `2026-03-27`: The current runtime-config pass freezes per-node executor config into `workflow_runs` and `run_tasks`, exposes runtime editing controls in the React workflow editor, and executes trusted-local Python code hooks from the task snapshot.
- `2026-03-27`: The current Docker-runtime pass upgrades `executorType=docker` from a pure stub to a real local container execution path whenever `executorConfig.image` is provided, while retaining a compatibility fallback for older demo tasks without an image.
- `2026-03-27`: The current built-in-node pass enriches the worker execution context with bound asset metadata and gives the default Python implementations for `source-asset` and `validate-structure` real delivery-oriented behavior instead of placeholder output.
---

View File

@ -54,10 +54,10 @@ def classify(path_text: str) -> str:
lower = path_text.lower()
path = Path(path_text)
if any(token in lower for token in DOC_PATTERNS) or path.suffix == ".md":
return "docs"
if any(token in lower for token in TEST_HINTS):
return "tests"
if any(token in lower for token in DOC_PATTERNS) or path.suffix == ".md":
return "docs"
if any(token in lower for token in CODE_HINTS):
return "code"
if path.suffix in CODE_SUFFIXES:

View File

@ -25,6 +25,18 @@ class DocCodeSyncAssessmentTests(unittest.TestCase):
def test_classifies_app_paths_as_code(self):
self.assertEqual(MODULE.classify("apps/web/package.json"), "code")
def test_classifies_spec_files_under_test_directories_as_tests(self):
self.assertEqual(
MODULE.classify("apps/worker/test/mongo-worker-runtime.spec.ts"),
"tests",
)
def test_classifies_markdown_plan_files_as_docs(self):
self.assertEqual(
MODULE.classify("docs/plans/2026-03-26-emboflow-v1-foundation-and-mvp.md"),
"docs",
)
def test_classifies_env_example_as_config(self):
self.assertEqual(MODULE.classify(".env.example"), "config")