diff --git a/README.md b/README.md index 3617274..3ab8ce3 100644 --- a/README.md +++ b/README.md @@ -89,7 +89,7 @@ The node library now supports both click-to-append and drag-and-drop placement i The Runs workspace now shows project-scoped run history, run-level aggregated summaries, cancel/retry controls, and run detail views with persisted task summaries, stdout/stderr sections, result previews, and artifact links into Explore. Selected run tasks now expose the frozen node definition id, executor config snapshot, and code-hook metadata that were captured when the run was created. Most built-in delivery nodes now default to `executorType=docker`. When a node uses `executorType=docker` and provides `executorConfig.image`, the worker runs a real local Docker container with mounted `input.json` / `output.json` exchange files plus read-only mounts for bound asset paths. If no image is configured, the executor falls back to the lightweight simulated behavior used by older demo tasks. -Custom Docker nodes follow the same runtime contract. The container reads the task snapshot and execution context from `EMBOFLOW_INPUT_PATH`, writes `{\"result\": ...}` JSON to `EMBOFLOW_OUTPUT_PATH`, and if it declares an asset-set output contract it must return `result.assetIds` as a string array. Dockerfile-based custom nodes are built locally on first execution and then reused by tag. +Custom Docker nodes follow the same runtime contract. The container reads the task snapshot and execution context from `EMBOFLOW_INPUT_PATH`, writes `{\"result\": ...}` JSON to `EMBOFLOW_OUTPUT_PATH`, and if it declares an asset-set output contract it must return `result.assetIds` as a string array. Dockerfile-based custom nodes are built locally on first execution and then reused by tag. The Nodes page and API now share the same validation rules, including required names, valid source kinds, a mandatory `FROM` instruction for Dockerfiles, and rejection of `Source` category nodes that incorrectly declare `inputMode=multi_asset_set`. When a node uses the built-in Python path without a custom hook, `source-asset` now emits bound asset metadata from Mongo-backed asset records and `validate-structure` now performs a real directory validation pass against local source paths. On the current sample path `/Users/longtaowu/workspace/emboldata/data`, that validation reports `valid=false`, `videoFileCount=407`, and missing delivery files because the sample root is a mixed dataset collection rather than a delivery package. The worker now also carries direct upstream task results into execution context so set-operation utility nodes can compute narrowed asset sets and pass those effective asset ids to downstream tasks. diff --git a/apps/api/src/runtime/mongo-store.ts b/apps/api/src/runtime/mongo-store.ts index d40ac5a..cf08b50 100644 --- a/apps/api/src/runtime/mongo-store.ts +++ b/apps/api/src/runtime/mongo-store.ts @@ -3,6 +3,14 @@ import { randomUUID } from "node:crypto"; import type { Db, Document, WithId } from "mongodb"; import type { AssetType } from "../../../../packages/contracts/src/domain.ts"; +import { + formatCustomNodeValidationIssue, + validateCustomNodeDefinition, + type CustomNodeArtifactType, + type CustomNodeCategory, + type CustomNodeInputMode, + type CustomNodeOutputMode, +} from "../../../../packages/contracts/src/custom-node.ts"; import { buildDefaultNodeRuntimeConfig, DELIVERY_NODE_DEFINITIONS, @@ -88,15 +96,11 @@ type StorageConnectionDocument = Timestamped & { createdBy: string; }; -type CustomNodeCategory = "Source" | "Transform" | "Inspect" | "Annotate" | "Export" | "Utility"; -type CustomNodeInputMode = "single_asset_set" | "multi_asset_set"; -type CustomNodeOutputMode = "report" | "asset_set" | "asset_set_with_report"; - type CustomNodeContractDocument = { version: "emboflow.node.v1"; inputMode: CustomNodeInputMode; outputMode: CustomNodeOutputMode; - artifactType: "json" | "directory" | "video"; + artifactType: CustomNodeArtifactType; }; type CustomNodeSourceDocument = @@ -865,7 +869,18 @@ export class MongoAppStore { contract: unknown; createdBy: string; }) { - const baseSlug = slugify(input.name); + const validationIssues = validateCustomNodeDefinition({ + name: input.name, + category: input.category, + source: input.source, + contract: input.contract, + }); + if (validationIssues.length > 0) { + throw new Error(formatCustomNodeValidationIssue(validationIssues[0])); + } + + const normalizedName = input.name.trim(); + const baseSlug = slugify(normalizedName); const collection = this.db.collection("custom_nodes"); let slug = baseSlug; let definitionId = `custom-${slug}`; @@ -882,7 +897,7 @@ export class MongoAppStore { definitionId, workspaceId: input.workspaceId, projectId: input.projectId, - name: input.name, + name: normalizedName, slug, description: input.description ?? "", category: sanitizeCustomNodeCategory(input.category), diff --git a/apps/api/test/domain-contracts.spec.ts b/apps/api/test/domain-contracts.spec.ts index 07522dd..92fa8f2 100644 --- a/apps/api/test/domain-contracts.spec.ts +++ b/apps/api/test/domain-contracts.spec.ts @@ -6,6 +6,10 @@ import { WORKFLOW_RUN_STATUSES, WORKSPACE_TYPES, } from "../../../packages/contracts/src/domain.ts"; +import { + formatCustomNodeValidationIssue, + validateCustomNodeDefinition, +} from "../../../packages/contracts/src/custom-node.ts"; import { createMongoConnectionUri } from "../src/common/mongo/mongo.module.ts"; import { ASSET_COLLECTION_NAME, @@ -50,3 +54,44 @@ test("schema collection names match the core domain objects", () => { assert.equal(ASSET_COLLECTION_NAME, "assets"); assert.equal(WORKFLOW_DEFINITION_COLLECTION_NAME, "workflow_definitions"); }); + +test("custom node validation accepts a valid docker image utility node", () => { + const issues = validateCustomNodeDefinition({ + name: "Merge Assets", + category: "Utility", + source: { + kind: "image", + image: "python:3.11-alpine", + command: ["python3", "-c", "print('merge')"], + }, + contract: { + inputMode: "multi_asset_set", + outputMode: "asset_set", + artifactType: "json", + }, + }); + + assert.deepEqual(issues, []); +}); + +test("custom node validation rejects invalid dockerfile and impossible source contract combinations", () => { + const issues = validateCustomNodeDefinition({ + name: "Bad Source", + category: "Source", + source: { + kind: "dockerfile", + dockerfileContent: "CMD [\"python3\"]", + }, + contract: { + inputMode: "multi_asset_set", + outputMode: "report", + artifactType: "json", + }, + }); + + assert.deepEqual(issues, ["source_cannot_be_multi_input", "dockerfile_missing_from"]); + assert.equal( + formatCustomNodeValidationIssue("dockerfile_missing_from"), + "custom node dockerfile must include a FROM instruction", + ); +}); diff --git a/apps/api/test/runtime-http.integration.spec.ts b/apps/api/test/runtime-http.integration.spec.ts index 312d198..93ec8d4 100644 --- a/apps/api/test/runtime-http.integration.spec.ts +++ b/apps/api/test/runtime-http.integration.spec.ts @@ -1447,3 +1447,58 @@ test("mongo-backed runtime manages custom docker nodes and exposes them as proje "asset_set", ); }); + +test("mongo-backed runtime rejects invalid custom node definitions with a 400 error", async (t) => { + const { MongoMemoryServer } = await import("mongodb-memory-server"); + const mongod = await MongoMemoryServer.create(); + t.after(async () => { + await mongod.stop(); + }); + + const server = await startRuntimeServer({ + host: "127.0.0.1", + port: 0, + mongoUri: mongod.getUri(), + database: "emboflow-runtime-custom-node-validation", + corsOrigin: "http://127.0.0.1:3000", + }); + t.after(async () => { + await server.close(); + }); + + const bootstrap = await readJson<{ + workspace: { _id: string }; + project: { _id: string }; + }>( + await fetch(`${server.baseUrl}/api/dev/bootstrap`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ userId: "custom-node-validation-user", projectName: "Validation Project" }), + }), + ); + + const response = await fetch(`${server.baseUrl}/api/custom-nodes`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + workspaceId: bootstrap.workspace._id, + projectId: bootstrap.project._id, + name: "Broken Source", + category: "Source", + source: { + kind: "dockerfile", + dockerfileContent: "CMD [\"python3\"]", + }, + contract: { + inputMode: "multi_asset_set", + outputMode: "report", + artifactType: "json", + }, + createdBy: "custom-node-validation-user", + }), + }); + + assert.equal(response.status, 400); + const payload = (await response.json()) as { message: string }; + assert.equal(payload.message, "source category custom nodes cannot declare multi_asset_set input"); +}); diff --git a/apps/web/src/runtime/app.tsx b/apps/web/src/runtime/app.tsx index 804233e..9eab12c 100644 --- a/apps/web/src/runtime/app.tsx +++ b/apps/web/src/runtime/app.tsx @@ -14,6 +14,10 @@ import { } from "@xyflow/react"; import { ApiClient } from "./api-client.ts"; +import { + type CustomNodeValidationIssue, + validateCustomNodeDefinition, +} from "../../../../packages/contracts/src/custom-node.ts"; import { localizeNodeDefinition, type TranslationKey, @@ -98,6 +102,35 @@ function mapConnectionValidationReasonToKey( } } +function mapCustomNodeValidationIssueToKey(issue: CustomNodeValidationIssue): TranslationKey { + switch (issue) { + case "name_required": + return "customNodeValidationNameRequired"; + case "name_too_long": + return "customNodeValidationNameTooLong"; + case "invalid_category": + return "customNodeValidationInvalidCategory"; + case "invalid_source_kind": + return "customNodeValidationInvalidSourceKind"; + case "image_required": + return "customNodeValidationImageRequired"; + case "dockerfile_required": + return "customNodeValidationDockerfileRequired"; + case "dockerfile_missing_from": + return "customNodeValidationDockerfileMissingFrom"; + case "invalid_command": + return "customNodeValidationInvalidCommand"; + case "invalid_input_mode": + return "customNodeValidationInvalidInputMode"; + case "invalid_output_mode": + return "customNodeValidationInvalidOutputMode"; + case "invalid_artifact_type": + return "customNodeValidationInvalidArtifactType"; + case "source_cannot_be_multi_input": + return "customNodeValidationSourceCannotBeMultiInput"; + } +} + type NavItem = "Projects" | "Assets" | "Nodes" | "Workflows" | "Runs" | "Explore" | "Labels" | "Admin"; type BootstrapContext = { @@ -780,6 +813,31 @@ function NodesPage(props: { const [outputMode, setOutputMode] = useState<"report" | "asset_set" | "asset_set_with_report">("report"); const [artifactType, setArtifactType] = useState<"json" | "directory" | "video">("json"); const [error, setError] = useState(null); + const customNodeValidationIssues = useMemo( + () => + validateCustomNodeDefinition({ + name, + category, + source: + sourceKind === "image" + ? { + kind: "image", + image, + command: parseCommandLines(commandText), + } + : { + kind: "dockerfile", + dockerfileContent, + command: parseCommandLines(commandText), + }, + contract: { + inputMode, + outputMode, + artifactType, + }, + }), + [artifactType, category, commandText, dockerfileContent, image, inputMode, name, outputMode, sourceKind], + ); const loadCustomNodes = useCallback(async () => { try { @@ -890,9 +948,20 @@ function NodesPage(props: { )}
+ {customNodeValidationIssues.length > 0 ? ( +
+ {customNodeValidationIssues.map((issue) => ( +

{t(mapCustomNodeValidationIssueToKey(issue))}

+ ))} +
+ ) : null} diff --git a/apps/web/src/runtime/i18n.tsx b/apps/web/src/runtime/i18n.tsx index f89cbd4..8a6edcd 100644 --- a/apps/web/src/runtime/i18n.tsx +++ b/apps/web/src/runtime/i18n.tsx @@ -60,6 +60,18 @@ export type TranslationKey = | "customNodeReport" | "customNodeAssetSet" | "customNodeAssetSetWithReport" + | "customNodeValidationNameRequired" + | "customNodeValidationNameTooLong" + | "customNodeValidationInvalidCategory" + | "customNodeValidationInvalidSourceKind" + | "customNodeValidationImageRequired" + | "customNodeValidationDockerfileRequired" + | "customNodeValidationDockerfileMissingFrom" + | "customNodeValidationInvalidCommand" + | "customNodeValidationInvalidInputMode" + | "customNodeValidationInvalidOutputMode" + | "customNodeValidationInvalidArtifactType" + | "customNodeValidationSourceCannotBeMultiInput" | "datasetsTitle" | "datasetsDescription" | "datasetName" @@ -275,6 +287,19 @@ const TRANSLATIONS: Record> = { customNodeReport: "Report only", customNodeAssetSet: "Asset set", customNodeAssetSetWithReport: "Asset set with report", + customNodeValidationNameRequired: "Node name is required.", + customNodeValidationNameTooLong: "Node name must be 80 characters or fewer.", + customNodeValidationInvalidCategory: "Node category is invalid.", + customNodeValidationInvalidSourceKind: "Container source must be Docker image or Dockerfile.", + customNodeValidationImageRequired: "Docker image is required.", + customNodeValidationDockerfileRequired: "Dockerfile content is required.", + customNodeValidationDockerfileMissingFrom: "Dockerfile must include a FROM instruction.", + customNodeValidationInvalidCommand: "Command must contain non-empty arguments only.", + customNodeValidationInvalidInputMode: "Input contract is invalid.", + customNodeValidationInvalidOutputMode: "Output contract is invalid.", + customNodeValidationInvalidArtifactType: "Artifact type is invalid.", + customNodeValidationSourceCannotBeMultiInput: + "Source category nodes cannot declare multiple upstream asset sets.", createStorageConnection: "Create Storage Connection", storageProvider: "Storage Provider", bucket: "Bucket", @@ -496,6 +521,18 @@ const TRANSLATIONS: Record> = { customNodeReport: "仅报告", customNodeAssetSet: "资产集", customNodeAssetSetWithReport: "资产集加报告", + customNodeValidationNameRequired: "必须填写节点名称。", + customNodeValidationNameTooLong: "节点名称长度不能超过 80 个字符。", + customNodeValidationInvalidCategory: "节点分类无效。", + customNodeValidationInvalidSourceKind: "容器来源必须是 Docker 镜像或 Dockerfile。", + customNodeValidationImageRequired: "必须填写 Docker 镜像。", + customNodeValidationDockerfileRequired: "必须填写 Dockerfile 内容。", + customNodeValidationDockerfileMissingFrom: "Dockerfile 必须包含 FROM 指令。", + customNodeValidationInvalidCommand: "启动命令只能包含非空参数。", + customNodeValidationInvalidInputMode: "输入契约无效。", + customNodeValidationInvalidOutputMode: "输出契约无效。", + customNodeValidationInvalidArtifactType: "产物类型无效。", + customNodeValidationSourceCannotBeMultiInput: "Source 分类节点不能声明多资产集输入。", createStorageConnection: "创建存储连接", storageProvider: "存储提供方", bucket: "Bucket", diff --git a/design/03-workflows/workflow-execution-model.md b/design/03-workflows/workflow-execution-model.md index fba3453..3e69373 100644 --- a/design/03-workflows/workflow-execution-model.md +++ b/design/03-workflows/workflow-execution-model.md @@ -176,6 +176,8 @@ If the custom node declares an `asset_set` style output contract, `result.assetI If the custom node declares `contract.inputMode = "multi_asset_set"`, the canvas should treat that node as multi-input at authoring time instead of forcing the user through single-input validation rules. The graph validator should derive this capability from the seeded runtime contract, not from a hardcoded node id list alone. +The current V1 validation boundary now rejects structurally invalid custom nodes before they enter the project registry. This includes missing names, unsupported source kinds, Dockerfiles without a `FROM` instruction, and `Source` category nodes that incorrectly declare `multi_asset_set` input. + The current V1 worker executes trusted-local Python hooks when a `run_task` carries a `codeHookSpec`. The hook is executed through a constrained Python harness with the task snapshot and execution context passed in as JSON. Hook stdout is captured into `stdoutLines`, hook failures populate `stderrLines`, and the returned object becomes the task artifact payload. The current V1 Docker executor now has two modes: diff --git a/design/04-ui-ux/information-architecture-and-key-screens.md b/design/04-ui-ux/information-architecture-and-key-screens.md index 0922347..671746e 100644 --- a/design/04-ui-ux/information-architecture-and-key-screens.md +++ b/design/04-ui-ux/information-architecture-and-key-screens.md @@ -214,6 +214,8 @@ Core regions: - left/top: creation form - bottom/right: existing custom node list for the active project +The creation form should validate node definitions before submit, not only after an API round-trip. In the current V1 direction the UI and API share the same rules for required names, valid image or Dockerfile sources, mandatory `FROM` instructions in Dockerfiles, and invalid category-contract combinations such as `Source + multi_asset_set`. + The current V1 direction treats custom nodes as project-scoped runtime extensions, not global plugins. That keeps tenancy and lifecycle simpler while still giving teams a controlled way to bring containerized processing into the canvas. ### Right Configuration Panel diff --git a/packages/contracts/src/custom-node.ts b/packages/contracts/src/custom-node.ts new file mode 100644 index 0000000..caf256c --- /dev/null +++ b/packages/contracts/src/custom-node.ts @@ -0,0 +1,168 @@ +export const CUSTOM_NODE_CATEGORIES = [ + "Source", + "Transform", + "Inspect", + "Annotate", + "Export", + "Utility", +] as const; +export type CustomNodeCategory = (typeof CUSTOM_NODE_CATEGORIES)[number]; + +export const CUSTOM_NODE_INPUT_MODES = ["single_asset_set", "multi_asset_set"] as const; +export type CustomNodeInputMode = (typeof CUSTOM_NODE_INPUT_MODES)[number]; + +export const CUSTOM_NODE_OUTPUT_MODES = [ + "report", + "asset_set", + "asset_set_with_report", +] as const; +export type CustomNodeOutputMode = (typeof CUSTOM_NODE_OUTPUT_MODES)[number]; + +export const CUSTOM_NODE_ARTIFACT_TYPES = ["json", "directory", "video"] as const; +export type CustomNodeArtifactType = (typeof CUSTOM_NODE_ARTIFACT_TYPES)[number]; + +export type CustomNodeContract = { + inputMode: CustomNodeInputMode; + outputMode: CustomNodeOutputMode; + artifactType: CustomNodeArtifactType; +}; + +export type CustomNodeSource = + | { + kind: "image"; + image: string; + command?: string[]; + } + | { + kind: "dockerfile"; + dockerfileContent: string; + imageTag?: string; + command?: string[]; + }; + +export type CustomNodeValidationIssue = + | "name_required" + | "name_too_long" + | "invalid_category" + | "invalid_source_kind" + | "image_required" + | "dockerfile_required" + | "dockerfile_missing_from" + | "invalid_command" + | "invalid_input_mode" + | "invalid_output_mode" + | "invalid_artifact_type" + | "source_cannot_be_multi_input"; + +type CustomNodeValidationInput = { + name?: unknown; + category?: unknown; + source?: unknown; + contract?: unknown; +}; + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null; +} + +function includesValue(values: T, candidate: unknown): candidate is T[number] { + return typeof candidate === "string" && values.includes(candidate); +} + +function isCommandArray(value: unknown) { + return Array.isArray(value) && value.every((item) => typeof item === "string" && item.trim().length > 0); +} + +function hasDockerfileFromInstruction(value: string) { + return /^\s*FROM\s+/imu.test(value); +} + +export function formatCustomNodeValidationIssue(issue: CustomNodeValidationIssue) { + switch (issue) { + case "name_required": + return "custom node name is required"; + case "name_too_long": + return "custom node name must be 80 characters or fewer"; + case "invalid_category": + return "custom node category is invalid"; + case "invalid_source_kind": + return "custom node source kind must be image or dockerfile"; + case "image_required": + return "custom node image is required"; + case "dockerfile_required": + return "custom node dockerfileContent is required"; + case "dockerfile_missing_from": + return "custom node dockerfile must include a FROM instruction"; + case "invalid_command": + return "custom node command must be an array of non-empty arguments"; + case "invalid_input_mode": + return "custom node input mode is invalid"; + case "invalid_output_mode": + return "custom node output mode is invalid"; + case "invalid_artifact_type": + return "custom node artifact type is invalid"; + case "source_cannot_be_multi_input": + return "source category custom nodes cannot declare multi_asset_set input"; + } +} + +export function validateCustomNodeDefinition(input: CustomNodeValidationInput): CustomNodeValidationIssue[] { + const issues: CustomNodeValidationIssue[] = []; + const name = typeof input.name === "string" ? input.name.trim() : ""; + + if (name.length === 0) { + issues.push("name_required"); + } else if (name.length > 80) { + issues.push("name_too_long"); + } + + if (!includesValue(CUSTOM_NODE_CATEGORIES, input.category)) { + issues.push("invalid_category"); + } + + const contract = isRecord(input.contract) ? input.contract : null; + const inputMode = contract?.inputMode; + const outputMode = contract?.outputMode; + const artifactType = contract?.artifactType; + + if (!includesValue(CUSTOM_NODE_INPUT_MODES, inputMode)) { + issues.push("invalid_input_mode"); + } + if (!includesValue(CUSTOM_NODE_OUTPUT_MODES, outputMode)) { + issues.push("invalid_output_mode"); + } + if (!includesValue(CUSTOM_NODE_ARTIFACT_TYPES, artifactType)) { + issues.push("invalid_artifact_type"); + } + + if (input.category === "Source" && inputMode === "multi_asset_set") { + issues.push("source_cannot_be_multi_input"); + } + + const source = isRecord(input.source) ? input.source : null; + if (!source || (source.kind !== "image" && source.kind !== "dockerfile")) { + issues.push("invalid_source_kind"); + return issues; + } + + if (source.command !== undefined && !isCommandArray(source.command)) { + issues.push("invalid_command"); + } + + if (source.kind === "image") { + if (typeof source.image !== "string" || source.image.trim().length === 0) { + issues.push("image_required"); + } + return issues; + } + + const dockerfileContent = + typeof source.dockerfileContent === "string" ? source.dockerfileContent.trim() : ""; + if (dockerfileContent.length === 0) { + issues.push("dockerfile_required"); + } else if (!hasDockerfileFromInstruction(dockerfileContent)) { + issues.push("dockerfile_missing_from"); + } + + return issues; +}