From 7d7cd142330228fe24427f716bc8b00398dbf0c1 Mon Sep 17 00:00:00 2001
From: eust-w
Date: Mon, 30 Mar 2026 14:18:57 +0800
Subject: [PATCH] :sparkles: feat: add dataset-aware workflow inputs
---
README.md | 6 +-
.../common/mongo/schemas/run-task.schema.ts | 2 +
.../mongo/schemas/workflow-run.schema.ts | 2 +
apps/api/src/runtime/mongo-store.ts | 190 +++++++++++++-----
apps/api/src/runtime/server.ts | 4 +
apps/api/test/domain-contracts.spec.ts | 37 ++++
.../api/test/runtime-http.integration.spec.ts | 163 +++++++++++++++
.../web/src/features/runs/run-detail-page.tsx | 4 +
apps/web/src/features/runs/runs-page.tsx | 4 +
.../workflows/workflow-editor-page.test.tsx | 10 +
apps/web/src/runtime/api-client.ts | 10 +-
apps/web/src/runtime/app.tsx | 162 +++++++++++++--
apps/web/src/runtime/i18n.test.ts | 4 +
apps/web/src/runtime/i18n.tsx | 33 ++-
.../worker/src/contracts/execution-context.ts | 20 ++
apps/worker/src/runtime/mongo-worker-store.ts | 47 +++++
apps/worker/src/runtime/worker-runtime.ts | 7 +
.../03-workflows/workflow-execution-model.md | 10 +-
...nformation-architecture-and-key-screens.md | 13 +-
design/05-data/mongodb-data-model.md | 13 ++
packages/contracts/src/workflow-input.ts | 74 +++++++
21 files changed, 730 insertions(+), 85 deletions(-)
create mode 100644 packages/contracts/src/workflow-input.ts
diff --git a/README.md b/README.md
index 198fc13..10278b5 100644
--- a/README.md
+++ b/README.md
@@ -76,7 +76,7 @@ The local validation path currently used for embodied data testing is:
```
You can register that directory from the Assets page or via `POST /api/assets/register`.
-The workflow editor currently requires selecting at least one registered asset before a run can be created.
+The workflow editor now supports workflow input bindings for both registered assets and project datasets. Dataset bindings are expanded into runnable asset ids during preflight and run creation, and run detail shows `input sources`, `input assets`, and `input datasets` separately.
The editor now also persists per-node runtime config in workflow versions, including executor overrides, optional artifact title overrides, and Python code-hook source for inspect and transform style nodes.
The runtime web shell now exposes a visible `中文 / English` language toggle. The core workspace shell and workflow authoring surface are translated through a lightweight i18n layer.
The shell now also exposes a dedicated Projects page plus an active project selector, so assets, datasets, workflow templates, workflows, and runs all switch together at the project boundary.
@@ -86,9 +86,9 @@ The Workflows workspace now includes a template gallery. Projects can start from
The workflow editor center panel now uses a real draggable node canvas with zoom, pan, mini-map, dotted background, handle-based edge creation, persisted node positions, and localized validation feedback instead of a static list of node cards.
The workflow editor right panel now also supports saving the current workflow draft as a reusable workflow template, in addition to editing per-node runtime settings and Python hooks.
When a custom node is selected on the canvas, the right panel now also exposes its declared input contract, output contract, artifact type, and container source so the operator can confirm compatibility without leaving the editor.
-The workflow editor now also exposes a workflow-level preflight panel. Saved workflow versions can be checked against the selected bound asset before execution, and run creation is blocked when the current version still has graph, executor, or asset-binding errors.
+The workflow editor now also exposes a workflow-level preflight panel. Saved workflow versions can be checked against the selected asset or dataset binding before execution, and run creation is blocked when the current version still has graph, executor, or input-binding errors.
The node library now supports both click-to-append and drag-and-drop placement into the canvas. When a node is inserted from the library, the editor now seeds its default runtime contract directly into the workflow draft, so custom Docker nodes keep their declared executor type and I/O contract without extra manual edits. V1 connection rules block self-edges, duplicate edges, cycles, incoming edges into source nodes, outgoing edges from export nodes, and multiple upstream edges into ordinary nodes, while allowing multi-input set nodes such as `union-assets`, `intersect-assets`, and `difference-assets` plus any custom node whose runtime contract declares `inputMode=multi_asset_set`.
-The Runs workspace now shows project-scoped run history, run-level aggregated summaries, cancel/retry controls, and run detail views with persisted task summaries, stdout/stderr sections, result previews, and artifact links into Explore.
+The Runs workspace now shows project-scoped run history, run-level aggregated summaries, cancel/retry controls, and run detail views with persisted task summaries, stdout/stderr sections, result previews, artifact links into Explore, plus explicit input-source visibility for both assets and datasets.
Selected run tasks now expose the frozen node definition id, executor config snapshot, and code-hook metadata that were captured when the run was created.
Most built-in delivery nodes now default to `executorType=docker`. When a node uses `executorType=docker` and provides `executorConfig.image`, the worker runs a real local Docker container with mounted `input.json` / `output.json` exchange files plus read-only mounts for bound asset paths. If no image is configured, the executor falls back to the lightweight simulated behavior used by older demo tasks.
The Docker runner now treats missing or `null` `codeHookSpec` values as “no hook configured”, so built-in Docker nodes and custom container nodes can share the same task envelope without crashing on optional hook fields.
diff --git a/apps/api/src/common/mongo/schemas/run-task.schema.ts b/apps/api/src/common/mongo/schemas/run-task.schema.ts
index b1719c9..0b3ee6d 100644
--- a/apps/api/src/common/mongo/schemas/run-task.schema.ts
+++ b/apps/api/src/common/mongo/schemas/run-task.schema.ts
@@ -13,7 +13,9 @@ export const runTaskSchemaDefinition = {
artifactTitle: { type: "string", required: false, default: null },
status: { type: "string", required: true },
attempt: { type: "number", required: true, default: 1 },
+ inputBindings: { type: "array", required: true, default: [] },
assetIds: { type: "array", required: true, default: [] },
+ datasetIds: { type: "array", required: true, default: [] },
upstreamNodeIds: { type: "array", required: true, default: [] },
outputArtifactIds: { type: "array", required: true, default: [] },
logLines: { type: "array", required: true, default: [] },
diff --git a/apps/api/src/common/mongo/schemas/workflow-run.schema.ts b/apps/api/src/common/mongo/schemas/workflow-run.schema.ts
index 42985e7..4a52085 100644
--- a/apps/api/src/common/mongo/schemas/workflow-run.schema.ts
+++ b/apps/api/src/common/mongo/schemas/workflow-run.schema.ts
@@ -5,7 +5,9 @@ export const workflowRunSchemaDefinition = {
workflowVersionId: { type: "string", required: true },
status: { type: "string", required: true },
triggeredBy: { type: "string", required: true },
+ inputBindings: { type: "array", required: true, default: [] },
assetIds: { type: "array", required: true, default: [] },
+ datasetIds: { type: "array", required: true, default: [] },
runtimeSnapshot: { type: "object", required: false, default: null },
summary: { type: "object", required: false, default: null },
startedAt: { type: "date", required: false, default: null },
diff --git a/apps/api/src/runtime/mongo-store.ts b/apps/api/src/runtime/mongo-store.ts
index 8f8508e..698bc32 100644
--- a/apps/api/src/runtime/mongo-store.ts
+++ b/apps/api/src/runtime/mongo-store.ts
@@ -11,6 +11,11 @@ import {
type CustomNodeInputMode,
type CustomNodeOutputMode,
} from "../../../../packages/contracts/src/custom-node.ts";
+import {
+ normalizeWorkflowInputBindings,
+ splitWorkflowInputBindings,
+ type WorkflowInputBinding,
+} from "../../../../packages/contracts/src/workflow-input.ts";
import {
buildDefaultNodeRuntimeConfig,
DELIVERY_NODE_DEFINITIONS,
@@ -24,6 +29,7 @@ import type {
RunRuntimeSnapshot,
TaskExecutionSummary,
TaskStatusCounts,
+ type WorkflowInputBinding as RuntimeWorkflowInputBinding,
} from "../../../worker/src/contracts/execution-context.ts";
type Timestamped = {
@@ -206,7 +212,9 @@ type WorkflowRunDocument = Timestamped & {
projectId: string;
status: "queued" | "running" | "success" | "failed" | "cancelled";
triggeredBy: string;
+ inputBindings?: RuntimeWorkflowInputBinding[];
assetIds: string[];
+ datasetIds?: string[];
runtimeSnapshot?: RunRuntimeSnapshot;
startedAt?: string;
finishedAt?: string;
@@ -228,7 +236,9 @@ type RunTaskDocument = Timestamped & {
artifactTitle?: string;
status: "queued" | "pending" | "running" | "success" | "failed" | "cancelled";
attempt: number;
+ inputBindings?: RuntimeWorkflowInputBinding[];
assetIds: string[];
+ datasetIds?: string[];
upstreamNodeIds: string[];
outputArtifactIds: string[];
startedAt?: string;
@@ -665,6 +675,119 @@ function createStorageSnapshot(
export class MongoAppStore {
constructor(private readonly db: Db) {}
+ private normalizeRunInputBindings(input: {
+ inputBindings?: WorkflowInputBinding[];
+ assetIds?: string[];
+ datasetIds?: string[];
+ }) {
+ return normalizeWorkflowInputBindings(input);
+ }
+
+ private async resolveRunInputSelection(
+ projectId: string,
+ input: {
+ inputBindings?: WorkflowInputBinding[];
+ assetIds?: string[];
+ datasetIds?: string[];
+ },
+ ) {
+ const inputBindings = this.normalizeRunInputBindings(input);
+ const issues: WorkflowPreflightIssue[] = [];
+
+ if (inputBindings.length === 0) {
+ issues.push({
+ severity: "error",
+ code: "asset_binding_missing",
+ message: "assetIds must include at least one asset or dataset binding",
+ });
+ return {
+ inputBindings,
+ assetIds: [] as string[],
+ datasetIds: [] as string[],
+ assets: [] as AssetDocument[],
+ datasets: [] as DatasetDocument[],
+ issues,
+ };
+ }
+
+ const { assetIds: directAssetIds, datasetIds } = splitWorkflowInputBindings(inputBindings);
+
+ const [directAssets, datasets] = await Promise.all([
+ directAssetIds.length > 0
+ ? this.db.collection("assets").find({ _id: { $in: directAssetIds } }).toArray()
+ : Promise.resolve([]),
+ datasetIds.length > 0
+ ? this.db.collection("datasets").find({ _id: { $in: datasetIds } }).toArray()
+ : Promise.resolve([]),
+ ]);
+
+ const directAssetIdSet = new Set(directAssets.map((asset) => asset._id));
+ for (const assetId of directAssetIds) {
+ if (!directAssetIdSet.has(assetId)) {
+ issues.push({
+ severity: "error",
+ code: "bound_asset_missing",
+ message: `bound asset does not exist: ${assetId}`,
+ });
+ }
+ }
+ if (directAssets.some((asset) => asset.projectId !== projectId)) {
+ issues.push({
+ severity: "error",
+ code: "bound_asset_project_mismatch",
+ message: "bound assets must belong to the workflow project",
+ });
+ }
+
+ const datasetIdSet = new Set(datasets.map((dataset) => dataset._id));
+ for (const datasetId of datasetIds) {
+ if (!datasetIdSet.has(datasetId)) {
+ issues.push({
+ severity: "error",
+ code: "bound_dataset_missing",
+ message: `bound dataset does not exist: ${datasetId}`,
+ });
+ }
+ }
+ if (datasets.some((dataset) => dataset.projectId !== projectId)) {
+ issues.push({
+ severity: "error",
+ code: "bound_dataset_project_mismatch",
+ message: "bound datasets must belong to the workflow project",
+ });
+ }
+
+ const resolvedAssetIds = Array.from(
+ new Set([
+ ...directAssetIds,
+ ...datasets.flatMap((dataset) => dataset.sourceAssetIds ?? []),
+ ]),
+ );
+
+ const assetIds = issues.some((issue) => issue.severity === "error")
+ ? directAssets
+ .map((asset) => asset._id)
+ .filter((assetId) => resolvedAssetIds.includes(assetId))
+ : resolvedAssetIds;
+
+ if (assetIds.length === 0) {
+ issues.push({
+ severity: "error",
+ code: "resolved_asset_binding_missing",
+ message: "the selected assets or datasets do not resolve to any runnable assets",
+ });
+ }
+
+ return {
+ inputBindings,
+ assetIds,
+ datasetIds,
+ assets: directAssets,
+ datasets,
+ issues,
+ };
+ }
+
private async ensureDefaultStorageConnection(workspaceId: string, createdBy: string) {
const collection = this.db.collection("storage_connections");
const existing = await collection.findOne({
@@ -1344,7 +1467,9 @@ export class MongoAppStore {
async preflightRun(input: {
workflowDefinitionId: string;
workflowVersionId: string;
- assetIds: string[];
+ inputBindings?: WorkflowInputBinding[];
+ assetIds?: string[];
+ datasetIds?: string[];
}) {
const version = await this.getWorkflowVersion(input.workflowVersionId);
if (!version) {
@@ -1355,35 +1480,8 @@ export class MongoAppStore {
}
const issues: WorkflowPreflightIssue[] = [];
- const assetIds = Array.from(new Set((input.assetIds ?? []).filter(Boolean)));
- if (assetIds.length === 0) {
- issues.push({
- severity: "error",
- code: "asset_binding_missing",
- message: "assetIds must include at least one asset",
- });
- }
-
- const assets = assetIds.length > 0
- ? await this.db.collection("assets").find({ _id: { $in: assetIds } }).toArray()
- : [];
- const assetIdSet = new Set(assets.map((asset) => asset._id));
- for (const assetId of assetIds) {
- if (!assetIdSet.has(assetId)) {
- issues.push({
- severity: "error",
- code: "bound_asset_missing",
- message: `bound asset does not exist: ${assetId}`,
- });
- }
- }
- if (assets.some((asset) => asset.projectId !== version.projectId)) {
- issues.push({
- severity: "error",
- code: "bound_asset_project_mismatch",
- message: "bound assets must belong to the workflow project",
- });
- }
+ const inputSelection = await this.resolveRunInputSelection(version.projectId, input);
+ issues.push(...inputSelection.issues);
const customNodes = await this.listCustomNodes(version.projectId);
const runtimeSnapshot = buildRuntimeSnapshot(
@@ -1501,12 +1599,16 @@ export class MongoAppStore {
workflowDefinitionId: string;
workflowVersionId: string;
triggeredBy: string;
- assetIds: string[];
+ inputBindings?: WorkflowInputBinding[];
+ assetIds?: string[];
+ datasetIds?: string[];
}) {
const preflight = await this.preflightRun({
workflowDefinitionId: input.workflowDefinitionId,
workflowVersionId: input.workflowVersionId,
+ inputBindings: input.inputBindings,
assetIds: input.assetIds,
+ datasetIds: input.datasetIds,
});
if (!preflight.ok) {
throw new Error(preflight.issues[0]?.message ?? "workflow run preflight failed");
@@ -1516,19 +1618,9 @@ export class MongoAppStore {
if (!version) {
throw new Error(`workflow version not found: ${input.workflowVersionId}`);
}
- const assetIds = Array.from(new Set((input.assetIds ?? []).filter(Boolean)));
- if (assetIds.length === 0) {
- throw new Error("assetIds must include at least one asset");
- }
- const assets = await this.db
- .collection("assets")
- .find({ _id: { $in: assetIds } })
- .toArray();
- if (assets.length !== assetIds.length) {
- throw new Error("one or more bound assets do not exist");
- }
- if (assets.some((asset) => asset.projectId !== version.projectId)) {
- throw new Error("bound assets must belong to the workflow project");
+ const inputSelection = await this.resolveRunInputSelection(version.projectId, input);
+ if (inputSelection.issues.length > 0) {
+ throw new Error(inputSelection.issues[0]?.message ?? "invalid workflow input bindings");
}
const customNodes = await this.listCustomNodes(version.projectId);
@@ -1547,7 +1639,9 @@ export class MongoAppStore {
projectId: version.projectId,
status: "queued",
triggeredBy: input.triggeredBy,
- assetIds,
+ inputBindings: inputSelection.inputBindings,
+ assetIds: inputSelection.assetIds,
+ datasetIds: inputSelection.datasetIds,
runtimeSnapshot,
createdAt: nowIso(),
updatedAt: nowIso(),
@@ -1570,7 +1664,9 @@ export class MongoAppStore {
artifactTitle: config?.artifactTitle,
status: targetNodes.has(node.id) ? "pending" : "queued",
attempt: 1,
- assetIds,
+ inputBindings: inputSelection.inputBindings,
+ assetIds: inputSelection.assetIds,
+ datasetIds: inputSelection.datasetIds,
upstreamNodeIds: version.logicGraph.edges
.filter((edge) => edge.to === node.id)
.map((edge) => edge.from),
@@ -1713,7 +1809,9 @@ export class MongoAppStore {
workflowDefinitionId: run.workflowDefinitionId,
workflowVersionId: run.workflowVersionId,
triggeredBy,
+ inputBindings: run.inputBindings,
assetIds: run.assetIds,
+ datasetIds: run.datasetIds,
});
}
diff --git a/apps/api/src/runtime/server.ts b/apps/api/src/runtime/server.ts
index e8c2360..6013b0b 100644
--- a/apps/api/src/runtime/server.ts
+++ b/apps/api/src/runtime/server.ts
@@ -402,7 +402,9 @@ export async function createApiRuntime(config = resolveApiRuntimeConfig()) {
await store.preflightRun({
workflowDefinitionId: request.body.workflowDefinitionId,
workflowVersionId: request.body.workflowVersionId,
+ inputBindings: request.body.inputBindings ?? [],
assetIds: request.body.assetIds ?? [],
+ datasetIds: request.body.datasetIds ?? [],
}),
);
} catch (error) {
@@ -417,7 +419,9 @@ export async function createApiRuntime(config = resolveApiRuntimeConfig()) {
workflowDefinitionId: request.body.workflowDefinitionId,
workflowVersionId: request.body.workflowVersionId,
triggeredBy: request.body.triggeredBy ?? "local-user",
+ inputBindings: request.body.inputBindings ?? [],
assetIds: request.body.assetIds ?? [],
+ datasetIds: request.body.datasetIds ?? [],
}),
);
} catch (error) {
diff --git a/apps/api/test/domain-contracts.spec.ts b/apps/api/test/domain-contracts.spec.ts
index 0779a54..483069a 100644
--- a/apps/api/test/domain-contracts.spec.ts
+++ b/apps/api/test/domain-contracts.spec.ts
@@ -11,6 +11,10 @@ import {
formatCustomNodeValidationIssue,
validateCustomNodeDefinition,
} from "../../../packages/contracts/src/custom-node.ts";
+import {
+ normalizeWorkflowInputBindings,
+ splitWorkflowInputBindings,
+} from "../../../packages/contracts/src/workflow-input.ts";
import { createMongoConnectionUri } from "../src/common/mongo/mongo.module.ts";
import {
ASSET_COLLECTION_NAME,
@@ -110,3 +114,36 @@ test("custom node envelope preview reflects the declared input and output contra
assert.equal(preview.output.result.artifactType, "json");
assert.equal(typeof preview.output.result.report, "object");
});
+
+test("workflow input bindings normalize legacy asset and dataset inputs into one contract", () => {
+ const bindings = normalizeWorkflowInputBindings({
+ assetIds: ["asset-1", "asset-1", ""],
+ datasetIds: ["dataset-1", "dataset-2", "dataset-1"],
+ });
+
+ assert.deepEqual(bindings, [
+ { kind: "asset", id: "asset-1" },
+ { kind: "dataset", id: "dataset-1" },
+ { kind: "dataset", id: "dataset-2" },
+ ]);
+ assert.deepEqual(splitWorkflowInputBindings(bindings), {
+ assetIds: ["asset-1"],
+ datasetIds: ["dataset-1", "dataset-2"],
+ });
+});
+
+test("workflow input bindings prefer explicit bindings over legacy fallback arrays", () => {
+ const bindings = normalizeWorkflowInputBindings({
+ inputBindings: [
+ { kind: "dataset", id: "dataset-9" },
+ { kind: "dataset", id: "dataset-9" },
+ { kind: "asset", id: "asset-2" },
+ ],
+ assetIds: ["asset-legacy"],
+ });
+
+ assert.deepEqual(bindings, [
+ { kind: "dataset", id: "dataset-9" },
+ { kind: "asset", id: "asset-2" },
+ ]);
+});
diff --git a/apps/api/test/runtime-http.integration.spec.ts b/apps/api/test/runtime-http.integration.spec.ts
index d51f961..c338cc2 100644
--- a/apps/api/test/runtime-http.integration.spec.ts
+++ b/apps/api/test/runtime-http.integration.spec.ts
@@ -543,6 +543,169 @@ test("mongo-backed runtime rejects workflow runs without bound assets", async (t
assert.match(await response.text(), /assetIds/i);
});
+test("mongo-backed runtime accepts dataset bindings and resolves them into run assets", async (t) => {
+ const sourceDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-runtime-dataset-inputs-"));
+ await mkdir(path.join(sourceDir, "DJI_001"));
+ await writeFile(path.join(sourceDir, "meta.json"), "{}");
+ await writeFile(path.join(sourceDir, "intrinsics.json"), "{}");
+ await writeFile(path.join(sourceDir, "video_meta.json"), "{}");
+
+ const mongod = await MongoMemoryServer.create({
+ instance: {
+ ip: "127.0.0.1",
+ port: 27129,
+ },
+ });
+ t.after(async () => {
+ await mongod.stop();
+ });
+
+ const server = await startRuntimeServer({
+ host: "127.0.0.1",
+ port: 0,
+ mongoUri: mongod.getUri(),
+ database: "emboflow-runtime-dataset-inputs",
+ corsOrigin: "http://127.0.0.1:3000",
+ });
+ t.after(async () => {
+ await server.close();
+ });
+
+ const bootstrap = await readJson<{
+ workspace: { _id: string };
+ project: { _id: string };
+ }>(
+ await fetch(`${server.baseUrl}/api/dev/bootstrap`, {
+ method: "POST",
+ headers: { "content-type": "application/json" },
+ body: JSON.stringify({ userId: "dataset-run-user", projectName: "Dataset Run Project" }),
+ }),
+ );
+
+ const storageConnection = await readJson<{ _id: string }>(
+ await fetch(`${server.baseUrl}/api/storage-connections`, {
+ method: "POST",
+ headers: { "content-type": "application/json" },
+ body: JSON.stringify({
+ workspaceId: bootstrap.workspace._id,
+ name: "Local Dataset Storage",
+ provider: "local",
+ rootPath: sourceDir,
+ createdBy: "dataset-run-user",
+ }),
+ }),
+ );
+
+ const asset = await readJson<{ _id: string }>(
+ await fetch(`${server.baseUrl}/api/assets/register`, {
+ method: "POST",
+ headers: { "content-type": "application/json" },
+ body: JSON.stringify({
+ workspaceId: bootstrap.workspace._id,
+ projectId: bootstrap.project._id,
+ sourcePath: sourceDir,
+ }),
+ }),
+ );
+
+ const dataset = await readJson<{ _id: string }>(
+ await fetch(`${server.baseUrl}/api/datasets`, {
+ method: "POST",
+ headers: { "content-type": "application/json" },
+ body: JSON.stringify({
+ workspaceId: bootstrap.workspace._id,
+ projectId: bootstrap.project._id,
+ name: "Customer Delivery Dataset",
+ sourceAssetIds: [asset._id],
+ storageConnectionId: storageConnection._id,
+ storagePath: "datasets/customer-delivery",
+ createdBy: "dataset-run-user",
+ }),
+ }),
+ );
+
+ const workflow = await readJson<{ _id: string }>(
+ await fetch(`${server.baseUrl}/api/workflows`, {
+ method: "POST",
+ headers: { "content-type": "application/json" },
+ body: JSON.stringify({
+ workspaceId: bootstrap.workspace._id,
+ projectId: bootstrap.project._id,
+ name: "Dataset Bound Workflow",
+ }),
+ }),
+ );
+
+ const version = await readJson<{ _id: string }>(
+ await fetch(`${server.baseUrl}/api/workflows/${workflow._id}/versions`, {
+ method: "POST",
+ headers: { "content-type": "application/json" },
+ body: JSON.stringify({
+ visualGraph: { viewport: { x: 0, y: 0, zoom: 1 } },
+ logicGraph: {
+ nodes: [
+ { id: "source-asset", type: "source" },
+ { id: "validate-structure", type: "inspect" },
+ ],
+ edges: [{ from: "source-asset", to: "validate-structure" }],
+ },
+ runtimeGraph: { selectedPreset: "delivery-normalization" },
+ pluginRefs: ["builtin:delivery-nodes"],
+ }),
+ }),
+ );
+
+ const preflight = await readJson<{
+ ok: boolean;
+ issues: Array<{ code: string }>;
+ }>(
+ await fetch(`${server.baseUrl}/api/runs/preflight`, {
+ method: "POST",
+ headers: { "content-type": "application/json" },
+ body: JSON.stringify({
+ workflowDefinitionId: workflow._id,
+ workflowVersionId: version._id,
+ inputBindings: [{ kind: "dataset", id: dataset._id }],
+ }),
+ }),
+ );
+
+ const run = await readJson<{
+ _id: string;
+ assetIds: string[];
+ datasetIds: string[];
+ inputBindings: Array<{ kind: string; id: string }>;
+ }>(
+ await fetch(`${server.baseUrl}/api/runs`, {
+ method: "POST",
+ headers: { "content-type": "application/json" },
+ body: JSON.stringify({
+ workflowDefinitionId: workflow._id,
+ workflowVersionId: version._id,
+ inputBindings: [{ kind: "dataset", id: dataset._id }],
+ }),
+ }),
+ );
+
+ const tasks = await readJson<
+ Array<{
+ nodeId: string;
+ assetIds: string[];
+ datasetIds: string[];
+ }>
+ >(await fetch(`${server.baseUrl}/api/runs/${run._id}/tasks`));
+
+ assert.equal(preflight.ok, true);
+ assert.deepEqual(preflight.issues, []);
+ assert.deepEqual(run.inputBindings, [{ kind: "dataset", id: dataset._id }]);
+ assert.deepEqual(run.datasetIds, [dataset._id]);
+ assert.deepEqual(run.assetIds, [asset._id]);
+ assert.deepEqual(tasks[0]?.datasetIds, [dataset._id]);
+ assert.deepEqual(tasks[0]?.assetIds, [asset._id]);
+ assert.deepEqual(tasks[1]?.datasetIds, [dataset._id]);
+ assert.deepEqual(tasks[1]?.assetIds, [asset._id]);
+});
+
test("mongo-backed runtime lists recent runs for a project", async (t) => {
const sourceDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-runtime-runs-"));
await mkdir(path.join(sourceDir, "DJI_001"));
diff --git a/apps/web/src/features/runs/run-detail-page.tsx b/apps/web/src/features/runs/run-detail-page.tsx
index bf05ea4..145f72a 100644
--- a/apps/web/src/features/runs/run-detail-page.tsx
+++ b/apps/web/src/features/runs/run-detail-page.tsx
@@ -12,7 +12,9 @@ export type RunDetailPageInput = {
id: string;
workflowName: string;
status: string;
+ inputBindings?: Array<{ kind: "asset" | "dataset"; id: string }>;
assetIds?: string[];
+ datasetIds?: string[];
durationMs?: number;
summaryLabel?: string;
canCancelRun?: boolean;
@@ -33,7 +35,9 @@ export function renderRunDetailPage(input: RunDetailPageInput): string {
${input.run.workflowName}
Run ${input.run.id}
Status: ${input.run.status}
+ Input sources: ${(input.run.inputBindings ?? []).map((binding) => `${binding.kind}:${binding.id}`).join(", ") || "none"}
Input assets: ${(input.run.assetIds ?? []).join(", ") || "none"}
+ Input datasets: ${(input.run.datasetIds ?? []).join(", ") || "none"}
Run duration: ${typeof input.run.durationMs === "number" ? `${input.run.durationMs} ms` : "n/a"}
${input.run.summaryLabel ? `Run summary: ${input.run.summaryLabel}
` : ""}
${input.run.canCancelRun ? `` : ""}
diff --git a/apps/web/src/features/runs/runs-page.tsx b/apps/web/src/features/runs/runs-page.tsx
index c50ef68..4c9f30f 100644
--- a/apps/web/src/features/runs/runs-page.tsx
+++ b/apps/web/src/features/runs/runs-page.tsx
@@ -7,7 +7,9 @@ export type RunsPageInput = {
id: string;
workflowName: string;
status: string;
+ inputBindings?: Array<{ kind: "asset" | "dataset"; id: string }>;
assetIds: string[];
+ datasetIds?: string[];
}>;
};
@@ -21,7 +23,9 @@ export function renderRunsPage(input: RunsPageInput): string {
${run.workflowName}
Status: ${run.status}
+ Input sources: ${(run.inputBindings ?? []).map((binding) => `${binding.kind}:${binding.id}`).join(", ") || "none"}
Input assets: ${run.assetIds.join(", ") || "none"}
+ Input datasets: ${(run.datasetIds ?? []).join(", ") || "none"}
`,
)
diff --git a/apps/web/src/features/workflows/workflow-editor-page.test.tsx b/apps/web/src/features/workflows/workflow-editor-page.test.tsx
index 5ef91b0..d6fa7aa 100644
--- a/apps/web/src/features/workflows/workflow-editor-page.test.tsx
+++ b/apps/web/src/features/workflows/workflow-editor-page.test.tsx
@@ -39,7 +39,9 @@ test("run detail view shows node status badges from run data", () => {
id: "run-1",
workflowName: "Delivery Normalize",
status: "running",
+ inputBindings: [{ kind: "dataset", id: "dataset-1" }],
assetIds: ["asset-1"],
+ datasetIds: ["dataset-1"],
durationMs: 2450,
summaryLabel: "2 tasks complete, 1 running, 1 stdout line",
canCancelRun: true,
@@ -85,7 +87,9 @@ test("run detail view shows node status badges from run data", () => {
assert.match(html, /Validate Structure/);
assert.match(html, /running/);
assert.match(html, /Checking metadata/);
+ assert.match(html, /Input sources: dataset:dataset-1/);
assert.match(html, /Input assets: asset-1/);
+ assert.match(html, /Input datasets: dataset-1/);
assert.match(html, /Run duration: 2450 ms/);
assert.match(html, /2 tasks complete, 1 running, 1 stdout line/);
assert.match(html, /Cancel Run/);
@@ -109,13 +113,17 @@ test("runs page renders project-scoped run history with workflow links", () => {
id: "run-1",
workflowName: "Delivery Normalize",
status: "success",
+ inputBindings: [{ kind: "asset", id: "asset-1" }],
assetIds: ["asset-1"],
+ datasetIds: [],
},
{
id: "run-2",
workflowName: "Archive Extract",
status: "running",
+ inputBindings: [{ kind: "dataset", id: "dataset-3" }],
assetIds: ["asset-2", "asset-3"],
+ datasetIds: ["dataset-3"],
},
],
});
@@ -123,6 +131,8 @@ test("runs page renders project-scoped run history with workflow links", () => {
assert.match(html, /Recent workflow executions/);
assert.match(html, /Delivery Normalize/);
assert.match(html, /Archive Extract/);
+ assert.match(html, /Input sources: dataset:dataset-3/);
assert.match(html, /Input assets: asset-2, asset-3/);
+ assert.match(html, /Input datasets: dataset-3/);
assert.match(html, /\/runs\/run-2/);
});
diff --git a/apps/web/src/runtime/api-client.ts b/apps/web/src/runtime/api-client.ts
index c2541e4..5206964 100644
--- a/apps/web/src/runtime/api-client.ts
+++ b/apps/web/src/runtime/api-client.ts
@@ -1,3 +1,5 @@
+import type { WorkflowInputBinding } from "../../../../packages/contracts/src/workflow-input.ts";
+
export type BootstrapContext = {
userId: string;
workspace: { _id: string; name: string };
@@ -283,7 +285,9 @@ export class ApiClient {
async createRun(input: {
workflowDefinitionId: string;
workflowVersionId: string;
- assetIds: string[];
+ inputBindings?: WorkflowInputBinding[];
+ assetIds?: string[];
+ datasetIds?: string[];
}) {
return readJson(
await fetch(`${this.baseUrl}/api/runs`, {
@@ -297,7 +301,9 @@ export class ApiClient {
async preflightRun(input: {
workflowDefinitionId: string;
workflowVersionId: string;
- assetIds: string[];
+ inputBindings?: WorkflowInputBinding[];
+ assetIds?: string[];
+ datasetIds?: string[];
}) {
return readJson<{
ok: boolean;
diff --git a/apps/web/src/runtime/app.tsx b/apps/web/src/runtime/app.tsx
index d4b8a7d..2fbd48e 100644
--- a/apps/web/src/runtime/app.tsx
+++ b/apps/web/src/runtime/app.tsx
@@ -19,6 +19,7 @@ import {
type CustomNodeValidationIssue,
validateCustomNodeDefinition,
} from "../../../../packages/contracts/src/custom-node.ts";
+import type { WorkflowInputBinding } from "../../../../packages/contracts/src/workflow-input.ts";
import {
formatCustomNodeInputModeKey,
formatCustomNodeOutputModeKey,
@@ -172,6 +173,8 @@ type WorkflowPreflightResult = {
};
};
+type WorkflowInputKind = WorkflowInputBinding["kind"];
+
function translateStatus(status: string | undefined, t: ReturnType["t"]) {
switch (status) {
case "success":
@@ -227,6 +230,14 @@ function formatExecutorConfigLabel(config?: Record) {
return JSON.stringify(config);
}
+function formatInputBindingLabel(
+ binding: WorkflowInputBinding,
+ t: ReturnType["t"],
+) {
+ const kindLabel = binding.kind === "dataset" ? t("datasetInputKind") : t("assetInputKind");
+ return `${kindLabel}: ${binding.id}`;
+}
+
function getDefaultExecutorType(definition?: { defaultExecutorType?: "python" | "docker" | "http" } | null) {
return definition?.defaultExecutorType ?? "python";
}
@@ -1284,7 +1295,9 @@ function WorkflowEditorPage(props: {
const [workflow, setWorkflow] = useState(null);
const [nodes, setNodes] = useState([]);
const [assets, setAssets] = useState([]);
- const [selectedAssetId, setSelectedAssetId] = useState(null);
+ const [datasets, setDatasets] = useState([]);
+ const [selectedInputKind, setSelectedInputKind] = useState("asset");
+ const [selectedInputId, setSelectedInputId] = useState(null);
const [versions, setVersions] = useState([]);
const [draft, setDraft] = useState(() => createDefaultWorkflowDraft());
const [selectedNodeId, setSelectedNodeId] = useState("rename-folder");
@@ -1304,19 +1317,36 @@ function WorkflowEditorPage(props: {
void (async () => {
try {
const workflowDefinition = await props.api.getWorkflowDefinition(props.workflowId);
- const [nodeDefs, workflowVersions, workflowAssets] = await Promise.all([
+ const [nodeDefs, workflowVersions, workflowAssets, workflowDatasets] = await Promise.all([
props.api.listNodeDefinitions(workflowDefinition.projectId),
props.api.listWorkflowVersions(props.workflowId),
props.api.listAssets(workflowDefinition.projectId),
+ props.api.listDatasets(workflowDefinition.projectId),
]);
setWorkflow(workflowDefinition);
setNodes(nodeDefs);
setAssets(workflowAssets);
- setSelectedAssetId((previous) => {
+ setDatasets(workflowDatasets);
+ setSelectedInputKind((previousKind) => {
+ if (previousKind === "dataset" && workflowDatasets.length > 0) {
+ return "dataset";
+ }
+ if (workflowAssets.length > 0) {
+ return "asset";
+ }
+ if (workflowDatasets.length > 0) {
+ return "dataset";
+ }
+ return previousKind;
+ });
+ setSelectedInputId((previous) => {
if (previous && workflowAssets.some((asset) => asset._id === previous)) {
return previous;
}
- return workflowAssets[0]?._id ?? null;
+ if (previous && workflowDatasets.some((dataset) => dataset._id === previous)) {
+ return previous;
+ }
+ return workflowAssets[0]?._id ?? workflowDatasets[0]?._id ?? null;
});
setVersions(workflowVersions);
const nextDraft = workflowDraftFromVersion(workflowVersions[0] ?? null);
@@ -1409,6 +1439,31 @@ function WorkflowEditorPage(props: {
artifactType: selectedNodeContract.artifactType as "json" | "directory" | "video",
});
}, [selectedNodeContract]);
+ const availableRunInputs = useMemo(
+ () =>
+ selectedInputKind === "dataset"
+ ? datasets.map((dataset) => ({
+ id: dataset._id as string,
+ label: `${dataset.name} (${t("datasetInputKind")})`,
+ }))
+ : assets.map((asset) => ({
+ id: asset._id as string,
+ label: `${asset.displayName} (${t("assetInputKind")})`,
+ })),
+ [assets, datasets, selectedInputKind, t],
+ );
+ const selectedInputBindings = useMemo(
+ () =>
+ selectedInputId
+ ? [
+ {
+ kind: selectedInputKind,
+ id: selectedInputId,
+ },
+ ]
+ : [],
+ [selectedInputId, selectedInputKind],
+ );
const canvasNodes = useMemo>(
() =>
draft.logicGraph.nodes.map((node) => {
@@ -1456,6 +1511,30 @@ function WorkflowEditorPage(props: {
[draft.logicGraph.edges, selectedNodeId],
);
+ useEffect(() => {
+ if (selectedInputKind === "asset" && assets.length === 0 && datasets.length > 0) {
+ setSelectedInputKind("dataset");
+ return;
+ }
+ if (selectedInputKind === "dataset" && datasets.length === 0 && assets.length > 0) {
+ setSelectedInputKind("asset");
+ }
+ }, [assets.length, datasets.length, selectedInputKind]);
+
+ useEffect(() => {
+ const stillExists = selectedInputKind === "dataset"
+ ? datasets.some((dataset) => dataset._id === selectedInputId)
+ : assets.some((asset) => asset._id === selectedInputId);
+ if (stillExists) {
+ return;
+ }
+ setSelectedInputId(
+ selectedInputKind === "dataset"
+ ? (datasets[0]?._id ?? assets[0]?._id ?? null)
+ : (assets[0]?._id ?? datasets[0]?._id ?? null),
+ );
+ }, [assets, datasets, selectedInputId, selectedInputKind]);
+
const onCanvasNodesChange = useCallback((changes: NodeChange[]) => {
const positionChanges = changes.filter(
(change): change is NodeChange & { id: string; position: { x: number; y: number } } =>
@@ -1567,9 +1646,9 @@ function WorkflowEditorPage(props: {
}
async function runWorkflowChecks(versionId: string) {
- if (!selectedAssetId) {
+ if (!selectedInputId) {
setPreflightResult(null);
- setError(t("selectAssetBeforeRun"));
+ setError(t("selectInputBeforeRun"));
return null;
}
@@ -1578,7 +1657,7 @@ function WorkflowEditorPage(props: {
const result = await props.api.preflightRun({
workflowDefinitionId: props.workflowId,
workflowVersionId: versionId,
- assetIds: [selectedAssetId],
+ inputBindings: selectedInputBindings,
});
setPreflightResult(result);
return result;
@@ -1597,7 +1676,7 @@ function WorkflowEditorPage(props: {
);
setVersions((previous) => [version, ...previous.filter((item) => item._id !== version._id)]);
setDirty(false);
- if (selectedAssetId) {
+ if (selectedInputId) {
await runWorkflowChecks(version._id);
} else {
setPreflightResult(null);
@@ -1606,12 +1685,12 @@ function WorkflowEditorPage(props: {
}
useEffect(() => {
- if (dirty || !selectedAssetId || versions.length === 0) {
+ if (dirty || !selectedInputId || versions.length === 0) {
return;
}
void runWorkflowChecks(versions[0]._id);
- }, [dirty, selectedAssetId, versions, props.workflowId]);
+ }, [dirty, selectedInputBindings, selectedInputId, versions, props.workflowId]);
return (
@@ -1619,15 +1698,29 @@ function WorkflowEditorPage(props: {
{workflow?.name ?? t("workflowEditor")}
+
{t("type")}: {task.nodeType}
{t("boundAssets")}: {(task.assetIds ?? []).join(", ") || t("none")}
+
{t("boundDatasets")}: {(task.datasetIds ?? []).join(", ") || t("none")}
))}
@@ -2260,6 +2377,7 @@ function RunDetailPage(props: {
: t("none")}
{t("inputAssets")}: {(selectedTask.assetIds ?? []).join(", ") || t("none")}
+ {t("inputDatasets")}: {(selectedTask.datasetIds ?? []).join(", ") || t("none")}
{t("startedAt")}: {selectedTask.startedAt ?? t("notAvailable")}
{t("finishedAt")}: {selectedTask.finishedAt ?? t("notAvailable")}
diff --git a/apps/web/src/runtime/i18n.test.ts b/apps/web/src/runtime/i18n.test.ts
index d0a6a58..91d75f6 100644
--- a/apps/web/src/runtime/i18n.test.ts
+++ b/apps/web/src/runtime/i18n.test.ts
@@ -12,6 +12,10 @@ test("translate returns chinese and english labels for shared frontend keys", ()
assert.equal(translate("zh", "navWorkflows"), "工作流");
assert.equal(translate("en", "navNodes"), "Nodes");
assert.equal(translate("zh", "navNodes"), "节点");
+ assert.equal(translate("en", "runInputType"), "Run Input Type");
+ assert.equal(translate("zh", "runInputType"), "运行输入类型");
+ assert.equal(translate("en", "datasetInputKind"), "Dataset");
+ assert.equal(translate("zh", "datasetInputKind"), "数据集");
assert.equal(
translate("en", "invalidConnectionCycle"),
"This edge would create a cycle.",
diff --git a/apps/web/src/runtime/i18n.tsx b/apps/web/src/runtime/i18n.tsx
index 390df74..dfb99e6 100644
--- a/apps/web/src/runtime/i18n.tsx
+++ b/apps/web/src/runtime/i18n.tsx
@@ -125,12 +125,15 @@ export type TranslationKey =
| "checkErrors"
| "checkWarnings"
| "noChecksRunYet"
- | "runAsset"
+ | "runInputType"
+ | "runInput"
+ | "assetInputKind"
+ | "datasetInputKind"
| "saveWorkflowVersion"
| "triggerWorkflowRun"
| "reloadLatestSaved"
| "openLatestRun"
- | "selectAssetBeforeRun"
+ | "selectInputBeforeRun"
| "nodeLibrary"
| "nodeLibraryHint"
| "canvas"
@@ -154,11 +157,14 @@ export type TranslationKey =
| "removeNode"
| "workflowCreatedName"
| "noAssetsAvailable"
+ | "noDatasetsAvailable"
| "runsTitle"
| "runsDescription"
| "noRunsYet"
| "createdAt"
+ | "inputSources"
| "inputAssets"
+ | "inputDatasets"
| "runDetail"
| "workflow"
| "startedAt"
@@ -169,6 +175,7 @@ export type TranslationKey =
| "retryRun"
| "runGraph"
| "boundAssets"
+ | "boundDatasets"
| "selectedTask"
| "executor"
| "executorConfig"
@@ -372,12 +379,15 @@ const TRANSLATIONS: Record> = {
checkErrors: "{count} errors",
checkWarnings: "{count} warnings",
noChecksRunYet: "Run checks to validate the saved workflow version before execution.",
- runAsset: "Run Asset",
+ runInputType: "Run Input Type",
+ runInput: "Run Input",
+ assetInputKind: "Asset",
+ datasetInputKind: "Dataset",
saveWorkflowVersion: "Save Workflow Version",
triggerWorkflowRun: "Trigger Workflow Run",
reloadLatestSaved: "Reload Latest Saved",
openLatestRun: "Open Latest Run",
- selectAssetBeforeRun: "Select an asset before triggering a workflow run.",
+ selectInputBeforeRun: "Select an asset or dataset before triggering a workflow run.",
nodeLibrary: "Node Library",
nodeLibraryHint: "Click to append or drag a node onto the canvas.",
canvas: "Canvas",
@@ -401,11 +411,14 @@ const TRANSLATIONS: Record> = {
removeNode: "Remove Node",
workflowCreatedName: "Delivery Normalize {count}",
noAssetsAvailable: "No assets available",
+ noDatasetsAvailable: "No datasets available",
runsTitle: "Runs",
runsDescription: "Recent workflow executions for the current project.",
noRunsYet: "No workflow runs yet.",
createdAt: "Created at",
+ inputSources: "Input sources",
inputAssets: "Input assets",
+ inputDatasets: "Input datasets",
runDetail: "Run Detail",
workflow: "Workflow",
startedAt: "Started at",
@@ -416,6 +429,7 @@ const TRANSLATIONS: Record> = {
retryRun: "Retry Run",
runGraph: "Run Graph",
boundAssets: "Bound assets",
+ boundDatasets: "Bound datasets",
selectedTask: "Selected Task",
executor: "Executor",
executorConfig: "Executor config",
@@ -612,12 +626,15 @@ const TRANSLATIONS: Record> = {
checkErrors: "{count} 个错误",
checkWarnings: "{count} 个警告",
noChecksRunYet: "先执行检查,再触发运行已保存的工作流版本。",
- runAsset: "运行资产",
+ runInputType: "运行输入类型",
+ runInput: "运行输入",
+ assetInputKind: "资产",
+ datasetInputKind: "数据集",
saveWorkflowVersion: "保存工作流版本",
triggerWorkflowRun: "触发工作流运行",
reloadLatestSaved: "重新加载最新保存版本",
openLatestRun: "打开最新运行",
- selectAssetBeforeRun: "触发工作流运行前请先选择资产。",
+ selectInputBeforeRun: "触发工作流运行前请先选择资产或数据集。",
nodeLibrary: "节点面板",
nodeLibraryHint: "支持点击追加,也支持将节点拖入画布指定位置。",
canvas: "画布",
@@ -641,11 +658,14 @@ const TRANSLATIONS: Record> = {
removeNode: "删除节点",
workflowCreatedName: "交付标准化 {count}",
noAssetsAvailable: "没有可用资产",
+ noDatasetsAvailable: "没有可用数据集",
runsTitle: "运行记录",
runsDescription: "当前项目最近的工作流执行记录。",
noRunsYet: "还没有工作流运行记录。",
createdAt: "创建时间",
+ inputSources: "输入源",
inputAssets: "输入资产",
+ inputDatasets: "输入数据集",
runDetail: "运行详情",
workflow: "工作流",
startedAt: "开始时间",
@@ -656,6 +676,7 @@ const TRANSLATIONS: Record> = {
retryRun: "重试运行",
runGraph: "运行图",
boundAssets: "绑定资产",
+ boundDatasets: "绑定数据集",
selectedTask: "当前任务",
executor: "执行器",
executorConfig: "执行器配置",
diff --git a/apps/worker/src/contracts/execution-context.ts b/apps/worker/src/contracts/execution-context.ts
index 498a8ff..68ced2d 100644
--- a/apps/worker/src/contracts/execution-context.ts
+++ b/apps/worker/src/contracts/execution-context.ts
@@ -1,5 +1,10 @@
export type ExecutorType = "python" | "docker" | "http";
export type ArtifactType = "json" | "directory" | "video";
+export type WorkflowInputBindingKind = "asset" | "dataset";
+export type WorkflowInputBinding = {
+ kind: WorkflowInputBindingKind;
+ id: string;
+};
export type TaskStatus = "pending" | "queued" | "running" | "success" | "failed" | "cancelled";
export type TaskStatusCounts = {
pending: number;
@@ -72,7 +77,9 @@ export type TaskRecord = {
artifactTitle?: string;
status: TaskStatus;
attempt?: number;
+ inputBindings?: WorkflowInputBinding[];
assetIds?: string[];
+ datasetIds?: string[];
upstreamNodeIds?: string[];
outputArtifactIds?: string[];
errorMessage?: string;
@@ -95,6 +102,16 @@ export type ExecutionAsset = {
summary?: Record;
};
+export type ExecutionDataset = {
+ id: string;
+ name: string;
+ storagePath?: string;
+ sourceAssetIds: string[];
+ latestVersionId?: string;
+ latestVersionNumber?: number;
+ summary?: Record;
+};
+
export type UpstreamExecutionResult = {
taskId: string;
nodeId: string;
@@ -108,8 +125,11 @@ export type ExecutionContext = {
workflowRunId?: string;
workflowVersionId?: string;
nodeId: string;
+ inputBindings?: WorkflowInputBinding[];
assetIds?: string[];
assets?: ExecutionAsset[];
+ datasetIds?: string[];
+ datasets?: ExecutionDataset[];
nodeDefinitionId?: string;
upstreamResults?: UpstreamExecutionResult[];
};
diff --git a/apps/worker/src/runtime/mongo-worker-store.ts b/apps/worker/src/runtime/mongo-worker-store.ts
index 404b3ad..d1fc062 100644
--- a/apps/worker/src/runtime/mongo-worker-store.ts
+++ b/apps/worker/src/runtime/mongo-worker-store.ts
@@ -5,6 +5,7 @@ import type { Db } from "mongodb";
import type {
CodeHookSpec,
ExecutionAsset,
+ ExecutionDataset,
ExecutorType,
NodeRuntimeConfig,
RunExecutionSummary,
@@ -12,6 +13,7 @@ import type {
TaskRecord,
TaskStatusCounts,
TaskStatus,
+ WorkflowInputBinding,
} from "../contracts/execution-context.ts";
type WorkflowRunDocument = {
@@ -20,7 +22,9 @@ type WorkflowRunDocument = {
workflowVersionId: string;
status: "queued" | "running" | "success" | "failed" | "cancelled";
triggeredBy: string;
+ inputBindings?: WorkflowInputBinding[];
assetIds: string[];
+ datasetIds?: string[];
runtimeSnapshot?: {
selectedPreset?: string;
nodeBindings?: Record;
@@ -59,7 +63,9 @@ type RunTaskDocument = {
artifactTitle?: string;
status: TaskStatus;
attempt: number;
+ inputBindings?: WorkflowInputBinding[];
assetIds: string[];
+ datasetIds?: string[];
upstreamNodeIds: string[];
outputArtifactIds: string[];
errorMessage?: string;
@@ -84,6 +90,16 @@ type AssetDocument = {
summary?: Record;
};
+type DatasetDocument = {
+ _id: string;
+ name: string;
+ storagePath?: string;
+ sourceAssetIds?: string[];
+ latestVersionId?: string;
+ latestVersionNumber?: number;
+ summary?: Record;
+};
+
function nowIso() {
return new Date().toISOString();
}
@@ -103,7 +119,9 @@ function toTaskRecord(task: RunTaskDocument): TaskRecord {
artifactTitle: task.artifactTitle,
status: task.status,
attempt: task.attempt,
+ inputBindings: task.inputBindings ?? [],
assetIds: task.assetIds,
+ datasetIds: task.datasetIds ?? [],
upstreamNodeIds: task.upstreamNodeIds,
outputArtifactIds: task.outputArtifactIds,
errorMessage: task.errorMessage,
@@ -294,6 +312,35 @@ export class MongoWorkerStore {
.filter((asset): asset is ExecutionAsset => Boolean(asset));
}
+ async getDatasetsByIds(datasetIds: string[]): Promise {
+ if (datasetIds.length === 0) {
+ return [];
+ }
+
+ const datasets = await this.db
+ .collection("datasets")
+ .find({ _id: { $in: datasetIds } })
+ .toArray();
+ const datasetMap = new Map(
+ datasets.map((dataset) => [
+ dataset._id,
+ {
+ id: dataset._id,
+ name: dataset.name,
+ storagePath: dataset.storagePath,
+ sourceAssetIds: dataset.sourceAssetIds ?? [],
+ latestVersionId: dataset.latestVersionId,
+ latestVersionNumber: dataset.latestVersionNumber,
+ summary: dataset.summary ?? {},
+ } satisfies ExecutionDataset,
+ ]),
+ );
+
+ return datasetIds
+ .map((datasetId) => datasetMap.get(datasetId))
+ .filter((dataset): dataset is ExecutionDataset => Boolean(dataset));
+ }
+
async createTaskArtifact(task: TaskRecord, payload: Record) {
const artifact = {
_id: `artifact-${randomUUID()}`,
diff --git a/apps/worker/src/runtime/worker-runtime.ts b/apps/worker/src/runtime/worker-runtime.ts
index 8d1a9b8..3b31eb6 100644
--- a/apps/worker/src/runtime/worker-runtime.ts
+++ b/apps/worker/src/runtime/worker-runtime.ts
@@ -47,8 +47,11 @@ export class WorkerRuntime {
workflowVersionId: task.workflowVersionId,
nodeId: task.nodeId,
nodeDefinitionId: task.nodeDefinitionId,
+ inputBindings: task.inputBindings,
assetIds: executionInput.assetIds,
assets: executionInput.assets,
+ datasetIds: executionInput.datasetIds,
+ datasets: executionInput.datasets,
upstreamResults: executionInput.upstreamResults,
};
@@ -61,7 +64,9 @@ export class WorkerRuntime {
nodeType: task.nodeType,
nodeDefinitionId: task.nodeDefinitionId,
executorType: task.executorType,
+ inputBindings: task.inputBindings,
assetIds: executionInput.assetIds,
+ datasetIds: executionInput.datasetIds,
result: execution.result,
});
const finishedAt = new Date().toISOString();
@@ -132,6 +137,8 @@ export class WorkerRuntime {
return {
assetIds,
assets,
+ datasetIds: task.datasetIds ?? [],
+ datasets: await this.store.getDatasetsByIds(task.datasetIds ?? []),
upstreamResults,
};
}
diff --git a/design/03-workflows/workflow-execution-model.md b/design/03-workflows/workflow-execution-model.md
index e00b3d1..9fa395c 100644
--- a/design/03-workflows/workflow-execution-model.md
+++ b/design/03-workflows/workflow-execution-model.md
@@ -235,14 +235,14 @@ Workflow execution must validate in this order:
Validation failure must block run creation.
-The current V1 API now exposes this as a real preflight step, not only as an editor convention. `POST /api/runs/preflight` evaluates the saved workflow version against the selected bound assets and frozen runtime snapshot. `POST /api/runs` reuses the same checks and rejects run creation when any blocking issue remains.
+The current V1 API now exposes this as a real preflight step, not only as an editor convention. `POST /api/runs/preflight` evaluates the saved workflow version against the selected workflow input bindings and frozen runtime snapshot. `POST /api/runs` reuses the same checks and rejects run creation when any blocking issue remains.
## Run Lifecycle
When a user executes a workflow:
1. resolve workflow version
-2. validate and snapshot all runtime-relevant inputs, including bound asset references
+2. validate and snapshot all runtime-relevant inputs, including bound asset and dataset references
3. resolve plugin versions
4. freeze node config and code hooks
5. compile graph into a DAG
@@ -255,7 +255,10 @@ When a user executes a workflow:
The current preflight checks include:
- workflow definition and version linkage
+- workflow input binding presence
- bound asset existence and project match
+- bound dataset existence and project match
+- resolution of dataset bindings into runnable asset ids
- resolved node definition existence
- source and export edge direction rules
- multi-input eligibility
@@ -367,7 +370,8 @@ The persisted local runtime now covers:
- asset registration and probe reporting
- workflow definition and immutable version snapshots
- workflow runs and task creation with worker-consumable dependency snapshots
-- workflow run asset bindings persisted on both runs and tasks
+- workflow run input bindings persisted on both runs and tasks
+- resolved asset ids and explicit dataset ids persisted separately on both runs and tasks
- project-scoped run history queries from Mongo-backed `workflow_runs`
- worker polling of queued tasks from Mongo-backed `run_tasks`
- run-task status transitions from `queued/pending` to `running/success/failed`
diff --git a/design/04-ui-ux/information-architecture-and-key-screens.md b/design/04-ui-ux/information-architecture-and-key-screens.md
index 8adf95f..3ef8203 100644
--- a/design/04-ui-ux/information-architecture-and-key-screens.md
+++ b/design/04-ui-ux/information-architecture-and-key-screens.md
@@ -159,12 +159,12 @@ Supports:
The current V1 implementation is simpler than the target canvas UX, but it already follows the same persistence model:
- load the latest saved workflow version when the editor opens
-- load project assets so the run entrypoint can bind a concrete input asset
+- load project assets and datasets so the run entrypoint can bind a concrete input source
- keep an unsaved draft in local editor state
- allow node add and remove operations on the draft
- save the current draft as a new workflow version
- auto-save a dirty draft before triggering a run
-- run a workflow-level preflight check against the latest saved version and selected bound asset before execution
+- run a workflow-level preflight check against the latest saved version and selected bound asset or dataset before execution
The current runtime implementation now also renders the center surface as a real node canvas instead of a static placeholder list:
@@ -178,6 +178,13 @@ The current runtime implementation now also renders the center surface as a real
- persisted node positions and viewport in `visualGraph`
- localized inline validation feedback when a connection is rejected
+The current V1 runtime header in the workflow editor now also treats run input selection as a first-class control:
+
+- choose input source type as `asset` or `dataset`
+- choose a concrete project asset or dataset inside that type
+- pass the selected source through preflight and run creation
+- show the resolved source again in run detail as `input sources`, `input assets`, and `input datasets`
+
The current V1 authoring rules intentionally keep the graph model constrained so the workflow stays legible and executable:
- source nodes do not accept inbound edges
@@ -253,7 +260,7 @@ Purpose:
Recommended layout:
- top: run summary and status
-- top: bound asset summary and links back to input assets
+- top: bound input source summary and links back to input assets or datasets
- center: workflow graph with execution overlays
- bottom or side drawer: logs and artifacts for selected node
diff --git a/design/05-data/mongodb-data-model.md b/design/05-data/mongodb-data-model.md
index 5989faf..6420b60 100644
--- a/design/05-data/mongodb-data-model.md
+++ b/design/05-data/mongodb-data-model.md
@@ -431,6 +431,19 @@ Core fields:
- `createdBy`
- `createdAt`
+### workflow_runs and run_tasks input binding note
+
+The current V1 runtime now stores workflow input selection in three layers:
+
+- `inputBindings`
+ The explicit operator-facing selection such as `[{ kind: "dataset", id: "dataset-..." }]`
+- `assetIds`
+ The resolved runnable asset ids after dataset expansion and deduplication
+- `datasetIds`
+ The explicit dataset ids that participated in the run or task
+
+This keeps execution backward-compatible for asset-oriented nodes while preserving the higher-level project data model in run history and task detail.
+
### annotation_tasks
Purpose:
diff --git a/packages/contracts/src/workflow-input.ts b/packages/contracts/src/workflow-input.ts
new file mode 100644
index 0000000..35d496e
--- /dev/null
+++ b/packages/contracts/src/workflow-input.ts
@@ -0,0 +1,74 @@
+export const WORKFLOW_INPUT_BINDING_KINDS = ["asset", "dataset"] as const;
+export type WorkflowInputBindingKind = (typeof WORKFLOW_INPUT_BINDING_KINDS)[number];
+
+export type WorkflowInputBinding = {
+ kind: WorkflowInputBindingKind;
+ id: string;
+};
+
+export function normalizeWorkflowInputBindings(input: {
+ inputBindings?: WorkflowInputBinding[];
+ assetIds?: string[];
+ datasetIds?: string[];
+}): WorkflowInputBinding[] {
+ const explicitBindings = (input.inputBindings ?? []).filter(
+ (binding): binding is WorkflowInputBinding =>
+ (binding?.kind === "asset" || binding?.kind === "dataset") &&
+ typeof binding.id === "string" &&
+ binding.id.trim().length > 0,
+ );
+ if (explicitBindings.length > 0) {
+ return dedupeWorkflowInputBindings(explicitBindings);
+ }
+
+ return dedupeWorkflowInputBindings([
+ ...(input.assetIds ?? []).map((id) => ({ kind: "asset" as const, id })),
+ ...(input.datasetIds ?? []).map((id) => ({ kind: "dataset" as const, id })),
+ ]);
+}
+
+export function dedupeWorkflowInputBindings(
+ inputBindings: WorkflowInputBinding[],
+): WorkflowInputBinding[] {
+ const seen = new Set();
+ const result: WorkflowInputBinding[] = [];
+
+ for (const binding of inputBindings) {
+ const normalizedId = binding.id.trim();
+ if (!normalizedId) {
+ continue;
+ }
+ const key = `${binding.kind}:${normalizedId}`;
+ if (seen.has(key)) {
+ continue;
+ }
+ seen.add(key);
+ result.push({
+ kind: binding.kind,
+ id: normalizedId,
+ });
+ }
+
+ return result;
+}
+
+export function splitWorkflowInputBindings(inputBindings: WorkflowInputBinding[]): {
+ assetIds: string[];
+ datasetIds: string[];
+} {
+ const assetIds: string[] = [];
+ const datasetIds: string[] = [];
+
+ for (const binding of inputBindings) {
+ if (binding.kind === "asset") {
+ assetIds.push(binding.id);
+ continue;
+ }
+ datasetIds.push(binding.id);
+ }
+
+ return {
+ assetIds,
+ datasetIds,
+ };
+}