From 668602de228227dc69fb439fdba884bfd3b4ca74 Mon Sep 17 00:00:00 2001 From: eust-w Date: Thu, 26 Mar 2026 17:41:32 +0800 Subject: [PATCH] :sparkles: feat: add api control plane flows and local worker runtime --- .../schemas/asset-probe-report.schema.ts | 10 ++ .../common/mongo/schemas/run-task.schema.ts | 13 ++ .../workflow-definition-version.schema.ts | 13 ++ .../mongo/schemas/workflow-run.schema.ts | 11 ++ .../src/modules/assets/assets.controller.ts | 21 +++ apps/api/src/modules/assets/assets.module.ts | 16 +++ apps/api/src/modules/assets/assets.service.ts | 65 +++++++++ .../src/modules/assets/probe/probe.service.ts | 48 +++++++ apps/api/src/modules/auth/auth.controller.ts | 11 ++ apps/api/src/modules/auth/auth.module.ts | 9 ++ .../modules/projects/projects.controller.ts | 21 +++ .../src/modules/projects/projects.module.ts | 13 ++ .../src/modules/projects/projects.service.ts | 57 ++++++++ apps/api/src/modules/runs/runs.controller.ts | 21 +++ apps/api/src/modules/runs/runs.module.ts | 13 ++ apps/api/src/modules/runs/runs.service.ts | 68 ++++++++++ .../api/src/modules/storage/storage.module.ts | 9 ++ .../src/modules/storage/storage.service.ts | 15 +++ .../modules/workflows/workflows.controller.ts | 29 +++++ .../src/modules/workflows/workflows.module.ts | 12 ++ .../modules/workflows/workflows.service.ts | 105 +++++++++++++++ .../workspaces/workspaces.controller.ts | 18 +++ .../modules/workspaces/workspaces.module.ts | 12 ++ .../modules/workspaces/workspaces.service.ts | 53 ++++++++ apps/api/test/assets.e2e-spec.ts | 96 ++++++++++++++ apps/api/test/projects.e2e-spec.ts | 52 ++++++++ apps/api/test/workflow-runs.e2e-spec.ts | 123 ++++++++++++++++++ apps/worker/package.json | 6 +- .../worker/src/contracts/execution-context.ts | 13 ++ apps/worker/src/executors/docker-executor.ts | 10 ++ apps/worker/src/executors/http-executor.ts | 10 ++ apps/worker/src/executors/python-executor.ts | 10 ++ apps/worker/src/main.ts | 5 + apps/worker/src/runner/task-runner.ts | 45 +++++++ apps/worker/src/scheduler/local-scheduler.ts | 32 +++++ apps/worker/test/task-runner.spec.ts | 88 +++++++++++++ ...26-03-26-emboflow-v1-foundation-and-mvp.md | 8 ++ pnpm-lock.yaml | 17 +++ 38 files changed, 1176 insertions(+), 2 deletions(-) create mode 100644 apps/api/src/common/mongo/schemas/asset-probe-report.schema.ts create mode 100644 apps/api/src/common/mongo/schemas/run-task.schema.ts create mode 100644 apps/api/src/common/mongo/schemas/workflow-definition-version.schema.ts create mode 100644 apps/api/src/common/mongo/schemas/workflow-run.schema.ts create mode 100644 apps/api/src/modules/assets/assets.controller.ts create mode 100644 apps/api/src/modules/assets/assets.module.ts create mode 100644 apps/api/src/modules/assets/assets.service.ts create mode 100644 apps/api/src/modules/assets/probe/probe.service.ts create mode 100644 apps/api/src/modules/auth/auth.controller.ts create mode 100644 apps/api/src/modules/auth/auth.module.ts create mode 100644 apps/api/src/modules/projects/projects.controller.ts create mode 100644 apps/api/src/modules/projects/projects.module.ts create mode 100644 apps/api/src/modules/projects/projects.service.ts create mode 100644 apps/api/src/modules/runs/runs.controller.ts create mode 100644 apps/api/src/modules/runs/runs.module.ts create mode 100644 apps/api/src/modules/runs/runs.service.ts create mode 100644 apps/api/src/modules/storage/storage.module.ts create mode 100644 apps/api/src/modules/storage/storage.service.ts create mode 100644 apps/api/src/modules/workflows/workflows.controller.ts create mode 100644 apps/api/src/modules/workflows/workflows.module.ts create mode 100644 apps/api/src/modules/workflows/workflows.service.ts create mode 100644 apps/api/src/modules/workspaces/workspaces.controller.ts create mode 100644 apps/api/src/modules/workspaces/workspaces.module.ts create mode 100644 apps/api/src/modules/workspaces/workspaces.service.ts create mode 100644 apps/api/test/assets.e2e-spec.ts create mode 100644 apps/api/test/projects.e2e-spec.ts create mode 100644 apps/api/test/workflow-runs.e2e-spec.ts create mode 100644 apps/worker/src/contracts/execution-context.ts create mode 100644 apps/worker/src/executors/docker-executor.ts create mode 100644 apps/worker/src/executors/http-executor.ts create mode 100644 apps/worker/src/executors/python-executor.ts create mode 100644 apps/worker/src/main.ts create mode 100644 apps/worker/src/runner/task-runner.ts create mode 100644 apps/worker/src/scheduler/local-scheduler.ts create mode 100644 apps/worker/test/task-runner.spec.ts create mode 100644 pnpm-lock.yaml diff --git a/apps/api/src/common/mongo/schemas/asset-probe-report.schema.ts b/apps/api/src/common/mongo/schemas/asset-probe-report.schema.ts new file mode 100644 index 0000000..686b749 --- /dev/null +++ b/apps/api/src/common/mongo/schemas/asset-probe-report.schema.ts @@ -0,0 +1,10 @@ +export const ASSET_PROBE_REPORT_COLLECTION_NAME = "asset_probe_reports"; + +export const assetProbeReportSchemaDefinition = { + assetId: { type: "string", required: true }, + detectedFormatCandidates: { type: "array", required: true, default: [] }, + structureSummary: { type: "object", required: true, default: {} }, + warnings: { type: "array", required: true, default: [] }, + recommendedNextNodes: { type: "array", required: true, default: [] }, + createdAt: { type: "date", required: true }, +} as const; diff --git a/apps/api/src/common/mongo/schemas/run-task.schema.ts b/apps/api/src/common/mongo/schemas/run-task.schema.ts new file mode 100644 index 0000000..089ee4f --- /dev/null +++ b/apps/api/src/common/mongo/schemas/run-task.schema.ts @@ -0,0 +1,13 @@ +export const RUN_TASK_COLLECTION_NAME = "run_tasks"; + +export const runTaskSchemaDefinition = { + workflowRunId: { type: "string", required: true }, + workflowVersionId: { type: "string", required: true }, + nodeId: { type: "string", required: true }, + nodeType: { type: "string", required: true }, + status: { type: "string", required: true }, + attempt: { type: "number", required: true, default: 1 }, + inputRefs: { type: "array", required: true, default: [] }, + outputRefs: { type: "array", required: true, default: [] }, + createdAt: { type: "date", required: true }, +} as const; diff --git a/apps/api/src/common/mongo/schemas/workflow-definition-version.schema.ts b/apps/api/src/common/mongo/schemas/workflow-definition-version.schema.ts new file mode 100644 index 0000000..1756b83 --- /dev/null +++ b/apps/api/src/common/mongo/schemas/workflow-definition-version.schema.ts @@ -0,0 +1,13 @@ +export const WORKFLOW_DEFINITION_VERSION_COLLECTION_NAME = + "workflow_definition_versions"; + +export const workflowDefinitionVersionSchemaDefinition = { + workflowDefinitionId: { type: "string", required: true }, + versionNumber: { type: "number", required: true }, + visualGraph: { type: "object", required: true }, + logicGraph: { type: "object", required: true }, + runtimeGraph: { type: "object", required: true }, + pluginRefs: { type: "array", required: true, default: [] }, + createdBy: { type: "string", required: true }, + createdAt: { type: "date", required: true }, +} as const; diff --git a/apps/api/src/common/mongo/schemas/workflow-run.schema.ts b/apps/api/src/common/mongo/schemas/workflow-run.schema.ts new file mode 100644 index 0000000..daf4483 --- /dev/null +++ b/apps/api/src/common/mongo/schemas/workflow-run.schema.ts @@ -0,0 +1,11 @@ +export const WORKFLOW_RUN_COLLECTION_NAME = "workflow_runs"; + +export const workflowRunSchemaDefinition = { + workflowDefinitionId: { type: "string", required: true }, + workflowVersionId: { type: "string", required: true }, + status: { type: "string", required: true }, + triggeredBy: { type: "string", required: true }, + startedAt: { type: "date", required: false, default: null }, + finishedAt: { type: "date", required: false, default: null }, + createdAt: { type: "date", required: true }, +} as const; diff --git a/apps/api/src/modules/assets/assets.controller.ts b/apps/api/src/modules/assets/assets.controller.ts new file mode 100644 index 0000000..646548c --- /dev/null +++ b/apps/api/src/modules/assets/assets.controller.ts @@ -0,0 +1,21 @@ +import type { LocalAuthContext } from "../auth/auth.controller.ts"; +import { + AssetsService, + type CreateAssetInput, +} from "./assets.service.ts"; + +export class AssetsController { + private readonly service: AssetsService; + + constructor(service: AssetsService) { + this.service = service; + } + + createAsset(auth: LocalAuthContext, input: CreateAssetInput) { + return this.service.createAsset(auth, input); + } + + probeAsset(assetId: string) { + return this.service.probeAsset(assetId); + } +} diff --git a/apps/api/src/modules/assets/assets.module.ts b/apps/api/src/modules/assets/assets.module.ts new file mode 100644 index 0000000..e049e65 --- /dev/null +++ b/apps/api/src/modules/assets/assets.module.ts @@ -0,0 +1,16 @@ +import type { StorageService } from "../storage/storage.service.ts"; +import { AssetsController } from "./assets.controller.ts"; +import { AssetsService } from "./assets.service.ts"; +import { ProbeService } from "./probe/probe.service.ts"; + +export function createAssetsModule(storageService: StorageService) { + const probeService = new ProbeService(); + const service = new AssetsService(storageService, probeService); + const controller = new AssetsController(service); + + return { + service, + controller, + probeService, + }; +} diff --git a/apps/api/src/modules/assets/assets.service.ts b/apps/api/src/modules/assets/assets.service.ts new file mode 100644 index 0000000..6263a5a --- /dev/null +++ b/apps/api/src/modules/assets/assets.service.ts @@ -0,0 +1,65 @@ +import type { LocalAuthContext } from "../auth/auth.controller.ts"; +import type { AssetType } from "../../../../../packages/contracts/src/domain.ts"; +import type { StorageService } from "../storage/storage.service.ts"; +import { + ProbeService, + type ProbeReport, +} from "./probe/probe.service.ts"; + +export type CreateAssetInput = { + workspaceId: string; + projectId: string; + type: AssetType; + sourceType: string; + displayName: string; +}; + +export type AssetRecord = CreateAssetInput & { + id: string; + status: "pending"; + storageRef: ReturnType; + createdBy: string; +}; + +export class AssetsService { + private readonly storageService: StorageService; + private readonly probeService: ProbeService; + private readonly assets: AssetRecord[] = []; + private readonly reports: ProbeReport[] = []; + + constructor(storageService: StorageService, probeService: ProbeService) { + this.storageService = storageService; + this.probeService = probeService; + } + + createAsset(auth: LocalAuthContext, input: CreateAssetInput): AssetRecord { + const asset: AssetRecord = { + id: `asset-${this.assets.length + 1}`, + workspaceId: input.workspaceId, + projectId: input.projectId, + type: input.type, + sourceType: input.sourceType, + displayName: input.displayName, + status: "pending", + storageRef: this.storageService.createUploadStorageRef(input.displayName), + createdBy: auth.userId, + }; + this.assets.push(asset); + return asset; + } + + getRequiredAsset(assetId: string): AssetRecord { + const asset = this.assets.find((item) => item.id === assetId); + if (!asset) { + throw new Error(`asset not found: ${assetId}`); + } + return asset; + } + + probeAsset(assetId: string): ProbeReport { + const asset = this.getRequiredAsset(assetId); + const report = this.probeService.probe(asset); + this.reports.push(report); + return report; + } +} diff --git a/apps/api/src/modules/assets/probe/probe.service.ts b/apps/api/src/modules/assets/probe/probe.service.ts new file mode 100644 index 0000000..cd7aeca --- /dev/null +++ b/apps/api/src/modules/assets/probe/probe.service.ts @@ -0,0 +1,48 @@ +import type { AssetRecord } from "../assets.service.ts"; + +export type ProbeReport = { + assetId: string; + detectedFormatCandidates: string[]; + structureSummary: Record; + warnings: string[]; + recommendedNextNodes: string[]; + createdAt: string; +}; + +export class ProbeService { + probe(asset: AssetRecord): ProbeReport { + if (asset.type === "archive") { + return { + assetId: asset.id, + detectedFormatCandidates: ["compressed_archive", "delivery_package"], + structureSummary: { assetType: asset.type, displayName: asset.displayName }, + warnings: [], + recommendedNextNodes: ["extract_archive", "validate_structure"], + createdAt: new Date().toISOString(), + }; + } + + if ( + asset.type === "folder" && + asset.displayName.startsWith("BJ_") + ) { + return { + assetId: asset.id, + detectedFormatCandidates: ["delivery_package"], + structureSummary: { assetType: asset.type, displayName: asset.displayName }, + warnings: [], + recommendedNextNodes: ["validate_structure", "validate_metadata"], + createdAt: new Date().toISOString(), + }; + } + + return { + assetId: asset.id, + detectedFormatCandidates: [asset.type], + structureSummary: { assetType: asset.type, displayName: asset.displayName }, + warnings: [], + recommendedNextNodes: ["inspect_asset"], + createdAt: new Date().toISOString(), + }; + } +} diff --git a/apps/api/src/modules/auth/auth.controller.ts b/apps/api/src/modules/auth/auth.controller.ts new file mode 100644 index 0000000..cba05c3 --- /dev/null +++ b/apps/api/src/modules/auth/auth.controller.ts @@ -0,0 +1,11 @@ +export type LocalAuthContext = { + userId: string; +}; + +export function createLocalAuthContext(userId: string): LocalAuthContext { + if (!userId) { + throw new Error("userId is required"); + } + + return { userId }; +} diff --git a/apps/api/src/modules/auth/auth.module.ts b/apps/api/src/modules/auth/auth.module.ts new file mode 100644 index 0000000..5b19bad --- /dev/null +++ b/apps/api/src/modules/auth/auth.module.ts @@ -0,0 +1,9 @@ +import { createLocalAuthContext } from "./auth.controller.ts"; + +export function createAuthModule() { + return { + controller: { + createLocalAuthContext, + }, + }; +} diff --git a/apps/api/src/modules/projects/projects.controller.ts b/apps/api/src/modules/projects/projects.controller.ts new file mode 100644 index 0000000..770e52a --- /dev/null +++ b/apps/api/src/modules/projects/projects.controller.ts @@ -0,0 +1,21 @@ +import type { LocalAuthContext } from "../auth/auth.controller.ts"; +import { + ProjectsService, + type CreateProjectInput, +} from "./projects.service.ts"; + +export class ProjectsController { + private readonly service: ProjectsService; + + constructor(service: ProjectsService) { + this.service = service; + } + + createProject(auth: LocalAuthContext, input: CreateProjectInput) { + return this.service.createProject(auth, input); + } + + listProjects(workspaceId: string) { + return this.service.listProjects(workspaceId); + } +} diff --git a/apps/api/src/modules/projects/projects.module.ts b/apps/api/src/modules/projects/projects.module.ts new file mode 100644 index 0000000..daeb90f --- /dev/null +++ b/apps/api/src/modules/projects/projects.module.ts @@ -0,0 +1,13 @@ +import type { WorkspacesService } from "../workspaces/workspaces.service.ts"; +import { ProjectsController } from "./projects.controller.ts"; +import { ProjectsService } from "./projects.service.ts"; + +export function createProjectsModule(workspacesService: WorkspacesService) { + const service = new ProjectsService(workspacesService); + const controller = new ProjectsController(service); + + return { + service, + controller, + }; +} diff --git a/apps/api/src/modules/projects/projects.service.ts b/apps/api/src/modules/projects/projects.service.ts new file mode 100644 index 0000000..4ec6359 --- /dev/null +++ b/apps/api/src/modules/projects/projects.service.ts @@ -0,0 +1,57 @@ +import type { LocalAuthContext } from "../auth/auth.controller.ts"; +import type { WorkspacesService } from "../workspaces/workspaces.service.ts"; + +export type CreateProjectInput = { + workspaceId: string; + name: string; + slug: string; + description: string; +}; + +export type ProjectRecord = CreateProjectInput & { + id: string; + status: "active"; + createdBy: string; +}; + +export class ProjectsService { + private readonly projects: ProjectRecord[] = []; + private readonly workspacesService: WorkspacesService; + + constructor(workspacesService: WorkspacesService) { + this.workspacesService = workspacesService; + } + + createProject( + auth: LocalAuthContext, + input: CreateProjectInput, + ): ProjectRecord { + if (!input.workspaceId) { + throw new Error("workspaceId is required"); + } + if (!input.name) { + throw new Error("name is required"); + } + if (!input.slug) { + throw new Error("slug is required"); + } + + this.workspacesService.getRequiredWorkspace(input.workspaceId); + + const project: ProjectRecord = { + id: `project-${input.slug}`, + workspaceId: input.workspaceId, + name: input.name, + slug: input.slug, + description: input.description, + status: "active", + createdBy: auth.userId, + }; + this.projects.push(project); + return project; + } + + listProjects(workspaceId: string): ProjectRecord[] { + return this.projects.filter((project) => project.workspaceId === workspaceId); + } +} diff --git a/apps/api/src/modules/runs/runs.controller.ts b/apps/api/src/modules/runs/runs.controller.ts new file mode 100644 index 0000000..7aaa2d0 --- /dev/null +++ b/apps/api/src/modules/runs/runs.controller.ts @@ -0,0 +1,21 @@ +import type { LocalAuthContext } from "../auth/auth.controller.ts"; +import { RunsService } from "./runs.service.ts"; + +export class RunsController { + private readonly service: RunsService; + + constructor(service: RunsService) { + this.service = service; + } + + createWorkflowRun( + auth: LocalAuthContext, + input: { workflowDefinitionId: string; workflowVersionId: string }, + ) { + return this.service.createWorkflowRun(auth, input); + } + + listRunTasks(workflowRunId: string) { + return this.service.listRunTasks(workflowRunId); + } +} diff --git a/apps/api/src/modules/runs/runs.module.ts b/apps/api/src/modules/runs/runs.module.ts new file mode 100644 index 0000000..5876624 --- /dev/null +++ b/apps/api/src/modules/runs/runs.module.ts @@ -0,0 +1,13 @@ +import type { WorkflowsService } from "../workflows/workflows.service.ts"; +import { RunsController } from "./runs.controller.ts"; +import { RunsService } from "./runs.service.ts"; + +export function createRunsModule(workflowsService: WorkflowsService) { + const service = new RunsService(workflowsService); + const controller = new RunsController(service); + + return { + service, + controller, + }; +} diff --git a/apps/api/src/modules/runs/runs.service.ts b/apps/api/src/modules/runs/runs.service.ts new file mode 100644 index 0000000..386e00c --- /dev/null +++ b/apps/api/src/modules/runs/runs.service.ts @@ -0,0 +1,68 @@ +import type { LocalAuthContext } from "../auth/auth.controller.ts"; +import type { WorkflowsService } from "../workflows/workflows.service.ts"; + +export type WorkflowRunRecord = { + id: string; + workflowDefinitionId: string; + workflowVersionId: string; + status: "queued"; + triggeredBy: string; +}; + +export type RunTaskRecord = { + id: string; + workflowRunId: string; + workflowVersionId: string; + nodeId: string; + nodeType: string; + status: "queued" | "pending"; + attempt: number; +}; + +export class RunsService { + private readonly workflowsService: WorkflowsService; + private readonly runs: WorkflowRunRecord[] = []; + private readonly tasks: RunTaskRecord[] = []; + + constructor(workflowsService: WorkflowsService) { + this.workflowsService = workflowsService; + } + + createWorkflowRun( + auth: LocalAuthContext, + input: { workflowDefinitionId: string; workflowVersionId: string }, + ): WorkflowRunRecord { + this.workflowsService.getRequiredWorkflowDefinition(input.workflowDefinitionId); + const version = this.workflowsService.getRequiredWorkflowVersion( + input.workflowVersionId, + ); + + const run: WorkflowRunRecord = { + id: `run-${this.runs.length + 1}`, + workflowDefinitionId: input.workflowDefinitionId, + workflowVersionId: input.workflowVersionId, + status: "queued", + triggeredBy: auth.userId, + }; + this.runs.push(run); + + const targetNodes = new Set(version.logicGraph.edges.map((edge) => edge.to)); + version.logicGraph.nodes.forEach((node, index) => { + this.tasks.push({ + id: `task-${run.id}-${node.id}`, + workflowRunId: run.id, + workflowVersionId: version.id, + nodeId: node.id, + nodeType: node.type, + status: targetNodes.has(node.id) ? "pending" : "queued", + attempt: 1, + }); + }); + + return run; + } + + listRunTasks(workflowRunId: string): RunTaskRecord[] { + return this.tasks.filter((task) => task.workflowRunId === workflowRunId); + } +} diff --git a/apps/api/src/modules/storage/storage.module.ts b/apps/api/src/modules/storage/storage.module.ts new file mode 100644 index 0000000..93c36f8 --- /dev/null +++ b/apps/api/src/modules/storage/storage.module.ts @@ -0,0 +1,9 @@ +import { StorageService } from "./storage.service.ts"; + +export function createStorageModule() { + const service = new StorageService(); + + return { + service, + }; +} diff --git a/apps/api/src/modules/storage/storage.service.ts b/apps/api/src/modules/storage/storage.service.ts new file mode 100644 index 0000000..39a9d43 --- /dev/null +++ b/apps/api/src/modules/storage/storage.service.ts @@ -0,0 +1,15 @@ +export type StorageRef = { + provider: "minio"; + bucket: string; + key: string; +}; + +export class StorageService { + createUploadStorageRef(displayName: string): StorageRef { + return { + provider: "minio", + bucket: "raw-assets", + key: `uploads/${displayName}`, + }; + } +} diff --git a/apps/api/src/modules/workflows/workflows.controller.ts b/apps/api/src/modules/workflows/workflows.controller.ts new file mode 100644 index 0000000..578dc6f --- /dev/null +++ b/apps/api/src/modules/workflows/workflows.controller.ts @@ -0,0 +1,29 @@ +import type { LocalAuthContext } from "../auth/auth.controller.ts"; +import { + WorkflowsService, + type WorkflowVersionInput, +} from "./workflows.service.ts"; + +export class WorkflowsController { + private readonly service: WorkflowsService; + + constructor(service: WorkflowsService) { + this.service = service; + } + + createWorkflowDefinition( + auth: LocalAuthContext, + input: { + workspaceId: string; + projectId: string; + name: string; + slug: string; + }, + ) { + return this.service.createWorkflowDefinition(auth, input); + } + + saveWorkflowVersion(auth: LocalAuthContext, input: WorkflowVersionInput) { + return this.service.saveWorkflowVersion(auth, input); + } +} diff --git a/apps/api/src/modules/workflows/workflows.module.ts b/apps/api/src/modules/workflows/workflows.module.ts new file mode 100644 index 0000000..b158306 --- /dev/null +++ b/apps/api/src/modules/workflows/workflows.module.ts @@ -0,0 +1,12 @@ +import { WorkflowsController } from "./workflows.controller.ts"; +import { WorkflowsService } from "./workflows.service.ts"; + +export function createWorkflowsModule() { + const service = new WorkflowsService(); + const controller = new WorkflowsController(service); + + return { + service, + controller, + }; +} diff --git a/apps/api/src/modules/workflows/workflows.service.ts b/apps/api/src/modules/workflows/workflows.service.ts new file mode 100644 index 0000000..54bbb19 --- /dev/null +++ b/apps/api/src/modules/workflows/workflows.service.ts @@ -0,0 +1,105 @@ +import type { LocalAuthContext } from "../auth/auth.controller.ts"; + +export type WorkflowNode = { + id: string; + type: string; +}; + +export type WorkflowEdge = { + from: string; + to: string; +}; + +export type WorkflowVersionInput = { + workflowDefinitionId: string; + visualGraph: Record; + logicGraph: { + nodes: WorkflowNode[]; + edges: WorkflowEdge[]; + }; + runtimeGraph: Record; + pluginRefs: string[]; +}; + +export type WorkflowDefinitionRecord = { + id: string; + workspaceId: string; + projectId: string; + name: string; + slug: string; + status: "draft"; + latestVersionNumber: number; + publishedVersionNumber: number | null; + createdBy: string; +}; + +export type WorkflowVersionRecord = WorkflowVersionInput & { + id: string; + versionNumber: number; + createdBy: string; +}; + +export class WorkflowsService { + private readonly definitions: WorkflowDefinitionRecord[] = []; + private readonly versions: WorkflowVersionRecord[] = []; + + createWorkflowDefinition( + auth: LocalAuthContext, + input: Omit< + WorkflowDefinitionRecord, + "id" | "status" | "latestVersionNumber" | "publishedVersionNumber" | "createdBy" + >, + ): WorkflowDefinitionRecord { + const definition: WorkflowDefinitionRecord = { + id: `workflow-${input.slug}`, + workspaceId: input.workspaceId, + projectId: input.projectId, + name: input.name, + slug: input.slug, + status: "draft", + latestVersionNumber: 0, + publishedVersionNumber: null, + createdBy: auth.userId, + }; + this.definitions.push(definition); + return definition; + } + + getRequiredWorkflowDefinition( + workflowDefinitionId: string, + ): WorkflowDefinitionRecord { + const definition = this.definitions.find( + (item) => item.id === workflowDefinitionId, + ); + if (!definition) { + throw new Error(`workflow definition not found: ${workflowDefinitionId}`); + } + return definition; + } + + saveWorkflowVersion( + auth: LocalAuthContext, + input: WorkflowVersionInput, + ): WorkflowVersionRecord { + const definition = this.getRequiredWorkflowDefinition(input.workflowDefinitionId); + const versionNumber = definition.latestVersionNumber + 1; + definition.latestVersionNumber = versionNumber; + + const version: WorkflowVersionRecord = { + ...input, + id: `${definition.id}-v${versionNumber}`, + versionNumber, + createdBy: auth.userId, + }; + this.versions.push(version); + return version; + } + + getRequiredWorkflowVersion(workflowVersionId: string): WorkflowVersionRecord { + const version = this.versions.find((item) => item.id === workflowVersionId); + if (!version) { + throw new Error(`workflow version not found: ${workflowVersionId}`); + } + return version; + } +} diff --git a/apps/api/src/modules/workspaces/workspaces.controller.ts b/apps/api/src/modules/workspaces/workspaces.controller.ts new file mode 100644 index 0000000..5f9ccc7 --- /dev/null +++ b/apps/api/src/modules/workspaces/workspaces.controller.ts @@ -0,0 +1,18 @@ +import type { LocalAuthContext } from "../auth/auth.controller.ts"; +import { WorkspacesService } from "./workspaces.service.ts"; + +export class WorkspacesController { + private readonly service: WorkspacesService; + + constructor(service: WorkspacesService) { + this.service = service; + } + + bootstrapPersonalWorkspace(auth: LocalAuthContext) { + return this.service.bootstrapPersonalWorkspace(auth); + } + + listMyWorkspaces(auth: LocalAuthContext) { + return this.service.listByOwner(auth.userId); + } +} diff --git a/apps/api/src/modules/workspaces/workspaces.module.ts b/apps/api/src/modules/workspaces/workspaces.module.ts new file mode 100644 index 0000000..d260557 --- /dev/null +++ b/apps/api/src/modules/workspaces/workspaces.module.ts @@ -0,0 +1,12 @@ +import { WorkspacesController } from "./workspaces.controller.ts"; +import { WorkspacesService } from "./workspaces.service.ts"; + +export function createWorkspaceModule() { + const service = new WorkspacesService(); + const controller = new WorkspacesController(service); + + return { + service, + controller, + }; +} diff --git a/apps/api/src/modules/workspaces/workspaces.service.ts b/apps/api/src/modules/workspaces/workspaces.service.ts new file mode 100644 index 0000000..e1af374 --- /dev/null +++ b/apps/api/src/modules/workspaces/workspaces.service.ts @@ -0,0 +1,53 @@ +import type { LocalAuthContext } from "../auth/auth.controller.ts"; +import type { WorkspaceType } from "../../../../../../packages/contracts/src/domain.ts"; + +export type WorkspaceRecord = { + id: string; + type: WorkspaceType; + name: string; + slug: string; + ownerId: string; + status: "active"; +}; + +function personalWorkspaceSlug(userId: string): string { + return `${userId}-personal`; +} + +export class WorkspacesService { + private readonly workspaces: WorkspaceRecord[] = []; + + bootstrapPersonalWorkspace(auth: LocalAuthContext): WorkspaceRecord { + const existing = this.workspaces.find( + (workspace) => + workspace.ownerId === auth.userId && workspace.type === "personal", + ); + + if (existing) { + return existing; + } + + const workspace: WorkspaceRecord = { + id: `workspace-${personalWorkspaceSlug(auth.userId)}`, + type: "personal", + name: `${auth.userId} Personal`, + slug: personalWorkspaceSlug(auth.userId), + ownerId: auth.userId, + status: "active", + }; + this.workspaces.push(workspace); + return workspace; + } + + listByOwner(ownerId: string): WorkspaceRecord[] { + return this.workspaces.filter((workspace) => workspace.ownerId === ownerId); + } + + getRequiredWorkspace(workspaceId: string): WorkspaceRecord { + const workspace = this.workspaces.find((item) => item.id === workspaceId); + if (!workspace) { + throw new Error(`workspace not found: ${workspaceId}`); + } + return workspace; + } +} diff --git a/apps/api/test/assets.e2e-spec.ts b/apps/api/test/assets.e2e-spec.ts new file mode 100644 index 0000000..39b25ca --- /dev/null +++ b/apps/api/test/assets.e2e-spec.ts @@ -0,0 +1,96 @@ +import test from "node:test"; +import assert from "node:assert/strict"; + +import { createLocalAuthContext } from "../src/modules/auth/auth.controller.ts"; +import { createWorkspaceModule } from "../src/modules/workspaces/workspaces.module.ts"; +import { createProjectsModule } from "../src/modules/projects/projects.module.ts"; +import { createStorageModule } from "../src/modules/storage/storage.module.ts"; +import { createAssetsModule } from "../src/modules/assets/assets.module.ts"; + +test("register an uploaded asset record", () => { + const auth = createLocalAuthContext("user-4"); + const workspaces = createWorkspaceModule(); + const projects = createProjectsModule(workspaces.service); + const storage = createStorageModule(); + const assets = createAssetsModule(storage.service); + + const workspace = workspaces.controller.bootstrapPersonalWorkspace(auth); + const project = projects.controller.createProject(auth, { + workspaceId: workspace.id, + name: "Assets", + slug: "assets", + description: "Asset intake project", + }); + + const asset = assets.controller.createAsset(auth, { + workspaceId: workspace.id, + projectId: project.id, + type: "archive", + sourceType: "upload", + displayName: "demo.zip", + }); + + assert.equal(asset.type, "archive"); + assert.equal(asset.projectId, project.id); + assert.equal(asset.status, "pending"); +}); + +test("create a probe report for a raw asset", () => { + const auth = createLocalAuthContext("user-5"); + const workspaces = createWorkspaceModule(); + const projects = createProjectsModule(workspaces.service); + const storage = createStorageModule(); + const assets = createAssetsModule(storage.service); + + const workspace = workspaces.controller.bootstrapPersonalWorkspace(auth); + const project = projects.controller.createProject(auth, { + workspaceId: workspace.id, + name: "Probe", + slug: "probe", + description: "Probe test project", + }); + + const asset = assets.controller.createAsset(auth, { + workspaceId: workspace.id, + projectId: project.id, + type: "folder", + sourceType: "upload", + displayName: "BJ_001_0001_OsmoNano_2026-03-19_14-51-43", + }); + + const report = assets.controller.probeAsset(asset.id); + + assert.equal(report.assetId, asset.id); + assert.equal(report.detectedFormatCandidates.includes("delivery_package"), true); +}); + +test("return recommended next actions from probe metadata", () => { + const auth = createLocalAuthContext("user-6"); + const workspaces = createWorkspaceModule(); + const projects = createProjectsModule(workspaces.service); + const storage = createStorageModule(); + const assets = createAssetsModule(storage.service); + + const workspace = workspaces.controller.bootstrapPersonalWorkspace(auth); + const project = projects.controller.createProject(auth, { + workspaceId: workspace.id, + name: "Recommendations", + slug: "recommendations", + description: "Recommendation test project", + }); + + const asset = assets.controller.createAsset(auth, { + workspaceId: workspace.id, + projectId: project.id, + type: "archive", + sourceType: "upload", + displayName: "20260324-data-demo.zip", + }); + + const report = assets.controller.probeAsset(asset.id); + + assert.deepEqual(report.recommendedNextNodes, [ + "extract_archive", + "validate_structure", + ]); +}); diff --git a/apps/api/test/projects.e2e-spec.ts b/apps/api/test/projects.e2e-spec.ts new file mode 100644 index 0000000..10d1386 --- /dev/null +++ b/apps/api/test/projects.e2e-spec.ts @@ -0,0 +1,52 @@ +import test from "node:test"; +import assert from "node:assert/strict"; + +import { createLocalAuthContext } from "../src/modules/auth/auth.controller.ts"; +import { createWorkspaceModule } from "../src/modules/workspaces/workspaces.module.ts"; +import { createProjectsModule } from "../src/modules/projects/projects.module.ts"; + +test("bootstrap personal workspace flow creates a personal workspace for the user", () => { + const auth = createLocalAuthContext("user-1"); + const workspaces = createWorkspaceModule(); + + const workspace = workspaces.controller.bootstrapPersonalWorkspace(auth); + + assert.equal(workspace.type, "personal"); + assert.equal(workspace.ownerId, "user-1"); + assert.equal(workspace.slug, "user-1-personal"); +}); + +test("create project under a workspace", () => { + const auth = createLocalAuthContext("user-2"); + const workspaces = createWorkspaceModule(); + const projects = createProjectsModule(workspaces.service); + + const workspace = workspaces.controller.bootstrapPersonalWorkspace(auth); + const project = projects.controller.createProject(auth, { + workspaceId: workspace.id, + name: "Dataset Ops", + slug: "dataset-ops", + description: "Project for embodied data workflows", + }); + + assert.equal(project.workspaceId, workspace.id); + assert.equal(project.name, "Dataset Ops"); + assert.equal(project.createdBy, "user-2"); +}); + +test("reject project creation without a workspace id", () => { + const auth = createLocalAuthContext("user-3"); + const workspaces = createWorkspaceModule(); + const projects = createProjectsModule(workspaces.service); + + assert.throws( + () => + projects.controller.createProject(auth, { + workspaceId: "", + name: "Invalid", + slug: "invalid", + description: "Missing workspace id", + }), + /workspaceId is required/, + ); +}); diff --git a/apps/api/test/workflow-runs.e2e-spec.ts b/apps/api/test/workflow-runs.e2e-spec.ts new file mode 100644 index 0000000..0c527ff --- /dev/null +++ b/apps/api/test/workflow-runs.e2e-spec.ts @@ -0,0 +1,123 @@ +import test from "node:test"; +import assert from "node:assert/strict"; + +import { createLocalAuthContext } from "../src/modules/auth/auth.controller.ts"; +import { createWorkspaceModule } from "../src/modules/workspaces/workspaces.module.ts"; +import { createProjectsModule } from "../src/modules/projects/projects.module.ts"; +import { createWorkflowsModule } from "../src/modules/workflows/workflows.module.ts"; +import { createRunsModule } from "../src/modules/runs/runs.module.ts"; + +test("create workflow definition", () => { + const auth = createLocalAuthContext("user-7"); + const workspaces = createWorkspaceModule(); + const projects = createProjectsModule(workspaces.service); + const workflows = createWorkflowsModule(); + + const workspace = workspaces.controller.bootstrapPersonalWorkspace(auth); + const project = projects.controller.createProject(auth, { + workspaceId: workspace.id, + name: "Workflow", + slug: "workflow", + description: "Workflow project", + }); + + const definition = workflows.controller.createWorkflowDefinition(auth, { + workspaceId: workspace.id, + projectId: project.id, + name: "Delivery Flow", + slug: "delivery-flow", + }); + + assert.equal(definition.name, "Delivery Flow"); + assert.equal(definition.status, "draft"); +}); + +test("save workflow version", () => { + const auth = createLocalAuthContext("user-8"); + const workspaces = createWorkspaceModule(); + const projects = createProjectsModule(workspaces.service); + const workflows = createWorkflowsModule(); + + const workspace = workspaces.controller.bootstrapPersonalWorkspace(auth); + const project = projects.controller.createProject(auth, { + workspaceId: workspace.id, + name: "Versions", + slug: "versions", + description: "Workflow version project", + }); + const definition = workflows.controller.createWorkflowDefinition(auth, { + workspaceId: workspace.id, + projectId: project.id, + name: "Versioned Flow", + slug: "versioned-flow", + }); + + const version = workflows.controller.saveWorkflowVersion(auth, { + workflowDefinitionId: definition.id, + visualGraph: { nodes: [], edges: [] }, + logicGraph: { + nodes: [ + { id: "source", type: "Source" }, + { id: "validate", type: "Inspect" }, + ], + edges: [{ from: "source", to: "validate" }], + }, + runtimeGraph: { nodes: [] }, + pluginRefs: [], + }); + + assert.equal(version.versionNumber, 1); + assert.equal(version.workflowDefinitionId, definition.id); +}); + +test("create workflow run from saved version and derive initial run tasks", () => { + const auth = createLocalAuthContext("user-9"); + const workspaces = createWorkspaceModule(); + const projects = createProjectsModule(workspaces.service); + const workflows = createWorkflowsModule(); + const runs = createRunsModule(workflows.service); + + const workspace = workspaces.controller.bootstrapPersonalWorkspace(auth); + const project = projects.controller.createProject(auth, { + workspaceId: workspace.id, + name: "Runs", + slug: "runs", + description: "Run project", + }); + const definition = workflows.controller.createWorkflowDefinition(auth, { + workspaceId: workspace.id, + projectId: project.id, + name: "Run Flow", + slug: "run-flow", + }); + const version = workflows.controller.saveWorkflowVersion(auth, { + workflowDefinitionId: definition.id, + visualGraph: { nodes: [], edges: [] }, + logicGraph: { + nodes: [ + { id: "source", type: "Source" }, + { id: "rename", type: "Transform" }, + { id: "export", type: "Export" }, + ], + edges: [ + { from: "source", to: "rename" }, + { from: "rename", to: "export" }, + ], + }, + runtimeGraph: { nodes: [] }, + pluginRefs: [], + }); + + const run = runs.controller.createWorkflowRun(auth, { + workflowDefinitionId: definition.id, + workflowVersionId: version.id, + }); + + const tasks = runs.controller.listRunTasks(run.id); + + assert.equal(run.status, "queued"); + assert.equal(tasks.length, 3); + assert.equal(tasks[0].nodeId, "source"); + assert.equal(tasks[0].status, "queued"); + assert.equal(tasks[1].status, "pending"); +}); diff --git a/apps/worker/package.json b/apps/worker/package.json index 4bec2d2..b0eab84 100644 --- a/apps/worker/package.json +++ b/apps/worker/package.json @@ -1,8 +1,10 @@ { - "name": "@emboflow/worker", + "name": "worker", "private": true, "version": "0.1.0", + "type": "module", "scripts": { - "dev": "echo 'worker app scaffold pending'" + "dev": "echo 'worker app scaffold pending'", + "test": "node --test --experimental-strip-types" } } diff --git a/apps/worker/src/contracts/execution-context.ts b/apps/worker/src/contracts/execution-context.ts new file mode 100644 index 0000000..afb5fa9 --- /dev/null +++ b/apps/worker/src/contracts/execution-context.ts @@ -0,0 +1,13 @@ +export type ExecutorType = "python" | "docker" | "http"; + +export type TaskRecord = { + id: string; + nodeId: string; + executorType: ExecutorType; + status: "pending" | "running" | "success" | "failed"; +}; + +export type ExecutionContext = { + taskId: string; + nodeId: string; +}; diff --git a/apps/worker/src/executors/docker-executor.ts b/apps/worker/src/executors/docker-executor.ts new file mode 100644 index 0000000..f8b4c74 --- /dev/null +++ b/apps/worker/src/executors/docker-executor.ts @@ -0,0 +1,10 @@ +import type { ExecutionContext, TaskRecord } from "../contracts/execution-context.ts"; + +export class DockerExecutor { + executionCount = 0; + + async execute(task: TaskRecord, _context: ExecutionContext) { + this.executionCount += 1; + return { taskId: task.id, executor: "docker" as const }; + } +} diff --git a/apps/worker/src/executors/http-executor.ts b/apps/worker/src/executors/http-executor.ts new file mode 100644 index 0000000..608520a --- /dev/null +++ b/apps/worker/src/executors/http-executor.ts @@ -0,0 +1,10 @@ +import type { ExecutionContext, TaskRecord } from "../contracts/execution-context.ts"; + +export class HttpExecutor { + executionCount = 0; + + async execute(task: TaskRecord, _context: ExecutionContext) { + this.executionCount += 1; + return { taskId: task.id, executor: "http" as const }; + } +} diff --git a/apps/worker/src/executors/python-executor.ts b/apps/worker/src/executors/python-executor.ts new file mode 100644 index 0000000..d19cf89 --- /dev/null +++ b/apps/worker/src/executors/python-executor.ts @@ -0,0 +1,10 @@ +import type { ExecutionContext, TaskRecord } from "../contracts/execution-context.ts"; + +export class PythonExecutor { + executionCount = 0; + + async execute(task: TaskRecord, _context: ExecutionContext) { + this.executionCount += 1; + return { taskId: task.id, executor: "python" as const }; + } +} diff --git a/apps/worker/src/main.ts b/apps/worker/src/main.ts new file mode 100644 index 0000000..e67a376 --- /dev/null +++ b/apps/worker/src/main.ts @@ -0,0 +1,5 @@ +export function bootstrapWorker() { + return { + status: "ready" as const, + }; +} diff --git a/apps/worker/src/runner/task-runner.ts b/apps/worker/src/runner/task-runner.ts new file mode 100644 index 0000000..d4c840d --- /dev/null +++ b/apps/worker/src/runner/task-runner.ts @@ -0,0 +1,45 @@ +import type { + ExecutionContext, + ExecutorType, + TaskRecord, +} from "../contracts/execution-context.ts"; +import { LocalScheduler } from "../scheduler/local-scheduler.ts"; +import { DockerExecutor } from "../executors/docker-executor.ts"; +import { HttpExecutor } from "../executors/http-executor.ts"; +import { PythonExecutor } from "../executors/python-executor.ts"; + +type ExecutorMap = { + python: PythonExecutor; + docker: DockerExecutor; + http: HttpExecutor; +}; + +export class TaskRunner { + private readonly scheduler: LocalScheduler; + private readonly executors: ExecutorMap; + + constructor(config: { scheduler: LocalScheduler; executors: ExecutorMap }) { + this.scheduler = config.scheduler; + this.executors = config.executors; + } + + async runNextTask(): Promise { + const task = this.scheduler.nextPendingTask(); + if (!task) { + return undefined; + } + + this.scheduler.transition(task.id, "running"); + const context: ExecutionContext = { + taskId: task.id, + nodeId: task.nodeId, + }; + await this.executors[task.executorType as ExecutorType].execute(task, context); + this.scheduler.transition(task.id, "success"); + + return { + ...task, + status: "success", + }; + } +} diff --git a/apps/worker/src/scheduler/local-scheduler.ts b/apps/worker/src/scheduler/local-scheduler.ts new file mode 100644 index 0000000..ae1b2f3 --- /dev/null +++ b/apps/worker/src/scheduler/local-scheduler.ts @@ -0,0 +1,32 @@ +import type { TaskRecord } from "../contracts/execution-context.ts"; + +export class LocalScheduler { + private readonly tasks: TaskRecord[]; + private readonly statusTimeline = new Map(); + + constructor(tasks: TaskRecord[]) { + this.tasks = tasks.map((task) => ({ ...task })); + tasks.forEach((task) => { + this.statusTimeline.set(task.id, [task.status]); + }); + } + + nextPendingTask(): TaskRecord | undefined { + return this.tasks.find((task) => task.status === "pending"); + } + + transition(taskId: string, status: TaskRecord["status"]) { + const task = this.tasks.find((item) => item.id === taskId); + if (!task) { + throw new Error(`task not found: ${taskId}`); + } + task.status = status; + const timeline = this.statusTimeline.get(taskId) ?? []; + timeline.push(status); + this.statusTimeline.set(taskId, timeline); + } + + getStatusTimeline(taskId: string): string[] { + return this.statusTimeline.get(taskId) ?? []; + } +} diff --git a/apps/worker/test/task-runner.spec.ts b/apps/worker/test/task-runner.spec.ts new file mode 100644 index 0000000..2b9e55d --- /dev/null +++ b/apps/worker/test/task-runner.spec.ts @@ -0,0 +1,88 @@ +import test from "node:test"; +import assert from "node:assert/strict"; + +import { TaskRunner } from "../src/runner/task-runner.ts"; +import { LocalScheduler } from "../src/scheduler/local-scheduler.ts"; +import { PythonExecutor } from "../src/executors/python-executor.ts"; +import { DockerExecutor } from "../src/executors/docker-executor.ts"; +import { HttpExecutor } from "../src/executors/http-executor.ts"; + +test("worker loads pending tasks", async () => { + const scheduler = new LocalScheduler([ + { + id: "task-1", + nodeId: "source", + executorType: "python", + status: "pending", + }, + ]); + + const runner = new TaskRunner({ + scheduler, + executors: { + python: new PythonExecutor(), + docker: new DockerExecutor(), + http: new HttpExecutor(), + }, + }); + + const task = await runner.runNextTask(); + + assert.equal(task?.id, "task-1"); + assert.equal(task?.status, "success"); +}); + +test("worker marks task running then success", async () => { + const scheduler = new LocalScheduler([ + { + id: "task-2", + nodeId: "transform", + executorType: "docker", + status: "pending", + }, + ]); + + const runner = new TaskRunner({ + scheduler, + executors: { + python: new PythonExecutor(), + docker: new DockerExecutor(), + http: new HttpExecutor(), + }, + }); + + await runner.runNextTask(); + + const timeline = scheduler.getStatusTimeline("task-2"); + assert.deepEqual(timeline, ["pending", "running", "success"]); +}); + +test("worker chooses executor based on runtime config", async () => { + const scheduler = new LocalScheduler([ + { + id: "task-3", + nodeId: "export", + executorType: "http", + status: "pending", + }, + ]); + + const pythonExecutor = new PythonExecutor(); + const dockerExecutor = new DockerExecutor(); + const httpExecutor = new HttpExecutor(); + + const runner = new TaskRunner({ + scheduler, + executors: { + python: pythonExecutor, + docker: dockerExecutor, + http: httpExecutor, + }, + }); + + await runner.runNextTask(); + + assert.equal(pythonExecutor.executionCount, 0); + assert.equal(dockerExecutor.executionCount, 0); + assert.equal(httpExecutor.executionCount, 1); +}); diff --git a/docs/plans/2026-03-26-emboflow-v1-foundation-and-mvp.md b/docs/plans/2026-03-26-emboflow-v1-foundation-and-mvp.md index c955b7f..d8f768c 100644 --- a/docs/plans/2026-03-26-emboflow-v1-foundation-and-mvp.md +++ b/docs/plans/2026-03-26-emboflow-v1-foundation-and-mvp.md @@ -10,6 +10,14 @@ --- +## Progress Notes + +- `2026-03-26`: Tasks 1 and 2 are complete and committed. +- `2026-03-26`: Tasks 3 through 6 are implemented against in-memory V1 control-plane services so the API and worker contracts can stabilize before persistence and framework wiring are deepened. +- `2026-03-26`: Package-level verification continues to use the Node 22 built-in test runner with direct file targets such as `pnpm --filter api test test/projects.e2e-spec.ts` and `pnpm --filter worker test test/task-runner.spec.ts`. + +--- + ### Task 1: Bootstrap The Monorepo And Runtime Skeleton **Files:** diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml new file mode 100644 index 0000000..25c73ae --- /dev/null +++ b/pnpm-lock.yaml @@ -0,0 +1,17 @@ +lockfileVersion: '9.0' + +settings: + autoInstallPeers: true + excludeLinksFromLockfile: false + +importers: + + .: {} + + apps/api: {} + + apps/web: {} + + apps/worker: {} + + packages/contracts: {}