424 lines
13 KiB
TypeScript
424 lines
13 KiB
TypeScript
import test from "node:test";
|
|
import assert from "node:assert/strict";
|
|
import { mkdtemp, mkdir, writeFile } from "node:fs/promises";
|
|
import os from "node:os";
|
|
import path from "node:path";
|
|
|
|
import { MongoMemoryServer } from "mongodb-memory-server";
|
|
|
|
import { createApiRuntime, type ApiRuntimeConfig } from "../src/runtime/server.ts";
|
|
|
|
async function readJson<T>(response: Response): Promise<T> {
|
|
const payload = (await response.json()) as T;
|
|
if (!response.ok) {
|
|
throw new Error(`request failed with status ${response.status}: ${JSON.stringify(payload)}`);
|
|
}
|
|
return payload;
|
|
}
|
|
|
|
async function startRuntimeServer(config: ApiRuntimeConfig) {
|
|
const runtime = await createApiRuntime(config);
|
|
const server = await new Promise<import("node:http").Server>((resolve) => {
|
|
const listening = runtime.app.listen(0, config.host, () => resolve(listening));
|
|
});
|
|
const address = server.address();
|
|
if (!address || typeof address === "string") {
|
|
throw new Error("failed to start runtime server");
|
|
}
|
|
|
|
return {
|
|
baseUrl: `http://${config.host}:${address.port}`,
|
|
close: async () => {
|
|
await new Promise<void>((resolve, reject) => {
|
|
server.close((error) => {
|
|
if (error) {
|
|
reject(error);
|
|
return;
|
|
}
|
|
resolve();
|
|
});
|
|
});
|
|
await runtime.client.close();
|
|
},
|
|
};
|
|
}
|
|
|
|
test("mongo-backed runtime reuses bootstrapped workspace and project across restarts", async (t) => {
|
|
const mongod = await MongoMemoryServer.create({
|
|
instance: {
|
|
ip: "127.0.0.1",
|
|
port: 27117,
|
|
},
|
|
});
|
|
t.after(async () => {
|
|
await mongod.stop();
|
|
});
|
|
|
|
const config: ApiRuntimeConfig = {
|
|
host: "127.0.0.1",
|
|
port: 0,
|
|
mongoUri: mongod.getUri(),
|
|
database: "emboflow-runtime-reuse",
|
|
corsOrigin: "http://127.0.0.1:3000",
|
|
};
|
|
|
|
const first = await startRuntimeServer(config);
|
|
|
|
const bootstrap = await readJson<{
|
|
workspace: { _id: string; name: string };
|
|
project: { _id: string; name: string };
|
|
}>(
|
|
await fetch(`${first.baseUrl}/api/dev/bootstrap`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({ userId: "runtime-user", projectName: "Runtime Demo" }),
|
|
}),
|
|
);
|
|
|
|
await first.close();
|
|
|
|
const second = await startRuntimeServer(config);
|
|
t.after(async () => {
|
|
await second.close();
|
|
});
|
|
|
|
const workspaces = await readJson<Array<{ _id: string }>>(
|
|
await fetch(`${second.baseUrl}/api/workspaces?ownerId=runtime-user`),
|
|
);
|
|
const projects = await readJson<Array<{ _id: string }>>(
|
|
await fetch(`${second.baseUrl}/api/projects?workspaceId=${bootstrap.workspace._id}`),
|
|
);
|
|
|
|
assert.equal(workspaces.length, 1);
|
|
assert.equal(projects.length, 1);
|
|
assert.equal(workspaces[0]?._id, bootstrap.workspace._id);
|
|
assert.equal(projects[0]?._id, bootstrap.project._id);
|
|
});
|
|
|
|
test("mongo-backed runtime persists probed assets and workflow runs through the HTTP API", async (t) => {
|
|
const sourceDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-runtime-"));
|
|
await mkdir(path.join(sourceDir, "DJI_001"));
|
|
await writeFile(path.join(sourceDir, "meta.json"), "{}");
|
|
await writeFile(path.join(sourceDir, "intrinsics.json"), "{}");
|
|
await writeFile(path.join(sourceDir, "video_meta.json"), "{}");
|
|
await writeFile(path.join(sourceDir, "DJI_001", "DJI_001.mp4"), "");
|
|
|
|
const mongod = await MongoMemoryServer.create({
|
|
instance: {
|
|
ip: "127.0.0.1",
|
|
port: 27118,
|
|
},
|
|
});
|
|
t.after(async () => {
|
|
await mongod.stop();
|
|
});
|
|
|
|
const server = await startRuntimeServer({
|
|
host: "127.0.0.1",
|
|
port: 0,
|
|
mongoUri: mongod.getUri(),
|
|
database: "emboflow-runtime-flow",
|
|
corsOrigin: "http://127.0.0.1:3000",
|
|
});
|
|
t.after(async () => {
|
|
await server.close();
|
|
});
|
|
|
|
const bootstrap = await readJson<{
|
|
workspace: { _id: string };
|
|
project: { _id: string };
|
|
}>(
|
|
await fetch(`${server.baseUrl}/api/dev/bootstrap`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({ userId: "flow-user", projectName: "Flow Project" }),
|
|
}),
|
|
);
|
|
|
|
const asset = await readJson<{ _id: string }>(
|
|
await fetch(`${server.baseUrl}/api/assets/register`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
workspaceId: bootstrap.workspace._id,
|
|
projectId: bootstrap.project._id,
|
|
sourcePath: sourceDir,
|
|
}),
|
|
}),
|
|
);
|
|
|
|
const probe = await readJson<{ detectedFormatCandidates: string[]; recommendedNextNodes: string[] }>(
|
|
await fetch(`${server.baseUrl}/api/assets/${asset._id}/probe`, { method: "POST" }),
|
|
);
|
|
const assets = await readJson<Array<{ _id: string; status: string; detectedFormats: string[] }>>(
|
|
await fetch(
|
|
`${server.baseUrl}/api/assets?projectId=${encodeURIComponent(bootstrap.project._id)}`,
|
|
),
|
|
);
|
|
|
|
const workflow = await readJson<{ _id: string }>(
|
|
await fetch(`${server.baseUrl}/api/workflows`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
workspaceId: bootstrap.workspace._id,
|
|
projectId: bootstrap.project._id,
|
|
name: "Delivery Flow",
|
|
}),
|
|
}),
|
|
);
|
|
|
|
const version = await readJson<{ _id: string; versionNumber: number }>(
|
|
await fetch(`${server.baseUrl}/api/workflows/${workflow._id}/versions`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
visualGraph: { viewport: { x: 0, y: 0, zoom: 1 } },
|
|
logicGraph: {
|
|
nodes: [
|
|
{ id: "source-asset", type: "source" },
|
|
{ id: "validate-structure", type: "inspect" },
|
|
{ id: "export-delivery-package", type: "export" },
|
|
],
|
|
edges: [
|
|
{ from: "source-asset", to: "validate-structure" },
|
|
{ from: "validate-structure", to: "export-delivery-package" },
|
|
],
|
|
},
|
|
runtimeGraph: { selectedPreset: "delivery-normalization" },
|
|
pluginRefs: ["builtin:delivery-nodes"],
|
|
}),
|
|
}),
|
|
);
|
|
|
|
const run = await readJson<{ _id: string; status: string }>(
|
|
await fetch(`${server.baseUrl}/api/runs`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
workflowDefinitionId: workflow._id,
|
|
workflowVersionId: version._id,
|
|
assetIds: [asset._id],
|
|
}),
|
|
}),
|
|
);
|
|
const tasks = await readJson<
|
|
Array<{
|
|
nodeId: string;
|
|
status: string;
|
|
executorType: string;
|
|
upstreamNodeIds: string[];
|
|
assetIds: string[];
|
|
}>
|
|
>(
|
|
await fetch(`${server.baseUrl}/api/runs/${run._id}/tasks`),
|
|
);
|
|
|
|
assert.deepEqual(probe.detectedFormatCandidates.includes("delivery_package"), true);
|
|
assert.deepEqual(probe.recommendedNextNodes.includes("validate_structure"), true);
|
|
assert.equal(assets[0]?._id, asset._id);
|
|
assert.equal(assets[0]?.status, "probed");
|
|
assert.deepEqual(assets[0]?.detectedFormats.includes("delivery_package"), true);
|
|
assert.equal(version.versionNumber, 1);
|
|
assert.equal(run.status, "queued");
|
|
assert.deepEqual((run as { assetIds?: string[] }).assetIds, [asset._id]);
|
|
assert.equal(tasks.length, 3);
|
|
assert.equal(tasks[0]?.nodeId, "source-asset");
|
|
assert.equal(tasks[0]?.executorType, "python");
|
|
assert.deepEqual(tasks[0]?.assetIds, [asset._id]);
|
|
assert.deepEqual(tasks[0]?.upstreamNodeIds, []);
|
|
assert.equal(tasks[0]?.status, "queued");
|
|
assert.deepEqual(tasks[1]?.assetIds, [asset._id]);
|
|
assert.deepEqual(tasks[1]?.upstreamNodeIds, ["source-asset"]);
|
|
assert.equal(tasks[1]?.status, "pending");
|
|
});
|
|
|
|
test("mongo-backed runtime rejects workflow runs without bound assets", async (t) => {
|
|
const mongod = await MongoMemoryServer.create({
|
|
instance: {
|
|
ip: "127.0.0.1",
|
|
port: 27119,
|
|
},
|
|
});
|
|
t.after(async () => {
|
|
await mongod.stop();
|
|
});
|
|
|
|
const server = await startRuntimeServer({
|
|
host: "127.0.0.1",
|
|
port: 0,
|
|
mongoUri: mongod.getUri(),
|
|
database: "emboflow-runtime-run-inputs",
|
|
corsOrigin: "http://127.0.0.1:3000",
|
|
});
|
|
t.after(async () => {
|
|
await server.close();
|
|
});
|
|
|
|
const bootstrap = await readJson<{
|
|
workspace: { _id: string };
|
|
project: { _id: string };
|
|
}>(
|
|
await fetch(`${server.baseUrl}/api/dev/bootstrap`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({ userId: "run-input-user", projectName: "Run Inputs" }),
|
|
}),
|
|
);
|
|
|
|
const workflow = await readJson<{ _id: string }>(
|
|
await fetch(`${server.baseUrl}/api/workflows`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
workspaceId: bootstrap.workspace._id,
|
|
projectId: bootstrap.project._id,
|
|
name: "Run Input Workflow",
|
|
}),
|
|
}),
|
|
);
|
|
|
|
const version = await readJson<{ _id: string }>(
|
|
await fetch(`${server.baseUrl}/api/workflows/${workflow._id}/versions`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
visualGraph: { viewport: { x: 0, y: 0, zoom: 1 } },
|
|
logicGraph: {
|
|
nodes: [{ id: "source-asset", type: "source" }],
|
|
edges: [],
|
|
},
|
|
runtimeGraph: { selectedPreset: "delivery-normalization" },
|
|
pluginRefs: ["builtin:delivery-nodes"],
|
|
}),
|
|
}),
|
|
);
|
|
|
|
const response = await fetch(`${server.baseUrl}/api/runs`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
workflowDefinitionId: workflow._id,
|
|
workflowVersionId: version._id,
|
|
}),
|
|
});
|
|
|
|
assert.equal(response.status, 400);
|
|
assert.match(await response.text(), /assetIds/i);
|
|
});
|
|
|
|
test("mongo-backed runtime lists recent runs for a project", async (t) => {
|
|
const sourceDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-runtime-runs-"));
|
|
await mkdir(path.join(sourceDir, "DJI_001"));
|
|
await writeFile(path.join(sourceDir, "meta.json"), "{}");
|
|
await writeFile(path.join(sourceDir, "intrinsics.json"), "{}");
|
|
await writeFile(path.join(sourceDir, "video_meta.json"), "{}");
|
|
|
|
const mongod = await MongoMemoryServer.create({
|
|
instance: {
|
|
ip: "127.0.0.1",
|
|
port: 27120,
|
|
},
|
|
});
|
|
t.after(async () => {
|
|
await mongod.stop();
|
|
});
|
|
|
|
const server = await startRuntimeServer({
|
|
host: "127.0.0.1",
|
|
port: 0,
|
|
mongoUri: mongod.getUri(),
|
|
database: "emboflow-runtime-run-list",
|
|
corsOrigin: "http://127.0.0.1:3000",
|
|
});
|
|
t.after(async () => {
|
|
await server.close();
|
|
});
|
|
|
|
const bootstrap = await readJson<{
|
|
workspace: { _id: string };
|
|
project: { _id: string };
|
|
}>(
|
|
await fetch(`${server.baseUrl}/api/dev/bootstrap`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({ userId: "run-list-user", projectName: "Run List Project" }),
|
|
}),
|
|
);
|
|
|
|
const asset = await readJson<{ _id: string }>(
|
|
await fetch(`${server.baseUrl}/api/assets/register`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
workspaceId: bootstrap.workspace._id,
|
|
projectId: bootstrap.project._id,
|
|
sourcePath: sourceDir,
|
|
}),
|
|
}),
|
|
);
|
|
await readJson(await fetch(`${server.baseUrl}/api/assets/${asset._id}/probe`, { method: "POST" }));
|
|
|
|
const workflow = await readJson<{ _id: string; name: string }>(
|
|
await fetch(`${server.baseUrl}/api/workflows`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
workspaceId: bootstrap.workspace._id,
|
|
projectId: bootstrap.project._id,
|
|
name: "Recent Runs Flow",
|
|
}),
|
|
}),
|
|
);
|
|
|
|
const version = await readJson<{ _id: string }>(
|
|
await fetch(`${server.baseUrl}/api/workflows/${workflow._id}/versions`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
visualGraph: { viewport: { x: 0, y: 0, zoom: 1 } },
|
|
logicGraph: {
|
|
nodes: [{ id: "source-asset", type: "source" }],
|
|
edges: [],
|
|
},
|
|
runtimeGraph: { selectedPreset: "delivery-normalization" },
|
|
pluginRefs: ["builtin:delivery-nodes"],
|
|
}),
|
|
}),
|
|
);
|
|
|
|
await readJson(
|
|
await fetch(`${server.baseUrl}/api/runs`, {
|
|
method: "POST",
|
|
headers: { "content-type": "application/json" },
|
|
body: JSON.stringify({
|
|
workflowDefinitionId: workflow._id,
|
|
workflowVersionId: version._id,
|
|
assetIds: [asset._id],
|
|
}),
|
|
}),
|
|
);
|
|
|
|
const runs = await readJson<
|
|
Array<{
|
|
_id: string;
|
|
projectId: string;
|
|
workflowDefinitionId: string;
|
|
workflowName: string;
|
|
assetIds: string[];
|
|
status: string;
|
|
}>
|
|
>(
|
|
await fetch(
|
|
`${server.baseUrl}/api/runs?projectId=${encodeURIComponent(bootstrap.project._id)}`,
|
|
),
|
|
);
|
|
|
|
assert.equal(runs.length, 1);
|
|
assert.equal(runs[0]?.projectId, bootstrap.project._id);
|
|
assert.equal(runs[0]?.workflowDefinitionId, workflow._id);
|
|
assert.equal(runs[0]?.workflowName, "Recent Runs Flow");
|
|
assert.deepEqual(runs[0]?.assetIds, [asset._id]);
|
|
assert.equal(runs[0]?.status, "queued");
|
|
});
|