From c59fba1af1e217411e98790bd10266dabdd6f36b Mon Sep 17 00:00:00 2001 From: eust-w Date: Thu, 26 Mar 2026 18:41:26 +0800 Subject: [PATCH] :sparkles: feat: persist workflow drafts and runtime tests --- apps/api/package.json | 5 +- .../api/test/runtime-http.integration.spec.ts | 209 ++++ apps/web/src/runtime/app.tsx | 107 +- .../src/runtime/workflow-editor-state.test.ts | 72 ++ apps/web/src/runtime/workflow-editor-state.ts | 177 +++ design/02-architecture/system-architecture.md | 2 +- .../03-workflows/workflow-execution-model.md | 8 + ...nformation-architecture-and-key-screens.md | 11 +- ...26-03-26-emboflow-v1-foundation-and-mvp.md | 3 +- pnpm-lock.yaml | 1043 ++++++++++++++++- 10 files changed, 1595 insertions(+), 42 deletions(-) create mode 100644 apps/api/test/runtime-http.integration.spec.ts create mode 100644 apps/web/src/runtime/workflow-editor-state.test.ts create mode 100644 apps/web/src/runtime/workflow-editor-state.ts diff --git a/apps/api/package.json b/apps/api/package.json index 343279d..f01e494 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -6,11 +6,14 @@ "scripts": { "dev": "tsx watch src/main.ts", "start": "tsx src/main.ts", - "test": "node --test --experimental-strip-types" + "test": "tsx --test" }, "dependencies": { "cors": "^2.8.6", "express": "^5.2.1", "mongodb": "^7.1.1" + }, + "devDependencies": { + "mongodb-memory-server": "^11.0.1" } } diff --git a/apps/api/test/runtime-http.integration.spec.ts b/apps/api/test/runtime-http.integration.spec.ts new file mode 100644 index 0000000..1a5ba30 --- /dev/null +++ b/apps/api/test/runtime-http.integration.spec.ts @@ -0,0 +1,209 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import { mkdtemp, mkdir, writeFile } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { MongoMemoryServer } from "mongodb-memory-server"; + +import { createApiRuntime, type ApiRuntimeConfig } from "../src/runtime/server.ts"; + +async function readJson(response: Response): Promise { + const payload = (await response.json()) as T; + if (!response.ok) { + throw new Error(`request failed with status ${response.status}: ${JSON.stringify(payload)}`); + } + return payload; +} + +async function startRuntimeServer(config: ApiRuntimeConfig) { + const runtime = await createApiRuntime(config); + const server = await new Promise((resolve) => { + const listening = runtime.app.listen(0, config.host, () => resolve(listening)); + }); + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("failed to start runtime server"); + } + + return { + baseUrl: `http://${config.host}:${address.port}`, + close: async () => { + await new Promise((resolve, reject) => { + server.close((error) => { + if (error) { + reject(error); + return; + } + resolve(); + }); + }); + await runtime.client.close(); + }, + }; +} + +test("mongo-backed runtime reuses bootstrapped workspace and project across restarts", async (t) => { + const mongod = await MongoMemoryServer.create(); + t.after(async () => { + await mongod.stop(); + }); + + const config: ApiRuntimeConfig = { + host: "127.0.0.1", + port: 0, + mongoUri: mongod.getUri(), + database: "emboflow-runtime-reuse", + corsOrigin: "http://127.0.0.1:3000", + }; + + const first = await startRuntimeServer(config); + + const bootstrap = await readJson<{ + workspace: { _id: string; name: string }; + project: { _id: string; name: string }; + }>( + await fetch(`${first.baseUrl}/api/dev/bootstrap`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ userId: "runtime-user", projectName: "Runtime Demo" }), + }), + ); + + await first.close(); + + const second = await startRuntimeServer(config); + t.after(async () => { + await second.close(); + }); + + const workspaces = await readJson>( + await fetch(`${second.baseUrl}/api/workspaces?ownerId=runtime-user`), + ); + const projects = await readJson>( + await fetch(`${second.baseUrl}/api/projects?workspaceId=${bootstrap.workspace._id}`), + ); + + assert.equal(workspaces.length, 1); + assert.equal(projects.length, 1); + assert.equal(workspaces[0]?._id, bootstrap.workspace._id); + assert.equal(projects[0]?._id, bootstrap.project._id); +}); + +test("mongo-backed runtime persists probed assets and workflow runs through the HTTP API", async (t) => { + const sourceDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-runtime-")); + await mkdir(path.join(sourceDir, "DJI_001")); + await writeFile(path.join(sourceDir, "meta.json"), "{}"); + await writeFile(path.join(sourceDir, "intrinsics.json"), "{}"); + await writeFile(path.join(sourceDir, "video_meta.json"), "{}"); + await writeFile(path.join(sourceDir, "DJI_001", "DJI_001.mp4"), ""); + + const mongod = await MongoMemoryServer.create(); + t.after(async () => { + await mongod.stop(); + }); + + const server = await startRuntimeServer({ + host: "127.0.0.1", + port: 0, + mongoUri: mongod.getUri(), + database: "emboflow-runtime-flow", + corsOrigin: "http://127.0.0.1:3000", + }); + t.after(async () => { + await server.close(); + }); + + const bootstrap = await readJson<{ + workspace: { _id: string }; + project: { _id: string }; + }>( + await fetch(`${server.baseUrl}/api/dev/bootstrap`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ userId: "flow-user", projectName: "Flow Project" }), + }), + ); + + const asset = await readJson<{ _id: string }>( + await fetch(`${server.baseUrl}/api/assets/register`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + workspaceId: bootstrap.workspace._id, + projectId: bootstrap.project._id, + sourcePath: sourceDir, + }), + }), + ); + + const probe = await readJson<{ detectedFormatCandidates: string[]; recommendedNextNodes: string[] }>( + await fetch(`${server.baseUrl}/api/assets/${asset._id}/probe`, { method: "POST" }), + ); + const assets = await readJson>( + await fetch( + `${server.baseUrl}/api/assets?projectId=${encodeURIComponent(bootstrap.project._id)}`, + ), + ); + + const workflow = await readJson<{ _id: string }>( + await fetch(`${server.baseUrl}/api/workflows`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + workspaceId: bootstrap.workspace._id, + projectId: bootstrap.project._id, + name: "Delivery Flow", + }), + }), + ); + + const version = await readJson<{ _id: string; versionNumber: number }>( + await fetch(`${server.baseUrl}/api/workflows/${workflow._id}/versions`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + visualGraph: { viewport: { x: 0, y: 0, zoom: 1 } }, + logicGraph: { + nodes: [ + { id: "source-asset", type: "source" }, + { id: "validate-structure", type: "inspect" }, + { id: "export-delivery-package", type: "export" }, + ], + edges: [ + { from: "source-asset", to: "validate-structure" }, + { from: "validate-structure", to: "export-delivery-package" }, + ], + }, + runtimeGraph: { selectedPreset: "delivery-normalization" }, + pluginRefs: ["builtin:delivery-nodes"], + }), + }), + ); + + const run = await readJson<{ _id: string; status: string }>( + await fetch(`${server.baseUrl}/api/runs`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + workflowDefinitionId: workflow._id, + workflowVersionId: version._id, + }), + }), + ); + const tasks = await readJson>( + await fetch(`${server.baseUrl}/api/runs/${run._id}/tasks`), + ); + + assert.deepEqual(probe.detectedFormatCandidates.includes("delivery_package"), true); + assert.deepEqual(probe.recommendedNextNodes.includes("validate_structure"), true); + assert.equal(assets[0]?._id, asset._id); + assert.equal(assets[0]?.status, "probed"); + assert.deepEqual(assets[0]?.detectedFormats.includes("delivery_package"), true); + assert.equal(version.versionNumber, 1); + assert.equal(run.status, "queued"); + assert.equal(tasks.length, 3); + assert.equal(tasks[0]?.nodeId, "source-asset"); + assert.equal(tasks[0]?.status, "queued"); + assert.equal(tasks[1]?.status, "pending"); +}); diff --git a/apps/web/src/runtime/app.tsx b/apps/web/src/runtime/app.tsx index 2504db6..12c3d34 100644 --- a/apps/web/src/runtime/app.tsx +++ b/apps/web/src/runtime/app.tsx @@ -1,6 +1,15 @@ import React, { useEffect, useMemo, useState } from "react"; import { ApiClient } from "./api-client.ts"; +import { + addNodeToDraft, + createDefaultWorkflowDraft, + removeNodeFromDraft, + resolveDefinitionIdForNode, + serializeWorkflowDraft, + workflowDraftFromVersion, + type WorkflowDraft, +} from "./workflow-editor-state.ts"; type NavItem = "Assets" | "Workflows" | "Runs" | "Explore" | "Labels" | "Admin"; @@ -264,29 +273,6 @@ function AssetDetailPage(props: { ); } -const SAMPLE_WORKFLOW_VERSION = { - visualGraph: { - viewport: { x: 0, y: 0, zoom: 1 }, - }, - logicGraph: { - nodes: [ - { id: "source-asset", type: "source" }, - { id: "rename-folder", type: "transform" }, - { id: "validate-structure", type: "inspect" }, - { id: "export-delivery-package", type: "export" }, - ], - edges: [ - { from: "source-asset", to: "rename-folder" }, - { from: "rename-folder", to: "validate-structure" }, - { from: "validate-structure", to: "export-delivery-package" }, - ], - }, - runtimeGraph: { - selectedPreset: "delivery-normalization", - }, - pluginRefs: ["builtin:delivery-nodes"], -}; - function WorkflowsPage(props: { api: ApiClient; bootstrap: BootstrapContext; @@ -355,8 +341,10 @@ function WorkflowEditorPage(props: { }) { const [nodes, setNodes] = useState([]); const [versions, setVersions] = useState([]); + const [draft, setDraft] = useState(() => createDefaultWorkflowDraft()); const [selectedNodeId, setSelectedNodeId] = useState("rename-folder"); const [lastRunId, setLastRunId] = useState(null); + const [dirty, setDirty] = useState(false); const [error, setError] = useState(null); useEffect(() => { @@ -368,6 +356,10 @@ function WorkflowEditorPage(props: { ]); setNodes(nodeDefs); setVersions(workflowVersions); + const nextDraft = workflowDraftFromVersion(workflowVersions[0] ?? null); + setDraft(nextDraft); + setSelectedNodeId(nextDraft.logicGraph.nodes[0]?.id ?? "rename-folder"); + setDirty(false); } catch (loadError) { setError(loadError instanceof Error ? loadError.message : "Failed to load workflow"); } @@ -375,10 +367,21 @@ function WorkflowEditorPage(props: { }, [props.workflowId]); const selectedNode = useMemo( - () => nodes.find((node) => node.id === selectedNodeId) ?? null, - [nodes, selectedNodeId], + () => + nodes.find((node) => node.id === resolveDefinitionIdForNode(draft, selectedNodeId)) ?? null, + [draft, nodes, selectedNodeId], ); + async function saveCurrentDraft() { + const version = await props.api.saveWorkflowVersion( + props.workflowId, + serializeWorkflowDraft(draft), + ); + setVersions((previous) => [version, ...previous.filter((item) => item._id !== version._id)]); + setDirty(false); + return version; + } + return (
@@ -387,11 +390,7 @@ function WorkflowEditorPage(props: { + {lastRunId ? Open Latest Run : null}
{error ?

{error}

: null} @@ -423,7 +434,12 @@ function WorkflowEditorPage(props: { @@ -434,16 +450,37 @@ function WorkflowEditorPage(props: {

Canvas

- {SAMPLE_WORKFLOW_VERSION.logicGraph.nodes.map((node) => ( -
+ {draft.logicGraph.nodes.map((node) => ( +
setSelectedNodeId(node.id)} + style={{ cursor: "pointer" }} + > {node.id}

Type: {node.type}

+

Definition: {resolveDefinitionIdForNode(draft, node.id)}

+
))}

Latest saved versions: {versions.length > 0 ? versions.map((item) => item.versionNumber).join(", ") : "none"}

+

Draft status: {dirty ? "unsaved changes" : "synced"}