feat: add api control plane flows and local worker runtime

This commit is contained in:
eust-w 2026-03-26 17:41:32 +08:00
parent 4a3c5a1431
commit 668602de22
38 changed files with 1176 additions and 2 deletions

View File

@ -0,0 +1,10 @@
export const ASSET_PROBE_REPORT_COLLECTION_NAME = "asset_probe_reports";
export const assetProbeReportSchemaDefinition = {
assetId: { type: "string", required: true },
detectedFormatCandidates: { type: "array", required: true, default: [] },
structureSummary: { type: "object", required: true, default: {} },
warnings: { type: "array", required: true, default: [] },
recommendedNextNodes: { type: "array", required: true, default: [] },
createdAt: { type: "date", required: true },
} as const;

View File

@ -0,0 +1,13 @@
export const RUN_TASK_COLLECTION_NAME = "run_tasks";
export const runTaskSchemaDefinition = {
workflowRunId: { type: "string", required: true },
workflowVersionId: { type: "string", required: true },
nodeId: { type: "string", required: true },
nodeType: { type: "string", required: true },
status: { type: "string", required: true },
attempt: { type: "number", required: true, default: 1 },
inputRefs: { type: "array", required: true, default: [] },
outputRefs: { type: "array", required: true, default: [] },
createdAt: { type: "date", required: true },
} as const;

View File

@ -0,0 +1,13 @@
export const WORKFLOW_DEFINITION_VERSION_COLLECTION_NAME =
"workflow_definition_versions";
export const workflowDefinitionVersionSchemaDefinition = {
workflowDefinitionId: { type: "string", required: true },
versionNumber: { type: "number", required: true },
visualGraph: { type: "object", required: true },
logicGraph: { type: "object", required: true },
runtimeGraph: { type: "object", required: true },
pluginRefs: { type: "array", required: true, default: [] },
createdBy: { type: "string", required: true },
createdAt: { type: "date", required: true },
} as const;

View File

@ -0,0 +1,11 @@
export const WORKFLOW_RUN_COLLECTION_NAME = "workflow_runs";
export const workflowRunSchemaDefinition = {
workflowDefinitionId: { type: "string", required: true },
workflowVersionId: { type: "string", required: true },
status: { type: "string", required: true },
triggeredBy: { type: "string", required: true },
startedAt: { type: "date", required: false, default: null },
finishedAt: { type: "date", required: false, default: null },
createdAt: { type: "date", required: true },
} as const;

View File

@ -0,0 +1,21 @@
import type { LocalAuthContext } from "../auth/auth.controller.ts";
import {
AssetsService,
type CreateAssetInput,
} from "./assets.service.ts";
export class AssetsController {
private readonly service: AssetsService;
constructor(service: AssetsService) {
this.service = service;
}
createAsset(auth: LocalAuthContext, input: CreateAssetInput) {
return this.service.createAsset(auth, input);
}
probeAsset(assetId: string) {
return this.service.probeAsset(assetId);
}
}

View File

@ -0,0 +1,16 @@
import type { StorageService } from "../storage/storage.service.ts";
import { AssetsController } from "./assets.controller.ts";
import { AssetsService } from "./assets.service.ts";
import { ProbeService } from "./probe/probe.service.ts";
export function createAssetsModule(storageService: StorageService) {
const probeService = new ProbeService();
const service = new AssetsService(storageService, probeService);
const controller = new AssetsController(service);
return {
service,
controller,
probeService,
};
}

View File

@ -0,0 +1,65 @@
import type { LocalAuthContext } from "../auth/auth.controller.ts";
import type { AssetType } from "../../../../../packages/contracts/src/domain.ts";
import type { StorageService } from "../storage/storage.service.ts";
import {
ProbeService,
type ProbeReport,
} from "./probe/probe.service.ts";
export type CreateAssetInput = {
workspaceId: string;
projectId: string;
type: AssetType;
sourceType: string;
displayName: string;
};
export type AssetRecord = CreateAssetInput & {
id: string;
status: "pending";
storageRef: ReturnType<StorageService["createUploadStorageRef"]>;
createdBy: string;
};
export class AssetsService {
private readonly storageService: StorageService;
private readonly probeService: ProbeService;
private readonly assets: AssetRecord[] = [];
private readonly reports: ProbeReport[] = [];
constructor(storageService: StorageService, probeService: ProbeService) {
this.storageService = storageService;
this.probeService = probeService;
}
createAsset(auth: LocalAuthContext, input: CreateAssetInput): AssetRecord {
const asset: AssetRecord = {
id: `asset-${this.assets.length + 1}`,
workspaceId: input.workspaceId,
projectId: input.projectId,
type: input.type,
sourceType: input.sourceType,
displayName: input.displayName,
status: "pending",
storageRef: this.storageService.createUploadStorageRef(input.displayName),
createdBy: auth.userId,
};
this.assets.push(asset);
return asset;
}
getRequiredAsset(assetId: string): AssetRecord {
const asset = this.assets.find((item) => item.id === assetId);
if (!asset) {
throw new Error(`asset not found: ${assetId}`);
}
return asset;
}
probeAsset(assetId: string): ProbeReport {
const asset = this.getRequiredAsset(assetId);
const report = this.probeService.probe(asset);
this.reports.push(report);
return report;
}
}

View File

@ -0,0 +1,48 @@
import type { AssetRecord } from "../assets.service.ts";
export type ProbeReport = {
assetId: string;
detectedFormatCandidates: string[];
structureSummary: Record<string, string>;
warnings: string[];
recommendedNextNodes: string[];
createdAt: string;
};
export class ProbeService {
probe(asset: AssetRecord): ProbeReport {
if (asset.type === "archive") {
return {
assetId: asset.id,
detectedFormatCandidates: ["compressed_archive", "delivery_package"],
structureSummary: { assetType: asset.type, displayName: asset.displayName },
warnings: [],
recommendedNextNodes: ["extract_archive", "validate_structure"],
createdAt: new Date().toISOString(),
};
}
if (
asset.type === "folder" &&
asset.displayName.startsWith("BJ_")
) {
return {
assetId: asset.id,
detectedFormatCandidates: ["delivery_package"],
structureSummary: { assetType: asset.type, displayName: asset.displayName },
warnings: [],
recommendedNextNodes: ["validate_structure", "validate_metadata"],
createdAt: new Date().toISOString(),
};
}
return {
assetId: asset.id,
detectedFormatCandidates: [asset.type],
structureSummary: { assetType: asset.type, displayName: asset.displayName },
warnings: [],
recommendedNextNodes: ["inspect_asset"],
createdAt: new Date().toISOString(),
};
}
}

View File

@ -0,0 +1,11 @@
export type LocalAuthContext = {
userId: string;
};
export function createLocalAuthContext(userId: string): LocalAuthContext {
if (!userId) {
throw new Error("userId is required");
}
return { userId };
}

View File

@ -0,0 +1,9 @@
import { createLocalAuthContext } from "./auth.controller.ts";
export function createAuthModule() {
return {
controller: {
createLocalAuthContext,
},
};
}

View File

@ -0,0 +1,21 @@
import type { LocalAuthContext } from "../auth/auth.controller.ts";
import {
ProjectsService,
type CreateProjectInput,
} from "./projects.service.ts";
export class ProjectsController {
private readonly service: ProjectsService;
constructor(service: ProjectsService) {
this.service = service;
}
createProject(auth: LocalAuthContext, input: CreateProjectInput) {
return this.service.createProject(auth, input);
}
listProjects(workspaceId: string) {
return this.service.listProjects(workspaceId);
}
}

View File

@ -0,0 +1,13 @@
import type { WorkspacesService } from "../workspaces/workspaces.service.ts";
import { ProjectsController } from "./projects.controller.ts";
import { ProjectsService } from "./projects.service.ts";
export function createProjectsModule(workspacesService: WorkspacesService) {
const service = new ProjectsService(workspacesService);
const controller = new ProjectsController(service);
return {
service,
controller,
};
}

View File

@ -0,0 +1,57 @@
import type { LocalAuthContext } from "../auth/auth.controller.ts";
import type { WorkspacesService } from "../workspaces/workspaces.service.ts";
export type CreateProjectInput = {
workspaceId: string;
name: string;
slug: string;
description: string;
};
export type ProjectRecord = CreateProjectInput & {
id: string;
status: "active";
createdBy: string;
};
export class ProjectsService {
private readonly projects: ProjectRecord[] = [];
private readonly workspacesService: WorkspacesService;
constructor(workspacesService: WorkspacesService) {
this.workspacesService = workspacesService;
}
createProject(
auth: LocalAuthContext,
input: CreateProjectInput,
): ProjectRecord {
if (!input.workspaceId) {
throw new Error("workspaceId is required");
}
if (!input.name) {
throw new Error("name is required");
}
if (!input.slug) {
throw new Error("slug is required");
}
this.workspacesService.getRequiredWorkspace(input.workspaceId);
const project: ProjectRecord = {
id: `project-${input.slug}`,
workspaceId: input.workspaceId,
name: input.name,
slug: input.slug,
description: input.description,
status: "active",
createdBy: auth.userId,
};
this.projects.push(project);
return project;
}
listProjects(workspaceId: string): ProjectRecord[] {
return this.projects.filter((project) => project.workspaceId === workspaceId);
}
}

View File

@ -0,0 +1,21 @@
import type { LocalAuthContext } from "../auth/auth.controller.ts";
import { RunsService } from "./runs.service.ts";
export class RunsController {
private readonly service: RunsService;
constructor(service: RunsService) {
this.service = service;
}
createWorkflowRun(
auth: LocalAuthContext,
input: { workflowDefinitionId: string; workflowVersionId: string },
) {
return this.service.createWorkflowRun(auth, input);
}
listRunTasks(workflowRunId: string) {
return this.service.listRunTasks(workflowRunId);
}
}

View File

@ -0,0 +1,13 @@
import type { WorkflowsService } from "../workflows/workflows.service.ts";
import { RunsController } from "./runs.controller.ts";
import { RunsService } from "./runs.service.ts";
export function createRunsModule(workflowsService: WorkflowsService) {
const service = new RunsService(workflowsService);
const controller = new RunsController(service);
return {
service,
controller,
};
}

View File

@ -0,0 +1,68 @@
import type { LocalAuthContext } from "../auth/auth.controller.ts";
import type { WorkflowsService } from "../workflows/workflows.service.ts";
export type WorkflowRunRecord = {
id: string;
workflowDefinitionId: string;
workflowVersionId: string;
status: "queued";
triggeredBy: string;
};
export type RunTaskRecord = {
id: string;
workflowRunId: string;
workflowVersionId: string;
nodeId: string;
nodeType: string;
status: "queued" | "pending";
attempt: number;
};
export class RunsService {
private readonly workflowsService: WorkflowsService;
private readonly runs: WorkflowRunRecord[] = [];
private readonly tasks: RunTaskRecord[] = [];
constructor(workflowsService: WorkflowsService) {
this.workflowsService = workflowsService;
}
createWorkflowRun(
auth: LocalAuthContext,
input: { workflowDefinitionId: string; workflowVersionId: string },
): WorkflowRunRecord {
this.workflowsService.getRequiredWorkflowDefinition(input.workflowDefinitionId);
const version = this.workflowsService.getRequiredWorkflowVersion(
input.workflowVersionId,
);
const run: WorkflowRunRecord = {
id: `run-${this.runs.length + 1}`,
workflowDefinitionId: input.workflowDefinitionId,
workflowVersionId: input.workflowVersionId,
status: "queued",
triggeredBy: auth.userId,
};
this.runs.push(run);
const targetNodes = new Set(version.logicGraph.edges.map((edge) => edge.to));
version.logicGraph.nodes.forEach((node, index) => {
this.tasks.push({
id: `task-${run.id}-${node.id}`,
workflowRunId: run.id,
workflowVersionId: version.id,
nodeId: node.id,
nodeType: node.type,
status: targetNodes.has(node.id) ? "pending" : "queued",
attempt: 1,
});
});
return run;
}
listRunTasks(workflowRunId: string): RunTaskRecord[] {
return this.tasks.filter((task) => task.workflowRunId === workflowRunId);
}
}

View File

@ -0,0 +1,9 @@
import { StorageService } from "./storage.service.ts";
export function createStorageModule() {
const service = new StorageService();
return {
service,
};
}

View File

@ -0,0 +1,15 @@
export type StorageRef = {
provider: "minio";
bucket: string;
key: string;
};
export class StorageService {
createUploadStorageRef(displayName: string): StorageRef {
return {
provider: "minio",
bucket: "raw-assets",
key: `uploads/${displayName}`,
};
}
}

View File

@ -0,0 +1,29 @@
import type { LocalAuthContext } from "../auth/auth.controller.ts";
import {
WorkflowsService,
type WorkflowVersionInput,
} from "./workflows.service.ts";
export class WorkflowsController {
private readonly service: WorkflowsService;
constructor(service: WorkflowsService) {
this.service = service;
}
createWorkflowDefinition(
auth: LocalAuthContext,
input: {
workspaceId: string;
projectId: string;
name: string;
slug: string;
},
) {
return this.service.createWorkflowDefinition(auth, input);
}
saveWorkflowVersion(auth: LocalAuthContext, input: WorkflowVersionInput) {
return this.service.saveWorkflowVersion(auth, input);
}
}

View File

@ -0,0 +1,12 @@
import { WorkflowsController } from "./workflows.controller.ts";
import { WorkflowsService } from "./workflows.service.ts";
export function createWorkflowsModule() {
const service = new WorkflowsService();
const controller = new WorkflowsController(service);
return {
service,
controller,
};
}

View File

@ -0,0 +1,105 @@
import type { LocalAuthContext } from "../auth/auth.controller.ts";
export type WorkflowNode = {
id: string;
type: string;
};
export type WorkflowEdge = {
from: string;
to: string;
};
export type WorkflowVersionInput = {
workflowDefinitionId: string;
visualGraph: Record<string, unknown>;
logicGraph: {
nodes: WorkflowNode[];
edges: WorkflowEdge[];
};
runtimeGraph: Record<string, unknown>;
pluginRefs: string[];
};
export type WorkflowDefinitionRecord = {
id: string;
workspaceId: string;
projectId: string;
name: string;
slug: string;
status: "draft";
latestVersionNumber: number;
publishedVersionNumber: number | null;
createdBy: string;
};
export type WorkflowVersionRecord = WorkflowVersionInput & {
id: string;
versionNumber: number;
createdBy: string;
};
export class WorkflowsService {
private readonly definitions: WorkflowDefinitionRecord[] = [];
private readonly versions: WorkflowVersionRecord[] = [];
createWorkflowDefinition(
auth: LocalAuthContext,
input: Omit<
WorkflowDefinitionRecord,
"id" | "status" | "latestVersionNumber" | "publishedVersionNumber" | "createdBy"
>,
): WorkflowDefinitionRecord {
const definition: WorkflowDefinitionRecord = {
id: `workflow-${input.slug}`,
workspaceId: input.workspaceId,
projectId: input.projectId,
name: input.name,
slug: input.slug,
status: "draft",
latestVersionNumber: 0,
publishedVersionNumber: null,
createdBy: auth.userId,
};
this.definitions.push(definition);
return definition;
}
getRequiredWorkflowDefinition(
workflowDefinitionId: string,
): WorkflowDefinitionRecord {
const definition = this.definitions.find(
(item) => item.id === workflowDefinitionId,
);
if (!definition) {
throw new Error(`workflow definition not found: ${workflowDefinitionId}`);
}
return definition;
}
saveWorkflowVersion(
auth: LocalAuthContext,
input: WorkflowVersionInput,
): WorkflowVersionRecord {
const definition = this.getRequiredWorkflowDefinition(input.workflowDefinitionId);
const versionNumber = definition.latestVersionNumber + 1;
definition.latestVersionNumber = versionNumber;
const version: WorkflowVersionRecord = {
...input,
id: `${definition.id}-v${versionNumber}`,
versionNumber,
createdBy: auth.userId,
};
this.versions.push(version);
return version;
}
getRequiredWorkflowVersion(workflowVersionId: string): WorkflowVersionRecord {
const version = this.versions.find((item) => item.id === workflowVersionId);
if (!version) {
throw new Error(`workflow version not found: ${workflowVersionId}`);
}
return version;
}
}

View File

@ -0,0 +1,18 @@
import type { LocalAuthContext } from "../auth/auth.controller.ts";
import { WorkspacesService } from "./workspaces.service.ts";
export class WorkspacesController {
private readonly service: WorkspacesService;
constructor(service: WorkspacesService) {
this.service = service;
}
bootstrapPersonalWorkspace(auth: LocalAuthContext) {
return this.service.bootstrapPersonalWorkspace(auth);
}
listMyWorkspaces(auth: LocalAuthContext) {
return this.service.listByOwner(auth.userId);
}
}

View File

@ -0,0 +1,12 @@
import { WorkspacesController } from "./workspaces.controller.ts";
import { WorkspacesService } from "./workspaces.service.ts";
export function createWorkspaceModule() {
const service = new WorkspacesService();
const controller = new WorkspacesController(service);
return {
service,
controller,
};
}

View File

@ -0,0 +1,53 @@
import type { LocalAuthContext } from "../auth/auth.controller.ts";
import type { WorkspaceType } from "../../../../../../packages/contracts/src/domain.ts";
export type WorkspaceRecord = {
id: string;
type: WorkspaceType;
name: string;
slug: string;
ownerId: string;
status: "active";
};
function personalWorkspaceSlug(userId: string): string {
return `${userId}-personal`;
}
export class WorkspacesService {
private readonly workspaces: WorkspaceRecord[] = [];
bootstrapPersonalWorkspace(auth: LocalAuthContext): WorkspaceRecord {
const existing = this.workspaces.find(
(workspace) =>
workspace.ownerId === auth.userId && workspace.type === "personal",
);
if (existing) {
return existing;
}
const workspace: WorkspaceRecord = {
id: `workspace-${personalWorkspaceSlug(auth.userId)}`,
type: "personal",
name: `${auth.userId} Personal`,
slug: personalWorkspaceSlug(auth.userId),
ownerId: auth.userId,
status: "active",
};
this.workspaces.push(workspace);
return workspace;
}
listByOwner(ownerId: string): WorkspaceRecord[] {
return this.workspaces.filter((workspace) => workspace.ownerId === ownerId);
}
getRequiredWorkspace(workspaceId: string): WorkspaceRecord {
const workspace = this.workspaces.find((item) => item.id === workspaceId);
if (!workspace) {
throw new Error(`workspace not found: ${workspaceId}`);
}
return workspace;
}
}

View File

@ -0,0 +1,96 @@
import test from "node:test";
import assert from "node:assert/strict";
import { createLocalAuthContext } from "../src/modules/auth/auth.controller.ts";
import { createWorkspaceModule } from "../src/modules/workspaces/workspaces.module.ts";
import { createProjectsModule } from "../src/modules/projects/projects.module.ts";
import { createStorageModule } from "../src/modules/storage/storage.module.ts";
import { createAssetsModule } from "../src/modules/assets/assets.module.ts";
test("register an uploaded asset record", () => {
const auth = createLocalAuthContext("user-4");
const workspaces = createWorkspaceModule();
const projects = createProjectsModule(workspaces.service);
const storage = createStorageModule();
const assets = createAssetsModule(storage.service);
const workspace = workspaces.controller.bootstrapPersonalWorkspace(auth);
const project = projects.controller.createProject(auth, {
workspaceId: workspace.id,
name: "Assets",
slug: "assets",
description: "Asset intake project",
});
const asset = assets.controller.createAsset(auth, {
workspaceId: workspace.id,
projectId: project.id,
type: "archive",
sourceType: "upload",
displayName: "demo.zip",
});
assert.equal(asset.type, "archive");
assert.equal(asset.projectId, project.id);
assert.equal(asset.status, "pending");
});
test("create a probe report for a raw asset", () => {
const auth = createLocalAuthContext("user-5");
const workspaces = createWorkspaceModule();
const projects = createProjectsModule(workspaces.service);
const storage = createStorageModule();
const assets = createAssetsModule(storage.service);
const workspace = workspaces.controller.bootstrapPersonalWorkspace(auth);
const project = projects.controller.createProject(auth, {
workspaceId: workspace.id,
name: "Probe",
slug: "probe",
description: "Probe test project",
});
const asset = assets.controller.createAsset(auth, {
workspaceId: workspace.id,
projectId: project.id,
type: "folder",
sourceType: "upload",
displayName: "BJ_001_0001_OsmoNano_2026-03-19_14-51-43",
});
const report = assets.controller.probeAsset(asset.id);
assert.equal(report.assetId, asset.id);
assert.equal(report.detectedFormatCandidates.includes("delivery_package"), true);
});
test("return recommended next actions from probe metadata", () => {
const auth = createLocalAuthContext("user-6");
const workspaces = createWorkspaceModule();
const projects = createProjectsModule(workspaces.service);
const storage = createStorageModule();
const assets = createAssetsModule(storage.service);
const workspace = workspaces.controller.bootstrapPersonalWorkspace(auth);
const project = projects.controller.createProject(auth, {
workspaceId: workspace.id,
name: "Recommendations",
slug: "recommendations",
description: "Recommendation test project",
});
const asset = assets.controller.createAsset(auth, {
workspaceId: workspace.id,
projectId: project.id,
type: "archive",
sourceType: "upload",
displayName: "20260324-data-demo.zip",
});
const report = assets.controller.probeAsset(asset.id);
assert.deepEqual(report.recommendedNextNodes, [
"extract_archive",
"validate_structure",
]);
});

View File

@ -0,0 +1,52 @@
import test from "node:test";
import assert from "node:assert/strict";
import { createLocalAuthContext } from "../src/modules/auth/auth.controller.ts";
import { createWorkspaceModule } from "../src/modules/workspaces/workspaces.module.ts";
import { createProjectsModule } from "../src/modules/projects/projects.module.ts";
test("bootstrap personal workspace flow creates a personal workspace for the user", () => {
const auth = createLocalAuthContext("user-1");
const workspaces = createWorkspaceModule();
const workspace = workspaces.controller.bootstrapPersonalWorkspace(auth);
assert.equal(workspace.type, "personal");
assert.equal(workspace.ownerId, "user-1");
assert.equal(workspace.slug, "user-1-personal");
});
test("create project under a workspace", () => {
const auth = createLocalAuthContext("user-2");
const workspaces = createWorkspaceModule();
const projects = createProjectsModule(workspaces.service);
const workspace = workspaces.controller.bootstrapPersonalWorkspace(auth);
const project = projects.controller.createProject(auth, {
workspaceId: workspace.id,
name: "Dataset Ops",
slug: "dataset-ops",
description: "Project for embodied data workflows",
});
assert.equal(project.workspaceId, workspace.id);
assert.equal(project.name, "Dataset Ops");
assert.equal(project.createdBy, "user-2");
});
test("reject project creation without a workspace id", () => {
const auth = createLocalAuthContext("user-3");
const workspaces = createWorkspaceModule();
const projects = createProjectsModule(workspaces.service);
assert.throws(
() =>
projects.controller.createProject(auth, {
workspaceId: "",
name: "Invalid",
slug: "invalid",
description: "Missing workspace id",
}),
/workspaceId is required/,
);
});

View File

@ -0,0 +1,123 @@
import test from "node:test";
import assert from "node:assert/strict";
import { createLocalAuthContext } from "../src/modules/auth/auth.controller.ts";
import { createWorkspaceModule } from "../src/modules/workspaces/workspaces.module.ts";
import { createProjectsModule } from "../src/modules/projects/projects.module.ts";
import { createWorkflowsModule } from "../src/modules/workflows/workflows.module.ts";
import { createRunsModule } from "../src/modules/runs/runs.module.ts";
test("create workflow definition", () => {
const auth = createLocalAuthContext("user-7");
const workspaces = createWorkspaceModule();
const projects = createProjectsModule(workspaces.service);
const workflows = createWorkflowsModule();
const workspace = workspaces.controller.bootstrapPersonalWorkspace(auth);
const project = projects.controller.createProject(auth, {
workspaceId: workspace.id,
name: "Workflow",
slug: "workflow",
description: "Workflow project",
});
const definition = workflows.controller.createWorkflowDefinition(auth, {
workspaceId: workspace.id,
projectId: project.id,
name: "Delivery Flow",
slug: "delivery-flow",
});
assert.equal(definition.name, "Delivery Flow");
assert.equal(definition.status, "draft");
});
test("save workflow version", () => {
const auth = createLocalAuthContext("user-8");
const workspaces = createWorkspaceModule();
const projects = createProjectsModule(workspaces.service);
const workflows = createWorkflowsModule();
const workspace = workspaces.controller.bootstrapPersonalWorkspace(auth);
const project = projects.controller.createProject(auth, {
workspaceId: workspace.id,
name: "Versions",
slug: "versions",
description: "Workflow version project",
});
const definition = workflows.controller.createWorkflowDefinition(auth, {
workspaceId: workspace.id,
projectId: project.id,
name: "Versioned Flow",
slug: "versioned-flow",
});
const version = workflows.controller.saveWorkflowVersion(auth, {
workflowDefinitionId: definition.id,
visualGraph: { nodes: [], edges: [] },
logicGraph: {
nodes: [
{ id: "source", type: "Source" },
{ id: "validate", type: "Inspect" },
],
edges: [{ from: "source", to: "validate" }],
},
runtimeGraph: { nodes: [] },
pluginRefs: [],
});
assert.equal(version.versionNumber, 1);
assert.equal(version.workflowDefinitionId, definition.id);
});
test("create workflow run from saved version and derive initial run tasks", () => {
const auth = createLocalAuthContext("user-9");
const workspaces = createWorkspaceModule();
const projects = createProjectsModule(workspaces.service);
const workflows = createWorkflowsModule();
const runs = createRunsModule(workflows.service);
const workspace = workspaces.controller.bootstrapPersonalWorkspace(auth);
const project = projects.controller.createProject(auth, {
workspaceId: workspace.id,
name: "Runs",
slug: "runs",
description: "Run project",
});
const definition = workflows.controller.createWorkflowDefinition(auth, {
workspaceId: workspace.id,
projectId: project.id,
name: "Run Flow",
slug: "run-flow",
});
const version = workflows.controller.saveWorkflowVersion(auth, {
workflowDefinitionId: definition.id,
visualGraph: { nodes: [], edges: [] },
logicGraph: {
nodes: [
{ id: "source", type: "Source" },
{ id: "rename", type: "Transform" },
{ id: "export", type: "Export" },
],
edges: [
{ from: "source", to: "rename" },
{ from: "rename", to: "export" },
],
},
runtimeGraph: { nodes: [] },
pluginRefs: [],
});
const run = runs.controller.createWorkflowRun(auth, {
workflowDefinitionId: definition.id,
workflowVersionId: version.id,
});
const tasks = runs.controller.listRunTasks(run.id);
assert.equal(run.status, "queued");
assert.equal(tasks.length, 3);
assert.equal(tasks[0].nodeId, "source");
assert.equal(tasks[0].status, "queued");
assert.equal(tasks[1].status, "pending");
});

View File

@ -1,8 +1,10 @@
{ {
"name": "@emboflow/worker", "name": "worker",
"private": true, "private": true,
"version": "0.1.0", "version": "0.1.0",
"type": "module",
"scripts": { "scripts": {
"dev": "echo 'worker app scaffold pending'" "dev": "echo 'worker app scaffold pending'",
"test": "node --test --experimental-strip-types"
} }
} }

View File

@ -0,0 +1,13 @@
export type ExecutorType = "python" | "docker" | "http";
export type TaskRecord = {
id: string;
nodeId: string;
executorType: ExecutorType;
status: "pending" | "running" | "success" | "failed";
};
export type ExecutionContext = {
taskId: string;
nodeId: string;
};

View File

@ -0,0 +1,10 @@
import type { ExecutionContext, TaskRecord } from "../contracts/execution-context.ts";
export class DockerExecutor {
executionCount = 0;
async execute(task: TaskRecord, _context: ExecutionContext) {
this.executionCount += 1;
return { taskId: task.id, executor: "docker" as const };
}
}

View File

@ -0,0 +1,10 @@
import type { ExecutionContext, TaskRecord } from "../contracts/execution-context.ts";
export class HttpExecutor {
executionCount = 0;
async execute(task: TaskRecord, _context: ExecutionContext) {
this.executionCount += 1;
return { taskId: task.id, executor: "http" as const };
}
}

View File

@ -0,0 +1,10 @@
import type { ExecutionContext, TaskRecord } from "../contracts/execution-context.ts";
export class PythonExecutor {
executionCount = 0;
async execute(task: TaskRecord, _context: ExecutionContext) {
this.executionCount += 1;
return { taskId: task.id, executor: "python" as const };
}
}

5
apps/worker/src/main.ts Normal file
View File

@ -0,0 +1,5 @@
export function bootstrapWorker() {
return {
status: "ready" as const,
};
}

View File

@ -0,0 +1,45 @@
import type {
ExecutionContext,
ExecutorType,
TaskRecord,
} from "../contracts/execution-context.ts";
import { LocalScheduler } from "../scheduler/local-scheduler.ts";
import { DockerExecutor } from "../executors/docker-executor.ts";
import { HttpExecutor } from "../executors/http-executor.ts";
import { PythonExecutor } from "../executors/python-executor.ts";
type ExecutorMap = {
python: PythonExecutor;
docker: DockerExecutor;
http: HttpExecutor;
};
export class TaskRunner {
private readonly scheduler: LocalScheduler;
private readonly executors: ExecutorMap;
constructor(config: { scheduler: LocalScheduler; executors: ExecutorMap }) {
this.scheduler = config.scheduler;
this.executors = config.executors;
}
async runNextTask(): Promise<TaskRecord | undefined> {
const task = this.scheduler.nextPendingTask();
if (!task) {
return undefined;
}
this.scheduler.transition(task.id, "running");
const context: ExecutionContext = {
taskId: task.id,
nodeId: task.nodeId,
};
await this.executors[task.executorType as ExecutorType].execute(task, context);
this.scheduler.transition(task.id, "success");
return {
...task,
status: "success",
};
}
}

View File

@ -0,0 +1,32 @@
import type { TaskRecord } from "../contracts/execution-context.ts";
export class LocalScheduler {
private readonly tasks: TaskRecord[];
private readonly statusTimeline = new Map<string, string[]>();
constructor(tasks: TaskRecord[]) {
this.tasks = tasks.map((task) => ({ ...task }));
tasks.forEach((task) => {
this.statusTimeline.set(task.id, [task.status]);
});
}
nextPendingTask(): TaskRecord | undefined {
return this.tasks.find((task) => task.status === "pending");
}
transition(taskId: string, status: TaskRecord["status"]) {
const task = this.tasks.find((item) => item.id === taskId);
if (!task) {
throw new Error(`task not found: ${taskId}`);
}
task.status = status;
const timeline = this.statusTimeline.get(taskId) ?? [];
timeline.push(status);
this.statusTimeline.set(taskId, timeline);
}
getStatusTimeline(taskId: string): string[] {
return this.statusTimeline.get(taskId) ?? [];
}
}

View File

@ -0,0 +1,88 @@
import test from "node:test";
import assert from "node:assert/strict";
import { TaskRunner } from "../src/runner/task-runner.ts";
import { LocalScheduler } from "../src/scheduler/local-scheduler.ts";
import { PythonExecutor } from "../src/executors/python-executor.ts";
import { DockerExecutor } from "../src/executors/docker-executor.ts";
import { HttpExecutor } from "../src/executors/http-executor.ts";
test("worker loads pending tasks", async () => {
const scheduler = new LocalScheduler([
{
id: "task-1",
nodeId: "source",
executorType: "python",
status: "pending",
},
]);
const runner = new TaskRunner({
scheduler,
executors: {
python: new PythonExecutor(),
docker: new DockerExecutor(),
http: new HttpExecutor(),
},
});
const task = await runner.runNextTask();
assert.equal(task?.id, "task-1");
assert.equal(task?.status, "success");
});
test("worker marks task running then success", async () => {
const scheduler = new LocalScheduler([
{
id: "task-2",
nodeId: "transform",
executorType: "docker",
status: "pending",
},
]);
const runner = new TaskRunner({
scheduler,
executors: {
python: new PythonExecutor(),
docker: new DockerExecutor(),
http: new HttpExecutor(),
},
});
await runner.runNextTask();
const timeline = scheduler.getStatusTimeline("task-2");
assert.deepEqual(timeline, ["pending", "running", "success"]);
});
test("worker chooses executor based on runtime config", async () => {
const scheduler = new LocalScheduler([
{
id: "task-3",
nodeId: "export",
executorType: "http",
status: "pending",
},
]);
const pythonExecutor = new PythonExecutor();
const dockerExecutor = new DockerExecutor();
const httpExecutor = new HttpExecutor();
const runner = new TaskRunner({
scheduler,
executors: {
python: pythonExecutor,
docker: dockerExecutor,
http: httpExecutor,
},
});
await runner.runNextTask();
assert.equal(pythonExecutor.executionCount, 0);
assert.equal(dockerExecutor.executionCount, 0);
assert.equal(httpExecutor.executionCount, 1);
});

View File

@ -10,6 +10,14 @@
--- ---
## Progress Notes
- `2026-03-26`: Tasks 1 and 2 are complete and committed.
- `2026-03-26`: Tasks 3 through 6 are implemented against in-memory V1 control-plane services so the API and worker contracts can stabilize before persistence and framework wiring are deepened.
- `2026-03-26`: Package-level verification continues to use the Node 22 built-in test runner with direct file targets such as `pnpm --filter api test test/projects.e2e-spec.ts` and `pnpm --filter worker test test/task-runner.spec.ts`.
---
### Task 1: Bootstrap The Monorepo And Runtime Skeleton ### Task 1: Bootstrap The Monorepo And Runtime Skeleton
**Files:** **Files:**

17
pnpm-lock.yaml generated Normal file
View File

@ -0,0 +1,17 @@
lockfileVersion: '9.0'
settings:
autoInstallPeers: true
excludeLinksFromLockfile: false
importers:
.: {}
apps/api: {}
apps/web: {}
apps/worker: {}
packages/contracts: {}