feat: bind workflow runs to project assets

This commit is contained in:
eust-w 2026-03-26 21:38:55 +08:00
parent a6ee6cfa38
commit 6a3ce185f1
15 changed files with 246 additions and 8 deletions

View File

@ -65,6 +65,7 @@ The local validation path currently used for embodied data testing is:
```
You can register that directory from the Assets page or via `POST /api/assets/register`.
The workflow editor currently requires selecting at least one registered asset before a run can be created.
## Repository Structure

View File

@ -94,6 +94,7 @@ type WorkflowRunDocument = Timestamped & {
workflowVersionId: string;
status: "queued";
triggeredBy: string;
assetIds: string[];
};
type RunTaskDocument = Timestamped & {
@ -105,6 +106,7 @@ type RunTaskDocument = Timestamped & {
executorType: ExecutorType;
status: "queued" | "pending";
attempt: number;
assetIds: string[];
upstreamNodeIds: string[];
outputArtifactIds: string[];
};
@ -437,11 +439,26 @@ export class MongoAppStore {
workflowDefinitionId: string;
workflowVersionId: string;
triggeredBy: string;
assetIds: string[];
}) {
const version = await this.getWorkflowVersion(input.workflowVersionId);
if (!version) {
throw new Error(`workflow version not found: ${input.workflowVersionId}`);
}
const assetIds = Array.from(new Set((input.assetIds ?? []).filter(Boolean)));
if (assetIds.length === 0) {
throw new Error("assetIds must include at least one asset");
}
const assets = await this.db
.collection<AssetDocument>("assets")
.find({ _id: { $in: assetIds } })
.toArray();
if (assets.length !== assetIds.length) {
throw new Error("one or more bound assets do not exist");
}
if (assets.some((asset) => asset.projectId !== version.projectId)) {
throw new Error("bound assets must belong to the workflow project");
}
const run: WorkflowRunDocument = {
_id: `run-${randomUUID()}`,
@ -449,6 +466,7 @@ export class MongoAppStore {
workflowVersionId: input.workflowVersionId,
status: "queued",
triggeredBy: input.triggeredBy,
assetIds,
createdAt: nowIso(),
updatedAt: nowIso(),
};
@ -464,6 +482,7 @@ export class MongoAppStore {
executorType: "python",
status: targetNodes.has(node.id) ? "pending" : "queued",
attempt: 1,
assetIds,
upstreamNodeIds: version.logicGraph.edges
.filter((edge) => edge.to === node.id)
.map((edge) => edge.from),

View File

@ -230,6 +230,7 @@ export async function createApiRuntime(config = resolveApiRuntimeConfig()) {
workflowDefinitionId: request.body.workflowDefinitionId,
workflowVersionId: request.body.workflowVersionId,
triggeredBy: request.body.triggeredBy ?? "local-user",
assetIds: request.body.assetIds ?? [],
}),
);
} catch (error) {

View File

@ -198,6 +198,7 @@ test("mongo-backed runtime persists probed assets and workflow runs through the
body: JSON.stringify({
workflowDefinitionId: workflow._id,
workflowVersionId: version._id,
assetIds: [asset._id],
}),
}),
);
@ -207,6 +208,7 @@ test("mongo-backed runtime persists probed assets and workflow runs through the
status: string;
executorType: string;
upstreamNodeIds: string[];
assetIds: string[];
}>
>(
await fetch(`${server.baseUrl}/api/runs/${run._id}/tasks`),
@ -219,11 +221,88 @@ test("mongo-backed runtime persists probed assets and workflow runs through the
assert.deepEqual(assets[0]?.detectedFormats.includes("delivery_package"), true);
assert.equal(version.versionNumber, 1);
assert.equal(run.status, "queued");
assert.deepEqual((run as { assetIds?: string[] }).assetIds, [asset._id]);
assert.equal(tasks.length, 3);
assert.equal(tasks[0]?.nodeId, "source-asset");
assert.equal(tasks[0]?.executorType, "python");
assert.deepEqual(tasks[0]?.assetIds, [asset._id]);
assert.deepEqual(tasks[0]?.upstreamNodeIds, []);
assert.equal(tasks[0]?.status, "queued");
assert.deepEqual(tasks[1]?.assetIds, [asset._id]);
assert.deepEqual(tasks[1]?.upstreamNodeIds, ["source-asset"]);
assert.equal(tasks[1]?.status, "pending");
});
test("mongo-backed runtime rejects workflow runs without bound assets", async (t) => {
const mongod = await MongoMemoryServer.create({
instance: {
ip: "127.0.0.1",
port: 27119,
},
});
t.after(async () => {
await mongod.stop();
});
const server = await startRuntimeServer({
host: "127.0.0.1",
port: 0,
mongoUri: mongod.getUri(),
database: "emboflow-runtime-run-inputs",
corsOrigin: "http://127.0.0.1:3000",
});
t.after(async () => {
await server.close();
});
const bootstrap = await readJson<{
workspace: { _id: string };
project: { _id: string };
}>(
await fetch(`${server.baseUrl}/api/dev/bootstrap`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ userId: "run-input-user", projectName: "Run Inputs" }),
}),
);
const workflow = await readJson<{ _id: string }>(
await fetch(`${server.baseUrl}/api/workflows`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
workspaceId: bootstrap.workspace._id,
projectId: bootstrap.project._id,
name: "Run Input Workflow",
}),
}),
);
const version = await readJson<{ _id: string }>(
await fetch(`${server.baseUrl}/api/workflows/${workflow._id}/versions`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
visualGraph: { viewport: { x: 0, y: 0, zoom: 1 } },
logicGraph: {
nodes: [{ id: "source-asset", type: "source" }],
edges: [],
},
runtimeGraph: { selectedPreset: "delivery-normalization" },
pluginRefs: ["builtin:delivery-nodes"],
}),
}),
);
const response = await fetch(`${server.baseUrl}/api/runs`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
workflowDefinitionId: workflow._id,
workflowVersionId: version._id,
}),
});
assert.equal(response.status, 400);
assert.match(await response.text(), /assetIds/i);
});

View File

@ -87,6 +87,12 @@ export class ApiClient {
);
}
async getWorkflowDefinition(workflowDefinitionId: string) {
return readJson<any>(
await fetch(`${this.baseUrl}/api/workflows/${workflowDefinitionId}`),
);
}
async saveWorkflowVersion(workflowDefinitionId: string, payload: Record<string, unknown>) {
return readJson<any>(
await fetch(`${this.baseUrl}/api/workflows/${workflowDefinitionId}/versions`, {
@ -104,6 +110,7 @@ export class ApiClient {
async createRun(input: {
workflowDefinitionId: string;
workflowVersionId: string;
assetIds: string[];
}) {
return readJson<any>(
await fetch(`${this.baseUrl}/api/runs`, {

View File

@ -339,7 +339,10 @@ function WorkflowEditorPage(props: {
api: ApiClient;
workflowId: string;
}) {
const [workflow, setWorkflow] = useState<any | null>(null);
const [nodes, setNodes] = useState<any[]>([]);
const [assets, setAssets] = useState<any[]>([]);
const [selectedAssetId, setSelectedAssetId] = useState<string | null>(null);
const [versions, setVersions] = useState<any[]>([]);
const [draft, setDraft] = useState<WorkflowDraft>(() => createDefaultWorkflowDraft());
const [selectedNodeId, setSelectedNodeId] = useState("rename-folder");
@ -350,11 +353,21 @@ function WorkflowEditorPage(props: {
useEffect(() => {
void (async () => {
try {
const [nodeDefs, workflowVersions] = await Promise.all([
const workflowDefinition = await props.api.getWorkflowDefinition(props.workflowId);
const [nodeDefs, workflowVersions, workflowAssets] = await Promise.all([
props.api.listNodeDefinitions(),
props.api.listWorkflowVersions(props.workflowId),
props.api.listAssets(workflowDefinition.projectId),
]);
setWorkflow(workflowDefinition);
setNodes(nodeDefs);
setAssets(workflowAssets);
setSelectedAssetId((previous) => {
if (previous && workflowAssets.some((asset) => asset._id === previous)) {
return previous;
}
return workflowAssets[0]?._id ?? null;
});
setVersions(workflowVersions);
const nextDraft = workflowDraftFromVersion(workflowVersions[0] ?? null);
setDraft(nextDraft);
@ -386,7 +399,21 @@ function WorkflowEditorPage(props: {
<div className="page-stack">
<section className="panel">
<div className="toolbar">
<h1 style={{ margin: 0 }}>Workflow Editor</h1>
<h1 style={{ margin: 0 }}>{workflow?.name ?? "Workflow Editor"}</h1>
<label style={{ display: "flex", alignItems: "center", gap: 8 }}>
<span>Run Asset</span>
<select
value={selectedAssetId ?? ""}
onChange={(event) => setSelectedAssetId(event.target.value || null)}
>
{assets.length === 0 ? <option value="">No assets available</option> : null}
{assets.map((asset) => (
<option key={asset._id} value={asset._id}>
{asset.displayName}
</option>
))}
</select>
</label>
<button
className="button-primary"
onClick={async () => {
@ -398,15 +425,21 @@ function WorkflowEditorPage(props: {
<button
className="button-secondary"
onClick={async () => {
if (!selectedAssetId) {
setError("Select an asset before triggering a workflow run.");
return;
}
const latestVersion = dirty || versions.length === 0
? await saveCurrentDraft()
: versions[0];
const run = await props.api.createRun({
workflowDefinitionId: props.workflowId,
workflowVersionId: latestVersion._id,
assetIds: [selectedAssetId],
});
setLastRunId(run._id);
}}
disabled={!selectedAssetId}
>
Trigger Workflow Run
</button>
@ -545,6 +578,12 @@ function RunDetailPage(props: {
<h1>Run Detail</h1>
<p>Run ID: {run._id}</p>
<p>Status: {run.status}</p>
<p>
Input assets:{" "}
{(run.assetIds ?? []).length > 0
? run.assetIds.map((assetId: string) => assetId).join(", ")
: "none"}
</p>
</section>
<section className="two-column">
<div className="panel">
@ -559,6 +598,7 @@ function RunDetailPage(props: {
</span>
</div>
<p>Node type: {task.nodeType}</p>
<p>Bound assets: {(task.assetIds ?? []).join(", ") || "none"}</p>
</article>
))}
</div>

View File

@ -10,6 +10,7 @@ export type TaskRecord = {
executorType: ExecutorType;
status: TaskStatus;
attempt?: number;
assetIds?: string[];
upstreamNodeIds?: string[];
outputArtifactIds?: string[];
errorMessage?: string;
@ -20,4 +21,5 @@ export type ExecutionContext = {
workflowRunId?: string;
workflowVersionId?: string;
nodeId: string;
assetIds?: string[];
};

View File

@ -10,6 +10,7 @@ type WorkflowRunDocument = {
workflowVersionId: string;
status: "queued" | "running" | "success" | "failed";
triggeredBy: string;
assetIds: string[];
createdAt: string;
updatedAt: string;
};
@ -33,6 +34,7 @@ type RunTaskDocument = {
executorType: ExecutorType;
status: TaskStatus;
attempt: number;
assetIds: string[];
upstreamNodeIds: string[];
outputArtifactIds: string[];
errorMessage?: string;
@ -54,6 +56,7 @@ function toTaskRecord(task: RunTaskDocument): TaskRecord {
executorType: task.executorType,
status: task.status,
attempt: task.attempt,
assetIds: task.assetIds,
upstreamNodeIds: task.upstreamNodeIds,
outputArtifactIds: task.outputArtifactIds,
errorMessage: task.errorMessage,

View File

@ -37,6 +37,7 @@ export class WorkerRuntime {
workflowRunId: task.workflowRunId,
workflowVersionId: task.workflowVersionId,
nodeId: task.nodeId,
assetIds: task.assetIds,
};
try {
@ -45,6 +46,7 @@ export class WorkerRuntime {
nodeId: task.nodeId,
nodeType: task.nodeType,
executorType: task.executorType,
assetIds: task.assetIds,
result,
});
await this.store.markTaskSuccess(task.id);

View File

@ -6,8 +6,14 @@ import { MongoClient } from "mongodb";
import { MongoWorkerStore } from "../src/runtime/mongo-worker-store.ts";
import { WorkerRuntime } from "../src/runtime/worker-runtime.ts";
import type { ExecutionContext, TaskRecord } from "../src/contracts/execution-context.ts";
async function createRuntimeFixture(database: string) {
async function createRuntimeFixture(
database: string,
options: {
executors?: ConstructorParameters<typeof WorkerRuntime>[0]["executors"];
} = {},
) {
const mongod = await MongoMemoryServer.create({
instance: {
ip: "127.0.0.1",
@ -17,7 +23,7 @@ async function createRuntimeFixture(database: string) {
await client.connect();
const db = client.db(database);
const store = new MongoWorkerStore(db);
const runtime = new WorkerRuntime({ store });
const runtime = new WorkerRuntime({ store, executors: options.executors });
return {
db,
@ -82,6 +88,7 @@ test("worker claims a queued task, creates an artifact, and queues the dependent
executorType: "python",
status: "queued",
attempt: 1,
assetIds: ["asset-1"],
upstreamNodeIds: [],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
@ -96,6 +103,7 @@ test("worker claims a queued task, creates an artifact, and queues the dependent
executorType: "python",
status: "pending",
attempt: 1,
assetIds: ["asset-1"],
upstreamNodeIds: ["source-asset"],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
@ -110,7 +118,9 @@ test("worker claims a queued task, creates an artifact, and queues the dependent
assert.equal(task?.id, "task-source");
assert.equal(tasks[0]?.status, "success");
assert.deepEqual(tasks[0]?.assetIds, ["asset-1"]);
assert.equal(tasks[1]?.status, "queued");
assert.deepEqual(tasks[1]?.assetIds, ["asset-1"]);
assert.equal(artifacts.length, 1);
assert.equal(run?.status, "running");
});
@ -162,6 +172,7 @@ test("worker marks the run successful after the final queued task completes", as
executorType: "python",
status: "queued",
attempt: 1,
assetIds: ["asset-final"],
upstreamNodeIds: [],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
@ -176,3 +187,65 @@ test("worker marks the run successful after the final queued task completes", as
assert.equal(task?.status, "success");
assert.equal(run?.status, "success");
});
test("worker passes bound asset ids into the execution context and task artifacts", async (t) => {
let capturedTask: TaskRecord | null = null;
let capturedContext: ExecutionContext | null = null;
const fixture = await createRuntimeFixture("emboflow-worker-asset-context", {
executors: {
python: {
async execute(task: TaskRecord, context: ExecutionContext) {
capturedTask = task;
capturedContext = context;
return {
taskId: task.id,
assetIds: context.assetIds,
};
},
},
},
});
t.after(async () => {
await fixture.close();
});
await fixture.db.collection("workflow_runs").insertOne({
_id: "run-asset",
workflowDefinitionId: "workflow-asset",
workflowVersionId: "workflow-asset-v1",
status: "queued",
triggeredBy: "local-user",
assetIds: ["asset-42"],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.db.collection("run_tasks").insertOne({
_id: "task-asset",
workflowRunId: "run-asset",
workflowVersionId: "workflow-asset-v1",
nodeId: "source-asset",
nodeType: "source",
executorType: "python",
status: "queued",
attempt: 1,
assetIds: ["asset-42"],
upstreamNodeIds: [],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
});
await fixture.runtime.runNextTask();
const storedTask = await fixture.store.getRunTask("task-asset");
const artifact = await fixture.db.collection("artifacts").findOne({ producerId: "task-asset" });
assert.deepEqual(capturedTask?.assetIds, ["asset-42"]);
assert.deepEqual(capturedContext?.assetIds, ["asset-42"]);
assert.deepEqual(storedTask?.assetIds, ["asset-42"]);
assert.deepEqual(
(artifact?.payload as { assetIds?: string[] } | undefined)?.assetIds,
["asset-42"],
);
});

View File

@ -88,6 +88,7 @@ Workflow definitions are versioned and contain:
Workflow execution produces immutable workflow runs. A run snapshots:
- Workflow version
- Bound asset references
- Node configuration
- Injected code
- Executor settings

View File

@ -59,7 +59,7 @@ Used for execution:
Visual changes must not change workflow semantics. Runtime changes must produce a new workflow version.
The current V1 editor implementation keeps a mutable local draft that is initialized from the latest saved workflow version. Saving the draft creates a new immutable workflow version. Triggering a run from a dirty draft first saves a fresh workflow version, then creates the run from that saved snapshot.
The current V1 editor implementation keeps a mutable local draft that is initialized from the latest saved workflow version. Saving the draft creates a new immutable workflow version. Triggering a run from a dirty draft first saves a fresh workflow version, then creates the run from that saved snapshot. The V1 editor also requires binding at least one project asset before run creation, and the selected asset ids are persisted with the run snapshot.
## Node Categories
@ -158,7 +158,7 @@ Validation failure must block run creation.
When a user executes a workflow:
1. resolve workflow version
2. snapshot all runtime-relevant inputs
2. validate and snapshot all runtime-relevant inputs, including bound asset references
3. resolve plugin versions
4. freeze node config and code hooks
5. compile graph into a DAG
@ -271,13 +271,14 @@ The persisted local runtime now covers:
- asset registration and probe reporting
- workflow definition and immutable version snapshots
- workflow runs and task creation with worker-consumable dependency snapshots
- workflow run asset bindings persisted on both runs and tasks
- worker polling of queued tasks from Mongo-backed `run_tasks`
- run-task status transitions from `queued/pending` to `running/success/failed`
- downstream task promotion when upstream nodes succeed
- artifact registration and producer lookup
- task-level artifact creation by the worker runtime
The React workflow editor now loads the latest persisted version from the Mongo-backed API instead of rendering only a fixed starter graph. Draft edits are local editor state until the user saves, at which point the draft is serialized into a new workflow version document.
The React workflow editor now loads the latest persisted version from the Mongo-backed API instead of rendering only a fixed starter graph. Draft edits are local editor state until the user saves, at which point the draft is serialized into a new workflow version document. Before a run is created, the editor loads project assets, requires one to be selected, and passes that binding to the API.
The API and worker runtimes now both have direct integration coverage against a real Mongo runtime through `mongodb-memory-server`, in addition to the older in-memory contract tests.

View File

@ -142,6 +142,7 @@ Supports:
The current V1 implementation is simpler than the target canvas UX, but it already follows the same persistence model:
- load the latest saved workflow version when the editor opens
- load project assets so the run entrypoint can bind a concrete input asset
- keep an unsaved draft in local editor state
- allow node add and remove operations on the draft
- save the current draft as a new workflow version
@ -177,6 +178,7 @@ Purpose:
Recommended layout:
- top: run summary and status
- top: bound asset summary and links back to input assets
- center: workflow graph with execution overlays
- bottom or side drawer: logs and artifacts for selected node
@ -278,7 +280,7 @@ The current local runtime now exposes these surfaces as a real React application
The current implementation uses direct API-driven page loads and lightweight route handling instead of a deeper client-side state framework.
The workflow editor surface now reflects persisted workflow versions instead of a hardcoded sample graph. It exposes draft status, node add and remove actions, reload-latest behavior, and version-save / run-trigger controls against the live API.
The workflow editor surface now reflects persisted workflow versions instead of a hardcoded sample graph. It exposes draft status, node add and remove actions, reload-latest behavior, asset selection for run binding, and version-save / run-trigger controls against the live API.
Do not rename the same concept differently across pages.
@ -286,6 +288,7 @@ Do not rename the same concept differently across pages.
Before users run a workflow, the editor should visibly show:
- which project asset is currently selected as the run input
- missing config
- invalid schema connections
- unsupported executor choices

View File

@ -282,12 +282,14 @@ Splitting versions from workflow head metadata avoids oversized documents and si
Purpose:
- store execution runs
- snapshot the asset bindings chosen at run creation time
Core fields:
- `_id`
- `workflowDefinitionId`
- `workflowVersionId`
- `assetIds`
- `workspaceId`
- `projectId`
- `triggeredBy`
@ -303,6 +305,7 @@ Core fields:
Purpose:
- store one execution unit per node per run
- keep bound asset context available to the worker at dequeue time
Core fields:
@ -314,6 +317,7 @@ Core fields:
- `executorType`
- `status`
- `attempt`
- `assetIds`
- `upstreamNodeIds`
- `outputArtifactIds`
- `logRef`
@ -329,6 +333,7 @@ This collection should remain separate from `workflow_runs` because task volume
The current executable worker path expects `run_tasks` to be self-sufficient enough for dequeue and dependency promotion. That means V1 runtime tasks already persist:
- executor choice
- bound asset ids
- upstream node dependencies
- produced artifact ids
- per-task status and error message

View File

@ -18,6 +18,7 @@
- `2026-03-26`: Tasks 7 through 10 add the first web shell, workflow editor surfaces, artifact explore renderers, developer entry commands, and CI/pre-push test execution through `make test`.
- `2026-03-26`: The next runtime pass adds a Mongo-backed HTTP API, a real React and Vite web runtime, and local data validation against `/Users/longtaowu/workspace/emboldata/data`.
- `2026-03-26`: The follow-up runtime pass adds Mongo-backed HTTP integration tests and converts the workflow editor from a fixed sample graph to a persisted draft-and-version model.
- `2026-03-26`: The current runtime pass binds workflow runs to registered project assets so run snapshots, run tasks, worker execution context, and the editor all agree on the concrete input asset being processed.
---