diff --git a/README.md b/README.md index 98e0791..bac8b3a 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ The local validation path currently used for embodied data testing is: You can register that directory from the Assets page or via `POST /api/assets/register`. The workflow editor currently requires selecting at least one registered asset before a run can be created. -The Runs workspace now shows project-scoped run history, run-level aggregated summaries, and run detail views with persisted task summaries, stdout/stderr sections, result previews, and artifact links into Explore. +The Runs workspace now shows project-scoped run history, run-level aggregated summaries, cancel/retry controls, and run detail views with persisted task summaries, stdout/stderr sections, result previews, and artifact links into Explore. ## Repository Structure diff --git a/apps/api/src/runtime/mongo-store.ts b/apps/api/src/runtime/mongo-store.ts index 795fb4d..6006d7d 100644 --- a/apps/api/src/runtime/mongo-store.ts +++ b/apps/api/src/runtime/mongo-store.ts @@ -99,7 +99,7 @@ type WorkflowRunDocument = Timestamped & { workflowVersionId: string; workspaceId: string; projectId: string; - status: "queued" | "running" | "success" | "failed"; + status: "queued" | "running" | "success" | "failed" | "cancelled"; triggeredBy: string; assetIds: string[]; startedAt?: string; @@ -115,7 +115,7 @@ type RunTaskDocument = Timestamped & { nodeId: string; nodeType: string; executorType: ExecutorType; - status: "queued" | "pending" | "running" | "success" | "failed"; + status: "queued" | "pending" | "running" | "success" | "failed" | "cancelled"; attempt: number; assetIds: string[]; upstreamNodeIds: string[]; @@ -163,6 +163,7 @@ function buildTaskStatusCounts(tasks: RunTaskDocument[]): TaskStatusCounts { running: 0, success: 0, failed: 0, + cancelled: 0, }; for (const task of tasks) { @@ -176,7 +177,7 @@ function buildRunExecutionSummary(tasks: RunTaskDocument[]): RunExecutionSummary const taskCounts = buildTaskStatusCounts(tasks); return { totalTaskCount: tasks.length, - completedTaskCount: taskCounts.success + taskCounts.failed, + completedTaskCount: taskCounts.success + taskCounts.failed + taskCounts.cancelled, artifactCount: tasks.reduce((total, task) => total + task.outputArtifactIds.length, 0), stdoutLineCount: tasks.reduce((total, task) => total + (task.stdoutLines?.length ?? 0), 0), stderrLineCount: tasks.reduce((total, task) => total + (task.stderrLines?.length ?? 0), 0), @@ -185,6 +186,23 @@ function buildRunExecutionSummary(tasks: RunTaskDocument[]): RunExecutionSummary }; } +function collectRetryNodeIds(tasks: RunTaskDocument[], rootNodeId: string) { + const pending = [rootNodeId]; + const collected = new Set([rootNodeId]); + + while (pending.length > 0) { + const currentNodeId = pending.shift()!; + for (const task of tasks) { + if (task.upstreamNodeIds.includes(currentNodeId) && !collected.has(task.nodeId)) { + collected.add(task.nodeId); + pending.push(task.nodeId); + } + } + } + + return collected; +} + export class MongoAppStore { constructor(private readonly db: Db) {} @@ -599,6 +617,187 @@ export class MongoAppStore { .toArray(); } + async cancelRun(runId: string) { + const run = await this.db.collection("workflow_runs").findOne({ _id: runId }); + if (!run) { + throw new Error(`run not found: ${runId}`); + } + if (run.status === "success" || run.status === "failed" || run.status === "cancelled") { + throw new Error("run is already in a terminal state"); + } + + const finishedAt = nowIso(); + await this.db.collection("run_tasks").updateMany( + { + workflowRunId: runId, + status: { $in: ["queued", "pending"] }, + }, + { + $set: { + status: "cancelled", + finishedAt, + updatedAt: finishedAt, + }, + $push: { + logLines: { + $each: ["Task cancelled before execution"], + }, + }, + }, + ); + + await this.db.collection("workflow_runs").updateOne( + { _id: runId }, + { + $set: { + status: "cancelled", + finishedAt, + updatedAt: finishedAt, + }, + }, + ); + + const tasks = await this.listRunTasks(runId); + const startedAt = tasks + .map((task) => task.startedAt) + .filter((value): value is string => Boolean(value)) + .sort()[0]; + const summary = buildRunExecutionSummary(tasks); + + await this.db.collection("workflow_runs").updateOne( + { _id: runId }, + { + $set: { + summary, + startedAt, + durationMs: startedAt ? Math.max(Date.parse(finishedAt) - Date.parse(startedAt), 0) : 0, + updatedAt: nowIso(), + }, + }, + ); + + return this.getRun(runId); + } + + async retryRun(runId: string, triggeredBy = "local-user") { + const run = await this.db.collection("workflow_runs").findOne({ _id: runId }); + if (!run) { + throw new Error(`run not found: ${runId}`); + } + if (run.status === "queued" || run.status === "running") { + throw new Error("run must be terminal before retry"); + } + + return this.createRun({ + workflowDefinitionId: run.workflowDefinitionId, + workflowVersionId: run.workflowVersionId, + triggeredBy, + assetIds: run.assetIds, + }); + } + + async retryRunTask(runId: string, taskId: string) { + const run = await this.db.collection("workflow_runs").findOne({ _id: runId }); + if (!run) { + throw new Error(`run not found: ${runId}`); + } + if (run.status === "running") { + throw new Error("cannot retry a task while the run is active"); + } + + const tasks = await this.listRunTasks(runId); + const targetTask = tasks.find((task) => task._id === taskId); + if (!targetTask) { + throw new Error(`task not found: ${taskId}`); + } + if (targetTask.status !== "failed" && targetTask.status !== "cancelled") { + throw new Error("only failed or cancelled tasks can be retried"); + } + + const retryNodeIds = collectRetryNodeIds(tasks, targetTask.nodeId); + const retryTaskIds = tasks + .filter((task) => retryNodeIds.has(task.nodeId)) + .map((task) => task._id); + const retryStartedAt = nowIso(); + + await this.db.collection("run_tasks").updateMany( + { _id: { $in: retryTaskIds } }, + { + $set: { + updatedAt: retryStartedAt, + outputArtifactIds: [], + logLines: [], + stdoutLines: [], + stderrLines: [], + }, + $unset: { + errorMessage: "", + summary: "", + lastResultPreview: "", + startedAt: "", + finishedAt: "", + durationMs: "", + }, + }, + ); + + await this.db.collection("run_tasks").updateOne( + { _id: targetTask._id }, + { + $set: { + status: "queued", + updatedAt: retryStartedAt, + logLines: ["Task scheduled for retry"], + stdoutLines: [], + stderrLines: [], + }, + $inc: { + attempt: 1, + }, + }, + ); + + await this.db.collection("run_tasks").updateMany( + { + workflowRunId: runId, + _id: { $in: retryTaskIds.filter((retryTaskId) => retryTaskId !== targetTask._id) }, + }, + { + $set: { + status: "pending", + updatedAt: retryStartedAt, + }, + }, + ); + + await this.db.collection("workflow_runs").updateOne( + { _id: runId }, + { + $set: { + status: "queued", + updatedAt: retryStartedAt, + }, + $unset: { + finishedAt: "", + durationMs: "", + }, + }, + ); + + const refreshedTasks = await this.listRunTasks(runId); + await this.db.collection("workflow_runs").updateOne( + { _id: runId }, + { + $set: { + summary: buildRunExecutionSummary(refreshedTasks), + updatedAt: nowIso(), + }, + }, + ); + + return this.getRun(runId); + } + async createArtifact(input: { type: "json" | "directory" | "video"; title: string; diff --git a/apps/api/src/runtime/server.ts b/apps/api/src/runtime/server.ts index 2633534..1b2e123 100644 --- a/apps/api/src/runtime/server.ts +++ b/apps/api/src/runtime/server.ts @@ -274,6 +274,32 @@ export async function createApiRuntime(config = resolveApiRuntimeConfig()) { } }); + app.post("/api/runs/:runId/cancel", async (request, response, next) => { + try { + response.json(await store.cancelRun(request.params.runId)); + } catch (error) { + next(error); + } + }); + + app.post("/api/runs/:runId/retry", async (request, response, next) => { + try { + response.json( + await store.retryRun(request.params.runId, request.body.triggeredBy ?? "local-user"), + ); + } catch (error) { + next(error); + } + }); + + app.post("/api/runs/:runId/tasks/:taskId/retry", async (request, response, next) => { + try { + response.json(await store.retryRunTask(request.params.runId, request.params.taskId)); + } catch (error) { + next(error); + } + }); + app.post("/api/artifacts", async (request, response, next) => { try { response.json( diff --git a/apps/api/test/runtime-http.integration.spec.ts b/apps/api/test/runtime-http.integration.spec.ts index 8467623..67f238e 100644 --- a/apps/api/test/runtime-http.integration.spec.ts +++ b/apps/api/test/runtime-http.integration.spec.ts @@ -651,3 +651,259 @@ test("mongo-backed runtime exposes persisted task execution summaries and logs", executor: "python", }); }); + +test("mongo-backed runtime can cancel a run, retry a run snapshot, and retry a failed task", async (t) => { + const mongod = await MongoMemoryServer.create({ + instance: { + ip: "127.0.0.1", + port: 27122, + }, + }); + t.after(async () => { + await mongod.stop(); + }); + + const server = await startRuntimeServer({ + host: "127.0.0.1", + port: 0, + mongoUri: mongod.getUri(), + database: "emboflow-runtime-run-controls", + corsOrigin: "http://127.0.0.1:3000", + }); + t.after(async () => { + await server.close(); + }); + + const bootstrap = await readJson<{ + workspace: { _id: string }; + project: { _id: string }; + }>( + await fetch(`${server.baseUrl}/api/dev/bootstrap`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ userId: "run-controls-user", projectName: "Run Controls Project" }), + }), + ); + + const sourceDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-runtime-controls-")); + await mkdir(path.join(sourceDir, "DJI_001")); + await writeFile(path.join(sourceDir, "meta.json"), "{}"); + await writeFile(path.join(sourceDir, "intrinsics.json"), "{}"); + await writeFile(path.join(sourceDir, "video_meta.json"), "{}"); + + const asset = await readJson<{ _id: string }>( + await fetch(`${server.baseUrl}/api/assets/register`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + workspaceId: bootstrap.workspace._id, + projectId: bootstrap.project._id, + sourcePath: sourceDir, + }), + }), + ); + await readJson(await fetch(`${server.baseUrl}/api/assets/${asset._id}/probe`, { method: "POST" })); + + const workflow = await readJson<{ _id: string }>( + await fetch(`${server.baseUrl}/api/workflows`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + workspaceId: bootstrap.workspace._id, + projectId: bootstrap.project._id, + name: "Run Control Flow", + }), + }), + ); + + const version = await readJson<{ _id: string }>( + await fetch(`${server.baseUrl}/api/workflows/${workflow._id}/versions`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + visualGraph: { viewport: { x: 0, y: 0, zoom: 1 } }, + logicGraph: { + nodes: [ + { id: "source-asset", type: "source" }, + { id: "validate-structure", type: "inspect" }, + { id: "export-delivery-package", type: "export" }, + ], + edges: [ + { from: "source-asset", to: "validate-structure" }, + { from: "validate-structure", to: "export-delivery-package" }, + ], + }, + runtimeGraph: { selectedPreset: "delivery-normalization" }, + pluginRefs: ["builtin:delivery-nodes"], + }), + }), + ); + + const run = await readJson<{ _id: string }>( + await fetch(`${server.baseUrl}/api/runs`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + workflowDefinitionId: workflow._id, + workflowVersionId: version._id, + assetIds: [asset._id], + }), + }), + ); + + const cancelledRun = await readJson<{ + _id: string; + status: string; + summary?: { taskCounts?: { cancelled?: number } }; + }>( + await fetch(`${server.baseUrl}/api/runs/${run._id}/cancel`, { method: "POST" }), + ); + const cancelledTasks = await readJson>( + await fetch(`${server.baseUrl}/api/runs/${run._id}/tasks`), + ); + + assert.equal(cancelledRun.status, "cancelled"); + assert.equal(cancelledRun.summary?.taskCounts?.cancelled, 3); + assert.deepEqual( + cancelledTasks.map((task) => task.status), + ["cancelled", "cancelled", "cancelled"], + ); + + const retriedRun = await readJson<{ + _id: string; + workflowVersionId: string; + assetIds: string[]; + status: string; + }>( + await fetch(`${server.baseUrl}/api/runs/${run._id}/retry`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ triggeredBy: "run-controls-user" }), + }), + ); + + assert.notEqual(retriedRun._id, run._id); + assert.equal(retriedRun.workflowVersionId, version._id); + assert.deepEqual(retriedRun.assetIds, [asset._id]); + assert.equal(retriedRun.status, "queued"); + + const client = new MongoClient(mongod.getUri()); + await client.connect(); + t.after(async () => { + await client.close(); + }); + const db = client.db("emboflow-runtime-run-controls"); + + const retriedRunTasks = await db + .collection("run_tasks") + .find({ workflowRunId: retriedRun._id }) + .sort({ createdAt: 1 }) + .toArray(); + const sourceTask = retriedRunTasks.find((task) => task.nodeId === "source-asset"); + const failedTask = retriedRunTasks.find((task) => task.nodeId === "validate-structure"); + const downstreamTask = retriedRunTasks.find((task) => task.nodeId === "export-delivery-package"); + await db.collection("run_tasks").updateOne( + { _id: sourceTask?._id }, + { + $set: { + status: "success", + summary: { + outcome: "success", + executorType: "python", + assetCount: 1, + artifactIds: [], + stdoutLineCount: 1, + stderrLineCount: 0, + }, + stdoutLines: ["source completed"], + logLines: ["Task claimed by worker", "stdout: source completed", "Executor completed successfully"], + }, + }, + ); + await db.collection("run_tasks").updateOne( + { _id: failedTask?._id }, + { + $set: { + status: "failed", + errorMessage: "validation failed", + summary: { + outcome: "failed", + executorType: "python", + assetCount: 1, + artifactIds: [], + stdoutLineCount: 0, + stderrLineCount: 1, + errorMessage: "validation failed", + }, + stderrLines: ["validation failed"], + logLines: ["Task claimed by worker", "stderr: validation failed", "Execution failed: validation failed"], + }, + }, + ); + await db.collection("run_tasks").updateOne( + { _id: downstreamTask?._id }, + { + $set: { + status: "cancelled", + logLines: ["Task cancelled after upstream failure"], + }, + }, + ); + await db.collection("workflow_runs").updateOne( + { _id: retriedRun._id }, + { + $set: { + status: "failed", + summary: { + totalTaskCount: 3, + completedTaskCount: 2, + artifactCount: 0, + stdoutLineCount: 0, + stderrLineCount: 1, + failedTaskIds: [failedTask?._id], + taskCounts: { + pending: 0, + queued: 0, + running: 0, + success: 1, + failed: 1, + cancelled: 1, + }, + }, + }, + }, + ); + + const retriedTaskRun = await readJson<{ + _id: string; + status: string; + summary?: { taskCounts?: { queued?: number; pending?: number } }; + }>( + await fetch(`${server.baseUrl}/api/runs/${retriedRun._id}/tasks/${failedTask?._id}/retry`, { + method: "POST", + }), + ); + const refreshedTasks = await readJson< + Array<{ + _id: string; + status: string; + attempt: number; + errorMessage?: string; + logLines?: string[]; + stderrLines?: string[]; + }> + >(await fetch(`${server.baseUrl}/api/runs/${retriedRun._id}/tasks`)); + + const refreshedFailedTask = refreshedTasks.find((task) => task._id === failedTask?._id); + const refreshedDownstreamTask = refreshedTasks.find((task) => task._id === downstreamTask?._id); + + assert.equal(retriedTaskRun.status, "queued"); + assert.equal(retriedTaskRun.summary?.taskCounts?.queued, 1); + assert.equal(retriedTaskRun.summary?.taskCounts?.pending, 1); + assert.equal(refreshedFailedTask?.status, "queued"); + assert.equal(refreshedFailedTask?.attempt, 2); + assert.equal(refreshedFailedTask?.errorMessage, undefined); + assert.deepEqual(refreshedFailedTask?.stderrLines, []); + assert.match(refreshedFailedTask?.logLines?.[0] ?? "", /retry/i); + assert.equal(refreshedDownstreamTask?.status, "pending"); +}); diff --git a/apps/web/src/features/runs/components/run-graph-view.tsx b/apps/web/src/features/runs/components/run-graph-view.tsx index 532dd47..c62bdb5 100644 --- a/apps/web/src/features/runs/components/run-graph-view.tsx +++ b/apps/web/src/features/runs/components/run-graph-view.tsx @@ -10,6 +10,7 @@ export type RunTaskView = { errorMessage?: string; stdoutLines?: string[]; stderrLines?: string[]; + canRetry?: boolean; logLines: string[]; }; diff --git a/apps/web/src/features/runs/components/task-log-panel.tsx b/apps/web/src/features/runs/components/task-log-panel.tsx index 70a0587..d5ca857 100644 --- a/apps/web/src/features/runs/components/task-log-panel.tsx +++ b/apps/web/src/features/runs/components/task-log-panel.tsx @@ -18,6 +18,7 @@ export function renderTaskLogPanel(

Duration: ${typeof task.durationMs === "number" ? `${task.durationMs} ms` : "n/a"}

${task.summaryLabel ? `

Summary: ${task.summaryLabel}

` : ""} ${task.errorMessage ? `

Error: ${task.errorMessage}

` : ""} + ${task.canRetry ? `` : ""}

Artifacts: ${(task.artifactIds ?? []).length}

${ (task.artifactIds ?? []).length > 0 diff --git a/apps/web/src/features/runs/run-detail-page.tsx b/apps/web/src/features/runs/run-detail-page.tsx index e75d441..bf05ea4 100644 --- a/apps/web/src/features/runs/run-detail-page.tsx +++ b/apps/web/src/features/runs/run-detail-page.tsx @@ -15,6 +15,8 @@ export type RunDetailPageInput = { assetIds?: string[]; durationMs?: number; summaryLabel?: string; + canCancelRun?: boolean; + canRetryRun?: boolean; }; tasks: RunTaskView[]; selectedTaskId?: string; @@ -34,6 +36,8 @@ export function renderRunDetailPage(input: RunDetailPageInput): string {

Input assets: ${(input.run.assetIds ?? []).join(", ") || "none"}

Run duration: ${typeof input.run.durationMs === "number" ? `${input.run.durationMs} ms` : "n/a"}

${input.run.summaryLabel ? `

Run summary: ${input.run.summaryLabel}

` : ""} + ${input.run.canCancelRun ? `` : ""} + ${input.run.canRetryRun ? `` : ""} ${renderRunGraphView(input.tasks)} ${renderTaskLogPanel(input.tasks, input.selectedTaskId)} diff --git a/apps/web/src/features/workflows/workflow-editor-page.test.tsx b/apps/web/src/features/workflows/workflow-editor-page.test.tsx index db4bf58..ec8723f 100644 --- a/apps/web/src/features/workflows/workflow-editor-page.test.tsx +++ b/apps/web/src/features/workflows/workflow-editor-page.test.tsx @@ -39,6 +39,8 @@ test("run detail view shows node status badges from run data", () => { assetIds: ["asset-1"], durationMs: 2450, summaryLabel: "2 tasks complete, 1 running, 1 stdout line", + canCancelRun: true, + canRetryRun: false, }, tasks: [ { @@ -66,6 +68,7 @@ test("run detail view shows node status badges from run data", () => { stdoutLines: ["Checking metadata"], stderrLines: ["Minor warning"], logLines: ["Checking metadata"], + canRetry: true, }, ], selectedTaskId: "task-2", @@ -79,10 +82,12 @@ test("run detail view shows node status badges from run data", () => { assert.match(html, /Input assets: asset-1/); assert.match(html, /Run duration: 2450 ms/); assert.match(html, /2 tasks complete, 1 running, 1 stdout line/); + assert.match(html, /Cancel Run/); assert.match(html, /Duration: 2450 ms/); assert.match(html, /Validated delivery package structure/); assert.match(html, /Stdout/); assert.match(html, /Minor warning/); + assert.match(html, /Retry Task/); assert.match(html, /\/explore\/artifact-2/); }); diff --git a/apps/web/src/runtime/api-client.ts b/apps/web/src/runtime/api-client.ts index 3c6e58b..bc1fc36 100644 --- a/apps/web/src/runtime/api-client.ts +++ b/apps/web/src/runtime/api-client.ts @@ -140,6 +140,32 @@ export class ApiClient { return readJson(await fetch(`${this.baseUrl}/api/runs/${runId}/tasks`)); } + async cancelRun(runId: string) { + return readJson( + await fetch(`${this.baseUrl}/api/runs/${runId}/cancel`, { + method: "POST", + }), + ); + } + + async retryRun(runId: string, triggeredBy = "local-user") { + return readJson( + await fetch(`${this.baseUrl}/api/runs/${runId}/retry`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ triggeredBy }), + }), + ); + } + + async retryRunTask(runId: string, taskId: string) { + return readJson( + await fetch(`${this.baseUrl}/api/runs/${runId}/tasks/${taskId}/retry`, { + method: "POST", + }), + ); + } + async createArtifact(input: { type: "json" | "directory" | "video"; title: string; diff --git a/apps/web/src/runtime/app.tsx b/apps/web/src/runtime/app.tsx index 20bbf1b..5538dcf 100644 --- a/apps/web/src/runtime/app.tsx +++ b/apps/web/src/runtime/app.tsx @@ -39,9 +39,10 @@ function formatRunSummary(run: any) { const successCount = run?.summary?.taskCounts?.success ?? 0; const failedCount = run?.summary?.taskCounts?.failed ?? 0; const runningCount = run?.summary?.taskCounts?.running ?? 0; + const cancelledCount = run?.summary?.taskCounts?.cancelled ?? 0; const stdoutLineCount = run?.summary?.stdoutLineCount ?? 0; const stderrLineCount = run?.summary?.stderrLineCount ?? 0; - return `${successCount} success, ${failedCount} failed, ${runningCount} running, ${stdoutLineCount} stdout lines, ${stderrLineCount} stderr lines, ${totalTaskCount} total tasks`; + return `${successCount} success, ${failedCount} failed, ${runningCount} running, ${cancelledCount} cancelled, ${stdoutLineCount} stdout lines, ${stderrLineCount} stderr lines, ${totalTaskCount} total tasks`; } function usePathname() { @@ -631,6 +632,7 @@ function RunDetailPage(props: { const [selectedTaskId, setSelectedTaskId] = useState(null); const [artifacts, setArtifacts] = useState([]); const [error, setError] = useState(null); + const [refreshKey, setRefreshKey] = useState(0); useEffect(() => { let cancelled = false; @@ -667,7 +669,7 @@ function RunDetailPage(props: { clearTimeout(timer); } }; - }, [props.api, props.runId]); + }, [props.api, props.runId, refreshKey]); useEffect(() => { if (tasks.length === 0) { @@ -723,6 +725,41 @@ function RunDetailPage(props: {

Finished at: {run.finishedAt ?? "n/a"}

Run duration: {typeof run.durationMs === "number" ? `${run.durationMs} ms` : "n/a"}

Run summary: {formatRunSummary(run)}

+
+ {run.status === "queued" || run.status === "running" ? ( + + ) : null} + {run.status !== "queued" && run.status !== "running" ? ( + + ) : null} +
@@ -766,6 +803,28 @@ function RunDetailPage(props: {

Summary: {formatTaskSummary(selectedTask)}

{selectedTask.errorMessage ?

Error: {selectedTask.errorMessage}

: null} + {selectedTask.status === "failed" || selectedTask.status === "cancelled" ? ( +
+ +
+ ) : null}

Artifacts: {artifacts.length}

{artifacts.length > 0 ? (
    diff --git a/apps/worker/src/contracts/execution-context.ts b/apps/worker/src/contracts/execution-context.ts index ca80862..76fc846 100644 --- a/apps/worker/src/contracts/execution-context.ts +++ b/apps/worker/src/contracts/execution-context.ts @@ -1,11 +1,12 @@ export type ExecutorType = "python" | "docker" | "http"; -export type TaskStatus = "pending" | "queued" | "running" | "success" | "failed"; +export type TaskStatus = "pending" | "queued" | "running" | "success" | "failed" | "cancelled"; export type TaskStatusCounts = { pending: number; queued: number; running: number; success: number; failed: number; + cancelled: number; }; export type TaskExecutionSummary = { diff --git a/apps/worker/src/runtime/mongo-worker-store.ts b/apps/worker/src/runtime/mongo-worker-store.ts index 3b8ee21..cb3c7aa 100644 --- a/apps/worker/src/runtime/mongo-worker-store.ts +++ b/apps/worker/src/runtime/mongo-worker-store.ts @@ -15,7 +15,7 @@ type WorkflowRunDocument = { _id: string; workflowDefinitionId: string; workflowVersionId: string; - status: "queued" | "running" | "success" | "failed"; + status: "queued" | "running" | "success" | "failed" | "cancelled"; triggeredBy: string; assetIds: string[]; startedAt?: string; @@ -97,6 +97,7 @@ function buildTaskStatusCounts(tasks: TaskRecord[]): TaskStatusCounts { running: 0, success: 0, failed: 0, + cancelled: 0, }; for (const task of tasks) { @@ -143,37 +144,74 @@ export class MongoWorkerStore { } async claimNextQueuedTask(): Promise { - const startedAt = nowIso(); - const task = await this.db.collection("run_tasks").findOneAndUpdate( - { status: "queued" }, - { - $set: { - status: "running", - startedAt, - updatedAt: startedAt, - }, - $push: { - logLines: { - $each: ["Task claimed by worker"], + for (;;) { + const candidate = await this.db + .collection("run_tasks") + .find({ status: "queued" }) + .sort({ createdAt: 1 }) + .limit(1) + .next(); + + if (!candidate) { + return undefined; + } + + const run = await this.db + .collection("workflow_runs") + .findOne({ _id: candidate.workflowRunId }); + + if (run?.status === "cancelled") { + const finishedAt = nowIso(); + await this.db.collection("run_tasks").updateOne( + { _id: candidate._id, status: "queued" }, + { + $set: { + status: "cancelled", + finishedAt, + updatedAt: finishedAt, + }, + $push: { + logLines: { + $each: ["Task cancelled before execution"], + }, + }, + }, + ); + await this.refreshRunStatus(candidate.workflowRunId); + continue; + } + + const startedAt = nowIso(); + const task = await this.db.collection("run_tasks").findOneAndUpdate( + { _id: candidate._id, status: "queued" }, + { + $set: { + status: "running", + startedAt, + updatedAt: startedAt, + }, + $push: { + logLines: { + $each: ["Task claimed by worker"], + }, }, }, - }, - { - sort: { createdAt: 1 }, - returnDocument: "after", - }, - ); + { + returnDocument: "after", + }, + ); - if (!task) { - return undefined; + if (!task) { + continue; + } + + await this.db.collection("workflow_runs").updateOne( + { _id: task.workflowRunId }, + { $set: { status: "running", updatedAt: nowIso() } }, + ); + + return toTaskRecord(task); } - - await this.db.collection("workflow_runs").updateOne( - { _id: task.workflowRunId }, - { $set: { status: "running", updatedAt: nowIso() } }, - ); - - return toTaskRecord(task); } async getRun(runId: string) { @@ -291,6 +329,11 @@ export class MongoWorkerStore { } async queueReadyDependents(runId: string) { + const run = await this.getRun(runId); + if (run?.status === "cancelled") { + return; + } + const tasks = await this.db .collection("run_tasks") .find({ workflowRunId: runId }) @@ -327,18 +370,27 @@ export class MongoWorkerStore { return; } + const currentRun = await this.getRun(runId); + if (!currentRun) { + return; + } + let status: WorkflowRunDocument["status"] = "queued"; - if (tasks.some((task) => task.status === "failed")) { - status = "failed"; + if (currentRun.status === "cancelled") { + status = "cancelled"; } else if (tasks.every((task) => task.status === "success")) { status = "success"; - } else if (tasks.some((task) => task.status === "running" || task.status === "success")) { + } else if (tasks.some((task) => task.status === "running")) { status = "running"; + } else if (tasks.some((task) => task.status === "queued")) { + status = "queued"; + } else if (tasks.some((task) => task.status === "failed")) { + status = "failed"; } const startedAt = minIso(tasks.map((task) => task.startedAt)); const finishedAt = - status === "success" || status === "failed" + status === "success" || status === "failed" || status === "cancelled" ? maxIso(tasks.map((task) => task.finishedAt)) : undefined; const summary = buildRunExecutionSummary(tasks); diff --git a/apps/worker/test/mongo-worker-runtime.spec.ts b/apps/worker/test/mongo-worker-runtime.spec.ts index 7f8a1e1..6d3442e 100644 --- a/apps/worker/test/mongo-worker-runtime.spec.ts +++ b/apps/worker/test/mongo-worker-runtime.spec.ts @@ -122,7 +122,7 @@ test("worker claims a queued task, creates an artifact, and queues the dependent assert.equal(tasks[1]?.status, "queued"); assert.deepEqual(tasks[1]?.assetIds, ["asset-1"]); assert.equal(artifacts.length, 1); - assert.equal(run?.status, "running"); + assert.equal(run?.status, "queued"); }); test("worker marks the run successful after the final queued task completes", async (t) => { @@ -355,3 +355,75 @@ test("worker persists failure summaries and task log lines when execution throws assert.equal(run?.summary?.stderrLineCount, 1); assert.deepEqual(run?.summary?.failedTaskIds, ["task-failure"]); }); + +test("worker skips queued tasks that belong to a cancelled run", async (t) => { + const fixture = await createRuntimeFixture("emboflow-worker-cancelled-run-skip"); + t.after(async () => { + await fixture.close(); + }); + + await fixture.db.collection("workflow_runs").insertMany([ + { + _id: "run-cancelled", + workflowDefinitionId: "workflow-cancelled", + workflowVersionId: "workflow-cancelled-v1", + status: "cancelled", + triggeredBy: "local-user", + assetIds: ["asset-cancelled"], + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }, + { + _id: "run-active", + workflowDefinitionId: "workflow-active", + workflowVersionId: "workflow-active-v1", + status: "queued", + triggeredBy: "local-user", + assetIds: ["asset-active"], + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }, + ]); + + await fixture.db.collection("run_tasks").insertMany([ + { + _id: "task-cancelled", + workflowRunId: "run-cancelled", + workflowVersionId: "workflow-cancelled-v1", + nodeId: "source-asset", + nodeType: "source", + executorType: "python", + status: "queued", + attempt: 1, + assetIds: ["asset-cancelled"], + upstreamNodeIds: [], + outputArtifactIds: [], + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }, + { + _id: "task-active", + workflowRunId: "run-active", + workflowVersionId: "workflow-active-v1", + nodeId: "source-asset", + nodeType: "source", + executorType: "python", + status: "queued", + attempt: 1, + assetIds: ["asset-active"], + upstreamNodeIds: [], + outputArtifactIds: [], + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }, + ]); + + const claimedTask = await fixture.runtime.runNextTask(); + const cancelledTask = await fixture.store.getRunTask("task-cancelled"); + const activeTask = await fixture.store.getRunTask("task-active"); + + assert.equal(claimedTask?.id, "task-active"); + assert.equal(cancelledTask?.status, "cancelled"); + assert.match(cancelledTask?.logLines?.at(-1) ?? "", /cancelled/i); + assert.equal(activeTask?.status, "success"); +}); diff --git a/design/03-workflows/workflow-execution-model.md b/design/03-workflows/workflow-execution-model.md index 8acfd63..fbbf0f8 100644 --- a/design/03-workflows/workflow-execution-model.md +++ b/design/03-workflows/workflow-execution-model.md @@ -308,6 +308,17 @@ The current runtime also aggregates execution state back onto `workflow_runs`. E This allows the Runs workspace to render a stable top-level run summary without client-side recomputation across every task document. +The current V1 runtime also implements the first run-control loop: + +- `POST /api/runs/:runId/cancel` + Cancels queued and pending tasks for that run and prevents downstream promotion. +- `POST /api/runs/:runId/retry` + Creates a brand-new run from the original run snapshot, keeping workflow version and bound asset ids. +- `POST /api/runs/:runId/tasks/:taskId/retry` + Resets the failed or cancelled task plus its downstream subtree, increments the target task attempt count, and requeues from that node. + +V1 cancellation is scheduler-level only. It does not attempt to hard-stop an executor that is already running inside the local worker loop. + The API and worker runtimes now both have direct integration coverage against a real Mongo runtime through `mongodb-memory-server`, in addition to the older in-memory contract tests. The first web authoring surface already follows the three-pane layout contract with: diff --git a/design/04-ui-ux/information-architecture-and-key-screens.md b/design/04-ui-ux/information-architecture-and-key-screens.md index 6bb514a..d08c3fe 100644 --- a/design/04-ui-ux/information-architecture-and-key-screens.md +++ b/design/04-ui-ux/information-architecture-and-key-screens.md @@ -199,6 +199,12 @@ The run header itself should also show an aggregated summary: - total stdout/stderr line counts - failed task ids when present +V1 run detail should expose direct operator actions: + +- `Cancel Run` while queued or running +- `Retry Run` after a run reaches a terminal state +- `Retry Task` on a selected failed or cancelled task + ## Screen 6: Explore Workspace Purpose: diff --git a/design/05-data/mongodb-data-model.md b/design/05-data/mongodb-data-model.md index 87fad10..c069f8f 100644 --- a/design/05-data/mongodb-data-model.md +++ b/design/05-data/mongodb-data-model.md @@ -357,6 +357,12 @@ The current runtime also aggregates task execution back onto `workflow_runs`, so - failed task ids - derived run duration +The current runtime control loop also mutates these collections in place for retry/cancel operations: + +- cancelling a run marks queued and pending `run_tasks` as `cancelled` +- retrying a run creates a new `workflow_runs` document plus a fresh set of `run_tasks` +- retrying a task resets the target node and downstream subtree on the existing run, clears task execution fields, and increments the retried task attempt count + ### artifacts Purpose: diff --git a/docs/plans/2026-03-26-emboflow-v1-foundation-and-mvp.md b/docs/plans/2026-03-26-emboflow-v1-foundation-and-mvp.md index f16defb..311fd12 100644 --- a/docs/plans/2026-03-26-emboflow-v1-foundation-and-mvp.md +++ b/docs/plans/2026-03-26-emboflow-v1-foundation-and-mvp.md @@ -22,6 +22,7 @@ - `2026-03-26`: The current UI/runtime pass turns Runs into a real project-scoped workspace with run history queries, active-run polling, and task artifact links into Explore. - `2026-03-27`: The current observability pass persists task execution summaries, timestamps, log lines, and result previews on Mongo-backed `run_tasks`, and surfaces those fields in the React run detail view. - `2026-03-27`: The current follow-up observability pass adds persisted stdout/stderr fields on `run_tasks` plus aggregated run summaries, durations, and task counts on `workflow_runs`. +- `2026-03-27`: The current run-control pass adds run cancellation, run retry from immutable snapshots, and task retry for failed/cancelled nodes with downstream reset semantics. ---