feat: add custom docker node registry tab

This commit is contained in:
eust-w 2026-03-30 03:02:44 +08:00
parent 3d5400da67
commit 0bdef113e7
16 changed files with 970 additions and 41 deletions

View File

@ -6,6 +6,7 @@ EmboFlow is a B/S embodied-data workflow platform for raw asset ingestion, deliv
- Project-scoped workspace shell with a dedicated Projects page and active project selector in the header
- Asset workspace that supports local asset registration, probe summaries, storage connection management, and dataset creation
- Project-scoped custom node registry with Docker image and Dockerfile based node definitions
- Workflow templates as first-class objects, including default project templates and creating project workflows from a template
- Blank workflow creation and a large React Flow editor with drag-and-drop nodes, free canvas movement, edge validation, Docker-first node runtime presets, and Python code-hook injection
- Workflow-level `Save As Template` so edited graphs can be promoted into reusable project templates
@ -80,6 +81,7 @@ The editor now also persists per-node runtime config in workflow versions, inclu
The runtime web shell now exposes a visible `中文 / English` language toggle. The core workspace shell and workflow authoring surface are translated through a lightweight i18n layer.
The shell now also exposes a dedicated Projects page plus an active project selector, so assets, datasets, workflow templates, workflows, and runs all switch together at the project boundary.
The Assets workspace now includes first-class storage connections and datasets. A dataset is distinct from a raw asset and binds project source assets to a selected local or object-storage-backed destination.
The shell now also exposes a dedicated Nodes page for project-scoped custom container nodes. Custom nodes can be registered from an existing Docker image or a self-contained Dockerfile, and each node declares whether it consumes a single asset set or multiple upstream asset sets plus what kind of output it produces.
The Workflows workspace now includes a template gallery. Projects can start from default or saved templates, or create a blank workflow directly.
The workflow editor center panel now uses a real draggable node canvas with zoom, pan, mini-map, dotted background, handle-based edge creation, persisted node positions, and localized validation feedback instead of a static list of node cards.
The workflow editor right panel now also supports saving the current workflow draft as a reusable workflow template, in addition to editing per-node runtime settings and Python hooks.
@ -87,6 +89,7 @@ The node library now supports both click-to-append and drag-and-drop placement i
The Runs workspace now shows project-scoped run history, run-level aggregated summaries, cancel/retry controls, and run detail views with persisted task summaries, stdout/stderr sections, result previews, and artifact links into Explore.
Selected run tasks now expose the frozen node definition id, executor config snapshot, and code-hook metadata that were captured when the run was created.
Most built-in delivery nodes now default to `executorType=docker`. When a node uses `executorType=docker` and provides `executorConfig.image`, the worker runs a real local Docker container with mounted `input.json` / `output.json` exchange files plus read-only mounts for bound asset paths. If no image is configured, the executor falls back to the lightweight simulated behavior used by older demo tasks.
Custom Docker nodes follow the same runtime contract. The container reads the task snapshot and execution context from `EMBOFLOW_INPUT_PATH`, writes `{\"result\": ...}` JSON to `EMBOFLOW_OUTPUT_PATH`, and if it declares an asset-set output contract it must return `result.assetIds` as a string array. Dockerfile-based custom nodes are built locally on first execution and then reused by tag.
When a node uses the built-in Python path without a custom hook, `source-asset` now emits bound asset metadata from Mongo-backed asset records and `validate-structure` now performs a real directory validation pass against local source paths. On the current sample path `/Users/longtaowu/workspace/emboldata/data`, that validation reports `valid=false`, `videoFileCount=407`, and missing delivery files because the sample root is a mixed dataset collection rather than a delivery package.
The worker now also carries direct upstream task results into execution context so set-operation utility nodes can compute narrowed asset sets and pass those effective asset ids to downstream tasks.

View File

@ -88,6 +88,45 @@ type StorageConnectionDocument = Timestamped & {
createdBy: string;
};
type CustomNodeCategory = "Source" | "Transform" | "Inspect" | "Annotate" | "Export" | "Utility";
type CustomNodeInputMode = "single_asset_set" | "multi_asset_set";
type CustomNodeOutputMode = "report" | "asset_set" | "asset_set_with_report";
type CustomNodeContractDocument = {
version: "emboflow.node.v1";
inputMode: CustomNodeInputMode;
outputMode: CustomNodeOutputMode;
artifactType: "json" | "directory" | "video";
};
type CustomNodeSourceDocument =
| {
kind: "image";
image: string;
command?: string[];
}
| {
kind: "dockerfile";
imageTag: string;
dockerfileContent: string;
command?: string[];
};
type CustomNodeDocument = Timestamped & {
_id: string;
definitionId: string;
workspaceId: string;
projectId: string;
name: string;
slug: string;
description: string;
category: CustomNodeCategory;
status: "active";
contract: CustomNodeContractDocument;
source: CustomNodeSourceDocument;
createdBy: string;
};
type DatasetDocument = Timestamped & {
_id: string;
workspaceId: string;
@ -402,6 +441,7 @@ function buildRuntimeSnapshot(
runtimeGraph: Record<string, unknown>,
logicGraph: WorkflowDefinitionVersionDocument["logicGraph"],
pluginRefs: string[],
resolveDefaults: (definitionId: string) => NodeRuntimeConfig | undefined = buildDefaultNodeRuntimeConfig,
): RunRuntimeSnapshot {
const graph = runtimeGraph as WorkflowRuntimeGraph;
const nodeBindings: Record<string, string> = {};
@ -412,7 +452,7 @@ function buildRuntimeSnapshot(
nodeBindings[node.id] = definitionId;
const config = mergeNodeRuntimeConfigs(
definitionId,
buildDefaultNodeRuntimeConfig(definitionId),
resolveDefaults(definitionId),
sanitizeNodeRuntimeConfig(graph.nodeConfigs?.[node.id], definitionId),
);
if (config) {
@ -428,6 +468,120 @@ function buildRuntimeSnapshot(
};
}
function sanitizeStringArray(value: unknown) {
return Array.isArray(value)
? value.filter((item): item is string => typeof item === "string" && item.trim().length > 0)
: undefined;
}
function sanitizeCustomNodeCategory(value: unknown): CustomNodeCategory {
return value === "Source" || value === "Transform" || value === "Inspect" || value === "Annotate" ||
value === "Export" || value === "Utility"
? value
: "Utility";
}
function sanitizeCustomNodeContract(value: unknown): CustomNodeContractDocument {
const input = isRecord(value) ? value : {};
const inputMode = input.inputMode === "multi_asset_set" ? "multi_asset_set" : "single_asset_set";
const outputMode =
input.outputMode === "asset_set" || input.outputMode === "asset_set_with_report"
? input.outputMode
: "report";
const artifactType = sanitizeArtifactType(input.artifactType) ?? "json";
return {
version: "emboflow.node.v1",
inputMode,
outputMode,
artifactType,
};
}
function sanitizeCustomNodeSource(value: unknown): CustomNodeSourceDocument {
if (!isRecord(value)) {
throw new Error("custom node source is required");
}
const command = sanitizeStringArray(value.command);
if (value.kind === "image") {
const image = typeof value.image === "string" ? value.image.trim() : "";
if (!image) {
throw new Error("custom node image is required");
}
return {
kind: "image",
image,
...(command && command.length > 0 ? { command } : {}),
};
}
if (value.kind === "dockerfile") {
const dockerfileContent = typeof value.dockerfileContent === "string" ? value.dockerfileContent.trim() : "";
if (!dockerfileContent) {
throw new Error("custom node dockerfileContent is required");
}
const imageTag = typeof value.imageTag === "string" && value.imageTag.trim().length > 0
? value.imageTag.trim()
: `emboflow/custom-node:${slugify(String(Date.now()))}`;
return {
kind: "dockerfile",
imageTag,
dockerfileContent,
...(command && command.length > 0 ? { command } : {}),
};
}
throw new Error("custom node source kind must be image or dockerfile");
}
function mapCustomNodeToDefinition(node: CustomNodeDocument) {
return {
id: node.definitionId,
name: node.name,
category: node.category,
description: node.description,
defaultExecutorType: "docker" as const,
defaultExecutorConfig: {
...(node.source.kind === "image"
? {
image: node.source.image,
}
: {
imageTag: node.source.imageTag,
dockerfileContent: node.source.dockerfileContent,
}),
...(node.source.command ? { command: [...node.source.command] } : {}),
contract: node.contract,
sourceKind: node.source.kind,
},
allowsMultipleIncoming: node.contract.inputMode === "multi_asset_set",
supportsCodeHook: false,
customNodeId: node._id,
};
}
function buildNodeRuntimeResolver(customNodes: CustomNodeDocument[]) {
const customDefinitionMap = new Map(customNodes.map((node) => [node.definitionId, node]));
return (definitionId: string) => {
const builtin = buildDefaultNodeRuntimeConfig(definitionId);
if (builtin) {
return builtin;
}
const customNode = customDefinitionMap.get(definitionId);
if (!customNode) {
return undefined;
}
return {
executorType: "docker",
executorConfig: mapCustomNodeToDefinition(customNode).defaultExecutorConfig,
} satisfies NodeRuntimeConfig;
};
}
function collectRetryNodeIds(tasks: RunTaskDocument[], rootNodeId: string) {
const pending = [rootNodeId];
const collected = new Set<string>([rootNodeId]);
@ -701,6 +855,57 @@ export class MongoAppStore {
.findOne({ _id: storageConnectionId });
}
async createCustomNode(input: {
workspaceId: string;
projectId: string;
name: string;
description?: string;
category?: CustomNodeCategory;
source: unknown;
contract: unknown;
createdBy: string;
}) {
const baseSlug = slugify(input.name);
const collection = this.db.collection<CustomNodeDocument>("custom_nodes");
let slug = baseSlug;
let definitionId = `custom-${slug}`;
let suffix = 1;
while (await collection.findOne({ projectId: input.projectId, definitionId })) {
suffix += 1;
slug = `${baseSlug}-${suffix}`;
definitionId = `custom-${slug}`;
}
const node: CustomNodeDocument = {
_id: `custom-node-${randomUUID()}`,
definitionId,
workspaceId: input.workspaceId,
projectId: input.projectId,
name: input.name,
slug,
description: input.description ?? "",
category: sanitizeCustomNodeCategory(input.category),
status: "active",
contract: sanitizeCustomNodeContract(input.contract),
source: sanitizeCustomNodeSource(input.source),
createdBy: input.createdBy,
createdAt: nowIso(),
updatedAt: nowIso(),
};
await collection.insertOne(node);
return node;
}
async listCustomNodes(projectId: string) {
return this.db
.collection<CustomNodeDocument>("custom_nodes")
.find({ projectId, status: "active" })
.sort({ createdAt: 1 })
.toArray();
}
async registerAsset(input: {
workspaceId: string;
projectId: string;
@ -1103,10 +1308,12 @@ export class MongoAppStore {
throw new Error("bound assets must belong to the workflow project");
}
const customNodes = await this.listCustomNodes(version.projectId);
const runtimeSnapshot = buildRuntimeSnapshot(
version.runtimeGraph,
version.logicGraph,
version.pluginRefs,
buildNodeRuntimeResolver(customNodes),
);
const run: WorkflowRunDocument = {
@ -1422,7 +1629,13 @@ export class MongoAppStore {
.toArray();
}
listNodeDefinitions() {
return DELIVERY_NODE_DEFINITIONS;
async listNodeDefinitions(projectId?: string) {
const builtinDefinitions = [...DELIVERY_NODE_DEFINITIONS];
if (!projectId) {
return builtinDefinitions;
}
const customNodes = await this.listCustomNodes(projectId);
return [...builtinDefinitions, ...customNodes.map((node) => mapCustomNodeToDefinition(node))];
}
}

View File

@ -234,8 +234,43 @@ export async function createApiRuntime(config = resolveApiRuntimeConfig()) {
}
});
app.get("/api/node-definitions", (_request, response) => {
response.json(store.listNodeDefinitions());
app.post("/api/custom-nodes", async (request, response, next) => {
try {
response.json(
await store.createCustomNode({
workspaceId: request.body.workspaceId,
projectId: request.body.projectId,
name: request.body.name,
description: request.body.description,
category: request.body.category,
source: request.body.source,
contract: request.body.contract,
createdBy: request.body.createdBy ?? "local-user",
}),
);
} catch (error) {
next(error);
}
});
app.get("/api/custom-nodes", async (request, response, next) => {
try {
response.json(await store.listCustomNodes(String(request.query.projectId)));
} catch (error) {
next(error);
}
});
app.get("/api/node-definitions", async (request, response, next) => {
try {
response.json(
await store.listNodeDefinitions(
request.query.projectId ? String(request.query.projectId) : undefined,
),
);
} catch (error) {
next(error);
}
});
app.post("/api/workflow-templates", async (request, response, next) => {

View File

@ -1327,3 +1327,123 @@ test("mongo-backed runtime can cancel a run, retry a run snapshot, and retry a f
assert.match(refreshedFailedTask?.logLines?.[0] ?? "", /retry/i);
assert.equal(refreshedDownstreamTask?.status, "pending");
});
test("mongo-backed runtime manages custom docker nodes and exposes them as project node definitions", async (t) => {
const mongod = await MongoMemoryServer.create({
instance: {
ip: "127.0.0.1",
port: 27131,
},
});
t.after(async () => {
await mongod.stop();
});
const server = await startRuntimeServer({
host: "127.0.0.1",
port: 0,
mongoUri: mongod.getUri(),
database: "emboflow-runtime-custom-nodes",
corsOrigin: "http://127.0.0.1:3000",
});
t.after(async () => {
await server.close();
});
const bootstrap = await readJson<{
workspace: { _id: string };
project: { _id: string };
}>(
await fetch(`${server.baseUrl}/api/dev/bootstrap`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ userId: "custom-node-user", projectName: "Custom Node Project" }),
}),
);
const imageNode = await readJson<{ _id: string; definitionId: string }>(
await fetch(`${server.baseUrl}/api/custom-nodes`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
workspaceId: bootstrap.workspace._id,
projectId: bootstrap.project._id,
name: "Merge Labels",
description: "Combine label reports from upstream nodes",
category: "Utility",
source: {
kind: "image",
image: "python:3.11-alpine",
command: ["python3", "-c", "print('custom image node')"],
},
contract: {
inputMode: "multi_asset_set",
outputMode: "asset_set_with_report",
artifactType: "json",
},
createdBy: "custom-node-user",
}),
}),
);
const dockerfileNode = await readJson<{ _id: string; definitionId: string }>(
await fetch(`${server.baseUrl}/api/custom-nodes`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
workspaceId: bootstrap.workspace._id,
projectId: bootstrap.project._id,
name: "Dockerfile Union",
description: "Union assets with a self-contained Dockerfile",
category: "Utility",
source: {
kind: "dockerfile",
imageTag: "emboflow-test/dockerfile-union:latest",
dockerfileContent: [
"FROM python:3.11-alpine",
"CMD [\"python3\", \"-c\", \"print('dockerfile custom node')\"]",
].join("\n"),
},
contract: {
inputMode: "multi_asset_set",
outputMode: "asset_set",
artifactType: "json",
},
createdBy: "custom-node-user",
}),
}),
);
const nodeDefinitions = await readJson<
Array<{
id: string;
defaultExecutorType?: string;
defaultExecutorConfig?: Record<string, unknown>;
allowsMultipleIncoming?: boolean;
}>
>(
await fetch(
`${server.baseUrl}/api/node-definitions?projectId=${encodeURIComponent(bootstrap.project._id)}`,
),
);
const imageDefinition = nodeDefinitions.find((definition) => definition.id === imageNode.definitionId);
const dockerfileDefinition = nodeDefinitions.find((definition) => definition.id === dockerfileNode.definitionId);
assert.equal(imageDefinition?.defaultExecutorType, "docker");
assert.equal(imageDefinition?.allowsMultipleIncoming, true);
assert.equal(imageDefinition?.defaultExecutorConfig?.image, "python:3.11-alpine");
assert.equal(
(imageDefinition?.defaultExecutorConfig?.contract as { outputMode?: string } | undefined)?.outputMode,
"asset_set_with_report",
);
assert.equal(dockerfileDefinition?.defaultExecutorType, "docker");
assert.equal(
typeof dockerfileDefinition?.defaultExecutorConfig?.dockerfileContent,
"string",
);
assert.equal(
(dockerfileDefinition?.defaultExecutorConfig?.contract as { outputMode?: string } | undefined)?.outputMode,
"asset_set",
);
});

View File

@ -13,6 +13,7 @@ test("app shell renders primary navigation", () => {
});
assert.match(html, /Assets/);
assert.match(html, /Nodes/);
assert.match(html, /Workflows/);
assert.match(html, /Runs/);
assert.match(html, /Explore/);

View File

@ -3,6 +3,7 @@ import { renderWorkspaceSwitcher } from "../workspaces/workspace-switcher.tsx";
export const PRIMARY_NAV_ITEMS = [
"Assets",
"Nodes",
"Workflows",
"Runs",
"Explore",

View File

@ -187,8 +187,43 @@ export class ApiClient {
);
}
async listNodeDefinitions() {
return readJson<any[]>(await fetch(`${this.baseUrl}/api/node-definitions`));
async listNodeDefinitions(projectId?: string) {
const search = new URLSearchParams();
if (projectId) {
search.set("projectId", projectId);
}
return readJson<any[]>(
await fetch(`${this.baseUrl}/api/node-definitions${search.toString() ? `?${search.toString()}` : ""}`),
);
}
async listCustomNodes(projectId: string) {
return readJson<any[]>(
await fetch(`${this.baseUrl}/api/custom-nodes?projectId=${encodeURIComponent(projectId)}`),
);
}
async createCustomNode(input: {
workspaceId: string;
projectId: string;
name: string;
description?: string;
category?: "Source" | "Transform" | "Inspect" | "Annotate" | "Export" | "Utility";
source: Record<string, unknown>;
contract: {
inputMode: "single_asset_set" | "multi_asset_set";
outputMode: "report" | "asset_set" | "asset_set_with_report";
artifactType: "json" | "directory" | "video";
};
createdBy?: string;
}) {
return readJson<any>(
await fetch(`${this.baseUrl}/api/custom-nodes`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(input),
}),
);
}
async listWorkflowTemplates(input: {

View File

@ -60,6 +60,9 @@ function normalizePathnameForProjectSwitch(pathname: string) {
if (pathname.startsWith("/assets/")) {
return "/assets";
}
if (pathname.startsWith("/nodes")) {
return "/nodes";
}
if (pathname.startsWith("/workflows/")) {
return "/workflows";
}
@ -95,7 +98,7 @@ function mapConnectionValidationReasonToKey(
}
}
type NavItem = "Projects" | "Assets" | "Workflows" | "Runs" | "Explore" | "Labels" | "Admin";
type NavItem = "Projects" | "Assets" | "Nodes" | "Workflows" | "Runs" | "Explore" | "Labels" | "Admin";
type BootstrapContext = {
userId: string;
@ -220,9 +223,10 @@ function AppShell(props: {
children: React.ReactNode;
}) {
const { language, setLanguage, t } = useI18n();
const navItems: Array<{ label: NavItem; href: string; key: "navProjects" | "navAssets" | "navWorkflows" | "navRuns" | "navExplore" | "navLabels" | "navAdmin" }> = [
const navItems: Array<{ label: NavItem; href: string; key: "navProjects" | "navAssets" | "navNodes" | "navWorkflows" | "navRuns" | "navExplore" | "navLabels" | "navAdmin" }> = [
{ label: "Projects", href: "/projects", key: "navProjects" },
{ label: "Assets", href: "/assets", key: "navAssets" },
{ label: "Nodes", href: "/nodes", key: "navNodes" },
{ label: "Workflows", href: "/workflows", key: "navWorkflows" },
{ label: "Runs", href: "/runs", key: "navRuns" },
{ label: "Explore", href: "/explore", key: "navExplore" },
@ -393,6 +397,13 @@ function ProjectsPage(props: {
);
}
function parseCommandLines(value: string) {
return value
.split(/\r?\n/u)
.map((line) => line.trim())
.filter((line) => line.length > 0);
}
function AssetsPage(props: {
api: ApiClient;
bootstrap: BootstrapContext;
@ -752,6 +763,206 @@ function AssetsPage(props: {
);
}
function NodesPage(props: {
api: ApiClient;
bootstrap: BootstrapContext;
}) {
const { t } = useI18n();
const [nodes, setNodes] = useState<any[]>([]);
const [name, setName] = useState("");
const [description, setDescription] = useState("");
const [category, setCategory] = useState<"Transform" | "Inspect" | "Utility" | "Export">("Utility");
const [sourceKind, setSourceKind] = useState<"image" | "dockerfile">("image");
const [image, setImage] = useState("python:3.11-alpine");
const [dockerfileContent, setDockerfileContent] = useState("");
const [commandText, setCommandText] = useState("");
const [inputMode, setInputMode] = useState<"single_asset_set" | "multi_asset_set">("single_asset_set");
const [outputMode, setOutputMode] = useState<"report" | "asset_set" | "asset_set_with_report">("report");
const [artifactType, setArtifactType] = useState<"json" | "directory" | "video">("json");
const [error, setError] = useState<string | null>(null);
const loadCustomNodes = useCallback(async () => {
try {
setNodes(await props.api.listCustomNodes(props.bootstrap.project._id));
} catch (loadError) {
setError(loadError instanceof Error ? loadError.message : t("failedLoadCustomNodes"));
}
}, [props.api, props.bootstrap.project._id, t]);
useEffect(() => {
void loadCustomNodes();
}, [loadCustomNodes]);
return (
<div className="page-stack">
<section className="panel">
<h1>{t("nodesTitle")}</h1>
<p>{t("nodesDescription")}</p>
{error ? <p>{error}</p> : null}
<div className="field-grid">
<label>
{t("customNodeName")}
<input value={name} onChange={(event) => setName(event.target.value)} placeholder="Asset Merge" />
</label>
<label>
{t("customNodeCategory")}
<select value={category} onChange={(event) => setCategory(event.target.value as typeof category)}>
<option value="Transform">Transform</option>
<option value="Inspect">Inspect</option>
<option value="Utility">Utility</option>
<option value="Export">Export</option>
</select>
</label>
<label>
{t("customNodeDescription")}
<textarea value={description} rows={3} onChange={(event) => setDescription(event.target.value)} />
</label>
<label>
{t("customNodeSourceKind")}
<select value={sourceKind} onChange={(event) => setSourceKind(event.target.value as typeof sourceKind)}>
<option value="image">{t("customNodeSourceImage")}</option>
<option value="dockerfile">{t("customNodeSourceDockerfile")}</option>
</select>
</label>
<label>
{t("customNodeInputMode")}
<select value={inputMode} onChange={(event) => setInputMode(event.target.value as typeof inputMode)}>
<option value="single_asset_set">{t("customNodeSingleAssetSet")}</option>
<option value="multi_asset_set">{t("customNodeMultiAssetSet")}</option>
</select>
</label>
<label>
{t("customNodeOutputMode")}
<select value={outputMode} onChange={(event) => setOutputMode(event.target.value as typeof outputMode)}>
<option value="report">{t("customNodeReport")}</option>
<option value="asset_set">{t("customNodeAssetSet")}</option>
<option value="asset_set_with_report">{t("customNodeAssetSetWithReport")}</option>
</select>
</label>
<label>
{t("customNodeArtifactType")}
<select value={artifactType} onChange={(event) => setArtifactType(event.target.value as typeof artifactType)}>
<option value="json">json</option>
<option value="directory">directory</option>
<option value="video">video</option>
</select>
</label>
<label>
{t("customNodeCommand")}
<textarea
rows={4}
value={commandText}
onChange={(event) => setCommandText(event.target.value)}
placeholder={["python3", "-c", "print('custom node')"].join("\n")}
/>
</label>
{sourceKind === "image" ? (
<label>
{t("customNodeImage")}
<input value={image} onChange={(event) => setImage(event.target.value)} placeholder="python:3.11-alpine" />
</label>
) : (
<>
<label>
{t("customNodeDockerfile")}
<textarea
rows={10}
value={dockerfileContent}
onChange={(event) => setDockerfileContent(event.target.value)}
placeholder={["FROM python:3.11-alpine", "CMD [\"python3\", \"-c\", \"print('custom node')\"]"].join("\n")}
/>
</label>
<label>
{t("customNodeDockerfileUpload")}
<input
type="file"
accept=".dockerfile,.Dockerfile,text/plain"
onChange={async (event) => {
const file = event.target.files?.[0];
if (!file) {
return;
}
setDockerfileContent(await file.text());
}}
/>
</label>
</>
)}
</div>
<div className="button-row" style={{ marginTop: 12 }}>
<button
className="button-primary"
onClick={async () => {
try {
setError(null);
await props.api.createCustomNode({
workspaceId: props.bootstrap.workspace._id,
projectId: props.bootstrap.project._id,
name,
description,
category,
source:
sourceKind === "image"
? {
kind: "image",
image,
command: parseCommandLines(commandText),
}
: {
kind: "dockerfile",
dockerfileContent,
command: parseCommandLines(commandText),
},
contract: {
inputMode,
outputMode,
artifactType,
},
createdBy: props.bootstrap.userId,
});
setName("");
setDescription("");
setCommandText("");
if (sourceKind === "dockerfile") {
setDockerfileContent("");
}
await loadCustomNodes();
} catch (createError) {
setError(createError instanceof Error ? createError.message : t("failedCreateCustomNode"));
}
}}
disabled={!name.trim() || (sourceKind === "image" ? !image.trim() : !dockerfileContent.trim())}
>
{t("createCustomNode")}
</button>
</div>
</section>
<section className="panel">
<h2>{t("nodesTitle")}</h2>
{nodes.length === 0 ? (
<p className="empty-state">{t("noCustomNodesYet")}</p>
) : (
<div className="list-grid">
{nodes.map((node) => (
<article key={node._id} className="node-card">
<strong>{node.name}</strong>
<p>{node.description || t("none")}</p>
<p>{t("category")}: {node.category}</p>
<p>{t("customNodeSourceKind")}: {node.source?.kind}</p>
<p>{t("customNodeInputMode")}: {node.contract?.inputMode}</p>
<p>{t("customNodeOutputMode")}: {node.contract?.outputMode}</p>
<p>{t("customNodeArtifactType")}: {node.contract?.artifactType}</p>
<p>{t("definition")}: {node.definitionId}</p>
</article>
))}
</div>
)}
</section>
</div>
);
}
function AssetDetailPage(props: {
api: ApiClient;
assetId: string;
@ -1002,7 +1213,7 @@ function WorkflowEditorPage(props: {
try {
const workflowDefinition = await props.api.getWorkflowDefinition(props.workflowId);
const [nodeDefs, workflowVersions, workflowAssets] = await Promise.all([
props.api.listNodeDefinitions(),
props.api.listNodeDefinitions(workflowDefinition.projectId),
props.api.listWorkflowVersions(props.workflowId),
props.api.listAssets(workflowDefinition.projectId),
]);
@ -1051,6 +1262,19 @@ function WorkflowEditorPage(props: {
() => (selectedNodeRaw ? localizeNodeDefinition(language, selectedNodeRaw) : null),
[language, selectedNodeRaw],
);
const selectedNodeSupportsCodeHook = useMemo(
() => {
if (typeof selectedNodeRaw?.supportsCodeHook === "boolean") {
return selectedNodeRaw.supportsCodeHook;
}
return (
selectedNodeRaw?.category === "Transform" ||
selectedNodeRaw?.category === "Inspect" ||
selectedNodeRaw?.category === "Utility"
);
},
[selectedNodeRaw],
);
const selectedNodeRuntimeConfig = useMemo(
() => getNodeRuntimeConfig(draft, selectedNodeId),
[draft, selectedNodeId],
@ -1400,7 +1624,11 @@ function WorkflowEditorPage(props: {
selectedNodeEffectiveRuntimeConfig.executorType === "http"
? String(selectedNodeEffectiveRuntimeConfig.executorConfig?.url ?? "")
: selectedNodeEffectiveRuntimeConfig.executorType === "docker"
? String(selectedNodeEffectiveRuntimeConfig.executorConfig?.image ?? "")
? String(
selectedNodeEffectiveRuntimeConfig.executorConfig?.image ??
selectedNodeEffectiveRuntimeConfig.executorConfig?.imageTag ??
"",
)
: ""
}
placeholder={
@ -1444,9 +1672,7 @@ function WorkflowEditorPage(props: {
/>
</label>
</div>
{selectedNodeRaw?.category === "Transform" ||
selectedNodeRaw?.category === "Inspect" ||
selectedNodeRaw?.category === "Utility" ? (
{selectedNodeSupportsCodeHook ? (
<label style={{ display: "grid", gap: 8 }}>
<span>{t("pythonCodeHook")}</span>
<textarea
@ -2066,6 +2292,9 @@ export function App(props: AppProps) {
if (pathname === "/workflows") {
active = "Workflows";
content = <WorkflowsPage api={api} bootstrap={activeBootstrap} />;
} else if (pathname === "/nodes") {
active = "Nodes";
content = <NodesPage api={api} bootstrap={activeBootstrap} />;
} else if (workflowMatch) {
active = "Workflows";
content = <WorkflowEditorPage api={api} workflowId={workflowMatch[1]} />;

View File

@ -10,6 +10,8 @@ test("translate returns chinese and english labels for shared frontend keys", ()
assert.equal(translate("zh", "templateSaved"), "已保存模板");
assert.equal(translate("en", "navWorkflows"), "Workflows");
assert.equal(translate("zh", "navWorkflows"), "工作流");
assert.equal(translate("en", "navNodes"), "Nodes");
assert.equal(translate("zh", "navNodes"), "节点");
assert.equal(
translate("en", "invalidConnectionCycle"),
"This edge would create a cycle.",

View File

@ -9,6 +9,7 @@ export type TranslationKey =
| "localDev"
| "navProjects"
| "navAssets"
| "navNodes"
| "navWorkflows"
| "navRuns"
| "navExplore"
@ -37,6 +38,28 @@ export type TranslationKey =
| "basePath"
| "rootPath"
| "noStorageConnectionsYet"
| "nodesTitle"
| "nodesDescription"
| "createCustomNode"
| "noCustomNodesYet"
| "customNodeName"
| "customNodeDescription"
| "customNodeCategory"
| "customNodeSourceKind"
| "customNodeSourceImage"
| "customNodeSourceDockerfile"
| "customNodeImage"
| "customNodeDockerfile"
| "customNodeDockerfileUpload"
| "customNodeCommand"
| "customNodeInputMode"
| "customNodeOutputMode"
| "customNodeArtifactType"
| "customNodeSingleAssetSet"
| "customNodeMultiAssetSet"
| "customNodeReport"
| "customNodeAssetSet"
| "customNodeAssetSetWithReport"
| "datasetsTitle"
| "datasetsDescription"
| "datasetName"
@ -153,6 +176,8 @@ export type TranslationKey =
| "failedCreateStorageConnection"
| "failedLoadDatasets"
| "failedCreateDataset"
| "failedLoadCustomNodes"
| "failedCreateCustomNode"
| "failedRegisterAsset"
| "failedLoadWorkflows"
| "failedLoadTemplates"
@ -203,6 +228,7 @@ const TRANSLATIONS: Record<Language, Record<TranslationKey, string>> = {
localDev: "Local Dev",
navProjects: "Projects",
navAssets: "Assets",
navNodes: "Nodes",
navWorkflows: "Workflows",
navRuns: "Runs",
navExplore: "Explore",
@ -226,6 +252,29 @@ const TRANSLATIONS: Record<Language, Record<TranslationKey, string>> = {
storageConnectionsTitle: "Storage Connections",
storageConnectionsDescription:
"Define where project datasets are stored, including local paths and object storage providers.",
nodesTitle: "Custom Nodes",
nodesDescription:
"Register project-level docker nodes from an image or a self-contained Dockerfile. Containers must read the EmboFlow input path and write a result object to the EmboFlow output path.",
createCustomNode: "Create Custom Node",
noCustomNodesYet: "No custom nodes have been created yet.",
customNodeName: "Node Name",
customNodeDescription: "Node Description",
customNodeCategory: "Node Category",
customNodeSourceKind: "Container Source",
customNodeSourceImage: "Docker Image",
customNodeSourceDockerfile: "Dockerfile",
customNodeImage: "Image",
customNodeDockerfile: "Dockerfile Content",
customNodeDockerfileUpload: "Upload Dockerfile",
customNodeCommand: "Command (one argument per line)",
customNodeInputMode: "Input Contract",
customNodeOutputMode: "Output Contract",
customNodeArtifactType: "Artifact Type",
customNodeSingleAssetSet: "Single asset set",
customNodeMultiAssetSet: "Multiple asset sets",
customNodeReport: "Report only",
customNodeAssetSet: "Asset set",
customNodeAssetSetWithReport: "Asset set with report",
createStorageConnection: "Create Storage Connection",
storageProvider: "Storage Provider",
bucket: "Bucket",
@ -352,6 +401,8 @@ const TRANSLATIONS: Record<Language, Record<TranslationKey, string>> = {
failedCreateStorageConnection: "Failed to create storage connection",
failedLoadDatasets: "Failed to load datasets",
failedCreateDataset: "Failed to create dataset",
failedLoadCustomNodes: "Failed to load custom nodes",
failedCreateCustomNode: "Failed to create custom node",
failedRegisterAsset: "Failed to register local asset",
failedLoadWorkflows: "Failed to load workflows",
failedLoadTemplates: "Failed to load workflow templates",
@ -401,6 +452,7 @@ const TRANSLATIONS: Record<Language, Record<TranslationKey, string>> = {
localDev: "本地开发",
navProjects: "项目",
navAssets: "数据资产",
navNodes: "节点",
navWorkflows: "工作流",
navRuns: "运行记录",
navExplore: "查看",
@ -421,6 +473,29 @@ const TRANSLATIONS: Record<Language, Record<TranslationKey, string>> = {
openProject: "打开项目",
storageConnectionsTitle: "存储连接",
storageConnectionsDescription: "定义项目数据集的存储位置,包括本地路径和对象存储提供方。",
nodesTitle: "自定义节点",
nodesDescription:
"通过镜像或自包含 Dockerfile 注册项目级 Docker 节点。容器必须读取 EmboFlow 输入路径,并把结果对象写入 EmboFlow 输出路径。",
createCustomNode: "创建自定义节点",
noCustomNodesYet: "当前还没有自定义节点。",
customNodeName: "节点名称",
customNodeDescription: "节点描述",
customNodeCategory: "节点分类",
customNodeSourceKind: "容器来源",
customNodeSourceImage: "Docker 镜像",
customNodeSourceDockerfile: "Dockerfile",
customNodeImage: "镜像",
customNodeDockerfile: "Dockerfile 内容",
customNodeDockerfileUpload: "上传 Dockerfile",
customNodeCommand: "启动命令(每行一个参数)",
customNodeInputMode: "输入契约",
customNodeOutputMode: "输出契约",
customNodeArtifactType: "产物类型",
customNodeSingleAssetSet: "单资产集",
customNodeMultiAssetSet: "多资产集",
customNodeReport: "仅报告",
customNodeAssetSet: "资产集",
customNodeAssetSetWithReport: "资产集加报告",
createStorageConnection: "创建存储连接",
storageProvider: "存储提供方",
bucket: "Bucket",
@ -545,6 +620,8 @@ const TRANSLATIONS: Record<Language, Record<TranslationKey, string>> = {
failedCreateStorageConnection: "创建存储连接失败",
failedLoadDatasets: "加载数据集失败",
failedCreateDataset: "创建数据集失败",
failedLoadCustomNodes: "加载自定义节点失败",
failedCreateCustomNode: "创建自定义节点失败",
failedRegisterAsset: "注册本地资产失败",
failedLoadWorkflows: "加载工作流失败",
failedLoadTemplates: "加载工作流模板失败",

View File

@ -1,3 +1,4 @@
import { createHash } from "node:crypto";
import { spawn } from "node:child_process";
import { mkdtemp, readFile, rm, stat, writeFile } from "node:fs/promises";
import os from "node:os";
@ -46,6 +47,77 @@ function parseDockerResult(payload: unknown) {
return payload;
}
async function runProcess(command: string, args: string[]) {
const child = spawn(command, args, {
stdio: ["ignore", "pipe", "pipe"],
});
let stdout = "";
let stderr = "";
child.stdout.on("data", (chunk) => {
stdout += String(chunk);
});
child.stderr.on("data", (chunk) => {
stderr += String(chunk);
});
const exitCode = await new Promise<number>((resolve, reject) => {
child.on("error", reject);
child.on("close", (code) => resolve(code ?? 1));
});
return {
exitCode,
stdoutLines: splitOutputLines(stdout),
stderrLines: splitOutputLines(stderr),
};
}
async function ensureImageFromDockerfile(dockerfileContent: string, requestedTag?: string) {
const imageTag = requestedTag && requestedTag.trim().length > 0
? requestedTag.trim()
: `emboflow/custom-node:${createHash("sha256").update(dockerfileContent).digest("hex").slice(0, 16)}`;
const inspected = await runProcess("docker", ["image", "inspect", imageTag]);
if (inspected.exitCode === 0) {
return imageTag;
}
const buildDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-custom-node-build-"));
try {
await writeFile(path.join(buildDir, "Dockerfile"), dockerfileContent);
const build = await runProcess("docker", ["build", "-t", imageTag, buildDir]);
if (build.exitCode !== 0) {
throw Object.assign(new Error(`docker build failed for ${imageTag}`), {
stdoutLines: build.stdoutLines,
stderrLines: build.stderrLines,
});
}
return imageTag;
} finally {
await rm(buildDir, { recursive: true, force: true });
}
}
function validateCustomNodeResult(task: TaskRecord, result: unknown) {
const contract =
task.executorConfig?.contract && typeof task.executorConfig.contract === "object" && !Array.isArray(task.executorConfig.contract)
? task.executorConfig.contract as { outputMode?: string }
: undefined;
if (!contract) {
return result;
}
if (!result || typeof result !== "object" || Array.isArray(result)) {
throw new Error("custom docker nodes must write an object result");
}
if (
(contract.outputMode === "asset_set" || contract.outputMode === "asset_set_with_report") &&
!Array.isArray((result as { assetIds?: unknown }).assetIds)
) {
throw new Error("custom asset-set nodes must return result.assetIds as a string array");
}
return result;
}
function buildContainerAssetContext(workdir: string, assets: ExecutionAsset[] = []) {
const volumeArgs: string[] = [];
const containerAssets = assets.map((asset) => {
@ -244,12 +316,18 @@ export class DockerExecutor {
async execute(task: TaskRecord, context: ExecutionContext): Promise<ExecutorExecutionResult> {
this.executionCount += 1;
const dockerfileContent =
typeof task.executorConfig?.dockerfileContent === "string" ? task.executorConfig.dockerfileContent.trim() : "";
const requestedImageTag =
typeof task.executorConfig?.imageTag === "string" ? task.executorConfig.imageTag.trim() : "";
const image = typeof task.executorConfig?.image === "string" ? task.executorConfig.image.trim() : "";
const command = Array.isArray(task.executorConfig?.command)
? task.executorConfig.command.filter((item): item is string => typeof item === "string")
: [];
if (!image) {
const resolvedImage = image || (dockerfileContent ? await ensureImageFromDockerfile(dockerfileContent, requestedImageTag) : "");
if (!resolvedImage) {
return buildFallbackResult(task, "docker://local-simulated", command);
}
@ -272,6 +350,8 @@ export class DockerExecutor {
const outputPath = path.join(tempDir, "output.json");
const runnerPath = path.join(tempDir, "runner.py");
const { assets: containerAssets, volumeArgs } = buildContainerAssetContext(workdir, context.assets ?? []);
const isCustomContainerNode = Boolean(task.executorConfig?.contract) ||
String(task.nodeDefinitionId ?? "").startsWith("custom-");
await writeFile(
inputPath,
@ -306,53 +386,38 @@ export class DockerExecutor {
"--env",
`EMBOFLOW_WORKFLOW_RUN_ID=${context.workflowRunId ?? ""}`,
...envVars.flatMap(([key, value]) => ["--env", `${key}=${value}`]),
image,
resolvedImage,
...(command.length > 0
? command
: ["python3", `${workdir}/runner.py`, `${workdir}/input.json`, `${workdir}/output.json`]),
: isCustomContainerNode
? []
: ["python3", `${workdir}/runner.py`, `${workdir}/input.json`, `${workdir}/output.json`]),
];
const child = spawn("docker", dockerArgs, {
stdio: ["ignore", "pipe", "pipe"],
});
let stdout = "";
let stderr = "";
child.stdout.on("data", (chunk) => {
stdout += String(chunk);
});
child.stderr.on("data", (chunk) => {
stderr += String(chunk);
});
const exitCode = await new Promise<number>((resolve, reject) => {
child.on("error", reject);
child.on("close", (code) => resolve(code ?? 1));
});
const { exitCode, stdoutLines, stderrLines } = await runProcess("docker", dockerArgs);
try {
if (exitCode !== 0) {
throw Object.assign(new Error(`docker executor failed with exit code ${exitCode}`), {
stdoutLines: splitOutputLines(stdout),
stderrLines: splitOutputLines(stderr),
stdoutLines,
stderrLines,
});
}
let result: unknown = {
taskId: task.id,
executor: "docker" as const,
image,
image: resolvedImage,
command,
};
if (await fileExists(outputPath)) {
const outputPayload = JSON.parse(await readFile(outputPath, "utf8")) as unknown;
result = parseDockerResult(outputPayload);
result = validateCustomNodeResult(task, parseDockerResult(outputPayload));
}
return {
result,
stdoutLines: splitOutputLines(stdout),
stderrLines: splitOutputLines(stderr),
stdoutLines,
stderrLines,
};
} finally {
await rm(tempDir, { recursive: true, force: true });

View File

@ -985,3 +985,68 @@ test("worker executes built-in union-assets inside docker when docker is availab
assert.match(task?.stdoutLines?.[0] ?? "", /union resolved 2 assets/i);
assert.deepEqual(task?.lastResultPreview?.assetIds, ["asset-union-a", "asset-union-b"]);
});
test("worker builds and executes a custom dockerfile node when docker is available", async (t) => {
if (!hasDockerRuntime()) {
t.diagnostic("docker runtime unavailable; skipping custom dockerfile node test");
return;
}
ensureDockerImage("python:3.11-alpine");
const fixture = await createRuntimeFixture("emboflow-worker-custom-dockerfile-node");
t.after(async () => {
await fixture.close();
});
await fixture.db.collection("workflow_runs").insertOne({
_id: "run-custom-dockerfile-node",
workflowDefinitionId: "workflow-custom-dockerfile-node",
workflowVersionId: "workflow-custom-dockerfile-node-v1",
status: "queued",
triggeredBy: "local-user",
assetIds: ["asset-custom-1", "asset-custom-2"],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.db.collection("run_tasks").insertOne({
_id: "task-custom-dockerfile-node",
workflowRunId: "run-custom-dockerfile-node",
workflowVersionId: "workflow-custom-dockerfile-node-v1",
nodeId: "custom-node-merge-assets-1",
nodeDefinitionId: "custom-merge-assets",
nodeType: "utility",
executorType: "docker",
executorConfig: {
imageTag: "emboflow-test/custom-merge-assets:latest",
dockerfileContent: [
"FROM python:3.11-alpine",
"CMD [\"python3\", \"-c\", \"import json,os,pathlib; payload=json.loads(pathlib.Path(os.environ['EMBOFLOW_INPUT_PATH']).read_text()); asset_ids=payload['context'].get('assetIds', []); pathlib.Path(os.environ['EMBOFLOW_OUTPUT_PATH']).write_text(json.dumps({'result': {'assetIds': asset_ids, 'assetCount': len(asset_ids), 'kind': 'custom-dockerfile'}})); print(f'custom dockerfile handled {len(asset_ids)} assets')\"]",
].join("\n"),
contract: {
inputMode: "single_asset_set",
outputMode: "asset_set",
artifactType: "json",
},
},
status: "queued",
attempt: 1,
assetIds: ["asset-custom-1", "asset-custom-2"],
upstreamNodeIds: [],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.runtime.runNextTask();
const task = await fixture.store.getRunTask("task-custom-dockerfile-node");
assert.equal(task?.status, "success");
assert.equal(task?.summary?.executorType, "docker");
assert.match(task?.stdoutLines?.[0] ?? "", /custom dockerfile handled 2 assets/i);
assert.deepEqual(task?.lastResultPreview, {
assetIds: ["asset-custom-1", "asset-custom-2"],
assetCount: 2,
kind: "custom-dockerfile",
});
});

View File

@ -102,6 +102,8 @@ V1 node categories:
- training config export
- Python processing node
The current V1 runtime also supports project-level custom Docker nodes. A custom node is registered separately from the workflow graph, then exposed through the same node-definition surface as built-in nodes.
## Node Definition Contract
Each node definition must expose:
@ -140,6 +142,36 @@ def process(input_data, context):
This keeps serialization, logging, and runtime control predictable.
### Custom Docker Node Contract
Custom containerized nodes must implement the EmboFlow runtime contract instead of inventing their own I/O shape.
Container input:
- `EMBOFLOW_INPUT_PATH`
points to a JSON file containing the frozen `task` snapshot and the execution `context`
- `EMBOFLOW_OUTPUT_PATH`
points to the JSON file the container must write before exit
Expected `context` shape:
- `assetIds`
- `assets`
- `upstreamResults`
- run and node identifiers
Expected output shape:
```json
{
"result": {
"...": "..."
}
}
```
If the custom node declares an `asset_set` style output contract, `result.assetIds` must be a string array. This is what allows downstream nodes to inherit the narrowed asset set.
The current V1 worker executes trusted-local Python hooks when a `run_task` carries a `codeHookSpec`. The hook is executed through a constrained Python harness with the task snapshot and execution context passed in as JSON. Hook stdout is captured into `stdoutLines`, hook failures populate `stderrLines`, and the returned object becomes the task artifact payload.
The current V1 Docker executor now has two modes:

View File

@ -14,6 +14,7 @@ Top-level product areas:
- Project selector
- Projects
- Asset Workspace
- Node Registry Workspace
- Canvas Workspace
- Explore Workspace
- Label Workspace
@ -38,6 +39,7 @@ Recommended primary navigation:
- Projects
- Assets
- Nodes
- Workflows
- Runs
- Explore
@ -195,6 +197,23 @@ The workflow entry surface that leads into this editor is also now template-awar
- the user can create a project workflow from a template
- the user can still create a blank workflow directly
## Screen 4A: Node Registry Workspace
Purpose:
- create project-level custom nodes
- choose Docker image or Dockerfile source
- declare input and output contract
- publish the node into the workflow editor node library
Core regions:
- top: contract summary and authoring guidance
- left/top: creation form
- bottom/right: existing custom node list for the active project
The current V1 direction treats custom nodes as project-scoped runtime extensions, not global plugins. That keeps tenancy and lifecycle simpler while still giving teams a controlled way to bring containerized processing into the canvas.
### Right Configuration Panel
The right panel is schema-driven.

View File

@ -55,6 +55,7 @@ while still targeting the collection model below as the persistent shape.
- `annotations`
- `plugins`
- `storage_connections`
- `custom_nodes`
- `audit_logs`
## Collection Design
@ -209,6 +210,36 @@ Core fields:
- `createdAt`
- `updatedAt`
### custom_nodes
Purpose:
- store project-scoped custom container node definitions
Core fields:
- `_id`
- `definitionId`
- `workspaceId`
- `projectId`
- `name`
- `slug`
- `description`
- `category`
- `status`
- `contract`
- `source`
- `createdBy`
- `createdAt`
- `updatedAt`
The current V1 implementation stores the custom node source as either:
- an existing Docker image reference
- a self-contained Dockerfile body plus an image tag
The node contract is persisted with the node definition so the API can expose correct node metadata to the editor and the worker can validate runtime outputs.
### dataset_versions
Purpose:

View File

@ -30,6 +30,7 @@
- `2026-03-27`: The follow-up canvas pass adds left-panel drag-and-drop node placement, localized canvas feedback, and V1 connection guards for self-edges, duplicates, cycles, invalid source/export directions, and multiple inbound edges.
- `2026-03-30`: The current product-integration pass promotes projects, datasets, storage connections, and workflow templates into first-class runtime flows. The shell now has a dedicated Projects page, project switching, workflow template gallery, workflow creation from templates, and workflow-level save-as-template support.
- `2026-03-30`: The current docker-defaults pass makes most built-in delivery nodes Docker-first by default, adds `union-assets` / `intersect-assets` / `difference-assets` utility nodes, permits multi-input edges only for those set nodes, and propagates narrowed upstream asset sets through downstream task execution.
- `2026-03-30`: The current custom-node pass adds a project-scoped `Nodes` tab, custom Docker node registration from image or self-contained Dockerfile, a persisted custom node collection, workflow-editor visibility through `/api/node-definitions`, and worker-side Dockerfile build plus output-contract validation.
---