From 3d5400da6790f02422af3751b16252bd425aa06d Mon Sep 17 00:00:00 2001 From: eust-w Date: Mon, 30 Mar 2026 02:42:00 +0800 Subject: [PATCH] :sparkles: feat: default delivery nodes to docker and add set operations --- README.md | 7 +- .../modules/plugins/builtin/delivery-nodes.ts | 92 ++++++- apps/api/src/runtime/mongo-store.ts | 55 +++- .../api/test/runtime-http.integration.spec.ts | 2 +- .../workflows/components/node-library.tsx | 38 ++- apps/web/src/runtime/app.tsx | 56 +++- apps/web/src/runtime/i18n.tsx | 12 + .../src/runtime/workflow-editor-state.test.ts | 31 +++ apps/web/src/runtime/workflow-editor-state.ts | 12 +- .../worker/src/contracts/execution-context.ts | 9 + apps/worker/src/executors/docker-executor.ts | 207 ++++++++++++++- apps/worker/src/executors/python-executor.ts | 116 ++++++++ apps/worker/src/runtime/mongo-worker-store.ts | 4 + apps/worker/src/runtime/worker-runtime.ts | 64 ++++- apps/worker/test/mongo-worker-runtime.spec.ts | 247 ++++++++++++++++++ .../03-workflows/workflow-execution-model.md | 22 ++ ...nformation-architecture-and-key-screens.md | 5 +- design/05-data/mongodb-data-model.md | 2 +- ...26-03-26-emboflow-v1-foundation-and-mvp.md | 1 + 19 files changed, 948 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index bb7fd86..c1473d4 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ EmboFlow is a B/S embodied-data workflow platform for raw asset ingestion, deliv - Project-scoped workspace shell with a dedicated Projects page and active project selector in the header - Asset workspace that supports local asset registration, probe summaries, storage connection management, and dataset creation - Workflow templates as first-class objects, including default project templates and creating project workflows from a template -- Blank workflow creation and a large React Flow editor with drag-and-drop nodes, free canvas movement, edge validation, node runtime presets, and Python code-hook injection +- Blank workflow creation and a large React Flow editor with drag-and-drop nodes, free canvas movement, edge validation, Docker-first node runtime presets, and Python code-hook injection - Workflow-level `Save As Template` so edited graphs can be promoted into reusable project templates - Mongo-backed run orchestration, worker execution, run history, task detail, logs, stdout/stderr, artifacts, cancel, retry, and task retry - Runtime shell level Chinese and English switching @@ -83,11 +83,12 @@ The Assets workspace now includes first-class storage connections and datasets. The Workflows workspace now includes a template gallery. Projects can start from default or saved templates, or create a blank workflow directly. The workflow editor center panel now uses a real draggable node canvas with zoom, pan, mini-map, dotted background, handle-based edge creation, persisted node positions, and localized validation feedback instead of a static list of node cards. The workflow editor right panel now also supports saving the current workflow draft as a reusable workflow template, in addition to editing per-node runtime settings and Python hooks. -The node library now supports both click-to-append and drag-and-drop placement into the canvas. V1 connection rules block self-edges, duplicate edges, cycles, incoming edges into source nodes, outgoing edges from export nodes, and multiple upstream edges into a single node. +The node library now supports both click-to-append and drag-and-drop placement into the canvas. V1 connection rules block self-edges, duplicate edges, cycles, incoming edges into source nodes, outgoing edges from export nodes, and multiple upstream edges into ordinary nodes, while allowing multi-input set nodes such as `union-assets`, `intersect-assets`, and `difference-assets`. The Runs workspace now shows project-scoped run history, run-level aggregated summaries, cancel/retry controls, and run detail views with persisted task summaries, stdout/stderr sections, result previews, and artifact links into Explore. Selected run tasks now expose the frozen node definition id, executor config snapshot, and code-hook metadata that were captured when the run was created. -When a node uses `executorType=docker` and provides `executorConfig.image`, the worker now runs a real local Docker container with mounted `input.json` / `output.json` exchange files. If no image is configured, the executor falls back to the lightweight simulated behavior used by older demo tasks. +Most built-in delivery nodes now default to `executorType=docker`. When a node uses `executorType=docker` and provides `executorConfig.image`, the worker runs a real local Docker container with mounted `input.json` / `output.json` exchange files plus read-only mounts for bound asset paths. If no image is configured, the executor falls back to the lightweight simulated behavior used by older demo tasks. When a node uses the built-in Python path without a custom hook, `source-asset` now emits bound asset metadata from Mongo-backed asset records and `validate-structure` now performs a real directory validation pass against local source paths. On the current sample path `/Users/longtaowu/workspace/emboldata/data`, that validation reports `valid=false`, `videoFileCount=407`, and missing delivery files because the sample root is a mixed dataset collection rather than a delivery package. +The worker now also carries direct upstream task results into execution context so set-operation utility nodes can compute narrowed asset sets and pass those effective asset ids to downstream tasks. ## Repository Structure diff --git a/apps/api/src/modules/plugins/builtin/delivery-nodes.ts b/apps/api/src/modules/plugins/builtin/delivery-nodes.ts index d58231b..6cdf308 100644 --- a/apps/api/src/modules/plugins/builtin/delivery-nodes.ts +++ b/apps/api/src/modules/plugins/builtin/delivery-nodes.ts @@ -1,38 +1,128 @@ -export const DELIVERY_NODE_DEFINITIONS = [ +import type { ExecutorType, NodeRuntimeConfig } from "../../../../worker/src/contracts/execution-context.ts"; + +export type DeliveryNodeDefinition = { + id: string; + name: string; + category: "Source" | "Transform" | "Inspect" | "Annotate" | "Export" | "Utility"; + description: string; + defaultExecutorType?: ExecutorType; + defaultExecutorConfig?: Record; + supportsCodeHook?: boolean; + allowsMultipleIncoming?: boolean; +}; + +const DEFAULT_DOCKER_EXECUTOR_CONFIG = { + image: "python:3.11-alpine", + networkMode: "none", +} as const; + +function createDockerDefaults(): Pick { + return { + defaultExecutorType: "docker", + defaultExecutorConfig: { ...DEFAULT_DOCKER_EXECUTOR_CONFIG }, + }; +} + +export const DELIVERY_NODE_DEFINITIONS: readonly DeliveryNodeDefinition[] = [ { id: "source-asset", name: "Source Asset", category: "Source", description: "Load the uploaded asset or registered path into the workflow.", + ...createDockerDefaults(), }, { id: "extract-archive", name: "Extract Archive", category: "Transform", description: "Unpack tar, zip, or zst archives for downstream processing.", + ...createDockerDefaults(), + supportsCodeHook: true, }, { id: "rename-folder", name: "Rename Delivery Folder", category: "Transform", description: "Rename the top-level folder to the delivery naming convention.", + ...createDockerDefaults(), + supportsCodeHook: true, }, { id: "validate-structure", name: "Validate Structure", category: "Inspect", description: "Check required directories and metadata files.", + ...createDockerDefaults(), + supportsCodeHook: true, }, { id: "validate-metadata", name: "Validate Metadata", category: "Inspect", description: "Validate meta.json, intrinsics.json, and video_meta.json.", + ...createDockerDefaults(), + supportsCodeHook: true, + }, + { + id: "union-assets", + name: "Union Assets", + category: "Utility", + description: "Merge multiple upstream asset sets into one deduplicated asset set.", + ...createDockerDefaults(), + supportsCodeHook: true, + allowsMultipleIncoming: true, + }, + { + id: "intersect-assets", + name: "Intersect Assets", + category: "Utility", + description: "Keep only the assets that exist in every upstream asset set.", + ...createDockerDefaults(), + supportsCodeHook: true, + allowsMultipleIncoming: true, + }, + { + id: "difference-assets", + name: "Difference Assets", + category: "Utility", + description: "Subtract downstream asset sets from the first upstream asset set.", + ...createDockerDefaults(), + supportsCodeHook: true, + allowsMultipleIncoming: true, }, { id: "export-delivery-package", name: "Export Delivery Package", category: "Export", description: "Publish the normalized package for downstream upload or handoff.", + ...createDockerDefaults(), }, ] as const; + +const DELIVERY_NODE_DEFINITION_BY_ID = new Map( + DELIVERY_NODE_DEFINITIONS.map((definition) => [definition.id, definition]), +); + +export function getDeliveryNodeDefinition(definitionId: string) { + return DELIVERY_NODE_DEFINITION_BY_ID.get(definitionId); +} + +export function buildDefaultNodeRuntimeConfig(definitionId: string): NodeRuntimeConfig | undefined { + const definition = getDeliveryNodeDefinition(definitionId); + if (!definition) { + return undefined; + } + + const config: NodeRuntimeConfig = {}; + if (definition.defaultExecutorType) { + config.executorType = definition.defaultExecutorType; + } + if (definition.defaultExecutorConfig) { + config.executorConfig = { ...definition.defaultExecutorConfig }; + } + return Object.keys(config).length > 0 ? config : undefined; +} + +export function deliveryNodeAllowsMultipleIncoming(definitionId: string) { + return Boolean(getDeliveryNodeDefinition(definitionId)?.allowsMultipleIncoming); +} diff --git a/apps/api/src/runtime/mongo-store.ts b/apps/api/src/runtime/mongo-store.ts index b1f85bc..e515f52 100644 --- a/apps/api/src/runtime/mongo-store.ts +++ b/apps/api/src/runtime/mongo-store.ts @@ -3,7 +3,10 @@ import { randomUUID } from "node:crypto"; import type { Db, Document, WithId } from "mongodb"; import type { AssetType } from "../../../../packages/contracts/src/domain.ts"; -import { DELIVERY_NODE_DEFINITIONS } from "../modules/plugins/builtin/delivery-nodes.ts"; +import { + buildDefaultNodeRuntimeConfig, + DELIVERY_NODE_DEFINITIONS, +} from "../modules/plugins/builtin/delivery-nodes.ts"; import { probeLocalSourcePath } from "./local-source-probe.ts"; import type { CodeHookSpec, @@ -351,6 +354,50 @@ function sanitizeNodeRuntimeConfig(value: unknown, fallbackDefinitionId: string) return config; } +function mergeNodeRuntimeConfigs( + fallbackDefinitionId: string, + defaults: NodeRuntimeConfig | undefined, + explicit: NodeRuntimeConfig | undefined, +): NodeRuntimeConfig | undefined { + if (!defaults && !explicit) { + return undefined; + } + + const definitionId = explicit?.definitionId ?? defaults?.definitionId ?? fallbackDefinitionId; + const executorType = explicit?.executorType ?? defaults?.executorType; + const executorConfig = defaults?.executorConfig || explicit?.executorConfig + ? { + ...(defaults?.executorConfig ?? {}), + ...(explicit?.executorConfig ?? {}), + } + : undefined; + const codeHookSpec = explicit?.codeHookSpec ?? defaults?.codeHookSpec; + const artifactType = explicit?.artifactType ?? defaults?.artifactType; + const artifactTitle = explicit?.artifactTitle ?? defaults?.artifactTitle; + + const config: NodeRuntimeConfig = {}; + if (definitionId !== fallbackDefinitionId) { + config.definitionId = definitionId; + } + if (executorType) { + config.executorType = executorType; + } + if (executorConfig && Object.keys(executorConfig).length > 0) { + config.executorConfig = executorConfig; + } + if (codeHookSpec) { + config.codeHookSpec = codeHookSpec; + } + if (artifactType) { + config.artifactType = artifactType; + } + if (artifactTitle) { + config.artifactTitle = artifactTitle; + } + + return Object.keys(config).length > 0 ? config : undefined; +} + function buildRuntimeSnapshot( runtimeGraph: Record, logicGraph: WorkflowDefinitionVersionDocument["logicGraph"], @@ -363,7 +410,11 @@ function buildRuntimeSnapshot( for (const node of logicGraph.nodes) { const definitionId = graph.nodeBindings?.[node.id] ?? inferDefinitionId(node.id); nodeBindings[node.id] = definitionId; - const config = sanitizeNodeRuntimeConfig(graph.nodeConfigs?.[node.id], definitionId); + const config = mergeNodeRuntimeConfigs( + definitionId, + buildDefaultNodeRuntimeConfig(definitionId), + sanitizeNodeRuntimeConfig(graph.nodeConfigs?.[node.id], definitionId), + ); if (config) { nodeConfigs[node.id] = config; } diff --git a/apps/api/test/runtime-http.integration.spec.ts b/apps/api/test/runtime-http.integration.spec.ts index b202cb3..1a39380 100644 --- a/apps/api/test/runtime-http.integration.spec.ts +++ b/apps/api/test/runtime-http.integration.spec.ts @@ -286,7 +286,7 @@ test("mongo-backed runtime persists probed assets and workflow runs through the assert.deepEqual((run as { assetIds?: string[] }).assetIds, [asset._id]); assert.equal(tasks.length, 3); assert.equal(tasks[0]?.nodeId, "source-asset"); - assert.equal(tasks[0]?.executorType, "python"); + assert.equal(tasks[0]?.executorType, "docker"); assert.deepEqual(tasks[0]?.assetIds, [asset._id]); assert.deepEqual(tasks[0]?.upstreamNodeIds, []); assert.equal(tasks[0]?.status, "queued"); diff --git a/apps/web/src/features/workflows/components/node-library.tsx b/apps/web/src/features/workflows/components/node-library.tsx index f6e45e9..99f390b 100644 --- a/apps/web/src/features/workflows/components/node-library.tsx +++ b/apps/web/src/features/workflows/components/node-library.tsx @@ -15,7 +15,7 @@ export const WORKFLOW_NODE_DEFINITIONS: WorkflowNodeDefinition[] = [ name: "Source Asset", category: "Source", description: "Load an uploaded asset or registered storage path.", - executorType: "python", + executorType: "docker", inputSchemaSummary: "assetRef", outputSchemaSummary: "assetRef", }, @@ -34,7 +34,7 @@ export const WORKFLOW_NODE_DEFINITIONS: WorkflowNodeDefinition[] = [ name: "Rename Delivery Folder", category: "Transform", description: "Rename the top-level delivery folder to the business naming convention.", - executorType: "python", + executorType: "docker", inputSchemaSummary: "artifactRef", outputSchemaSummary: "artifactRef", supportsCodeHook: true, @@ -44,17 +44,47 @@ export const WORKFLOW_NODE_DEFINITIONS: WorkflowNodeDefinition[] = [ name: "Validate Structure", category: "Inspect", description: "Validate required directories and metadata files.", - executorType: "python", + executorType: "docker", inputSchemaSummary: "artifactRef", outputSchemaSummary: "artifactRef + report", supportsCodeHook: true, }, + { + id: "union-assets", + name: "Union Assets", + category: "Utility", + description: "Merge multiple upstream asset sets into one deduplicated asset set.", + executorType: "docker", + inputSchemaSummary: "assetSet + assetSet", + outputSchemaSummary: "assetSet", + supportsCodeHook: true, + }, + { + id: "intersect-assets", + name: "Intersect Assets", + category: "Utility", + description: "Keep only the assets shared by every upstream asset set.", + executorType: "docker", + inputSchemaSummary: "assetSet + assetSet", + outputSchemaSummary: "assetSet", + supportsCodeHook: true, + }, + { + id: "difference-assets", + name: "Difference Assets", + category: "Utility", + description: "Subtract downstream asset sets from the first upstream asset set.", + executorType: "docker", + inputSchemaSummary: "assetSet + assetSet", + outputSchemaSummary: "assetSet", + supportsCodeHook: true, + }, { id: "export-delivery-package", name: "Export Delivery Package", category: "Export", description: "Produce the final delivery package artifact for upload.", - executorType: "http", + executorType: "docker", inputSchemaSummary: "artifactRef", outputSchemaSummary: "artifactRef", }, diff --git a/apps/web/src/runtime/app.tsx b/apps/web/src/runtime/app.tsx index 8307cd0..e956b78 100644 --- a/apps/web/src/runtime/app.tsx +++ b/apps/web/src/runtime/app.tsx @@ -170,6 +170,34 @@ function formatExecutorConfigLabel(config?: Record) { return JSON.stringify(config); } +function getDefaultExecutorType(definition?: { defaultExecutorType?: "python" | "docker" | "http" } | null) { + return definition?.defaultExecutorType ?? "python"; +} + +function getDefaultExecutorConfig(definition?: { defaultExecutorConfig?: Record } | null) { + return definition?.defaultExecutorConfig ? { ...definition.defaultExecutorConfig } : undefined; +} + +function getEffectiveNodeRuntimeConfig( + definition: { id: string; defaultExecutorType?: "python" | "docker" | "http"; defaultExecutorConfig?: Record } | null, + runtimeConfig: WorkflowNodeRuntimeConfig | undefined, +): WorkflowNodeRuntimeConfig { + const executorType = runtimeConfig?.executorType ?? getDefaultExecutorType(definition); + const executorConfig = { + ...(getDefaultExecutorConfig(definition) ?? {}), + ...(runtimeConfig?.executorConfig ?? {}), + }; + + return { + definitionId: runtimeConfig?.definitionId ?? definition?.id, + executorType, + executorConfig: Object.keys(executorConfig).length > 0 ? executorConfig : undefined, + codeHookSpec: runtimeConfig?.codeHookSpec, + artifactType: runtimeConfig?.artifactType, + artifactTitle: runtimeConfig?.artifactTitle, + }; +} + function usePathname() { const [pathname, setPathname] = useState( typeof window === "undefined" ? "/projects" : window.location.pathname || "/projects", @@ -1027,6 +1055,10 @@ function WorkflowEditorPage(props: { () => getNodeRuntimeConfig(draft, selectedNodeId), [draft, selectedNodeId], ); + const selectedNodeEffectiveRuntimeConfig = useMemo( + () => getEffectiveNodeRuntimeConfig(selectedNodeRaw, selectedNodeRuntimeConfig), + [selectedNodeRaw, selectedNodeRuntimeConfig], + ); const canvasNodes = useMemo>( () => draft.logicGraph.nodes.map((node) => { @@ -1175,10 +1207,10 @@ function WorkflowEditorPage(props: { if (!selectedNodeId) { return; } - const currentConfig = { - definitionId: resolveDefinitionIdForNode(draft, selectedNodeId), - ...(getNodeRuntimeConfig(draft, selectedNodeId) ?? {}), - }; + const currentConfig = getEffectiveNodeRuntimeConfig( + selectedNodeRaw, + getNodeRuntimeConfig(draft, selectedNodeId), + ); const resolved = typeof nextConfig === "function" ? nextConfig(currentConfig) : nextConfig; setDraft(setNodeRuntimeConfig(draft, selectedNodeId, resolved)); setDirty(true); @@ -1348,7 +1380,7 @@ function WorkflowEditorPage(props: {