feat: default delivery nodes to docker and add set operations

This commit is contained in:
eust-w 2026-03-30 02:42:00 +08:00
parent 71c5fd5995
commit 3d5400da67
19 changed files with 948 additions and 34 deletions

View File

@ -7,7 +7,7 @@ EmboFlow is a B/S embodied-data workflow platform for raw asset ingestion, deliv
- Project-scoped workspace shell with a dedicated Projects page and active project selector in the header
- Asset workspace that supports local asset registration, probe summaries, storage connection management, and dataset creation
- Workflow templates as first-class objects, including default project templates and creating project workflows from a template
- Blank workflow creation and a large React Flow editor with drag-and-drop nodes, free canvas movement, edge validation, node runtime presets, and Python code-hook injection
- Blank workflow creation and a large React Flow editor with drag-and-drop nodes, free canvas movement, edge validation, Docker-first node runtime presets, and Python code-hook injection
- Workflow-level `Save As Template` so edited graphs can be promoted into reusable project templates
- Mongo-backed run orchestration, worker execution, run history, task detail, logs, stdout/stderr, artifacts, cancel, retry, and task retry
- Runtime shell level Chinese and English switching
@ -83,11 +83,12 @@ The Assets workspace now includes first-class storage connections and datasets.
The Workflows workspace now includes a template gallery. Projects can start from default or saved templates, or create a blank workflow directly.
The workflow editor center panel now uses a real draggable node canvas with zoom, pan, mini-map, dotted background, handle-based edge creation, persisted node positions, and localized validation feedback instead of a static list of node cards.
The workflow editor right panel now also supports saving the current workflow draft as a reusable workflow template, in addition to editing per-node runtime settings and Python hooks.
The node library now supports both click-to-append and drag-and-drop placement into the canvas. V1 connection rules block self-edges, duplicate edges, cycles, incoming edges into source nodes, outgoing edges from export nodes, and multiple upstream edges into a single node.
The node library now supports both click-to-append and drag-and-drop placement into the canvas. V1 connection rules block self-edges, duplicate edges, cycles, incoming edges into source nodes, outgoing edges from export nodes, and multiple upstream edges into ordinary nodes, while allowing multi-input set nodes such as `union-assets`, `intersect-assets`, and `difference-assets`.
The Runs workspace now shows project-scoped run history, run-level aggregated summaries, cancel/retry controls, and run detail views with persisted task summaries, stdout/stderr sections, result previews, and artifact links into Explore.
Selected run tasks now expose the frozen node definition id, executor config snapshot, and code-hook metadata that were captured when the run was created.
When a node uses `executorType=docker` and provides `executorConfig.image`, the worker now runs a real local Docker container with mounted `input.json` / `output.json` exchange files. If no image is configured, the executor falls back to the lightweight simulated behavior used by older demo tasks.
Most built-in delivery nodes now default to `executorType=docker`. When a node uses `executorType=docker` and provides `executorConfig.image`, the worker runs a real local Docker container with mounted `input.json` / `output.json` exchange files plus read-only mounts for bound asset paths. If no image is configured, the executor falls back to the lightweight simulated behavior used by older demo tasks.
When a node uses the built-in Python path without a custom hook, `source-asset` now emits bound asset metadata from Mongo-backed asset records and `validate-structure` now performs a real directory validation pass against local source paths. On the current sample path `/Users/longtaowu/workspace/emboldata/data`, that validation reports `valid=false`, `videoFileCount=407`, and missing delivery files because the sample root is a mixed dataset collection rather than a delivery package.
The worker now also carries direct upstream task results into execution context so set-operation utility nodes can compute narrowed asset sets and pass those effective asset ids to downstream tasks.
## Repository Structure

View File

@ -1,38 +1,128 @@
export const DELIVERY_NODE_DEFINITIONS = [
import type { ExecutorType, NodeRuntimeConfig } from "../../../../worker/src/contracts/execution-context.ts";
export type DeliveryNodeDefinition = {
id: string;
name: string;
category: "Source" | "Transform" | "Inspect" | "Annotate" | "Export" | "Utility";
description: string;
defaultExecutorType?: ExecutorType;
defaultExecutorConfig?: Record<string, unknown>;
supportsCodeHook?: boolean;
allowsMultipleIncoming?: boolean;
};
const DEFAULT_DOCKER_EXECUTOR_CONFIG = {
image: "python:3.11-alpine",
networkMode: "none",
} as const;
function createDockerDefaults(): Pick<DeliveryNodeDefinition, "defaultExecutorType" | "defaultExecutorConfig"> {
return {
defaultExecutorType: "docker",
defaultExecutorConfig: { ...DEFAULT_DOCKER_EXECUTOR_CONFIG },
};
}
export const DELIVERY_NODE_DEFINITIONS: readonly DeliveryNodeDefinition[] = [
{
id: "source-asset",
name: "Source Asset",
category: "Source",
description: "Load the uploaded asset or registered path into the workflow.",
...createDockerDefaults(),
},
{
id: "extract-archive",
name: "Extract Archive",
category: "Transform",
description: "Unpack tar, zip, or zst archives for downstream processing.",
...createDockerDefaults(),
supportsCodeHook: true,
},
{
id: "rename-folder",
name: "Rename Delivery Folder",
category: "Transform",
description: "Rename the top-level folder to the delivery naming convention.",
...createDockerDefaults(),
supportsCodeHook: true,
},
{
id: "validate-structure",
name: "Validate Structure",
category: "Inspect",
description: "Check required directories and metadata files.",
...createDockerDefaults(),
supportsCodeHook: true,
},
{
id: "validate-metadata",
name: "Validate Metadata",
category: "Inspect",
description: "Validate meta.json, intrinsics.json, and video_meta.json.",
...createDockerDefaults(),
supportsCodeHook: true,
},
{
id: "union-assets",
name: "Union Assets",
category: "Utility",
description: "Merge multiple upstream asset sets into one deduplicated asset set.",
...createDockerDefaults(),
supportsCodeHook: true,
allowsMultipleIncoming: true,
},
{
id: "intersect-assets",
name: "Intersect Assets",
category: "Utility",
description: "Keep only the assets that exist in every upstream asset set.",
...createDockerDefaults(),
supportsCodeHook: true,
allowsMultipleIncoming: true,
},
{
id: "difference-assets",
name: "Difference Assets",
category: "Utility",
description: "Subtract downstream asset sets from the first upstream asset set.",
...createDockerDefaults(),
supportsCodeHook: true,
allowsMultipleIncoming: true,
},
{
id: "export-delivery-package",
name: "Export Delivery Package",
category: "Export",
description: "Publish the normalized package for downstream upload or handoff.",
...createDockerDefaults(),
},
] as const;
const DELIVERY_NODE_DEFINITION_BY_ID = new Map(
DELIVERY_NODE_DEFINITIONS.map((definition) => [definition.id, definition]),
);
export function getDeliveryNodeDefinition(definitionId: string) {
return DELIVERY_NODE_DEFINITION_BY_ID.get(definitionId);
}
export function buildDefaultNodeRuntimeConfig(definitionId: string): NodeRuntimeConfig | undefined {
const definition = getDeliveryNodeDefinition(definitionId);
if (!definition) {
return undefined;
}
const config: NodeRuntimeConfig = {};
if (definition.defaultExecutorType) {
config.executorType = definition.defaultExecutorType;
}
if (definition.defaultExecutorConfig) {
config.executorConfig = { ...definition.defaultExecutorConfig };
}
return Object.keys(config).length > 0 ? config : undefined;
}
export function deliveryNodeAllowsMultipleIncoming(definitionId: string) {
return Boolean(getDeliveryNodeDefinition(definitionId)?.allowsMultipleIncoming);
}

View File

@ -3,7 +3,10 @@ import { randomUUID } from "node:crypto";
import type { Db, Document, WithId } from "mongodb";
import type { AssetType } from "../../../../packages/contracts/src/domain.ts";
import { DELIVERY_NODE_DEFINITIONS } from "../modules/plugins/builtin/delivery-nodes.ts";
import {
buildDefaultNodeRuntimeConfig,
DELIVERY_NODE_DEFINITIONS,
} from "../modules/plugins/builtin/delivery-nodes.ts";
import { probeLocalSourcePath } from "./local-source-probe.ts";
import type {
CodeHookSpec,
@ -351,6 +354,50 @@ function sanitizeNodeRuntimeConfig(value: unknown, fallbackDefinitionId: string)
return config;
}
function mergeNodeRuntimeConfigs(
fallbackDefinitionId: string,
defaults: NodeRuntimeConfig | undefined,
explicit: NodeRuntimeConfig | undefined,
): NodeRuntimeConfig | undefined {
if (!defaults && !explicit) {
return undefined;
}
const definitionId = explicit?.definitionId ?? defaults?.definitionId ?? fallbackDefinitionId;
const executorType = explicit?.executorType ?? defaults?.executorType;
const executorConfig = defaults?.executorConfig || explicit?.executorConfig
? {
...(defaults?.executorConfig ?? {}),
...(explicit?.executorConfig ?? {}),
}
: undefined;
const codeHookSpec = explicit?.codeHookSpec ?? defaults?.codeHookSpec;
const artifactType = explicit?.artifactType ?? defaults?.artifactType;
const artifactTitle = explicit?.artifactTitle ?? defaults?.artifactTitle;
const config: NodeRuntimeConfig = {};
if (definitionId !== fallbackDefinitionId) {
config.definitionId = definitionId;
}
if (executorType) {
config.executorType = executorType;
}
if (executorConfig && Object.keys(executorConfig).length > 0) {
config.executorConfig = executorConfig;
}
if (codeHookSpec) {
config.codeHookSpec = codeHookSpec;
}
if (artifactType) {
config.artifactType = artifactType;
}
if (artifactTitle) {
config.artifactTitle = artifactTitle;
}
return Object.keys(config).length > 0 ? config : undefined;
}
function buildRuntimeSnapshot(
runtimeGraph: Record<string, unknown>,
logicGraph: WorkflowDefinitionVersionDocument["logicGraph"],
@ -363,7 +410,11 @@ function buildRuntimeSnapshot(
for (const node of logicGraph.nodes) {
const definitionId = graph.nodeBindings?.[node.id] ?? inferDefinitionId(node.id);
nodeBindings[node.id] = definitionId;
const config = sanitizeNodeRuntimeConfig(graph.nodeConfigs?.[node.id], definitionId);
const config = mergeNodeRuntimeConfigs(
definitionId,
buildDefaultNodeRuntimeConfig(definitionId),
sanitizeNodeRuntimeConfig(graph.nodeConfigs?.[node.id], definitionId),
);
if (config) {
nodeConfigs[node.id] = config;
}

View File

@ -286,7 +286,7 @@ test("mongo-backed runtime persists probed assets and workflow runs through the
assert.deepEqual((run as { assetIds?: string[] }).assetIds, [asset._id]);
assert.equal(tasks.length, 3);
assert.equal(tasks[0]?.nodeId, "source-asset");
assert.equal(tasks[0]?.executorType, "python");
assert.equal(tasks[0]?.executorType, "docker");
assert.deepEqual(tasks[0]?.assetIds, [asset._id]);
assert.deepEqual(tasks[0]?.upstreamNodeIds, []);
assert.equal(tasks[0]?.status, "queued");

View File

@ -15,7 +15,7 @@ export const WORKFLOW_NODE_DEFINITIONS: WorkflowNodeDefinition[] = [
name: "Source Asset",
category: "Source",
description: "Load an uploaded asset or registered storage path.",
executorType: "python",
executorType: "docker",
inputSchemaSummary: "assetRef",
outputSchemaSummary: "assetRef",
},
@ -34,7 +34,7 @@ export const WORKFLOW_NODE_DEFINITIONS: WorkflowNodeDefinition[] = [
name: "Rename Delivery Folder",
category: "Transform",
description: "Rename the top-level delivery folder to the business naming convention.",
executorType: "python",
executorType: "docker",
inputSchemaSummary: "artifactRef",
outputSchemaSummary: "artifactRef",
supportsCodeHook: true,
@ -44,17 +44,47 @@ export const WORKFLOW_NODE_DEFINITIONS: WorkflowNodeDefinition[] = [
name: "Validate Structure",
category: "Inspect",
description: "Validate required directories and metadata files.",
executorType: "python",
executorType: "docker",
inputSchemaSummary: "artifactRef",
outputSchemaSummary: "artifactRef + report",
supportsCodeHook: true,
},
{
id: "union-assets",
name: "Union Assets",
category: "Utility",
description: "Merge multiple upstream asset sets into one deduplicated asset set.",
executorType: "docker",
inputSchemaSummary: "assetSet + assetSet",
outputSchemaSummary: "assetSet",
supportsCodeHook: true,
},
{
id: "intersect-assets",
name: "Intersect Assets",
category: "Utility",
description: "Keep only the assets shared by every upstream asset set.",
executorType: "docker",
inputSchemaSummary: "assetSet + assetSet",
outputSchemaSummary: "assetSet",
supportsCodeHook: true,
},
{
id: "difference-assets",
name: "Difference Assets",
category: "Utility",
description: "Subtract downstream asset sets from the first upstream asset set.",
executorType: "docker",
inputSchemaSummary: "assetSet + assetSet",
outputSchemaSummary: "assetSet",
supportsCodeHook: true,
},
{
id: "export-delivery-package",
name: "Export Delivery Package",
category: "Export",
description: "Produce the final delivery package artifact for upload.",
executorType: "http",
executorType: "docker",
inputSchemaSummary: "artifactRef",
outputSchemaSummary: "artifactRef",
},

View File

@ -170,6 +170,34 @@ function formatExecutorConfigLabel(config?: Record<string, unknown>) {
return JSON.stringify(config);
}
function getDefaultExecutorType(definition?: { defaultExecutorType?: "python" | "docker" | "http" } | null) {
return definition?.defaultExecutorType ?? "python";
}
function getDefaultExecutorConfig(definition?: { defaultExecutorConfig?: Record<string, unknown> } | null) {
return definition?.defaultExecutorConfig ? { ...definition.defaultExecutorConfig } : undefined;
}
function getEffectiveNodeRuntimeConfig(
definition: { id: string; defaultExecutorType?: "python" | "docker" | "http"; defaultExecutorConfig?: Record<string, unknown> } | null,
runtimeConfig: WorkflowNodeRuntimeConfig | undefined,
): WorkflowNodeRuntimeConfig {
const executorType = runtimeConfig?.executorType ?? getDefaultExecutorType(definition);
const executorConfig = {
...(getDefaultExecutorConfig(definition) ?? {}),
...(runtimeConfig?.executorConfig ?? {}),
};
return {
definitionId: runtimeConfig?.definitionId ?? definition?.id,
executorType,
executorConfig: Object.keys(executorConfig).length > 0 ? executorConfig : undefined,
codeHookSpec: runtimeConfig?.codeHookSpec,
artifactType: runtimeConfig?.artifactType,
artifactTitle: runtimeConfig?.artifactTitle,
};
}
function usePathname() {
const [pathname, setPathname] = useState(
typeof window === "undefined" ? "/projects" : window.location.pathname || "/projects",
@ -1027,6 +1055,10 @@ function WorkflowEditorPage(props: {
() => getNodeRuntimeConfig(draft, selectedNodeId),
[draft, selectedNodeId],
);
const selectedNodeEffectiveRuntimeConfig = useMemo(
() => getEffectiveNodeRuntimeConfig(selectedNodeRaw, selectedNodeRuntimeConfig),
[selectedNodeRaw, selectedNodeRuntimeConfig],
);
const canvasNodes = useMemo<Array<Node>>(
() =>
draft.logicGraph.nodes.map((node) => {
@ -1175,10 +1207,10 @@ function WorkflowEditorPage(props: {
if (!selectedNodeId) {
return;
}
const currentConfig = {
definitionId: resolveDefinitionIdForNode(draft, selectedNodeId),
...(getNodeRuntimeConfig(draft, selectedNodeId) ?? {}),
};
const currentConfig = getEffectiveNodeRuntimeConfig(
selectedNodeRaw,
getNodeRuntimeConfig(draft, selectedNodeId),
);
const resolved = typeof nextConfig === "function" ? nextConfig(currentConfig) : nextConfig;
setDraft(setNodeRuntimeConfig(draft, selectedNodeId, resolved));
setDirty(true);
@ -1348,7 +1380,7 @@ function WorkflowEditorPage(props: {
<label>
{t("executorType")}
<select
value={selectedNodeRuntimeConfig?.executorType ?? "python"}
value={selectedNodeEffectiveRuntimeConfig.executorType ?? "python"}
onChange={(event) =>
updateSelectedNodeRuntimeConfig((current) => ({
...current,
@ -1365,17 +1397,17 @@ function WorkflowEditorPage(props: {
{t("runtimeTarget")}
<input
value={
selectedNodeRuntimeConfig?.executorType === "http"
? String(selectedNodeRuntimeConfig?.executorConfig?.url ?? "")
: selectedNodeRuntimeConfig?.executorType === "docker"
? String(selectedNodeRuntimeConfig?.executorConfig?.image ?? "")
selectedNodeEffectiveRuntimeConfig.executorType === "http"
? String(selectedNodeEffectiveRuntimeConfig.executorConfig?.url ?? "")
: selectedNodeEffectiveRuntimeConfig.executorType === "docker"
? String(selectedNodeEffectiveRuntimeConfig.executorConfig?.image ?? "")
: ""
}
placeholder={
selectedNodeRuntimeConfig?.executorType === "http"
selectedNodeEffectiveRuntimeConfig.executorType === "http"
? "http://127.0.0.1:3010/mock-executor"
: selectedNodeRuntimeConfig?.executorType === "docker"
? "python:3.11"
: selectedNodeEffectiveRuntimeConfig.executorType === "docker"
? "python:3.11-alpine"
: "python executor uses inline hook or default"
}
onChange={(event) =>

View File

@ -610,6 +610,18 @@ const BUILTIN_NODE_TRANSLATIONS: Record<string, { en: { name: string; descriptio
en: { name: "Validate Metadata", description: "Validate meta.json, intrinsics.json, and video_meta.json." },
zh: { name: "校验元数据", description: "校验 meta.json、intrinsics.json 和 video_meta.json。" },
},
"union-assets": {
en: { name: "Union Assets", description: "Merge multiple upstream asset sets into one deduplicated asset set." },
zh: { name: "资产并集", description: "将多个上游资产集合合并为一个去重后的资产集合。" },
},
"intersect-assets": {
en: { name: "Intersect Assets", description: "Keep only the assets that exist in every upstream asset set." },
zh: { name: "资产交集", description: "只保留所有上游资产集合共同包含的资产。" },
},
"difference-assets": {
en: { name: "Difference Assets", description: "Subtract downstream asset sets from the first upstream asset set." },
zh: { name: "资产差集", description: "从第一个上游资产集合中减去后续上游资产集合。" },
},
"export-delivery-package": {
en: { name: "Export Delivery Package", description: "Produce the final delivery package artifact for upload." },
zh: { name: "导出交付包", description: "生成最终交付包产物用于上传或交付。" },

View File

@ -256,3 +256,34 @@ test("reject connections that would form a back edge cycle when target accepts i
"cycle",
);
});
test("allow multi-inbound utility set nodes while still blocking cycles", () => {
const draft = workflowDraftFromVersion({
logicGraph: {
nodes: [
{ id: "source-a", type: "source" },
{ id: "source-b", type: "source" },
{ id: "intersect-assets-1", type: "utility" },
],
edges: [],
},
runtimeGraph: {
nodeBindings: {
"source-a": "source-asset",
"source-b": "source-asset",
"intersect-assets-1": "intersect-assets",
},
},
});
const withFirst = connectNodesInDraft(draft, "source-a", "intersect-assets-1");
assert.equal(
canConnectNodesInDraft(withFirst, "source-b", "intersect-assets-1").ok,
true,
);
assert.equal(
canConnectNodesInDraft(withFirst, "intersect-assets-1", "source-a").reason,
"target_disallows_incoming",
);
});

View File

@ -26,6 +26,10 @@ export type WorkflowNodeDefinitionSummary = {
id: string;
name: string;
category?: string;
defaultExecutorType?: "python" | "docker" | "http";
defaultExecutorConfig?: Record<string, unknown>;
allowsMultipleIncoming?: boolean;
supportsCodeHook?: boolean;
};
export type WorkflowCodeHookSpec = {
@ -79,6 +83,7 @@ const DEFAULT_NODE_LAYOUT: Record<string, WorkflowPoint> = {
"rename-folder": { x: 430, y: 280 },
"validate-structure": { x: 760, y: 450 },
};
const MULTI_INPUT_NODE_DEFINITION_IDS = new Set(["union-assets", "intersect-assets", "difference-assets"]);
function createDefaultNodePosition(index: number): WorkflowPoint {
const column = index % 3;
@ -344,6 +349,11 @@ function nodeDisallowsIncoming(node: WorkflowLogicNode) {
return node.type === "source";
}
function allowsMultipleIncoming(draft: WorkflowDraft, nodeId: string) {
const definitionId = resolveDefinitionIdForNode(draft, nodeId);
return MULTI_INPUT_NODE_DEFINITION_IDS.has(definitionId);
}
function wouldCreateCycle(draft: WorkflowDraft, sourceNodeId: string, targetNodeId: string) {
const adjacency = new Map<string, string[]>();
for (const edge of draft.logicGraph.edges) {
@ -405,7 +415,7 @@ export function canConnectNodesInDraft(
if (nodeDisallowsIncoming(targetNode)) {
return { ok: false, reason: "target_disallows_incoming" };
}
if (draft.logicGraph.edges.some((edge) => edge.to === targetNodeId)) {
if (!allowsMultipleIncoming(draft, targetNodeId) && draft.logicGraph.edges.some((edge) => edge.to === targetNodeId)) {
return { ok: false, reason: "target_already_has_incoming" };
}
if (wouldCreateCycle(draft, sourceNodeId, targetNodeId)) {

View File

@ -95,6 +95,14 @@ export type ExecutionAsset = {
summary?: Record<string, unknown>;
};
export type UpstreamExecutionResult = {
taskId: string;
nodeId: string;
nodeDefinitionId?: string;
assetIds: string[];
result?: Record<string, unknown>;
};
export type ExecutionContext = {
taskId: string;
workflowRunId?: string;
@ -103,4 +111,5 @@ export type ExecutionContext = {
assetIds?: string[];
assets?: ExecutionAsset[];
nodeDefinitionId?: string;
upstreamResults?: UpstreamExecutionResult[];
};

View File

@ -4,6 +4,7 @@ import os from "node:os";
import path from "node:path";
import type {
ExecutionAsset,
ExecutionContext,
ExecutorExecutionResult,
TaskRecord,
@ -45,6 +46,198 @@ function parseDockerResult(payload: unknown) {
return payload;
}
function buildContainerAssetContext(workdir: string, assets: ExecutionAsset[] = []) {
const volumeArgs: string[] = [];
const containerAssets = assets.map((asset) => {
if (!asset.sourcePath || !path.isAbsolute(asset.sourcePath)) {
return asset;
}
const containerSourcePath = path.posix.join(workdir, "mounted-assets", asset.id);
volumeArgs.push("--volume", `${asset.sourcePath}:${containerSourcePath}:ro`);
return {
...asset,
sourcePath: containerSourcePath,
};
});
return {
assets: containerAssets,
volumeArgs,
};
}
function createDockerRunner() {
return [
"import contextlib",
"import io",
"import json",
"import pathlib",
"import sys",
"",
"REQUIRED_DELIVERY_FILES = ['meta.json', 'intrinsics.json', 'video_meta.json']",
"",
"def dedupe_asset_ids(asset_ids):",
" return list(dict.fromkeys([asset_id for asset_id in asset_ids if isinstance(asset_id, str) and asset_id]))",
"",
"def count_video_files(source_path):",
" path = pathlib.Path(source_path)",
" if not path.exists():",
" return 0",
" if path.is_file():",
" return 1 if path.name.lower().endswith('.mp4') else 0",
" return sum(1 for child in path.rglob('*') if child.is_file() and child.name.lower().endswith('.mp4'))",
"",
"def build_source_result(context):",
" assets = context.get('assets', [])",
" asset_ids = [asset.get('id') for asset in assets if asset.get('id')]",
" print(f\"loaded {len(asset_ids)} bound asset{'s' if len(asset_ids) != 1 else ''}\")",
" return {",
" 'assetIds': asset_ids,",
" 'assetCount': len(asset_ids),",
" 'assets': assets,",
" }",
"",
"def build_validate_structure_result(context):",
" assets = context.get('assets', [])",
" summaries = []",
" for asset in assets:",
" top_level_paths = asset.get('topLevelPaths') or []",
" missing_required_files = [required for required in REQUIRED_DELIVERY_FILES if required not in top_level_paths]",
" video_file_count = count_video_files(asset.get('sourcePath')) if asset.get('sourcePath') else 0",
" summaries.append({",
" 'id': asset.get('id'),",
" 'displayName': asset.get('displayName'),",
" 'sourcePath': asset.get('sourcePath'),",
" 'detectedFormats': asset.get('detectedFormats') or [],",
" 'missingRequiredFiles': missing_required_files,",
" 'videoFileCount': video_file_count,",
" 'valid': len(missing_required_files) == 0 and video_file_count > 0,",
" })",
" asset_ids = [asset.get('id') for asset in assets if asset.get('id')]",
" missing_required = sorted({value for summary in summaries for value in summary['missingRequiredFiles']})",
" print(f\"validated {len(asset_ids)} asset{'s' if len(asset_ids) != 1 else ''}\")",
" result = {",
" 'assetCount': len(asset_ids),",
" 'requiredFiles': REQUIRED_DELIVERY_FILES,",
" 'videoFileCount': sum(summary['videoFileCount'] for summary in summaries),",
" 'valid': len(summaries) > 0 and all(summary['valid'] for summary in summaries),",
" }",
" if missing_required:",
" result['missingRequiredFiles'] = missing_required",
" return result",
"",
"def build_validate_metadata_result(context):",
" assets = context.get('assets', [])",
" summaries = []",
" for asset in assets:",
" top_level_paths = asset.get('topLevelPaths') or []",
" missing_required_files = [required for required in REQUIRED_DELIVERY_FILES if required not in top_level_paths]",
" summaries.append({",
" 'id': asset.get('id'),",
" 'displayName': asset.get('displayName'),",
" 'missingRequiredFiles': missing_required_files,",
" 'valid': len(missing_required_files) == 0,",
" })",
" asset_ids = [asset.get('id') for asset in assets if asset.get('id')]",
" missing_required = sorted({value for summary in summaries for value in summary['missingRequiredFiles']})",
" print(f\"validated metadata for {len(asset_ids)} asset{'s' if len(asset_ids) != 1 else ''}\")",
" result = {",
" 'assetIds': asset_ids,",
" 'assetCount': len(asset_ids),",
" 'requiredFiles': REQUIRED_DELIVERY_FILES,",
" 'valid': len(summaries) > 0 and all(summary['valid'] for summary in summaries),",
" 'assets': summaries,",
" }",
" if missing_required:",
" result['missingRequiredFiles'] = missing_required",
" return result",
"",
"def build_pass_through_result(context, operation):",
" asset_ids = dedupe_asset_ids(context.get('assetIds') or [])",
" print(f\"{operation} processed {len(asset_ids)} asset{'s' if len(asset_ids) != 1 else ''}\")",
" return {",
" 'operation': operation,",
" 'assetIds': asset_ids,",
" 'assetCount': len(asset_ids),",
" }",
"",
"def build_asset_set_result(context, operation):",
" upstream_results = context.get('upstreamResults') or []",
" upstream_sets = [dedupe_asset_ids(result.get('assetIds') or []) for result in upstream_results]",
" if operation == 'union':",
" asset_ids = dedupe_asset_ids([asset_id for asset_set in upstream_sets for asset_id in asset_set])",
" elif operation == 'intersect':",
" asset_ids = [] if not upstream_sets else list(upstream_sets[0])",
" for asset_set in upstream_sets[1:]:",
" asset_ids = [asset_id for asset_id in asset_ids if asset_id in asset_set]",
" else:",
" head = list(upstream_sets[0]) if upstream_sets else []",
" subtract = {asset_id for asset_set in upstream_sets[1:] for asset_id in asset_set}",
" asset_ids = [asset_id for asset_id in head if asset_id not in subtract]",
" assets_by_id = {asset.get('id'): asset for asset in context.get('assets', []) if asset.get('id')}",
" operation_label = 'intersection' if operation == 'intersect' else operation",
" print(f\"{operation_label} resolved {len(asset_ids)} asset{'s' if len(asset_ids) != 1 else ''}\")",
" return {",
" 'operation': operation,",
" 'upstreamCount': len(upstream_results),",
" 'assetIds': asset_ids,",
" 'assetCount': len(asset_ids),",
" 'assets': [assets_by_id[asset_id] for asset_id in asset_ids if asset_id in assets_by_id],",
" }",
"",
"def execute_hook(task, context):",
" hook = task.get('codeHookSpec') or {}",
" namespace = {}",
" stdout_buffer = io.StringIO()",
" entrypoint = hook.get('entrypoint') or 'process'",
" with contextlib.redirect_stdout(stdout_buffer):",
" exec(hook.get('source') or '', namespace)",
" candidate = namespace.get(entrypoint)",
" if not callable(candidate):",
" raise RuntimeError(f'Python hook entrypoint not found: {entrypoint}')",
" result = candidate(task, context)",
" stdout = stdout_buffer.getvalue()",
" if stdout:",
" sys.stdout.write(stdout)",
" return result",
"",
"def main():",
" payload = json.loads(pathlib.Path(sys.argv[1]).read_text())",
" output_path = pathlib.Path(sys.argv[2])",
" task = payload.get('task') or {}",
" context = payload.get('context') or {}",
" definition_id = task.get('nodeDefinitionId') or task.get('nodeId')",
" if task.get('codeHookSpec', {}).get('source'):",
" result = execute_hook(task, context)",
" elif definition_id == 'source-asset':",
" result = build_source_result(context)",
" elif definition_id == 'validate-structure':",
" result = build_validate_structure_result(context)",
" elif definition_id == 'validate-metadata':",
" result = build_validate_metadata_result(context)",
" elif definition_id == 'union-assets':",
" result = build_asset_set_result(context, 'union')",
" elif definition_id == 'intersect-assets':",
" result = build_asset_set_result(context, 'intersect')",
" elif definition_id == 'difference-assets':",
" result = build_asset_set_result(context, 'difference')",
" elif definition_id in {'extract-archive', 'rename-folder', 'export-delivery-package'}:",
" result = build_pass_through_result(context, definition_id)",
" else:",
" print(f\"docker executor processed {task.get('nodeId')} with {task.get('executorConfig', {}).get('image', 'docker://local-simulated')}\")",
" result = {",
" 'taskId': task.get('id'),",
" 'executor': 'docker',",
" 'image': task.get('executorConfig', {}).get('image'),",
" }",
" output_path.write_text(json.dumps({'result': result}))",
"",
"if __name__ == '__main__':",
" main()",
].join("\n");
}
export class DockerExecutor {
executionCount = 0;
@ -77,13 +270,20 @@ export class DockerExecutor {
const tempDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-docker-executor-"));
const inputPath = path.join(tempDir, "input.json");
const outputPath = path.join(tempDir, "output.json");
const runnerPath = path.join(tempDir, "runner.py");
const { assets: containerAssets, volumeArgs } = buildContainerAssetContext(workdir, context.assets ?? []);
await writeFile(
inputPath,
JSON.stringify({
task,
context,
context: {
...context,
assets: containerAssets,
},
}),
);
await writeFile(runnerPath, createDockerRunner());
const dockerArgs = [
"run",
@ -94,6 +294,7 @@ export class DockerExecutor {
workdir,
"--volume",
`${tempDir}:${workdir}`,
...volumeArgs,
"--env",
`EMBOFLOW_INPUT_PATH=${workdir}/input.json`,
"--env",
@ -106,7 +307,9 @@ export class DockerExecutor {
`EMBOFLOW_WORKFLOW_RUN_ID=${context.workflowRunId ?? ""}`,
...envVars.flatMap(([key, value]) => ["--env", `${key}=${value}`]),
image,
...(command.length > 0 ? command : ["sh", "-lc", "cat \"$EMBOFLOW_INPUT_PATH\" > \"$EMBOFLOW_OUTPUT_PATH\""]),
...(command.length > 0
? command
: ["python3", `${workdir}/runner.py`, `${workdir}/input.json`, `${workdir}/output.json`]),
];
const child = spawn("docker", dockerArgs, {

View File

@ -8,6 +8,7 @@ import type {
ExecutionContext,
ExecutorExecutionResult,
TaskRecord,
UpstreamExecutionResult,
} from "../contracts/execution-context.ts";
function splitOutputLines(output: string) {
@ -53,10 +54,67 @@ function getEffectiveNodeDefinitionId(task: TaskRecord) {
return task.nodeDefinitionId ?? task.nodeId;
}
function dedupeAssetIds(assetIds: string[] = []) {
return Array.from(new Set(assetIds.filter((assetId) => typeof assetId === "string" && assetId.length > 0)));
}
function getResultAssetIds(result: UpstreamExecutionResult) {
return dedupeAssetIds(result.assetIds ?? []);
}
function createAssetSetResult(
operation: "union" | "intersect" | "difference",
context: ExecutionContext,
): ExecutorExecutionResult {
const upstreamResults = context.upstreamResults ?? [];
const upstreamSets = upstreamResults.map((result) => getResultAssetIds(result));
let assetIds: string[] = [];
if (operation === "union") {
assetIds = dedupeAssetIds(upstreamSets.flat());
} else if (operation === "intersect") {
assetIds = upstreamSets.length === 0
? []
: upstreamSets.reduce<string[]>(
(current, next) => current.filter((assetId) => next.includes(assetId)),
[...upstreamSets[0]!],
);
} else {
const [head = [], ...rest] = upstreamSets;
const subtract = new Set(rest.flat());
assetIds = head.filter((assetId) => !subtract.has(assetId));
}
const assetsById = new Map((context.assets ?? []).map((asset) => [asset.id, asset]));
const assets = assetIds
.map((assetId) => assetsById.get(assetId))
.filter((asset): asset is ExecutionAsset => Boolean(asset))
.map((asset) => ({
id: asset.id,
displayName: asset.displayName,
sourcePath: asset.sourcePath,
}));
const operationLabel = operation === "intersect" ? "intersection" : operation;
return {
result: {
operation,
upstreamCount: upstreamResults.length,
assetIds,
assetCount: assetIds.length,
assets,
},
stdoutLines: [`${operationLabel} resolved ${assetIds.length} asset${assetIds.length === 1 ? "" : "s"}`],
stderrLines: [],
};
}
function createBuiltinSourceResult(context: ExecutionContext): ExecutorExecutionResult {
const assets = context.assets ?? [];
return {
result: {
assetIds: assets.map((asset) => asset.id),
assetCount: assets.length,
assets: assets.map((asset) => ({
id: asset.id,
@ -136,6 +194,52 @@ async function createBuiltinValidateResult(context: ExecutionContext): Promise<E
};
}
function createBuiltinMetadataResult(context: ExecutionContext): ExecutorExecutionResult {
const assets = context.assets ?? [];
const assetSummaries = assets.map((asset) => {
const topLevelPaths = asset.topLevelPaths ?? [];
const missingRequiredFiles = REQUIRED_DELIVERY_FILES.filter((required) => !topLevelPaths.includes(required));
return {
id: asset.id,
displayName: asset.displayName,
missingRequiredFiles,
valid: missingRequiredFiles.length === 0,
};
});
const missingRequiredFiles = Array.from(new Set(assetSummaries.flatMap((asset) => asset.missingRequiredFiles)));
const valid = assetSummaries.length > 0 && assetSummaries.every((asset) => asset.valid);
return {
result: {
assetIds: assets.map((asset) => asset.id),
assetCount: assets.length,
valid,
requiredFiles: [...REQUIRED_DELIVERY_FILES],
assets: assetSummaries,
...(missingRequiredFiles.length > 0 ? { missingRequiredFiles } : {}),
},
stdoutLines: [`validated metadata for ${assets.length} asset${assets.length === 1 ? "" : "s"}`],
stderrLines: [],
};
}
function createBuiltinPassThroughResult(
_task: TaskRecord,
context: ExecutionContext,
operation: string,
): ExecutorExecutionResult {
const assetIds = dedupeAssetIds(context.assetIds ?? []);
return {
result: {
operation,
assetIds,
assetCount: assetIds.length,
},
stdoutLines: [`${operation} processed ${assetIds.length} asset${assetIds.length === 1 ? "" : "s"}`],
stderrLines: [],
};
}
export class PythonExecutor {
executionCount = 0;
@ -200,8 +304,20 @@ export class PythonExecutor {
switch (getEffectiveNodeDefinitionId(task)) {
case "source-asset":
return createBuiltinSourceResult(context);
case "extract-archive":
return createBuiltinPassThroughResult(task, context, "extract-archive");
case "rename-folder":
return createBuiltinPassThroughResult(task, context, "rename-folder");
case "validate-structure":
return createBuiltinValidateResult(context);
case "validate-metadata":
return createBuiltinMetadataResult(context);
case "union-assets":
return createAssetSetResult("union", context);
case "intersect-assets":
return createAssetSetResult("intersect", context);
case "difference-assets":
return createAssetSetResult("difference", context);
default:
return createDefaultResult(task);
}

View File

@ -319,6 +319,7 @@ export class MongoWorkerStore {
async markTaskSuccess(
taskId: string,
input: {
assetIds: string[];
finishedAt: string;
durationMs: number;
summary: TaskExecutionSummary;
@ -332,6 +333,7 @@ export class MongoWorkerStore {
{ _id: taskId },
{
$set: {
assetIds: input.assetIds,
status: "success",
finishedAt: input.finishedAt,
durationMs: input.durationMs,
@ -354,6 +356,7 @@ export class MongoWorkerStore {
taskId: string,
errorMessage: string,
input: {
assetIds: string[];
finishedAt: string;
durationMs: number;
summary: TaskExecutionSummary;
@ -366,6 +369,7 @@ export class MongoWorkerStore {
{ _id: taskId },
{
$set: {
assetIds: input.assetIds,
status: "failed",
errorMessage,
finishedAt: input.finishedAt,

View File

@ -3,6 +3,7 @@ import { HttpExecutor } from "../executors/http-executor.ts";
import { PythonExecutor } from "../executors/python-executor.ts";
import type {
ExecutionContext,
UpstreamExecutionResult,
ExecutorExecutionResult,
ExecutorType,
TaskExecutionSummary,
@ -38,7 +39,7 @@ export class WorkerRuntime {
return undefined;
}
const startedAt = task.startedAt ?? new Date().toISOString();
const assets = await this.store.getAssetsByIds(task.assetIds ?? []);
const executionInput = await this.resolveExecutionInput(task);
const context: ExecutionContext = {
taskId: task.id,
@ -46,8 +47,9 @@ export class WorkerRuntime {
workflowVersionId: task.workflowVersionId,
nodeId: task.nodeId,
nodeDefinitionId: task.nodeDefinitionId,
assetIds: task.assetIds,
assets,
assetIds: executionInput.assetIds,
assets: executionInput.assets,
upstreamResults: executionInput.upstreamResults,
};
try {
@ -59,19 +61,20 @@ export class WorkerRuntime {
nodeType: task.nodeType,
nodeDefinitionId: task.nodeDefinitionId,
executorType: task.executorType,
assetIds: task.assetIds,
assetIds: executionInput.assetIds,
result: execution.result,
});
const finishedAt = new Date().toISOString();
const summary: TaskExecutionSummary = {
outcome: "success",
executorType: task.executorType,
assetCount: task.assetIds?.length ?? 0,
assetCount: executionInput.assetIds.length,
artifactIds: [artifact._id],
stdoutLineCount: execution.stdoutLines.length,
stderrLineCount: execution.stderrLines.length,
};
await this.store.markTaskSuccess(task.id, {
assetIds: executionInput.assetIds,
finishedAt,
durationMs: this.computeDurationMs(startedAt, finishedAt),
summary,
@ -95,13 +98,14 @@ export class WorkerRuntime {
const summary: TaskExecutionSummary = {
outcome: "failed",
executorType: task.executorType,
assetCount: task.assetIds?.length ?? 0,
assetCount: executionInput.assetIds.length,
artifactIds: [],
stdoutLineCount: executionError.stdoutLines.length,
stderrLineCount: executionError.stderrLines.length,
errorMessage: executionError.message,
};
await this.store.markTaskFailed(task.id, executionError.message, {
assetIds: executionInput.assetIds,
finishedAt,
durationMs: this.computeDurationMs(startedAt, finishedAt),
summary,
@ -120,6 +124,54 @@ export class WorkerRuntime {
}
}
private async resolveExecutionInput(task: TaskRecord) {
const upstreamResults = await this.collectUpstreamResults(task);
const assetIds = this.resolveEffectiveAssetIds(task, upstreamResults);
const assets = await this.store.getAssetsByIds(assetIds);
return {
assetIds,
assets,
upstreamResults,
};
}
private async collectUpstreamResults(task: TaskRecord): Promise<UpstreamExecutionResult[]> {
if (!task.workflowRunId || (task.upstreamNodeIds?.length ?? 0) === 0) {
return [];
}
const runTasks = await this.store.listRunTasks(task.workflowRunId);
const directUpstreamNodeIds = new Set(task.upstreamNodeIds ?? []);
return runTasks
.filter((candidate) => candidate.status === "success" && directUpstreamNodeIds.has(candidate.nodeId))
.map((candidate) => ({
taskId: candidate.id,
nodeId: candidate.nodeId,
nodeDefinitionId: candidate.nodeDefinitionId,
assetIds: this.readAssetIdsFromResult(candidate.lastResultPreview) ?? candidate.assetIds ?? [],
result: candidate.lastResultPreview,
}));
}
private resolveEffectiveAssetIds(task: TaskRecord, upstreamResults: UpstreamExecutionResult[]) {
if (upstreamResults.length === 1) {
return this.dedupeAssetIds(upstreamResults[0].assetIds);
}
return this.dedupeAssetIds(task.assetIds ?? []);
}
private readAssetIdsFromResult(result: Record<string, unknown> | undefined) {
if (!result || !Array.isArray(result.assetIds)) {
return undefined;
}
return result.assetIds.filter((assetId): assetId is string => typeof assetId === "string");
}
private dedupeAssetIds(assetIds: string[]) {
return Array.from(new Set(assetIds.filter((assetId) => assetId.length > 0)));
}
private computeDurationMs(startedAt: string, finishedAt: string) {
const duration = Date.parse(finishedAt) - Date.parse(startedAt);
return Number.isFinite(duration) && duration >= 0 ? duration : 0;

View File

@ -738,3 +738,250 @@ test("worker validates delivery structure against the bound asset path for valid
videoFileCount: 1,
});
});
test("worker applies intersect-assets and narrows the downstream effective asset set", async (t) => {
const sourceDirA = await mkdtemp(path.join(os.tmpdir(), "emboflow-worker-intersect-a-"));
const sourceDirB = await mkdtemp(path.join(os.tmpdir(), "emboflow-worker-intersect-b-"));
await mkdir(path.join(sourceDirA, "DJI_A"));
await mkdir(path.join(sourceDirB, "DJI_B"));
for (const root of [sourceDirA, sourceDirB]) {
await writeFile(path.join(root, "meta.json"), "{}");
await writeFile(path.join(root, "intrinsics.json"), "{}");
await writeFile(path.join(root, "video_meta.json"), "{}");
}
await writeFile(path.join(sourceDirA, "DJI_A", "A.mp4"), "");
await writeFile(path.join(sourceDirB, "DJI_B", "B.mp4"), "");
const fixture = await createRuntimeFixture("emboflow-worker-intersect-assets");
t.after(async () => {
await fixture.close();
});
await fixture.db.collection("assets").insertMany([
{
_id: "asset-intersect-a",
workspaceId: "workspace-1",
projectId: "project-1",
type: "folder",
sourceType: "registered_path",
displayName: "Intersect Asset A",
sourcePath: sourceDirA,
status: "probed",
storageRef: {},
topLevelPaths: ["DJI_A", "meta.json", "intrinsics.json", "video_meta.json"],
detectedFormats: ["delivery_package"],
fileCount: 4,
summary: { kind: "delivery_package" },
createdBy: "local-user",
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
{
_id: "asset-intersect-b",
workspaceId: "workspace-1",
projectId: "project-1",
type: "folder",
sourceType: "registered_path",
displayName: "Intersect Asset B",
sourcePath: sourceDirB,
status: "probed",
storageRef: {},
topLevelPaths: ["DJI_B", "meta.json", "intrinsics.json", "video_meta.json"],
detectedFormats: ["delivery_package"],
fileCount: 4,
summary: { kind: "delivery_package" },
createdBy: "local-user",
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
]);
await fixture.db.collection("workflow_runs").insertOne({
_id: "run-intersect-assets",
workflowDefinitionId: "workflow-intersect-assets",
workflowVersionId: "workflow-intersect-assets-v1",
status: "queued",
triggeredBy: "local-user",
assetIds: ["asset-intersect-a", "asset-intersect-b"],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.db.collection("run_tasks").insertMany([
{
_id: "task-upstream-a",
workflowRunId: "run-intersect-assets",
workflowVersionId: "workflow-intersect-assets-v1",
nodeId: "source-assets-a",
nodeDefinitionId: "source-asset",
nodeType: "source",
executorType: "python",
status: "success",
attempt: 1,
assetIds: ["asset-intersect-a", "asset-intersect-b"],
upstreamNodeIds: [],
outputArtifactIds: [],
lastResultPreview: { assetIds: ["asset-intersect-a", "asset-intersect-b"] },
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
{
_id: "task-upstream-b",
workflowRunId: "run-intersect-assets",
workflowVersionId: "workflow-intersect-assets-v1",
nodeId: "source-assets-b",
nodeDefinitionId: "source-asset",
nodeType: "source",
executorType: "python",
status: "success",
attempt: 1,
assetIds: ["asset-intersect-b"],
upstreamNodeIds: [],
outputArtifactIds: [],
lastResultPreview: { assetIds: ["asset-intersect-b"] },
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
{
_id: "task-intersect-assets",
workflowRunId: "run-intersect-assets",
workflowVersionId: "workflow-intersect-assets-v1",
nodeId: "intersect-assets-1",
nodeDefinitionId: "intersect-assets",
nodeType: "utility",
executorType: "python",
status: "queued",
attempt: 1,
assetIds: ["asset-intersect-a", "asset-intersect-b"],
upstreamNodeIds: ["source-assets-a", "source-assets-b"],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
{
_id: "task-downstream-validate",
workflowRunId: "run-intersect-assets",
workflowVersionId: "workflow-intersect-assets-v1",
nodeId: "validate-structure",
nodeDefinitionId: "validate-structure",
nodeType: "inspect",
executorType: "python",
status: "pending",
attempt: 1,
assetIds: ["asset-intersect-a", "asset-intersect-b"],
upstreamNodeIds: ["intersect-assets-1"],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
]);
await fixture.runtime.runNextTask();
const intersectTask = await fixture.store.getRunTask("task-intersect-assets");
const queuedValidate = await fixture.store.getRunTask("task-downstream-validate");
assert.equal(intersectTask?.status, "success");
assert.deepEqual(intersectTask?.lastResultPreview?.assetIds, ["asset-intersect-b"]);
assert.match(intersectTask?.stdoutLines?.[0] ?? "", /intersection resolved 1 asset/i);
assert.equal(queuedValidate?.status, "queued");
await fixture.runtime.runNextTask();
const validateTask = await fixture.store.getRunTask("task-downstream-validate");
assert.equal(validateTask?.status, "success");
assert.equal(validateTask?.summary?.assetCount, 1);
assert.deepEqual(validateTask?.lastResultPreview, {
assetCount: 1,
valid: true,
requiredFiles: ["meta.json", "intrinsics.json", "video_meta.json"],
videoFileCount: 1,
});
});
test("worker executes built-in union-assets inside docker when docker is available", async (t) => {
if (!hasDockerRuntime()) {
t.diagnostic("docker runtime unavailable; skipping built-in docker union-assets test");
return;
}
ensureDockerImage("python:3.11-alpine");
const fixture = await createRuntimeFixture("emboflow-worker-docker-union-assets");
t.after(async () => {
await fixture.close();
});
await fixture.db.collection("workflow_runs").insertOne({
_id: "run-docker-union-assets",
workflowDefinitionId: "workflow-docker-union-assets",
workflowVersionId: "workflow-docker-union-assets-v1",
status: "queued",
triggeredBy: "local-user",
assetIds: ["asset-union-a", "asset-union-b"],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.db.collection("run_tasks").insertMany([
{
_id: "task-union-upstream-a",
workflowRunId: "run-docker-union-assets",
workflowVersionId: "workflow-docker-union-assets-v1",
nodeId: "source-assets-a",
nodeDefinitionId: "source-asset",
nodeType: "source",
executorType: "python",
status: "success",
attempt: 1,
assetIds: ["asset-union-a"],
upstreamNodeIds: [],
outputArtifactIds: [],
lastResultPreview: { assetIds: ["asset-union-a"] },
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
{
_id: "task-union-upstream-b",
workflowRunId: "run-docker-union-assets",
workflowVersionId: "workflow-docker-union-assets-v1",
nodeId: "source-assets-b",
nodeDefinitionId: "source-asset",
nodeType: "source",
executorType: "python",
status: "success",
attempt: 1,
assetIds: ["asset-union-b"],
upstreamNodeIds: [],
outputArtifactIds: [],
lastResultPreview: { assetIds: ["asset-union-b"] },
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
{
_id: "task-union-docker",
workflowRunId: "run-docker-union-assets",
workflowVersionId: "workflow-docker-union-assets-v1",
nodeId: "union-assets-1",
nodeDefinitionId: "union-assets",
nodeType: "utility",
executorType: "docker",
executorConfig: {
image: "python:3.11-alpine",
},
status: "queued",
attempt: 1,
assetIds: ["asset-union-a", "asset-union-b"],
upstreamNodeIds: ["source-assets-a", "source-assets-b"],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
]);
await fixture.runtime.runNextTask();
const task = await fixture.store.getRunTask("task-union-docker");
assert.equal(task?.status, "success");
assert.equal(task?.summary?.executorType, "docker");
assert.match(task?.stdoutLines?.[0] ?? "", /union resolved 2 assets/i);
assert.deepEqual(task?.lastResultPreview?.assetIds, ["asset-union-a", "asset-union-b"]);
});

View File

@ -63,6 +63,20 @@ The current V1 editor implementation keeps a mutable local draft that is initial
The current local runtime also persists per-node runtime config under `runtimeGraph.nodeConfigs`. That config includes executor overrides, executor-specific config payloads, optional artifact metadata, and Python code-hook source for supported node categories. When a run is created, the API freezes those node configs into `workflow_runs.runtimeSnapshot` and copies the effective executor choice plus code-hook snapshot onto each `run_task`.
The current built-in delivery node library is now Docker-first by default. Unless a workflow author overrides a node runtime config, these built-ins resolve to `executorType=docker` with a local Python container image and `networkMode=none`:
- `source-asset`
- `extract-archive`
- `rename-folder`
- `validate-structure`
- `validate-metadata`
- `union-assets`
- `intersect-assets`
- `difference-assets`
- `export-delivery-package`
This keeps most default processing isolated from the API and worker host processes while still letting individual workflows opt back into `python` or `http`.
## Node Categories
V1 node categories:
@ -144,6 +158,14 @@ In real container mode the worker:
The default Docker runtime policy is `--network none`. This keeps V1 safer for local processing nodes unless a later phase deliberately opens network access for containerized tasks.
The V1 worker now also carries direct upstream task previews into the execution context. This is what makes multi-input set nodes executable instead of purely visual:
- `union-assets` merges all upstream asset ids
- `intersect-assets` keeps only the shared asset ids
- `difference-assets` subtracts later upstream sets from the first upstream set
When one upstream node produces a narrowed asset set, the worker treats that effective asset set as the execution input for the downstream task and writes it back to the successful `run_task`.
## Data Flow Contract
Tasks should exchange managed references, not loose file paths.

View File

@ -181,9 +181,12 @@ The current V1 authoring rules intentionally keep the graph model constrained so
- export nodes do not emit outbound edges
- duplicate edges are blocked
- self-edges are blocked
- a node may only keep one inbound edge
- ordinary nodes may only keep one inbound edge
- set-operation utility nodes may accept multiple inbound edges
- cycles are blocked
The current built-in node library also exposes Docker-first runtime defaults in the editor. Most built-ins now render with `docker` preselected, while still allowing the user to override the executor, image, and optional Python code hook from the right-side configuration panel.
The runtime header also now exposes a visible `中文 / English` language toggle and the main shell plus workflow authoring surface are translated through a lightweight i18n layer.
The workflow entry surface that leads into this editor is also now template-aware:

View File

@ -347,7 +347,7 @@ The current executable worker path expects `run_tasks` to be self-sufficient eno
- executor choice
- node definition id and frozen per-node runtime config
- bound asset ids
- bound asset ids at run creation time, then the effective asset ids that were actually executed after any upstream set-operation narrowing
- upstream node dependencies
- produced artifact ids
- per-task status and error message

View File

@ -29,6 +29,7 @@
- `2026-03-27`: The current web-authoring pass adds a visible zh/en language switcher, a lightweight i18n layer for the runtime shell, and a real React Flow canvas with persisted node positions and viewport instead of the earlier static node list.
- `2026-03-27`: The follow-up canvas pass adds left-panel drag-and-drop node placement, localized canvas feedback, and V1 connection guards for self-edges, duplicates, cycles, invalid source/export directions, and multiple inbound edges.
- `2026-03-30`: The current product-integration pass promotes projects, datasets, storage connections, and workflow templates into first-class runtime flows. The shell now has a dedicated Projects page, project switching, workflow template gallery, workflow creation from templates, and workflow-level save-as-template support.
- `2026-03-30`: The current docker-defaults pass makes most built-in delivery nodes Docker-first by default, adds `union-assets` / `intersect-assets` / `difference-assets` utility nodes, permits multi-input edges only for those set nodes, and propagates narrowed upstream asset sets through downstream task execution.
---