feat: add run control retry and cancellation flows

This commit is contained in:
eust-w 2026-03-27 02:40:16 +08:00
parent 526317681d
commit 45b4a99af1
17 changed files with 766 additions and 40 deletions

View File

@ -66,7 +66,7 @@ The local validation path currently used for embodied data testing is:
You can register that directory from the Assets page or via `POST /api/assets/register`.
The workflow editor currently requires selecting at least one registered asset before a run can be created.
The Runs workspace now shows project-scoped run history, run-level aggregated summaries, and run detail views with persisted task summaries, stdout/stderr sections, result previews, and artifact links into Explore.
The Runs workspace now shows project-scoped run history, run-level aggregated summaries, cancel/retry controls, and run detail views with persisted task summaries, stdout/stderr sections, result previews, and artifact links into Explore.
## Repository Structure

View File

@ -99,7 +99,7 @@ type WorkflowRunDocument = Timestamped & {
workflowVersionId: string;
workspaceId: string;
projectId: string;
status: "queued" | "running" | "success" | "failed";
status: "queued" | "running" | "success" | "failed" | "cancelled";
triggeredBy: string;
assetIds: string[];
startedAt?: string;
@ -115,7 +115,7 @@ type RunTaskDocument = Timestamped & {
nodeId: string;
nodeType: string;
executorType: ExecutorType;
status: "queued" | "pending" | "running" | "success" | "failed";
status: "queued" | "pending" | "running" | "success" | "failed" | "cancelled";
attempt: number;
assetIds: string[];
upstreamNodeIds: string[];
@ -163,6 +163,7 @@ function buildTaskStatusCounts(tasks: RunTaskDocument[]): TaskStatusCounts {
running: 0,
success: 0,
failed: 0,
cancelled: 0,
};
for (const task of tasks) {
@ -176,7 +177,7 @@ function buildRunExecutionSummary(tasks: RunTaskDocument[]): RunExecutionSummary
const taskCounts = buildTaskStatusCounts(tasks);
return {
totalTaskCount: tasks.length,
completedTaskCount: taskCounts.success + taskCounts.failed,
completedTaskCount: taskCounts.success + taskCounts.failed + taskCounts.cancelled,
artifactCount: tasks.reduce((total, task) => total + task.outputArtifactIds.length, 0),
stdoutLineCount: tasks.reduce((total, task) => total + (task.stdoutLines?.length ?? 0), 0),
stderrLineCount: tasks.reduce((total, task) => total + (task.stderrLines?.length ?? 0), 0),
@ -185,6 +186,23 @@ function buildRunExecutionSummary(tasks: RunTaskDocument[]): RunExecutionSummary
};
}
function collectRetryNodeIds(tasks: RunTaskDocument[], rootNodeId: string) {
const pending = [rootNodeId];
const collected = new Set<string>([rootNodeId]);
while (pending.length > 0) {
const currentNodeId = pending.shift()!;
for (const task of tasks) {
if (task.upstreamNodeIds.includes(currentNodeId) && !collected.has(task.nodeId)) {
collected.add(task.nodeId);
pending.push(task.nodeId);
}
}
}
return collected;
}
export class MongoAppStore {
constructor(private readonly db: Db) {}
@ -599,6 +617,187 @@ export class MongoAppStore {
.toArray();
}
async cancelRun(runId: string) {
const run = await this.db.collection<WorkflowRunDocument>("workflow_runs").findOne({ _id: runId });
if (!run) {
throw new Error(`run not found: ${runId}`);
}
if (run.status === "success" || run.status === "failed" || run.status === "cancelled") {
throw new Error("run is already in a terminal state");
}
const finishedAt = nowIso();
await this.db.collection<RunTaskDocument>("run_tasks").updateMany(
{
workflowRunId: runId,
status: { $in: ["queued", "pending"] },
},
{
$set: {
status: "cancelled",
finishedAt,
updatedAt: finishedAt,
},
$push: {
logLines: {
$each: ["Task cancelled before execution"],
},
},
},
);
await this.db.collection<WorkflowRunDocument>("workflow_runs").updateOne(
{ _id: runId },
{
$set: {
status: "cancelled",
finishedAt,
updatedAt: finishedAt,
},
},
);
const tasks = await this.listRunTasks(runId);
const startedAt = tasks
.map((task) => task.startedAt)
.filter((value): value is string => Boolean(value))
.sort()[0];
const summary = buildRunExecutionSummary(tasks);
await this.db.collection<WorkflowRunDocument>("workflow_runs").updateOne(
{ _id: runId },
{
$set: {
summary,
startedAt,
durationMs: startedAt ? Math.max(Date.parse(finishedAt) - Date.parse(startedAt), 0) : 0,
updatedAt: nowIso(),
},
},
);
return this.getRun(runId);
}
async retryRun(runId: string, triggeredBy = "local-user") {
const run = await this.db.collection<WorkflowRunDocument>("workflow_runs").findOne({ _id: runId });
if (!run) {
throw new Error(`run not found: ${runId}`);
}
if (run.status === "queued" || run.status === "running") {
throw new Error("run must be terminal before retry");
}
return this.createRun({
workflowDefinitionId: run.workflowDefinitionId,
workflowVersionId: run.workflowVersionId,
triggeredBy,
assetIds: run.assetIds,
});
}
async retryRunTask(runId: string, taskId: string) {
const run = await this.db.collection<WorkflowRunDocument>("workflow_runs").findOne({ _id: runId });
if (!run) {
throw new Error(`run not found: ${runId}`);
}
if (run.status === "running") {
throw new Error("cannot retry a task while the run is active");
}
const tasks = await this.listRunTasks(runId);
const targetTask = tasks.find((task) => task._id === taskId);
if (!targetTask) {
throw new Error(`task not found: ${taskId}`);
}
if (targetTask.status !== "failed" && targetTask.status !== "cancelled") {
throw new Error("only failed or cancelled tasks can be retried");
}
const retryNodeIds = collectRetryNodeIds(tasks, targetTask.nodeId);
const retryTaskIds = tasks
.filter((task) => retryNodeIds.has(task.nodeId))
.map((task) => task._id);
const retryStartedAt = nowIso();
await this.db.collection<RunTaskDocument>("run_tasks").updateMany(
{ _id: { $in: retryTaskIds } },
{
$set: {
updatedAt: retryStartedAt,
outputArtifactIds: [],
logLines: [],
stdoutLines: [],
stderrLines: [],
},
$unset: {
errorMessage: "",
summary: "",
lastResultPreview: "",
startedAt: "",
finishedAt: "",
durationMs: "",
},
},
);
await this.db.collection<RunTaskDocument>("run_tasks").updateOne(
{ _id: targetTask._id },
{
$set: {
status: "queued",
updatedAt: retryStartedAt,
logLines: ["Task scheduled for retry"],
stdoutLines: [],
stderrLines: [],
},
$inc: {
attempt: 1,
},
},
);
await this.db.collection<RunTaskDocument>("run_tasks").updateMany(
{
workflowRunId: runId,
_id: { $in: retryTaskIds.filter((retryTaskId) => retryTaskId !== targetTask._id) },
},
{
$set: {
status: "pending",
updatedAt: retryStartedAt,
},
},
);
await this.db.collection<WorkflowRunDocument>("workflow_runs").updateOne(
{ _id: runId },
{
$set: {
status: "queued",
updatedAt: retryStartedAt,
},
$unset: {
finishedAt: "",
durationMs: "",
},
},
);
const refreshedTasks = await this.listRunTasks(runId);
await this.db.collection<WorkflowRunDocument>("workflow_runs").updateOne(
{ _id: runId },
{
$set: {
summary: buildRunExecutionSummary(refreshedTasks),
updatedAt: nowIso(),
},
},
);
return this.getRun(runId);
}
async createArtifact(input: {
type: "json" | "directory" | "video";
title: string;

View File

@ -274,6 +274,32 @@ export async function createApiRuntime(config = resolveApiRuntimeConfig()) {
}
});
app.post("/api/runs/:runId/cancel", async (request, response, next) => {
try {
response.json(await store.cancelRun(request.params.runId));
} catch (error) {
next(error);
}
});
app.post("/api/runs/:runId/retry", async (request, response, next) => {
try {
response.json(
await store.retryRun(request.params.runId, request.body.triggeredBy ?? "local-user"),
);
} catch (error) {
next(error);
}
});
app.post("/api/runs/:runId/tasks/:taskId/retry", async (request, response, next) => {
try {
response.json(await store.retryRunTask(request.params.runId, request.params.taskId));
} catch (error) {
next(error);
}
});
app.post("/api/artifacts", async (request, response, next) => {
try {
response.json(

View File

@ -651,3 +651,259 @@ test("mongo-backed runtime exposes persisted task execution summaries and logs",
executor: "python",
});
});
test("mongo-backed runtime can cancel a run, retry a run snapshot, and retry a failed task", async (t) => {
const mongod = await MongoMemoryServer.create({
instance: {
ip: "127.0.0.1",
port: 27122,
},
});
t.after(async () => {
await mongod.stop();
});
const server = await startRuntimeServer({
host: "127.0.0.1",
port: 0,
mongoUri: mongod.getUri(),
database: "emboflow-runtime-run-controls",
corsOrigin: "http://127.0.0.1:3000",
});
t.after(async () => {
await server.close();
});
const bootstrap = await readJson<{
workspace: { _id: string };
project: { _id: string };
}>(
await fetch(`${server.baseUrl}/api/dev/bootstrap`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ userId: "run-controls-user", projectName: "Run Controls Project" }),
}),
);
const sourceDir = await mkdtemp(path.join(os.tmpdir(), "emboflow-runtime-controls-"));
await mkdir(path.join(sourceDir, "DJI_001"));
await writeFile(path.join(sourceDir, "meta.json"), "{}");
await writeFile(path.join(sourceDir, "intrinsics.json"), "{}");
await writeFile(path.join(sourceDir, "video_meta.json"), "{}");
const asset = await readJson<{ _id: string }>(
await fetch(`${server.baseUrl}/api/assets/register`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
workspaceId: bootstrap.workspace._id,
projectId: bootstrap.project._id,
sourcePath: sourceDir,
}),
}),
);
await readJson(await fetch(`${server.baseUrl}/api/assets/${asset._id}/probe`, { method: "POST" }));
const workflow = await readJson<{ _id: string }>(
await fetch(`${server.baseUrl}/api/workflows`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
workspaceId: bootstrap.workspace._id,
projectId: bootstrap.project._id,
name: "Run Control Flow",
}),
}),
);
const version = await readJson<{ _id: string }>(
await fetch(`${server.baseUrl}/api/workflows/${workflow._id}/versions`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
visualGraph: { viewport: { x: 0, y: 0, zoom: 1 } },
logicGraph: {
nodes: [
{ id: "source-asset", type: "source" },
{ id: "validate-structure", type: "inspect" },
{ id: "export-delivery-package", type: "export" },
],
edges: [
{ from: "source-asset", to: "validate-structure" },
{ from: "validate-structure", to: "export-delivery-package" },
],
},
runtimeGraph: { selectedPreset: "delivery-normalization" },
pluginRefs: ["builtin:delivery-nodes"],
}),
}),
);
const run = await readJson<{ _id: string }>(
await fetch(`${server.baseUrl}/api/runs`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({
workflowDefinitionId: workflow._id,
workflowVersionId: version._id,
assetIds: [asset._id],
}),
}),
);
const cancelledRun = await readJson<{
_id: string;
status: string;
summary?: { taskCounts?: { cancelled?: number } };
}>(
await fetch(`${server.baseUrl}/api/runs/${run._id}/cancel`, { method: "POST" }),
);
const cancelledTasks = await readJson<Array<{ status: string }>>(
await fetch(`${server.baseUrl}/api/runs/${run._id}/tasks`),
);
assert.equal(cancelledRun.status, "cancelled");
assert.equal(cancelledRun.summary?.taskCounts?.cancelled, 3);
assert.deepEqual(
cancelledTasks.map((task) => task.status),
["cancelled", "cancelled", "cancelled"],
);
const retriedRun = await readJson<{
_id: string;
workflowVersionId: string;
assetIds: string[];
status: string;
}>(
await fetch(`${server.baseUrl}/api/runs/${run._id}/retry`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ triggeredBy: "run-controls-user" }),
}),
);
assert.notEqual(retriedRun._id, run._id);
assert.equal(retriedRun.workflowVersionId, version._id);
assert.deepEqual(retriedRun.assetIds, [asset._id]);
assert.equal(retriedRun.status, "queued");
const client = new MongoClient(mongod.getUri());
await client.connect();
t.after(async () => {
await client.close();
});
const db = client.db("emboflow-runtime-run-controls");
const retriedRunTasks = await db
.collection("run_tasks")
.find({ workflowRunId: retriedRun._id })
.sort({ createdAt: 1 })
.toArray();
const sourceTask = retriedRunTasks.find((task) => task.nodeId === "source-asset");
const failedTask = retriedRunTasks.find((task) => task.nodeId === "validate-structure");
const downstreamTask = retriedRunTasks.find((task) => task.nodeId === "export-delivery-package");
await db.collection("run_tasks").updateOne(
{ _id: sourceTask?._id },
{
$set: {
status: "success",
summary: {
outcome: "success",
executorType: "python",
assetCount: 1,
artifactIds: [],
stdoutLineCount: 1,
stderrLineCount: 0,
},
stdoutLines: ["source completed"],
logLines: ["Task claimed by worker", "stdout: source completed", "Executor completed successfully"],
},
},
);
await db.collection("run_tasks").updateOne(
{ _id: failedTask?._id },
{
$set: {
status: "failed",
errorMessage: "validation failed",
summary: {
outcome: "failed",
executorType: "python",
assetCount: 1,
artifactIds: [],
stdoutLineCount: 0,
stderrLineCount: 1,
errorMessage: "validation failed",
},
stderrLines: ["validation failed"],
logLines: ["Task claimed by worker", "stderr: validation failed", "Execution failed: validation failed"],
},
},
);
await db.collection("run_tasks").updateOne(
{ _id: downstreamTask?._id },
{
$set: {
status: "cancelled",
logLines: ["Task cancelled after upstream failure"],
},
},
);
await db.collection("workflow_runs").updateOne(
{ _id: retriedRun._id },
{
$set: {
status: "failed",
summary: {
totalTaskCount: 3,
completedTaskCount: 2,
artifactCount: 0,
stdoutLineCount: 0,
stderrLineCount: 1,
failedTaskIds: [failedTask?._id],
taskCounts: {
pending: 0,
queued: 0,
running: 0,
success: 1,
failed: 1,
cancelled: 1,
},
},
},
},
);
const retriedTaskRun = await readJson<{
_id: string;
status: string;
summary?: { taskCounts?: { queued?: number; pending?: number } };
}>(
await fetch(`${server.baseUrl}/api/runs/${retriedRun._id}/tasks/${failedTask?._id}/retry`, {
method: "POST",
}),
);
const refreshedTasks = await readJson<
Array<{
_id: string;
status: string;
attempt: number;
errorMessage?: string;
logLines?: string[];
stderrLines?: string[];
}>
>(await fetch(`${server.baseUrl}/api/runs/${retriedRun._id}/tasks`));
const refreshedFailedTask = refreshedTasks.find((task) => task._id === failedTask?._id);
const refreshedDownstreamTask = refreshedTasks.find((task) => task._id === downstreamTask?._id);
assert.equal(retriedTaskRun.status, "queued");
assert.equal(retriedTaskRun.summary?.taskCounts?.queued, 1);
assert.equal(retriedTaskRun.summary?.taskCounts?.pending, 1);
assert.equal(refreshedFailedTask?.status, "queued");
assert.equal(refreshedFailedTask?.attempt, 2);
assert.equal(refreshedFailedTask?.errorMessage, undefined);
assert.deepEqual(refreshedFailedTask?.stderrLines, []);
assert.match(refreshedFailedTask?.logLines?.[0] ?? "", /retry/i);
assert.equal(refreshedDownstreamTask?.status, "pending");
});

View File

@ -10,6 +10,7 @@ export type RunTaskView = {
errorMessage?: string;
stdoutLines?: string[];
stderrLines?: string[];
canRetry?: boolean;
logLines: string[];
};

View File

@ -18,6 +18,7 @@ export function renderTaskLogPanel(
<p>Duration: ${typeof task.durationMs === "number" ? `${task.durationMs} ms` : "n/a"}</p>
${task.summaryLabel ? `<p>Summary: ${task.summaryLabel}</p>` : ""}
${task.errorMessage ? `<p>Error: ${task.errorMessage}</p>` : ""}
${task.canRetry ? `<button type="button">Retry Task</button>` : ""}
<p>Artifacts: ${(task.artifactIds ?? []).length}</p>
${
(task.artifactIds ?? []).length > 0

View File

@ -15,6 +15,8 @@ export type RunDetailPageInput = {
assetIds?: string[];
durationMs?: number;
summaryLabel?: string;
canCancelRun?: boolean;
canRetryRun?: boolean;
};
tasks: RunTaskView[];
selectedTaskId?: string;
@ -34,6 +36,8 @@ export function renderRunDetailPage(input: RunDetailPageInput): string {
<p>Input assets: ${(input.run.assetIds ?? []).join(", ") || "none"}</p>
<p>Run duration: ${typeof input.run.durationMs === "number" ? `${input.run.durationMs} ms` : "n/a"}</p>
${input.run.summaryLabel ? `<p>Run summary: ${input.run.summaryLabel}</p>` : ""}
${input.run.canCancelRun ? `<button type="button">Cancel Run</button>` : ""}
${input.run.canRetryRun ? `<button type="button">Retry Run</button>` : ""}
</header>
${renderRunGraphView(input.tasks)}
${renderTaskLogPanel(input.tasks, input.selectedTaskId)}

View File

@ -39,6 +39,8 @@ test("run detail view shows node status badges from run data", () => {
assetIds: ["asset-1"],
durationMs: 2450,
summaryLabel: "2 tasks complete, 1 running, 1 stdout line",
canCancelRun: true,
canRetryRun: false,
},
tasks: [
{
@ -66,6 +68,7 @@ test("run detail view shows node status badges from run data", () => {
stdoutLines: ["Checking metadata"],
stderrLines: ["Minor warning"],
logLines: ["Checking metadata"],
canRetry: true,
},
],
selectedTaskId: "task-2",
@ -79,10 +82,12 @@ test("run detail view shows node status badges from run data", () => {
assert.match(html, /Input assets: asset-1/);
assert.match(html, /Run duration: 2450 ms/);
assert.match(html, /2 tasks complete, 1 running, 1 stdout line/);
assert.match(html, /Cancel Run/);
assert.match(html, /Duration: 2450 ms/);
assert.match(html, /Validated delivery package structure/);
assert.match(html, /Stdout/);
assert.match(html, /Minor warning/);
assert.match(html, /Retry Task/);
assert.match(html, /\/explore\/artifact-2/);
});

View File

@ -140,6 +140,32 @@ export class ApiClient {
return readJson<any[]>(await fetch(`${this.baseUrl}/api/runs/${runId}/tasks`));
}
async cancelRun(runId: string) {
return readJson<any>(
await fetch(`${this.baseUrl}/api/runs/${runId}/cancel`, {
method: "POST",
}),
);
}
async retryRun(runId: string, triggeredBy = "local-user") {
return readJson<any>(
await fetch(`${this.baseUrl}/api/runs/${runId}/retry`, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ triggeredBy }),
}),
);
}
async retryRunTask(runId: string, taskId: string) {
return readJson<any>(
await fetch(`${this.baseUrl}/api/runs/${runId}/tasks/${taskId}/retry`, {
method: "POST",
}),
);
}
async createArtifact(input: {
type: "json" | "directory" | "video";
title: string;

View File

@ -39,9 +39,10 @@ function formatRunSummary(run: any) {
const successCount = run?.summary?.taskCounts?.success ?? 0;
const failedCount = run?.summary?.taskCounts?.failed ?? 0;
const runningCount = run?.summary?.taskCounts?.running ?? 0;
const cancelledCount = run?.summary?.taskCounts?.cancelled ?? 0;
const stdoutLineCount = run?.summary?.stdoutLineCount ?? 0;
const stderrLineCount = run?.summary?.stderrLineCount ?? 0;
return `${successCount} success, ${failedCount} failed, ${runningCount} running, ${stdoutLineCount} stdout lines, ${stderrLineCount} stderr lines, ${totalTaskCount} total tasks`;
return `${successCount} success, ${failedCount} failed, ${runningCount} running, ${cancelledCount} cancelled, ${stdoutLineCount} stdout lines, ${stderrLineCount} stderr lines, ${totalTaskCount} total tasks`;
}
function usePathname() {
@ -631,6 +632,7 @@ function RunDetailPage(props: {
const [selectedTaskId, setSelectedTaskId] = useState<string | null>(null);
const [artifacts, setArtifacts] = useState<any[]>([]);
const [error, setError] = useState<string | null>(null);
const [refreshKey, setRefreshKey] = useState(0);
useEffect(() => {
let cancelled = false;
@ -667,7 +669,7 @@ function RunDetailPage(props: {
clearTimeout(timer);
}
};
}, [props.api, props.runId]);
}, [props.api, props.runId, refreshKey]);
useEffect(() => {
if (tasks.length === 0) {
@ -723,6 +725,41 @@ function RunDetailPage(props: {
<p>Finished at: {run.finishedAt ?? "n/a"}</p>
<p>Run duration: {typeof run.durationMs === "number" ? `${run.durationMs} ms` : "n/a"}</p>
<p>Run summary: {formatRunSummary(run)}</p>
<div className="button-row" style={{ marginTop: 12 }}>
{run.status === "queued" || run.status === "running" ? (
<button
className="button-secondary"
onClick={async () => {
try {
setError(null);
await props.api.cancelRun(run._id);
setRefreshKey((value) => value + 1);
} catch (actionError) {
setError(actionError instanceof Error ? actionError.message : "Failed to cancel run");
}
}}
>
Cancel Run
</button>
) : null}
{run.status !== "queued" && run.status !== "running" ? (
<button
className="button-primary"
onClick={async () => {
try {
setError(null);
const retriedRun = await props.api.retryRun(run._id);
window.history.pushState({}, "", `/runs/${retriedRun._id}`);
window.dispatchEvent(new PopStateEvent("popstate"));
} catch (actionError) {
setError(actionError instanceof Error ? actionError.message : "Failed to retry run");
}
}}
>
Retry Run
</button>
) : null}
</div>
</section>
<section className="two-column">
<div className="panel">
@ -766,6 +803,28 @@ function RunDetailPage(props: {
</p>
<p>Summary: {formatTaskSummary(selectedTask)}</p>
{selectedTask.errorMessage ? <p>Error: {selectedTask.errorMessage}</p> : null}
{selectedTask.status === "failed" || selectedTask.status === "cancelled" ? (
<div className="button-row" style={{ marginTop: 12 }}>
<button
className="button-primary"
onClick={async () => {
try {
setError(null);
await props.api.retryRunTask(run._id, selectedTask._id);
setRefreshKey((value) => value + 1);
} catch (actionError) {
setError(
actionError instanceof Error
? actionError.message
: "Failed to retry task",
);
}
}}
>
Retry Task
</button>
</div>
) : null}
<p>Artifacts: {artifacts.length}</p>
{artifacts.length > 0 ? (
<ul>

View File

@ -1,11 +1,12 @@
export type ExecutorType = "python" | "docker" | "http";
export type TaskStatus = "pending" | "queued" | "running" | "success" | "failed";
export type TaskStatus = "pending" | "queued" | "running" | "success" | "failed" | "cancelled";
export type TaskStatusCounts = {
pending: number;
queued: number;
running: number;
success: number;
failed: number;
cancelled: number;
};
export type TaskExecutionSummary = {

View File

@ -15,7 +15,7 @@ type WorkflowRunDocument = {
_id: string;
workflowDefinitionId: string;
workflowVersionId: string;
status: "queued" | "running" | "success" | "failed";
status: "queued" | "running" | "success" | "failed" | "cancelled";
triggeredBy: string;
assetIds: string[];
startedAt?: string;
@ -97,6 +97,7 @@ function buildTaskStatusCounts(tasks: TaskRecord[]): TaskStatusCounts {
running: 0,
success: 0,
failed: 0,
cancelled: 0,
};
for (const task of tasks) {
@ -143,37 +144,74 @@ export class MongoWorkerStore {
}
async claimNextQueuedTask(): Promise<TaskRecord | undefined> {
const startedAt = nowIso();
const task = await this.db.collection<RunTaskDocument>("run_tasks").findOneAndUpdate(
{ status: "queued" },
{
$set: {
status: "running",
startedAt,
updatedAt: startedAt,
},
$push: {
logLines: {
$each: ["Task claimed by worker"],
for (;;) {
const candidate = await this.db
.collection<RunTaskDocument>("run_tasks")
.find({ status: "queued" })
.sort({ createdAt: 1 })
.limit(1)
.next();
if (!candidate) {
return undefined;
}
const run = await this.db
.collection<WorkflowRunDocument>("workflow_runs")
.findOne({ _id: candidate.workflowRunId });
if (run?.status === "cancelled") {
const finishedAt = nowIso();
await this.db.collection<RunTaskDocument>("run_tasks").updateOne(
{ _id: candidate._id, status: "queued" },
{
$set: {
status: "cancelled",
finishedAt,
updatedAt: finishedAt,
},
$push: {
logLines: {
$each: ["Task cancelled before execution"],
},
},
},
);
await this.refreshRunStatus(candidate.workflowRunId);
continue;
}
const startedAt = nowIso();
const task = await this.db.collection<RunTaskDocument>("run_tasks").findOneAndUpdate(
{ _id: candidate._id, status: "queued" },
{
$set: {
status: "running",
startedAt,
updatedAt: startedAt,
},
$push: {
logLines: {
$each: ["Task claimed by worker"],
},
},
},
},
{
sort: { createdAt: 1 },
returnDocument: "after",
},
);
{
returnDocument: "after",
},
);
if (!task) {
return undefined;
if (!task) {
continue;
}
await this.db.collection<WorkflowRunDocument>("workflow_runs").updateOne(
{ _id: task.workflowRunId },
{ $set: { status: "running", updatedAt: nowIso() } },
);
return toTaskRecord(task);
}
await this.db.collection<WorkflowRunDocument>("workflow_runs").updateOne(
{ _id: task.workflowRunId },
{ $set: { status: "running", updatedAt: nowIso() } },
);
return toTaskRecord(task);
}
async getRun(runId: string) {
@ -291,6 +329,11 @@ export class MongoWorkerStore {
}
async queueReadyDependents(runId: string) {
const run = await this.getRun(runId);
if (run?.status === "cancelled") {
return;
}
const tasks = await this.db
.collection<RunTaskDocument>("run_tasks")
.find({ workflowRunId: runId })
@ -327,18 +370,27 @@ export class MongoWorkerStore {
return;
}
const currentRun = await this.getRun(runId);
if (!currentRun) {
return;
}
let status: WorkflowRunDocument["status"] = "queued";
if (tasks.some((task) => task.status === "failed")) {
status = "failed";
if (currentRun.status === "cancelled") {
status = "cancelled";
} else if (tasks.every((task) => task.status === "success")) {
status = "success";
} else if (tasks.some((task) => task.status === "running" || task.status === "success")) {
} else if (tasks.some((task) => task.status === "running")) {
status = "running";
} else if (tasks.some((task) => task.status === "queued")) {
status = "queued";
} else if (tasks.some((task) => task.status === "failed")) {
status = "failed";
}
const startedAt = minIso(tasks.map((task) => task.startedAt));
const finishedAt =
status === "success" || status === "failed"
status === "success" || status === "failed" || status === "cancelled"
? maxIso(tasks.map((task) => task.finishedAt))
: undefined;
const summary = buildRunExecutionSummary(tasks);

View File

@ -122,7 +122,7 @@ test("worker claims a queued task, creates an artifact, and queues the dependent
assert.equal(tasks[1]?.status, "queued");
assert.deepEqual(tasks[1]?.assetIds, ["asset-1"]);
assert.equal(artifacts.length, 1);
assert.equal(run?.status, "running");
assert.equal(run?.status, "queued");
});
test("worker marks the run successful after the final queued task completes", async (t) => {
@ -355,3 +355,75 @@ test("worker persists failure summaries and task log lines when execution throws
assert.equal(run?.summary?.stderrLineCount, 1);
assert.deepEqual(run?.summary?.failedTaskIds, ["task-failure"]);
});
test("worker skips queued tasks that belong to a cancelled run", async (t) => {
const fixture = await createRuntimeFixture("emboflow-worker-cancelled-run-skip");
t.after(async () => {
await fixture.close();
});
await fixture.db.collection("workflow_runs").insertMany([
{
_id: "run-cancelled",
workflowDefinitionId: "workflow-cancelled",
workflowVersionId: "workflow-cancelled-v1",
status: "cancelled",
triggeredBy: "local-user",
assetIds: ["asset-cancelled"],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
{
_id: "run-active",
workflowDefinitionId: "workflow-active",
workflowVersionId: "workflow-active-v1",
status: "queued",
triggeredBy: "local-user",
assetIds: ["asset-active"],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
]);
await fixture.db.collection("run_tasks").insertMany([
{
_id: "task-cancelled",
workflowRunId: "run-cancelled",
workflowVersionId: "workflow-cancelled-v1",
nodeId: "source-asset",
nodeType: "source",
executorType: "python",
status: "queued",
attempt: 1,
assetIds: ["asset-cancelled"],
upstreamNodeIds: [],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
{
_id: "task-active",
workflowRunId: "run-active",
workflowVersionId: "workflow-active-v1",
nodeId: "source-asset",
nodeType: "source",
executorType: "python",
status: "queued",
attempt: 1,
assetIds: ["asset-active"],
upstreamNodeIds: [],
outputArtifactIds: [],
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
]);
const claimedTask = await fixture.runtime.runNextTask();
const cancelledTask = await fixture.store.getRunTask("task-cancelled");
const activeTask = await fixture.store.getRunTask("task-active");
assert.equal(claimedTask?.id, "task-active");
assert.equal(cancelledTask?.status, "cancelled");
assert.match(cancelledTask?.logLines?.at(-1) ?? "", /cancelled/i);
assert.equal(activeTask?.status, "success");
});

View File

@ -308,6 +308,17 @@ The current runtime also aggregates execution state back onto `workflow_runs`. E
This allows the Runs workspace to render a stable top-level run summary without client-side recomputation across every task document.
The current V1 runtime also implements the first run-control loop:
- `POST /api/runs/:runId/cancel`
Cancels queued and pending tasks for that run and prevents downstream promotion.
- `POST /api/runs/:runId/retry`
Creates a brand-new run from the original run snapshot, keeping workflow version and bound asset ids.
- `POST /api/runs/:runId/tasks/:taskId/retry`
Resets the failed or cancelled task plus its downstream subtree, increments the target task attempt count, and requeues from that node.
V1 cancellation is scheduler-level only. It does not attempt to hard-stop an executor that is already running inside the local worker loop.
The API and worker runtimes now both have direct integration coverage against a real Mongo runtime through `mongodb-memory-server`, in addition to the older in-memory contract tests.
The first web authoring surface already follows the three-pane layout contract with:

View File

@ -199,6 +199,12 @@ The run header itself should also show an aggregated summary:
- total stdout/stderr line counts
- failed task ids when present
V1 run detail should expose direct operator actions:
- `Cancel Run` while queued or running
- `Retry Run` after a run reaches a terminal state
- `Retry Task` on a selected failed or cancelled task
## Screen 6: Explore Workspace
Purpose:

View File

@ -357,6 +357,12 @@ The current runtime also aggregates task execution back onto `workflow_runs`, so
- failed task ids
- derived run duration
The current runtime control loop also mutates these collections in place for retry/cancel operations:
- cancelling a run marks queued and pending `run_tasks` as `cancelled`
- retrying a run creates a new `workflow_runs` document plus a fresh set of `run_tasks`
- retrying a task resets the target node and downstream subtree on the existing run, clears task execution fields, and increments the retried task attempt count
### artifacts
Purpose:

View File

@ -22,6 +22,7 @@
- `2026-03-26`: The current UI/runtime pass turns Runs into a real project-scoped workspace with run history queries, active-run polling, and task artifact links into Explore.
- `2026-03-27`: The current observability pass persists task execution summaries, timestamps, log lines, and result previews on Mongo-backed `run_tasks`, and surfaces those fields in the React run detail view.
- `2026-03-27`: The current follow-up observability pass adds persisted stdout/stderr fields on `run_tasks` plus aggregated run summaries, durations, and task counts on `workflow_runs`.
- `2026-03-27`: The current run-control pass adds run cancellation, run retry from immutable snapshots, and task retry for failed/cancelled nodes with downstream reset semantics.
---