feat: add workflow run preflight and custom node envelopes

This commit is contained in:
eust-w 2026-03-30 03:28:10 +08:00
parent 91e7d6a802
commit 93eb07d7d7
11 changed files with 651 additions and 1 deletions

View File

@ -86,11 +86,12 @@ The Workflows workspace now includes a template gallery. Projects can start from
The workflow editor center panel now uses a real draggable node canvas with zoom, pan, mini-map, dotted background, handle-based edge creation, persisted node positions, and localized validation feedback instead of a static list of node cards. The workflow editor center panel now uses a real draggable node canvas with zoom, pan, mini-map, dotted background, handle-based edge creation, persisted node positions, and localized validation feedback instead of a static list of node cards.
The workflow editor right panel now also supports saving the current workflow draft as a reusable workflow template, in addition to editing per-node runtime settings and Python hooks. The workflow editor right panel now also supports saving the current workflow draft as a reusable workflow template, in addition to editing per-node runtime settings and Python hooks.
When a custom node is selected on the canvas, the right panel now also exposes its declared input contract, output contract, artifact type, and container source so the operator can confirm compatibility without leaving the editor. When a custom node is selected on the canvas, the right panel now also exposes its declared input contract, output contract, artifact type, and container source so the operator can confirm compatibility without leaving the editor.
The workflow editor now also exposes a workflow-level preflight panel. Saved workflow versions can be checked against the selected bound asset before execution, and run creation is blocked when the current version still has graph, executor, or asset-binding errors.
The node library now supports both click-to-append and drag-and-drop placement into the canvas. When a node is inserted from the library, the editor now seeds its default runtime contract directly into the workflow draft, so custom Docker nodes keep their declared executor type and I/O contract without extra manual edits. V1 connection rules block self-edges, duplicate edges, cycles, incoming edges into source nodes, outgoing edges from export nodes, and multiple upstream edges into ordinary nodes, while allowing multi-input set nodes such as `union-assets`, `intersect-assets`, and `difference-assets` plus any custom node whose runtime contract declares `inputMode=multi_asset_set`. The node library now supports both click-to-append and drag-and-drop placement into the canvas. When a node is inserted from the library, the editor now seeds its default runtime contract directly into the workflow draft, so custom Docker nodes keep their declared executor type and I/O contract without extra manual edits. V1 connection rules block self-edges, duplicate edges, cycles, incoming edges into source nodes, outgoing edges from export nodes, and multiple upstream edges into ordinary nodes, while allowing multi-input set nodes such as `union-assets`, `intersect-assets`, and `difference-assets` plus any custom node whose runtime contract declares `inputMode=multi_asset_set`.
The Runs workspace now shows project-scoped run history, run-level aggregated summaries, cancel/retry controls, and run detail views with persisted task summaries, stdout/stderr sections, result previews, and artifact links into Explore. The Runs workspace now shows project-scoped run history, run-level aggregated summaries, cancel/retry controls, and run detail views with persisted task summaries, stdout/stderr sections, result previews, and artifact links into Explore.
Selected run tasks now expose the frozen node definition id, executor config snapshot, and code-hook metadata that were captured when the run was created. Selected run tasks now expose the frozen node definition id, executor config snapshot, and code-hook metadata that were captured when the run was created.
Most built-in delivery nodes now default to `executorType=docker`. When a node uses `executorType=docker` and provides `executorConfig.image`, the worker runs a real local Docker container with mounted `input.json` / `output.json` exchange files plus read-only mounts for bound asset paths. If no image is configured, the executor falls back to the lightweight simulated behavior used by older demo tasks. Most built-in delivery nodes now default to `executorType=docker`. When a node uses `executorType=docker` and provides `executorConfig.image`, the worker runs a real local Docker container with mounted `input.json` / `output.json` exchange files plus read-only mounts for bound asset paths. If no image is configured, the executor falls back to the lightweight simulated behavior used by older demo tasks.
Custom Docker nodes follow the same runtime contract. The container reads the task snapshot and execution context from `EMBOFLOW_INPUT_PATH`, writes `{\"result\": ...}` JSON to `EMBOFLOW_OUTPUT_PATH`, and if it declares an asset-set output contract it must return `result.assetIds` as a string array. Dockerfile-based custom nodes are built locally on first execution and then reused by tag. The Nodes page and API now share the same validation rules, including required names, valid source kinds, a mandatory `FROM` instruction for Dockerfiles, and rejection of `Source` category nodes that incorrectly declare `inputMode=multi_asset_set`. Custom Docker nodes follow the same runtime contract. The container reads the task snapshot and execution context from `EMBOFLOW_INPUT_PATH`, writes `{\"result\": ...}` JSON to `EMBOFLOW_OUTPUT_PATH`, and if it declares an asset-set output contract it must return `result.assetIds` as a string array. Dockerfile-based custom nodes are built locally on first execution and then reused by tag. The Nodes page and API now share the same validation rules, including required names, valid source kinds, a mandatory `FROM` instruction for Dockerfiles, and rejection of `Source` category nodes that incorrectly declare `inputMode=multi_asset_set`. The editor also renders the standard EmboFlow input and output envelope preview for custom nodes so users can align container code to the actual runtime JSON shape.
When a node uses the built-in Python path without a custom hook, `source-asset` now emits bound asset metadata from Mongo-backed asset records and `validate-structure` now performs a real directory validation pass against local source paths. On the current sample path `/Users/longtaowu/workspace/emboldata/data`, that validation reports `valid=false`, `videoFileCount=407`, and missing delivery files because the sample root is a mixed dataset collection rather than a delivery package. When a node uses the built-in Python path without a custom hook, `source-asset` now emits bound asset metadata from Mongo-backed asset records and `validate-structure` now performs a real directory validation pass against local source paths. On the current sample path `/Users/longtaowu/workspace/emboldata/data`, that validation reports `valid=false`, `videoFileCount=407`, and missing delivery files because the sample root is a mixed dataset collection rather than a delivery package.
The worker now also carries direct upstream task results into execution context so set-operation utility nodes can compute narrowed asset sets and pass those effective asset ids to downstream tasks. The worker now also carries direct upstream task results into execution context so set-operation utility nodes can compute narrowed asset sets and pass those effective asset ids to downstream tasks.

View File

@ -242,6 +242,23 @@ type RunTaskDocument = Timestamped & {
errorMessage?: string; errorMessage?: string;
}; };
type WorkflowPreflightIssue = {
severity: "error" | "warning";
code: string;
message: string;
nodeId?: string;
nodeDefinitionId?: string;
};
type WorkflowPreflightResult = {
ok: boolean;
issues: WorkflowPreflightIssue[];
summary: {
errorCount: number;
warningCount: number;
};
};
type ArtifactDocument = Timestamped & { type ArtifactDocument = Timestamped & {
_id: string; _id: string;
type: "json" | "directory" | "video"; type: "json" | "directory" | "video";
@ -321,6 +338,19 @@ function buildRunExecutionSummary(tasks: RunTaskDocument[]): RunExecutionSummary
}; };
} }
function buildWorkflowPreflightResult(issues: WorkflowPreflightIssue[]): WorkflowPreflightResult {
const errorCount = issues.filter((issue) => issue.severity === "error").length;
const warningCount = issues.filter((issue) => issue.severity === "warning").length;
return {
ok: errorCount === 0,
issues,
summary: {
errorCount,
warningCount,
},
};
}
function inferDefinitionId(nodeId: string) { function inferDefinitionId(nodeId: string) {
return nodeId.replace(/-\d+$/, ""); return nodeId.replace(/-\d+$/, "");
} }
@ -586,6 +616,19 @@ function buildNodeRuntimeResolver(customNodes: CustomNodeDocument[]) {
}; };
} }
function allowsMultipleIncomingEdges(
definition: ReturnType<typeof mapCustomNodeToDefinition> | (typeof DELIVERY_NODE_DEFINITIONS)[number] | undefined,
config: NodeRuntimeConfig | undefined,
) {
if (definition?.allowsMultipleIncoming) {
return true;
}
const contract = isRecord(config?.executorConfig)
? (config.executorConfig.contract as { inputMode?: string } | undefined)
: undefined;
return contract?.inputMode === "multi_asset_set";
}
function collectRetryNodeIds(tasks: RunTaskDocument[], rootNodeId: string) { function collectRetryNodeIds(tasks: RunTaskDocument[], rootNodeId: string) {
const pending = [rootNodeId]; const pending = [rootNodeId];
const collected = new Set<string>([rootNodeId]); const collected = new Set<string>([rootNodeId]);
@ -1298,12 +1341,177 @@ export class MongoAppStore {
.findOne({ _id: workflowVersionId }); .findOne({ _id: workflowVersionId });
} }
async preflightRun(input: {
workflowDefinitionId: string;
workflowVersionId: string;
assetIds: string[];
}) {
const version = await this.getWorkflowVersion(input.workflowVersionId);
if (!version) {
throw new Error(`workflow version not found: ${input.workflowVersionId}`);
}
if (version.workflowDefinitionId !== input.workflowDefinitionId) {
throw new Error("workflow version does not belong to the provided workflow definition");
}
const issues: WorkflowPreflightIssue[] = [];
const assetIds = Array.from(new Set((input.assetIds ?? []).filter(Boolean)));
if (assetIds.length === 0) {
issues.push({
severity: "error",
code: "asset_binding_missing",
message: "assetIds must include at least one asset",
});
}
const assets = assetIds.length > 0
? await this.db.collection<AssetDocument>("assets").find({ _id: { $in: assetIds } }).toArray()
: [];
const assetIdSet = new Set(assets.map((asset) => asset._id));
for (const assetId of assetIds) {
if (!assetIdSet.has(assetId)) {
issues.push({
severity: "error",
code: "bound_asset_missing",
message: `bound asset does not exist: ${assetId}`,
});
}
}
if (assets.some((asset) => asset.projectId !== version.projectId)) {
issues.push({
severity: "error",
code: "bound_asset_project_mismatch",
message: "bound assets must belong to the workflow project",
});
}
const customNodes = await this.listCustomNodes(version.projectId);
const runtimeSnapshot = buildRuntimeSnapshot(
version.runtimeGraph,
version.logicGraph,
version.pluginRefs,
buildNodeRuntimeResolver(customNodes),
);
const definitions = [
...DELIVERY_NODE_DEFINITIONS,
...customNodes.map((node) => mapCustomNodeToDefinition(node)),
];
const definitionMap = new Map(definitions.map((definition) => [definition.id, definition]));
const incomingEdgeCount = new Map<string, number>();
const outgoingEdgeCount = new Map<string, number>();
for (const edge of version.logicGraph.edges) {
incomingEdgeCount.set(edge.to, (incomingEdgeCount.get(edge.to) ?? 0) + 1);
outgoingEdgeCount.set(edge.from, (outgoingEdgeCount.get(edge.from) ?? 0) + 1);
}
for (const node of version.logicGraph.nodes) {
const definitionId = runtimeSnapshot.nodeBindings?.[node.id] ?? inferDefinitionId(node.id);
const definition = definitionMap.get(definitionId);
if (!definition) {
issues.push({
severity: "error",
code: "node_definition_missing",
message: `node ${node.id} references an unknown definition ${definitionId}`,
nodeId: node.id,
nodeDefinitionId: definitionId,
});
continue;
}
const config = runtimeSnapshot.nodeConfigs?.[node.id];
const executorType = config?.executorType ?? definition.defaultExecutorType ?? "python";
const executorConfig = isRecord(config?.executorConfig)
? config.executorConfig
: isRecord(definition.defaultExecutorConfig)
? definition.defaultExecutorConfig
: undefined;
const incomingCount = incomingEdgeCount.get(node.id) ?? 0;
const outgoingCount = outgoingEdgeCount.get(node.id) ?? 0;
if (node.type === "source" && incomingCount > 0) {
issues.push({
severity: "error",
code: "source_has_incoming_edges",
message: `source node ${node.id} cannot accept incoming edges`,
nodeId: node.id,
nodeDefinitionId: definitionId,
});
}
if (node.type === "export" && outgoingCount > 0) {
issues.push({
severity: "error",
code: "export_has_outgoing_edges",
message: `export node ${node.id} cannot emit outgoing edges`,
nodeId: node.id,
nodeDefinitionId: definitionId,
});
}
if (incomingCount > 1 && !allowsMultipleIncomingEdges(definition, config)) {
issues.push({
severity: "error",
code: "node_requires_single_input",
message: `node ${node.id} accepts only one upstream edge in V1`,
nodeId: node.id,
nodeDefinitionId: definitionId,
});
}
if (executorType === "docker") {
const hasDockerTarget = Boolean(
typeof executorConfig?.image === "string" && executorConfig.image.trim().length > 0 ||
typeof executorConfig?.imageTag === "string" && executorConfig.imageTag.trim().length > 0 ||
typeof executorConfig?.dockerfileContent === "string" && executorConfig.dockerfileContent.trim().length > 0,
);
if (!hasDockerTarget) {
issues.push({
severity: "error",
code: "docker_executor_missing_image",
message: `node ${node.id} uses the docker executor without an image or Dockerfile`,
nodeId: node.id,
nodeDefinitionId: definitionId,
});
}
}
if (executorType === "http") {
const url = typeof executorConfig?.url === "string" ? executorConfig.url.trim() : "";
if (!url) {
issues.push({
severity: "error",
code: "http_executor_missing_url",
message: `node ${node.id} uses the http executor without a url`,
nodeId: node.id,
nodeDefinitionId: definitionId,
});
}
}
if (config?.codeHookSpec && config.codeHookSpec.source.trim().length === 0) {
issues.push({
severity: "error",
code: "code_hook_missing_source",
message: `node ${node.id} has an empty python code hook`,
nodeId: node.id,
nodeDefinitionId: definitionId,
});
}
}
return buildWorkflowPreflightResult(issues);
}
async createRun(input: { async createRun(input: {
workflowDefinitionId: string; workflowDefinitionId: string;
workflowVersionId: string; workflowVersionId: string;
triggeredBy: string; triggeredBy: string;
assetIds: string[]; assetIds: string[];
}) { }) {
const preflight = await this.preflightRun({
workflowDefinitionId: input.workflowDefinitionId,
workflowVersionId: input.workflowVersionId,
assetIds: input.assetIds,
});
if (!preflight.ok) {
throw new Error(preflight.issues[0]?.message ?? "workflow run preflight failed");
}
const version = await this.getWorkflowVersion(input.workflowVersionId); const version = await this.getWorkflowVersion(input.workflowVersionId);
if (!version) { if (!version) {
throw new Error(`workflow version not found: ${input.workflowVersionId}`); throw new Error(`workflow version not found: ${input.workflowVersionId}`);

View File

@ -396,6 +396,20 @@ export async function createApiRuntime(config = resolveApiRuntimeConfig()) {
} }
}); });
app.post("/api/runs/preflight", async (request, response, next) => {
try {
response.json(
await store.preflightRun({
workflowDefinitionId: request.body.workflowDefinitionId,
workflowVersionId: request.body.workflowVersionId,
assetIds: request.body.assetIds ?? [],
}),
);
} catch (error) {
next(error);
}
});
app.post("/api/runs", async (request, response, next) => { app.post("/api/runs", async (request, response, next) => {
try { try {
response.json( response.json(

View File

@ -7,6 +7,7 @@ import {
WORKSPACE_TYPES, WORKSPACE_TYPES,
} from "../../../packages/contracts/src/domain.ts"; } from "../../../packages/contracts/src/domain.ts";
import { import {
buildCustomNodeEnvelopePreview,
formatCustomNodeValidationIssue, formatCustomNodeValidationIssue,
validateCustomNodeDefinition, validateCustomNodeDefinition,
} from "../../../packages/contracts/src/custom-node.ts"; } from "../../../packages/contracts/src/custom-node.ts";
@ -95,3 +96,17 @@ test("custom node validation rejects invalid dockerfile and impossible source co
"custom node dockerfile must include a FROM instruction", "custom node dockerfile must include a FROM instruction",
); );
}); });
test("custom node envelope preview reflects the declared input and output contract", () => {
const preview = buildCustomNodeEnvelopePreview({
inputMode: "multi_asset_set",
outputMode: "asset_set_with_report",
artifactType: "json",
});
assert.deepEqual(preview.input.context.assetIds, ["asset-123"]);
assert.deepEqual(preview.input.context.upstreamResults[0]?.result?.assetIds, ["asset-123"]);
assert.deepEqual(preview.output.result.assetIds, ["asset-123"]);
assert.equal(preview.output.result.artifactType, "json");
assert.equal(typeof preview.output.result.report, "object");
});

View File

@ -1502,3 +1502,131 @@ test("mongo-backed runtime rejects invalid custom node definitions with a 400 er
const payload = (await response.json()) as { message: string }; const payload = (await response.json()) as { message: string };
assert.equal(payload.message, "source category custom nodes cannot declare multi_asset_set input"); assert.equal(payload.message, "source category custom nodes cannot declare multi_asset_set input");
}); });
test("mongo-backed runtime preflights workflow runs before creation and blocks invalid executor config", async (t) => {
const sourceDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-runtime-preflight-"));
await mkdir(path.join(sourceDir, "DJI_001"));
await writeFile(path.join(sourceDir, "meta.json"), "{}");
await writeFile(path.join(sourceDir, "intrinsics.json"), "{}");
await writeFile(path.join(sourceDir, "video_meta.json"), "{}");
await writeFile(path.join(sourceDir, "DJI_001", "DJI_001.mp4"), "");
const mongod = await MongoMemoryServer.create();
t.after(async () => {
await mongod.stop();
});
const server = await startRuntimeServer({
host: "127.0.0.1",
port: 0,
mongoUri: mongod.getUri(),
database: "emboflow-runtime-preflight",
corsOrigin: "http://127.0.0.1:3000",
});
t.after(async () => {
await server.close();
});
const bootstrap = await readJson<{
workspace: { _id: string };
project: { _id: string };
}>(
await fetch(`${server.baseUrl}/api/dev/bootstrap`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ userId: "preflight-user", projectName: "Preflight Project" }),
}),
);
const asset = await readJson<{ _id: string }>(
await fetch(`${server.baseUrl}/api/assets/register`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
workspaceId: bootstrap.workspace._id,
projectId: bootstrap.project._id,
sourcePath: sourceDir,
}),
}),
);
await readJson(await fetch(`${server.baseUrl}/api/assets/${asset._id}/probe`, { method: "POST" }));
const workflow = await readJson<{ _id: string }>(
await fetch(`${server.baseUrl}/api/workflows`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
workspaceId: bootstrap.workspace._id,
projectId: bootstrap.project._id,
name: "Preflight Flow",
}),
}),
);
const version = await readJson<{ _id: string }>(
await fetch(`${server.baseUrl}/api/workflows/${workflow._id}/versions`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
visualGraph: { viewport: { x: 0, y: 0, zoom: 1 } },
logicGraph: {
nodes: [
{ id: "source-asset", type: "source" },
{ id: "validate-structure", type: "inspect" },
{ id: "export-delivery-package", type: "export" },
],
edges: [
{ from: "source-asset", to: "validate-structure" },
{ from: "validate-structure", to: "export-delivery-package" },
],
},
runtimeGraph: {
nodeConfigs: {
"validate-structure": {
executorType: "http",
executorConfig: {
method: "POST",
},
},
},
},
pluginRefs: ["builtin:delivery-nodes"],
}),
}),
);
const preflight = await readJson<{
ok: boolean;
issues: Array<{ code: string; message: string; nodeId?: string; severity: string }>;
summary: { errorCount: number; warningCount: number };
}>(
await fetch(`${server.baseUrl}/api/runs/preflight`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
workflowDefinitionId: workflow._id,
workflowVersionId: version._id,
assetIds: [asset._id],
}),
}),
);
assert.equal(preflight.ok, false);
assert.equal(preflight.summary.errorCount, 1);
assert.equal(preflight.issues[0]?.code, "http_executor_missing_url");
assert.equal(preflight.issues[0]?.nodeId, "validate-structure");
const runResponse = await fetch(`${server.baseUrl}/api/runs`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
workflowDefinitionId: workflow._id,
workflowVersionId: version._id,
assetIds: [asset._id],
}),
});
assert.equal(runResponse.status, 400);
const runPayload = (await runResponse.json()) as { message: string };
assert.equal(runPayload.message, "node validate-structure uses the http executor without a url");
});

View File

@ -294,6 +294,33 @@ export class ApiClient {
); );
} }
async preflightRun(input: {
workflowDefinitionId: string;
workflowVersionId: string;
assetIds: string[];
}) {
return readJson<{
ok: boolean;
issues: Array<{
severity: "error" | "warning";
code: string;
message: string;
nodeId?: string;
nodeDefinitionId?: string;
}>;
summary: {
errorCount: number;
warningCount: number;
};
}>(
await fetch(`${this.baseUrl}/api/runs/preflight`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(input),
}),
);
}
async getRun(runId: string) { async getRun(runId: string) {
return readJson<any>(await fetch(`${this.baseUrl}/api/runs/${runId}`)); return readJson<any>(await fetch(`${this.baseUrl}/api/runs/${runId}`));
} }

View File

@ -15,6 +15,7 @@ import {
import { ApiClient } from "./api-client.ts"; import { ApiClient } from "./api-client.ts";
import { import {
buildCustomNodeEnvelopePreview,
type CustomNodeValidationIssue, type CustomNodeValidationIssue,
validateCustomNodeDefinition, validateCustomNodeDefinition,
} from "../../../../packages/contracts/src/custom-node.ts"; } from "../../../../packages/contracts/src/custom-node.ts";
@ -156,6 +157,21 @@ type AppProps = {
apiBaseUrl: string; apiBaseUrl: string;
}; };
type WorkflowPreflightResult = {
ok: boolean;
issues: Array<{
severity: "error" | "warning";
code: string;
message: string;
nodeId?: string;
nodeDefinitionId?: string;
}>;
summary: {
errorCount: number;
warningCount: number;
};
};
function translateStatus(status: string | undefined, t: ReturnType<typeof useI18n>["t"]) { function translateStatus(status: string | undefined, t: ReturnType<typeof useI18n>["t"]) {
switch (status) { switch (status) {
case "success": case "success":
@ -1281,6 +1297,8 @@ function WorkflowEditorPage(props: {
const [canvasFeedbackKey, setCanvasFeedbackKey] = useState<TranslationKey | null>(null); const [canvasFeedbackKey, setCanvasFeedbackKey] = useState<TranslationKey | null>(null);
const [canvasDropActive, setCanvasDropActive] = useState(false); const [canvasDropActive, setCanvasDropActive] = useState(false);
const [flowInstance, setFlowInstance] = useState<ReactFlowInstance<Node, Edge> | null>(null); const [flowInstance, setFlowInstance] = useState<ReactFlowInstance<Node, Edge> | null>(null);
const [preflightResult, setPreflightResult] = useState<WorkflowPreflightResult | null>(null);
const [preflightBusy, setPreflightBusy] = useState(false);
useEffect(() => { useEffect(() => {
void (async () => { void (async () => {
@ -1309,6 +1327,7 @@ function WorkflowEditorPage(props: {
setSavedTemplateName(null); setSavedTemplateName(null);
setDirty(false); setDirty(false);
setCanvasFeedbackKey(null); setCanvasFeedbackKey(null);
setPreflightResult(null);
} catch (loadError) { } catch (loadError) {
setError(loadError instanceof Error ? loadError.message : t("failedLoadWorkflow")); setError(loadError instanceof Error ? loadError.message : t("failedLoadWorkflow"));
} }
@ -1375,6 +1394,21 @@ function WorkflowEditorPage(props: {
} | undefined)?.kind, } | undefined)?.kind,
[selectedNodeEffectiveRuntimeConfig], [selectedNodeEffectiveRuntimeConfig],
); );
const selectedNodeEnvelopePreview = useMemo(() => {
if (
!selectedNodeContract ||
typeof selectedNodeContract.inputMode !== "string" ||
typeof selectedNodeContract.outputMode !== "string" ||
typeof selectedNodeContract.artifactType !== "string"
) {
return null;
}
return buildCustomNodeEnvelopePreview({
inputMode: selectedNodeContract.inputMode as "single_asset_set" | "multi_asset_set",
outputMode: selectedNodeContract.outputMode as "report" | "asset_set" | "asset_set_with_report",
artifactType: selectedNodeContract.artifactType as "json" | "directory" | "video",
});
}, [selectedNodeContract]);
const canvasNodes = useMemo<Array<Node>>( const canvasNodes = useMemo<Array<Node>>(
() => () =>
draft.logicGraph.nodes.map((node) => { draft.logicGraph.nodes.map((node) => {
@ -1532,6 +1566,30 @@ function WorkflowEditorPage(props: {
setDirty(true); setDirty(true);
} }
async function runWorkflowChecks(versionId: string) {
if (!selectedAssetId) {
setPreflightResult(null);
setError(t("selectAssetBeforeRun"));
return null;
}
setPreflightBusy(true);
try {
const result = await props.api.preflightRun({
workflowDefinitionId: props.workflowId,
workflowVersionId: versionId,
assetIds: [selectedAssetId],
});
setPreflightResult(result);
return result;
} catch (preflightError) {
setError(preflightError instanceof Error ? preflightError.message : t("checksBlocked"));
return null;
} finally {
setPreflightBusy(false);
}
}
async function saveCurrentDraft() { async function saveCurrentDraft() {
const version = await props.api.saveWorkflowVersion( const version = await props.api.saveWorkflowVersion(
props.workflowId, props.workflowId,
@ -1539,9 +1597,22 @@ function WorkflowEditorPage(props: {
); );
setVersions((previous) => [version, ...previous.filter((item) => item._id !== version._id)]); setVersions((previous) => [version, ...previous.filter((item) => item._id !== version._id)]);
setDirty(false); setDirty(false);
if (selectedAssetId) {
await runWorkflowChecks(version._id);
} else {
setPreflightResult(null);
}
return version; return version;
} }
useEffect(() => {
if (dirty || !selectedAssetId || versions.length === 0) {
return;
}
void runWorkflowChecks(versions[0]._id);
}, [dirty, selectedAssetId, versions, props.workflowId]);
return ( return (
<div className="page-stack"> <div className="page-stack">
<section className="panel"> <section className="panel">
@ -1561,6 +1632,18 @@ function WorkflowEditorPage(props: {
))} ))}
</select> </select>
</label> </label>
<button
className="button-secondary"
onClick={async () => {
const latestVersion = dirty || versions.length === 0
? await saveCurrentDraft()
: versions[0];
await runWorkflowChecks(latestVersion._id);
}}
disabled={!selectedAssetId || preflightBusy}
>
{t("runChecks")}
</button>
<button <button
className="button-primary" className="button-primary"
onClick={async () => { onClick={async () => {
@ -1579,6 +1662,11 @@ function WorkflowEditorPage(props: {
const latestVersion = dirty || versions.length === 0 const latestVersion = dirty || versions.length === 0
? await saveCurrentDraft() ? await saveCurrentDraft()
: versions[0]; : versions[0];
const preflight = await runWorkflowChecks(latestVersion._id);
if (!preflight?.ok) {
setError(preflight?.issues[0]?.message ?? t("checksBlocked"));
return;
}
const run = await props.api.createRun({ const run = await props.api.createRun({
workflowDefinitionId: props.workflowId, workflowDefinitionId: props.workflowId,
workflowVersionId: latestVersion._id, workflowVersionId: latestVersion._id,
@ -1604,6 +1692,33 @@ function WorkflowEditorPage(props: {
{lastRunId ? <a href={`/runs/${lastRunId}`}>{t("openLatestRun")}</a> : null} {lastRunId ? <a href={`/runs/${lastRunId}`}>{t("openLatestRun")}</a> : null}
</div> </div>
{error ? <p>{error}</p> : null} {error ? <p>{error}</p> : null}
<div style={{ marginTop: 12, display: "grid", gap: 8 }}>
<div className="toolbar">
<strong>{t("workflowChecks")}</strong>
{preflightBusy ? <span>{t("running")}</span> : null}
</div>
{preflightResult ? (
<>
<p>
{preflightResult.ok ? t("checksPassed") : t("checksBlocked")} ·{" "}
{t("checkErrors", { count: preflightResult.summary.errorCount })} ·{" "}
{t("checkWarnings", { count: preflightResult.summary.warningCount })}
</p>
{preflightResult.issues.length > 0 ? (
<ul>
{preflightResult.issues.map((issue, index) => (
<li key={`${issue.code}-${issue.nodeId ?? "global"}-${index}`}>
{issue.message}
{issue.nodeId ? ` (${issue.nodeId})` : ""}
</li>
))}
</ul>
) : null}
</>
) : (
<p className="empty-state">{t("noChecksRunYet")}</p>
)}
</div>
</section> </section>
<section className="editor-layout"> <section className="editor-layout">
@ -1698,6 +1813,18 @@ function WorkflowEditorPage(props: {
<p>{t("customNodeOutputMode")}: {t(formatCustomNodeOutputModeKey(selectedNodeContract.outputMode))}</p> <p>{t("customNodeOutputMode")}: {t(formatCustomNodeOutputModeKey(selectedNodeContract.outputMode))}</p>
<p>{t("customNodeArtifactType")}: {selectedNodeContract.artifactType ?? t("none")}</p> <p>{t("customNodeArtifactType")}: {selectedNodeContract.artifactType ?? t("none")}</p>
<p>{t("customNodeSourceKind")}: {t(formatCustomNodeSourceKindKey(selectedNodeSourceKind))}</p> <p>{t("customNodeSourceKind")}: {t(formatCustomNodeSourceKindKey(selectedNodeSourceKind))}</p>
{selectedNodeEnvelopePreview ? (
<>
<label style={{ display: "grid", gap: 8 }}>
<span>{t("inputEnvelope")}</span>
<pre>{JSON.stringify(selectedNodeEnvelopePreview.input, null, 2)}</pre>
</label>
<label style={{ display: "grid", gap: 8 }}>
<span>{t("outputEnvelope")}</span>
<pre>{JSON.stringify(selectedNodeEnvelopePreview.output, null, 2)}</pre>
</label>
</>
) : null}
</> </>
) : null} ) : null}
<div className="field-grid"> <div className="field-grid">

View File

@ -118,6 +118,13 @@ export type TranslationKey =
| "noWorkflowsYet" | "noWorkflowsYet"
| "latestVersion" | "latestVersion"
| "workflowEditor" | "workflowEditor"
| "workflowChecks"
| "runChecks"
| "checksPassed"
| "checksBlocked"
| "checkErrors"
| "checkWarnings"
| "noChecksRunYet"
| "runAsset" | "runAsset"
| "saveWorkflowVersion" | "saveWorkflowVersion"
| "triggerWorkflowRun" | "triggerWorkflowRun"
@ -134,6 +141,8 @@ export type TranslationKey =
| "draftSynced" | "draftSynced"
| "draftUnsaved" | "draftUnsaved"
| "nodeConfiguration" | "nodeConfiguration"
| "inputEnvelope"
| "outputEnvelope"
| "category" | "category"
| "definition" | "definition"
| "executorType" | "executorType"
@ -356,6 +365,13 @@ const TRANSLATIONS: Record<Language, Record<TranslationKey, string>> = {
noWorkflowsYet: "No workflows yet.", noWorkflowsYet: "No workflows yet.",
latestVersion: "Latest version", latestVersion: "Latest version",
workflowEditor: "Workflow Editor", workflowEditor: "Workflow Editor",
workflowChecks: "Workflow Checks",
runChecks: "Run Checks",
checksPassed: "Checks passed",
checksBlocked: "Blocking issues found",
checkErrors: "{count} errors",
checkWarnings: "{count} warnings",
noChecksRunYet: "Run checks to validate the saved workflow version before execution.",
runAsset: "Run Asset", runAsset: "Run Asset",
saveWorkflowVersion: "Save Workflow Version", saveWorkflowVersion: "Save Workflow Version",
triggerWorkflowRun: "Trigger Workflow Run", triggerWorkflowRun: "Trigger Workflow Run",
@ -372,6 +388,8 @@ const TRANSLATIONS: Record<Language, Record<TranslationKey, string>> = {
draftSynced: "synced", draftSynced: "synced",
draftUnsaved: "unsaved changes", draftUnsaved: "unsaved changes",
nodeConfiguration: "Node Configuration", nodeConfiguration: "Node Configuration",
inputEnvelope: "Input Envelope",
outputEnvelope: "Output Envelope",
category: "Category", category: "Category",
definition: "Definition", definition: "Definition",
executorType: "Executor Type", executorType: "Executor Type",
@ -587,6 +605,13 @@ const TRANSLATIONS: Record<Language, Record<TranslationKey, string>> = {
noWorkflowsYet: "还没有工作流。", noWorkflowsYet: "还没有工作流。",
latestVersion: "最新版本", latestVersion: "最新版本",
workflowEditor: "工作流编辑器", workflowEditor: "工作流编辑器",
workflowChecks: "工作流检查",
runChecks: "执行检查",
checksPassed: "检查通过",
checksBlocked: "发现阻塞问题",
checkErrors: "{count} 个错误",
checkWarnings: "{count} 个警告",
noChecksRunYet: "先执行检查,再触发运行已保存的工作流版本。",
runAsset: "运行资产", runAsset: "运行资产",
saveWorkflowVersion: "保存工作流版本", saveWorkflowVersion: "保存工作流版本",
triggerWorkflowRun: "触发工作流运行", triggerWorkflowRun: "触发工作流运行",
@ -603,6 +628,8 @@ const TRANSLATIONS: Record<Language, Record<TranslationKey, string>> = {
draftSynced: "已同步", draftSynced: "已同步",
draftUnsaved: "有未保存修改", draftUnsaved: "有未保存修改",
nodeConfiguration: "节点配置", nodeConfiguration: "节点配置",
inputEnvelope: "输入 Envelope",
outputEnvelope: "输出 Envelope",
category: "分类", category: "分类",
definition: "定义", definition: "定义",
executorType: "执行器类型", executorType: "执行器类型",

View File

@ -233,6 +233,8 @@ Workflow execution must validate in this order:
Validation failure must block run creation. Validation failure must block run creation.
The current V1 API now exposes this as a real preflight step, not only as an editor convention. `POST /api/runs/preflight` evaluates the saved workflow version against the selected bound assets and frozen runtime snapshot. `POST /api/runs` reuses the same checks and rejects run creation when any blocking issue remains.
## Run Lifecycle ## Run Lifecycle
When a user executes a workflow: When a user executes a workflow:
@ -248,6 +250,16 @@ When a user executes a workflow:
9. collect outputs, logs, and task state 9. collect outputs, logs, and task state
10. finalize run status and summary 10. finalize run status and summary
The current preflight checks include:
- workflow definition and version linkage
- bound asset existence and project match
- resolved node definition existence
- source and export edge direction rules
- multi-input eligibility
- executor-specific required config such as Docker image or HTTP URL
- non-empty code hook source when a hook is present
## Run State Model ## Run State Model
### WorkflowRun Status ### WorkflowRun Status

View File

@ -164,6 +164,7 @@ The current V1 implementation is simpler than the target canvas UX, but it alrea
- allow node add and remove operations on the draft - allow node add and remove operations on the draft
- save the current draft as a new workflow version - save the current draft as a new workflow version
- auto-save a dirty draft before triggering a run - auto-save a dirty draft before triggering a run
- run a workflow-level preflight check against the latest saved version and selected bound asset before execution
The current runtime implementation now also renders the center surface as a real node canvas instead of a static placeholder list: The current runtime implementation now also renders the center surface as a real node canvas instead of a static placeholder list:
@ -237,6 +238,7 @@ This panel is critical. It should feel like a structured system console, not a g
The current right panel also includes a workflow-level `Save As Template` section so an edited graph can be published back into the project template library. The current right panel also includes a workflow-level `Save As Template` section so an edited graph can be published back into the project template library.
For project-scoped custom nodes, the right panel should also surface the declared contract summary directly from the node definition, including input mode, output mode, artifact type, and whether the backing runtime came from a Docker image or Dockerfile definition. For project-scoped custom nodes, the right panel should also surface the declared contract summary directly from the node definition, including input mode, output mode, artifact type, and whether the backing runtime came from a Docker image or Dockerfile definition.
The current V1 direction now also renders the standard EmboFlow input and output envelope preview for selected custom nodes, so container authors can see the exact JSON shape expected by the runtime without leaving the editor.
## Screen 5: Workflow Run Detail ## Screen 5: Workflow Run Detail

View File

@ -27,6 +27,32 @@ export type CustomNodeContract = {
artifactType: CustomNodeArtifactType; artifactType: CustomNodeArtifactType;
}; };
export type CustomNodeEnvelopePreview = {
input: {
task: {
nodeId: string;
nodeDefinitionId: string;
executorType: "docker";
assetIds: string[];
};
context: {
assetIds: string[];
assets: Array<{
_id: string;
displayName: string;
sourcePath: string;
}>;
upstreamResults: Array<{
nodeId: string;
result: Record<string, unknown>;
}>;
};
};
output: {
result: Record<string, unknown>;
};
};
export type CustomNodeSource = export type CustomNodeSource =
| { | {
kind: "image"; kind: "image";
@ -166,3 +192,66 @@ export function validateCustomNodeDefinition(input: CustomNodeValidationInput):
return issues; return issues;
} }
export function buildCustomNodeEnvelopePreview(contract: CustomNodeContract): CustomNodeEnvelopePreview {
const upstreamResult =
contract.inputMode === "multi_asset_set"
? {
nodeId: "upstream-union-assets",
result: {
assetIds: ["asset-123"],
summary: {
keptAssetCount: 1,
},
},
}
: {
nodeId: "upstream-source-asset",
result: {
assetIds: ["asset-123"],
},
};
const result: Record<string, unknown> = {
summary: {
outcome: "success",
processedAssetCount: 1,
},
artifactType: contract.artifactType,
};
if (contract.outputMode === "asset_set" || contract.outputMode === "asset_set_with_report") {
result.assetIds = ["asset-123"];
}
if (contract.outputMode === "report" || contract.outputMode === "asset_set_with_report") {
result.report = {
status: "ok",
findings: [],
};
}
return {
input: {
task: {
nodeId: "custom-node-1",
nodeDefinitionId: "custom-example",
executorType: "docker",
assetIds: ["asset-123"],
},
context: {
assetIds: ["asset-123"],
assets: [
{
_id: "asset-123",
displayName: "Sample Asset",
sourcePath: "/data/sample-asset",
},
],
upstreamResults: [upstreamResult],
},
},
output: {
result,
},
};
}