✨ feat: persist task execution summaries and logs
This commit is contained in:
parent
ce7ec0aee4
commit
22efdbcf3b
@ -66,7 +66,7 @@ The local validation path currently used for embodied data testing is:
|
|||||||
|
|
||||||
You can register that directory from the Assets page or via `POST /api/assets/register`.
|
You can register that directory from the Assets page or via `POST /api/assets/register`.
|
||||||
The workflow editor currently requires selecting at least one registered asset before a run can be created.
|
The workflow editor currently requires selecting at least one registered asset before a run can be created.
|
||||||
The Runs workspace now shows project-scoped run history, and each run detail view links task artifacts into Explore.
|
The Runs workspace now shows project-scoped run history, and each run detail view surfaces persisted task summaries, log lines, result previews, and artifact links into Explore.
|
||||||
|
|
||||||
## Repository Structure
|
## Repository Structure
|
||||||
|
|
||||||
|
|||||||
@ -5,9 +5,19 @@ export const runTaskSchemaDefinition = {
|
|||||||
workflowVersionId: { type: "string", required: true },
|
workflowVersionId: { type: "string", required: true },
|
||||||
nodeId: { type: "string", required: true },
|
nodeId: { type: "string", required: true },
|
||||||
nodeType: { type: "string", required: true },
|
nodeType: { type: "string", required: true },
|
||||||
|
executorType: { type: "string", required: true },
|
||||||
status: { type: "string", required: true },
|
status: { type: "string", required: true },
|
||||||
attempt: { type: "number", required: true, default: 1 },
|
attempt: { type: "number", required: true, default: 1 },
|
||||||
inputRefs: { type: "array", required: true, default: [] },
|
assetIds: { type: "array", required: true, default: [] },
|
||||||
outputRefs: { type: "array", required: true, default: [] },
|
upstreamNodeIds: { type: "array", required: true, default: [] },
|
||||||
|
outputArtifactIds: { type: "array", required: true, default: [] },
|
||||||
|
logLines: { type: "array", required: true, default: [] },
|
||||||
|
errorMessage: { type: "string", required: false, default: null },
|
||||||
|
summary: { type: "object", required: false, default: null },
|
||||||
|
lastResultPreview: { type: "object", required: false, default: null },
|
||||||
|
startedAt: { type: "date", required: false, default: null },
|
||||||
|
finishedAt: { type: "date", required: false, default: null },
|
||||||
|
durationMs: { type: "number", required: false, default: null },
|
||||||
createdAt: { type: "date", required: true },
|
createdAt: { type: "date", required: true },
|
||||||
|
updatedAt: { type: "date", required: true },
|
||||||
} as const;
|
} as const;
|
||||||
|
|||||||
@ -5,7 +5,10 @@ import type { Db, Document, WithId } from "mongodb";
|
|||||||
import type { AssetType } from "../../../../packages/contracts/src/domain.ts";
|
import type { AssetType } from "../../../../packages/contracts/src/domain.ts";
|
||||||
import { DELIVERY_NODE_DEFINITIONS } from "../modules/plugins/builtin/delivery-nodes.ts";
|
import { DELIVERY_NODE_DEFINITIONS } from "../modules/plugins/builtin/delivery-nodes.ts";
|
||||||
import { probeLocalSourcePath } from "./local-source-probe.ts";
|
import { probeLocalSourcePath } from "./local-source-probe.ts";
|
||||||
import type { ExecutorType } from "../../../worker/src/contracts/execution-context.ts";
|
import type {
|
||||||
|
ExecutorType,
|
||||||
|
TaskExecutionSummary,
|
||||||
|
} from "../../../worker/src/contracts/execution-context.ts";
|
||||||
|
|
||||||
type Timestamped = {
|
type Timestamped = {
|
||||||
createdAt: string;
|
createdAt: string;
|
||||||
@ -111,6 +114,13 @@ type RunTaskDocument = Timestamped & {
|
|||||||
assetIds: string[];
|
assetIds: string[];
|
||||||
upstreamNodeIds: string[];
|
upstreamNodeIds: string[];
|
||||||
outputArtifactIds: string[];
|
outputArtifactIds: string[];
|
||||||
|
startedAt?: string;
|
||||||
|
finishedAt?: string;
|
||||||
|
durationMs?: number;
|
||||||
|
logLines?: string[];
|
||||||
|
summary?: TaskExecutionSummary;
|
||||||
|
lastResultPreview?: Record<string, unknown>;
|
||||||
|
errorMessage?: string;
|
||||||
};
|
};
|
||||||
|
|
||||||
type ArtifactDocument = Timestamped & {
|
type ArtifactDocument = Timestamped & {
|
||||||
@ -491,6 +501,7 @@ export class MongoAppStore {
|
|||||||
.filter((edge) => edge.to === node.id)
|
.filter((edge) => edge.to === node.id)
|
||||||
.map((edge) => edge.from),
|
.map((edge) => edge.from),
|
||||||
outputArtifactIds: [],
|
outputArtifactIds: [],
|
||||||
|
logLines: [],
|
||||||
createdAt: nowIso(),
|
createdAt: nowIso(),
|
||||||
updatedAt: nowIso(),
|
updatedAt: nowIso(),
|
||||||
}));
|
}));
|
||||||
|
|||||||
@ -5,6 +5,7 @@ import os from "node:os";
|
|||||||
import path from "node:path";
|
import path from "node:path";
|
||||||
|
|
||||||
import { MongoMemoryServer } from "mongodb-memory-server";
|
import { MongoMemoryServer } from "mongodb-memory-server";
|
||||||
|
import { MongoClient } from "mongodb";
|
||||||
|
|
||||||
import { createApiRuntime, type ApiRuntimeConfig } from "../src/runtime/server.ts";
|
import { createApiRuntime, type ApiRuntimeConfig } from "../src/runtime/server.ts";
|
||||||
|
|
||||||
@ -421,3 +422,170 @@ test("mongo-backed runtime lists recent runs for a project", async (t) => {
|
|||||||
assert.deepEqual(runs[0]?.assetIds, [asset._id]);
|
assert.deepEqual(runs[0]?.assetIds, [asset._id]);
|
||||||
assert.equal(runs[0]?.status, "queued");
|
assert.equal(runs[0]?.status, "queued");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test("mongo-backed runtime exposes persisted task execution summaries and logs", async (t) => {
|
||||||
|
const mongod = await MongoMemoryServer.create({
|
||||||
|
instance: {
|
||||||
|
ip: "127.0.0.1",
|
||||||
|
port: 27121,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
t.after(async () => {
|
||||||
|
await mongod.stop();
|
||||||
|
});
|
||||||
|
|
||||||
|
const database = "emboflow-runtime-task-summaries";
|
||||||
|
const server = await startRuntimeServer({
|
||||||
|
host: "127.0.0.1",
|
||||||
|
port: 0,
|
||||||
|
mongoUri: mongod.getUri(),
|
||||||
|
database,
|
||||||
|
corsOrigin: "http://127.0.0.1:3000",
|
||||||
|
});
|
||||||
|
t.after(async () => {
|
||||||
|
await server.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
const bootstrap = await readJson<{
|
||||||
|
workspace: { _id: string };
|
||||||
|
project: { _id: string };
|
||||||
|
}>(
|
||||||
|
await fetch(`${server.baseUrl}/api/dev/bootstrap`, {
|
||||||
|
method: "POST",
|
||||||
|
headers: { "content-type": "application/json" },
|
||||||
|
body: JSON.stringify({ userId: "task-summary-user", projectName: "Task Summary Project" }),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
const workflow = await readJson<{ _id: string }>(
|
||||||
|
await fetch(`${server.baseUrl}/api/workflows`, {
|
||||||
|
method: "POST",
|
||||||
|
headers: { "content-type": "application/json" },
|
||||||
|
body: JSON.stringify({
|
||||||
|
workspaceId: bootstrap.workspace._id,
|
||||||
|
projectId: bootstrap.project._id,
|
||||||
|
name: "Task Summary Flow",
|
||||||
|
}),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
const version = await readJson<{ _id: string }>(
|
||||||
|
await fetch(`${server.baseUrl}/api/workflows/${workflow._id}/versions`, {
|
||||||
|
method: "POST",
|
||||||
|
headers: { "content-type": "application/json" },
|
||||||
|
body: JSON.stringify({
|
||||||
|
visualGraph: { viewport: { x: 0, y: 0, zoom: 1 } },
|
||||||
|
logicGraph: {
|
||||||
|
nodes: [{ id: "source-asset", type: "source" }],
|
||||||
|
edges: [],
|
||||||
|
},
|
||||||
|
runtimeGraph: { selectedPreset: "delivery-normalization" },
|
||||||
|
pluginRefs: ["builtin:delivery-nodes"],
|
||||||
|
}),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
const assetId = "asset-task-summary";
|
||||||
|
const client = new MongoClient(mongod.getUri());
|
||||||
|
await client.connect();
|
||||||
|
t.after(async () => {
|
||||||
|
await client.close();
|
||||||
|
});
|
||||||
|
const db = client.db(database);
|
||||||
|
await db.collection("assets").insertOne({
|
||||||
|
_id: assetId,
|
||||||
|
workspaceId: bootstrap.workspace._id,
|
||||||
|
projectId: bootstrap.project._id,
|
||||||
|
type: "folder",
|
||||||
|
sourceType: "local_path",
|
||||||
|
displayName: "Summary Asset",
|
||||||
|
status: "probed",
|
||||||
|
storageRef: {},
|
||||||
|
topLevelPaths: ["DJI_001"],
|
||||||
|
detectedFormats: ["delivery_package"],
|
||||||
|
fileCount: 1,
|
||||||
|
summary: {},
|
||||||
|
createdBy: "task-summary-user",
|
||||||
|
createdAt: new Date().toISOString(),
|
||||||
|
updatedAt: new Date().toISOString(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const run = await readJson<{ _id: string }>(
|
||||||
|
await fetch(`${server.baseUrl}/api/runs`, {
|
||||||
|
method: "POST",
|
||||||
|
headers: { "content-type": "application/json" },
|
||||||
|
body: JSON.stringify({
|
||||||
|
workflowDefinitionId: workflow._id,
|
||||||
|
workflowVersionId: version._id,
|
||||||
|
assetIds: [assetId],
|
||||||
|
}),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
const [task] = await db
|
||||||
|
.collection("run_tasks")
|
||||||
|
.find({ workflowRunId: run._id })
|
||||||
|
.sort({ createdAt: 1 })
|
||||||
|
.toArray();
|
||||||
|
|
||||||
|
await db.collection("run_tasks").updateOne(
|
||||||
|
{ _id: task?._id },
|
||||||
|
{
|
||||||
|
$set: {
|
||||||
|
status: "success",
|
||||||
|
startedAt: "2026-03-27T10:00:00.000Z",
|
||||||
|
finishedAt: "2026-03-27T10:00:02.500Z",
|
||||||
|
durationMs: 2500,
|
||||||
|
logLines: ["Task claimed by worker", "Executor completed successfully"],
|
||||||
|
summary: {
|
||||||
|
outcome: "success",
|
||||||
|
executorType: "python",
|
||||||
|
assetCount: 1,
|
||||||
|
artifactIds: ["artifact-1"],
|
||||||
|
},
|
||||||
|
lastResultPreview: {
|
||||||
|
taskId: task?._id,
|
||||||
|
executor: "python",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
const tasks = await readJson<
|
||||||
|
Array<{
|
||||||
|
_id: string;
|
||||||
|
status: string;
|
||||||
|
startedAt?: string;
|
||||||
|
finishedAt?: string;
|
||||||
|
durationMs?: number;
|
||||||
|
logLines?: string[];
|
||||||
|
summary?: {
|
||||||
|
outcome?: string;
|
||||||
|
executorType?: string;
|
||||||
|
assetCount?: number;
|
||||||
|
artifactIds?: string[];
|
||||||
|
};
|
||||||
|
lastResultPreview?: {
|
||||||
|
taskId?: string;
|
||||||
|
executor?: string;
|
||||||
|
};
|
||||||
|
}>
|
||||||
|
>(await fetch(`${server.baseUrl}/api/runs/${run._id}/tasks`));
|
||||||
|
|
||||||
|
assert.equal(tasks.length, 1);
|
||||||
|
assert.equal(tasks[0]?._id, task?._id);
|
||||||
|
assert.equal(tasks[0]?.status, "success");
|
||||||
|
assert.equal(tasks[0]?.durationMs, 2500);
|
||||||
|
assert.deepEqual(tasks[0]?.logLines, [
|
||||||
|
"Task claimed by worker",
|
||||||
|
"Executor completed successfully",
|
||||||
|
]);
|
||||||
|
assert.equal(tasks[0]?.summary?.outcome, "success");
|
||||||
|
assert.equal(tasks[0]?.summary?.executorType, "python");
|
||||||
|
assert.equal(tasks[0]?.summary?.assetCount, 1);
|
||||||
|
assert.deepEqual(tasks[0]?.summary?.artifactIds, ["artifact-1"]);
|
||||||
|
assert.deepEqual(tasks[0]?.lastResultPreview, {
|
||||||
|
taskId: task?._id,
|
||||||
|
executor: "python",
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|||||||
@ -5,6 +5,9 @@ export type RunTaskView = {
|
|||||||
status: string;
|
status: string;
|
||||||
assetIds?: string[];
|
assetIds?: string[];
|
||||||
artifactIds?: string[];
|
artifactIds?: string[];
|
||||||
|
durationMs?: number;
|
||||||
|
summaryLabel?: string;
|
||||||
|
errorMessage?: string;
|
||||||
logLines: string[];
|
logLines: string[];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -15,6 +15,9 @@ export function renderTaskLogPanel(
|
|||||||
<h2>${task.nodeName}</h2>
|
<h2>${task.nodeName}</h2>
|
||||||
<p>Status: ${task.status}</p>
|
<p>Status: ${task.status}</p>
|
||||||
<p>Input assets: ${(task.assetIds ?? []).join(", ") || "none"}</p>
|
<p>Input assets: ${(task.assetIds ?? []).join(", ") || "none"}</p>
|
||||||
|
<p>Duration: ${typeof task.durationMs === "number" ? `${task.durationMs} ms` : "n/a"}</p>
|
||||||
|
${task.summaryLabel ? `<p>Summary: ${task.summaryLabel}</p>` : ""}
|
||||||
|
${task.errorMessage ? `<p>Error: ${task.errorMessage}</p>` : ""}
|
||||||
<p>Artifacts: ${(task.artifactIds ?? []).length}</p>
|
<p>Artifacts: ${(task.artifactIds ?? []).length}</p>
|
||||||
${
|
${
|
||||||
(task.artifactIds ?? []).length > 0
|
(task.artifactIds ?? []).length > 0
|
||||||
|
|||||||
@ -46,6 +46,8 @@ test("run detail view shows node status badges from run data", () => {
|
|||||||
status: "success",
|
status: "success",
|
||||||
assetIds: ["asset-1"],
|
assetIds: ["asset-1"],
|
||||||
artifactIds: ["artifact-1"],
|
artifactIds: ["artifact-1"],
|
||||||
|
durationMs: 1200,
|
||||||
|
summaryLabel: "Processed 1 asset",
|
||||||
logLines: ["Asset loaded"],
|
logLines: ["Asset loaded"],
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -55,6 +57,8 @@ test("run detail view shows node status badges from run data", () => {
|
|||||||
status: "running",
|
status: "running",
|
||||||
assetIds: ["asset-1"],
|
assetIds: ["asset-1"],
|
||||||
artifactIds: ["artifact-2"],
|
artifactIds: ["artifact-2"],
|
||||||
|
durationMs: 2450,
|
||||||
|
summaryLabel: "Validated delivery package structure",
|
||||||
logLines: ["Checking metadata"],
|
logLines: ["Checking metadata"],
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
@ -67,6 +71,8 @@ test("run detail view shows node status badges from run data", () => {
|
|||||||
assert.match(html, /running/);
|
assert.match(html, /running/);
|
||||||
assert.match(html, /Checking metadata/);
|
assert.match(html, /Checking metadata/);
|
||||||
assert.match(html, /Input assets: asset-1/);
|
assert.match(html, /Input assets: asset-1/);
|
||||||
|
assert.match(html, /Duration: 2450 ms/);
|
||||||
|
assert.match(html, /Validated delivery package structure/);
|
||||||
assert.match(html, /\/explore\/artifact-2/);
|
assert.match(html, /\/explore\/artifact-2/);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@ -23,6 +23,17 @@ type AppProps = {
|
|||||||
apiBaseUrl: string;
|
apiBaseUrl: string;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
function formatTaskSummary(task: any) {
|
||||||
|
if (task?.summary?.errorMessage) {
|
||||||
|
return task.summary.errorMessage;
|
||||||
|
}
|
||||||
|
const outcome = task?.summary?.outcome ?? task?.status ?? "unknown";
|
||||||
|
const executor = task?.summary?.executorType ?? task?.executorType ?? "unknown";
|
||||||
|
const assetCount = task?.summary?.assetCount ?? task?.assetIds?.length ?? 0;
|
||||||
|
const artifactCount = task?.summary?.artifactIds?.length ?? task?.outputArtifactIds?.length ?? 0;
|
||||||
|
return `${outcome} via ${executor}; assets ${assetCount}; artifacts ${artifactCount}`;
|
||||||
|
}
|
||||||
|
|
||||||
function usePathname() {
|
function usePathname() {
|
||||||
const [pathname, setPathname] = useState(
|
const [pathname, setPathname] = useState(
|
||||||
typeof window === "undefined" ? "/assets" : window.location.pathname || "/assets",
|
typeof window === "undefined" ? "/assets" : window.location.pathname || "/assets",
|
||||||
@ -729,7 +740,18 @@ function RunDetailPage(props: {
|
|||||||
<>
|
<>
|
||||||
<p>Node: {selectedTask.nodeId}</p>
|
<p>Node: {selectedTask.nodeId}</p>
|
||||||
<p>Status: {selectedTask.status}</p>
|
<p>Status: {selectedTask.status}</p>
|
||||||
|
<p>Executor: {selectedTask.executorType}</p>
|
||||||
<p>Input assets: {(selectedTask.assetIds ?? []).join(", ") || "none"}</p>
|
<p>Input assets: {(selectedTask.assetIds ?? []).join(", ") || "none"}</p>
|
||||||
|
<p>Started at: {selectedTask.startedAt ?? "n/a"}</p>
|
||||||
|
<p>Finished at: {selectedTask.finishedAt ?? "n/a"}</p>
|
||||||
|
<p>
|
||||||
|
Duration:{" "}
|
||||||
|
{typeof selectedTask.durationMs === "number"
|
||||||
|
? `${selectedTask.durationMs} ms`
|
||||||
|
: "n/a"}
|
||||||
|
</p>
|
||||||
|
<p>Summary: {formatTaskSummary(selectedTask)}</p>
|
||||||
|
{selectedTask.errorMessage ? <p>Error: {selectedTask.errorMessage}</p> : null}
|
||||||
<p>Artifacts: {artifacts.length}</p>
|
<p>Artifacts: {artifacts.length}</p>
|
||||||
{artifacts.length > 0 ? (
|
{artifacts.length > 0 ? (
|
||||||
<ul>
|
<ul>
|
||||||
@ -742,6 +764,28 @@ function RunDetailPage(props: {
|
|||||||
) : (
|
) : (
|
||||||
<p className="empty-state">No task artifacts yet.</p>
|
<p className="empty-state">No task artifacts yet.</p>
|
||||||
)}
|
)}
|
||||||
|
<div className="page-stack">
|
||||||
|
<div>
|
||||||
|
<strong>Execution Log</strong>
|
||||||
|
{(selectedTask.logLines ?? []).length > 0 ? (
|
||||||
|
<ul>
|
||||||
|
{(selectedTask.logLines ?? []).map((line: string, index: number) => (
|
||||||
|
<li key={`${selectedTask._id}-log-${index}`}>{line}</li>
|
||||||
|
))}
|
||||||
|
</ul>
|
||||||
|
) : (
|
||||||
|
<p className="empty-state">No task logs yet.</p>
|
||||||
|
)}
|
||||||
|
</div>
|
||||||
|
{selectedTask.lastResultPreview ? (
|
||||||
|
<div>
|
||||||
|
<strong>Result Preview</strong>
|
||||||
|
<pre className="mono-block">
|
||||||
|
{JSON.stringify(selectedTask.lastResultPreview, null, 2)}
|
||||||
|
</pre>
|
||||||
|
</div>
|
||||||
|
) : null}
|
||||||
|
</div>
|
||||||
<pre className="mono-block">{JSON.stringify(selectedTask, null, 2)}</pre>
|
<pre className="mono-block">{JSON.stringify(selectedTask, null, 2)}</pre>
|
||||||
</>
|
</>
|
||||||
) : (
|
) : (
|
||||||
|
|||||||
@ -1,6 +1,14 @@
|
|||||||
export type ExecutorType = "python" | "docker" | "http";
|
export type ExecutorType = "python" | "docker" | "http";
|
||||||
export type TaskStatus = "pending" | "queued" | "running" | "success" | "failed";
|
export type TaskStatus = "pending" | "queued" | "running" | "success" | "failed";
|
||||||
|
|
||||||
|
export type TaskExecutionSummary = {
|
||||||
|
outcome: "success" | "failed";
|
||||||
|
executorType: ExecutorType;
|
||||||
|
assetCount: number;
|
||||||
|
artifactIds: string[];
|
||||||
|
errorMessage?: string;
|
||||||
|
};
|
||||||
|
|
||||||
export type TaskRecord = {
|
export type TaskRecord = {
|
||||||
id: string;
|
id: string;
|
||||||
workflowRunId?: string;
|
workflowRunId?: string;
|
||||||
@ -14,6 +22,12 @@ export type TaskRecord = {
|
|||||||
upstreamNodeIds?: string[];
|
upstreamNodeIds?: string[];
|
||||||
outputArtifactIds?: string[];
|
outputArtifactIds?: string[];
|
||||||
errorMessage?: string;
|
errorMessage?: string;
|
||||||
|
startedAt?: string;
|
||||||
|
finishedAt?: string;
|
||||||
|
durationMs?: number;
|
||||||
|
logLines?: string[];
|
||||||
|
summary?: TaskExecutionSummary;
|
||||||
|
lastResultPreview?: Record<string, unknown>;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type ExecutionContext = {
|
export type ExecutionContext = {
|
||||||
|
|||||||
@ -2,7 +2,12 @@ import { randomUUID } from "node:crypto";
|
|||||||
|
|
||||||
import type { Db } from "mongodb";
|
import type { Db } from "mongodb";
|
||||||
|
|
||||||
import type { ExecutorType, TaskRecord, TaskStatus } from "../contracts/execution-context.ts";
|
import type {
|
||||||
|
ExecutorType,
|
||||||
|
TaskExecutionSummary,
|
||||||
|
TaskRecord,
|
||||||
|
TaskStatus,
|
||||||
|
} from "../contracts/execution-context.ts";
|
||||||
|
|
||||||
type WorkflowRunDocument = {
|
type WorkflowRunDocument = {
|
||||||
_id: string;
|
_id: string;
|
||||||
@ -38,6 +43,12 @@ type RunTaskDocument = {
|
|||||||
upstreamNodeIds: string[];
|
upstreamNodeIds: string[];
|
||||||
outputArtifactIds: string[];
|
outputArtifactIds: string[];
|
||||||
errorMessage?: string;
|
errorMessage?: string;
|
||||||
|
startedAt?: string;
|
||||||
|
finishedAt?: string;
|
||||||
|
durationMs?: number;
|
||||||
|
logLines?: string[];
|
||||||
|
summary?: TaskExecutionSummary;
|
||||||
|
lastResultPreview?: Record<string, unknown>;
|
||||||
createdAt: string;
|
createdAt: string;
|
||||||
updatedAt: string;
|
updatedAt: string;
|
||||||
};
|
};
|
||||||
@ -60,6 +71,12 @@ function toTaskRecord(task: RunTaskDocument): TaskRecord {
|
|||||||
upstreamNodeIds: task.upstreamNodeIds,
|
upstreamNodeIds: task.upstreamNodeIds,
|
||||||
outputArtifactIds: task.outputArtifactIds,
|
outputArtifactIds: task.outputArtifactIds,
|
||||||
errorMessage: task.errorMessage,
|
errorMessage: task.errorMessage,
|
||||||
|
startedAt: task.startedAt,
|
||||||
|
finishedAt: task.finishedAt,
|
||||||
|
durationMs: task.durationMs,
|
||||||
|
logLines: task.logLines ?? [],
|
||||||
|
summary: task.summary,
|
||||||
|
lastResultPreview: task.lastResultPreview,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -71,12 +88,19 @@ export class MongoWorkerStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async claimNextQueuedTask(): Promise<TaskRecord | undefined> {
|
async claimNextQueuedTask(): Promise<TaskRecord | undefined> {
|
||||||
|
const startedAt = nowIso();
|
||||||
const task = await this.db.collection<RunTaskDocument>("run_tasks").findOneAndUpdate(
|
const task = await this.db.collection<RunTaskDocument>("run_tasks").findOneAndUpdate(
|
||||||
{ status: "queued" },
|
{ status: "queued" },
|
||||||
{
|
{
|
||||||
$set: {
|
$set: {
|
||||||
status: "running",
|
status: "running",
|
||||||
updatedAt: nowIso(),
|
startedAt,
|
||||||
|
updatedAt: startedAt,
|
||||||
|
},
|
||||||
|
$push: {
|
||||||
|
logLines: {
|
||||||
|
$each: ["Task claimed by worker"],
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -143,26 +167,61 @@ export class MongoWorkerStore {
|
|||||||
return artifact;
|
return artifact;
|
||||||
}
|
}
|
||||||
|
|
||||||
async markTaskSuccess(taskId: string) {
|
async markTaskSuccess(
|
||||||
|
taskId: string,
|
||||||
|
input: {
|
||||||
|
finishedAt: string;
|
||||||
|
durationMs: number;
|
||||||
|
summary: TaskExecutionSummary;
|
||||||
|
logLine: string;
|
||||||
|
lastResultPreview?: Record<string, unknown>;
|
||||||
|
},
|
||||||
|
) {
|
||||||
await this.db.collection<RunTaskDocument>("run_tasks").updateOne(
|
await this.db.collection<RunTaskDocument>("run_tasks").updateOne(
|
||||||
{ _id: taskId },
|
{ _id: taskId },
|
||||||
{
|
{
|
||||||
$set: {
|
$set: {
|
||||||
status: "success",
|
status: "success",
|
||||||
updatedAt: nowIso(),
|
finishedAt: input.finishedAt,
|
||||||
|
durationMs: input.durationMs,
|
||||||
|
summary: input.summary,
|
||||||
|
lastResultPreview: input.lastResultPreview,
|
||||||
|
updatedAt: input.finishedAt,
|
||||||
|
},
|
||||||
|
$push: {
|
||||||
|
logLines: {
|
||||||
|
$each: [input.logLine],
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
async markTaskFailed(taskId: string, errorMessage: string) {
|
async markTaskFailed(
|
||||||
|
taskId: string,
|
||||||
|
errorMessage: string,
|
||||||
|
input: {
|
||||||
|
finishedAt: string;
|
||||||
|
durationMs: number;
|
||||||
|
summary: TaskExecutionSummary;
|
||||||
|
logLine: string;
|
||||||
|
},
|
||||||
|
) {
|
||||||
await this.db.collection<RunTaskDocument>("run_tasks").updateOne(
|
await this.db.collection<RunTaskDocument>("run_tasks").updateOne(
|
||||||
{ _id: taskId },
|
{ _id: taskId },
|
||||||
{
|
{
|
||||||
$set: {
|
$set: {
|
||||||
status: "failed",
|
status: "failed",
|
||||||
errorMessage,
|
errorMessage,
|
||||||
updatedAt: nowIso(),
|
finishedAt: input.finishedAt,
|
||||||
|
durationMs: input.durationMs,
|
||||||
|
summary: input.summary,
|
||||||
|
updatedAt: input.finishedAt,
|
||||||
|
},
|
||||||
|
$push: {
|
||||||
|
logLines: {
|
||||||
|
$each: [input.logLine],
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|||||||
@ -1,7 +1,12 @@
|
|||||||
import { DockerExecutor } from "../executors/docker-executor.ts";
|
import { DockerExecutor } from "../executors/docker-executor.ts";
|
||||||
import { HttpExecutor } from "../executors/http-executor.ts";
|
import { HttpExecutor } from "../executors/http-executor.ts";
|
||||||
import { PythonExecutor } from "../executors/python-executor.ts";
|
import { PythonExecutor } from "../executors/python-executor.ts";
|
||||||
import type { ExecutionContext, ExecutorType, TaskRecord } from "../contracts/execution-context.ts";
|
import type {
|
||||||
|
ExecutionContext,
|
||||||
|
ExecutorType,
|
||||||
|
TaskExecutionSummary,
|
||||||
|
TaskRecord,
|
||||||
|
} from "../contracts/execution-context.ts";
|
||||||
import { MongoWorkerStore } from "./mongo-worker-store.ts";
|
import { MongoWorkerStore } from "./mongo-worker-store.ts";
|
||||||
|
|
||||||
type ExecutorMap = {
|
type ExecutorMap = {
|
||||||
@ -31,6 +36,7 @@ export class WorkerRuntime {
|
|||||||
if (!task) {
|
if (!task) {
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
const startedAt = task.startedAt ?? new Date().toISOString();
|
||||||
|
|
||||||
const context: ExecutionContext = {
|
const context: ExecutionContext = {
|
||||||
taskId: task.id,
|
taskId: task.id,
|
||||||
@ -42,14 +48,27 @@ export class WorkerRuntime {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
const result = await this.executors[task.executorType as ExecutorType].execute(task, context);
|
const result = await this.executors[task.executorType as ExecutorType].execute(task, context);
|
||||||
await this.store.createTaskArtifact(task, {
|
const artifact = await this.store.createTaskArtifact(task, {
|
||||||
nodeId: task.nodeId,
|
nodeId: task.nodeId,
|
||||||
nodeType: task.nodeType,
|
nodeType: task.nodeType,
|
||||||
executorType: task.executorType,
|
executorType: task.executorType,
|
||||||
assetIds: task.assetIds,
|
assetIds: task.assetIds,
|
||||||
result,
|
result,
|
||||||
});
|
});
|
||||||
await this.store.markTaskSuccess(task.id);
|
const finishedAt = new Date().toISOString();
|
||||||
|
const summary: TaskExecutionSummary = {
|
||||||
|
outcome: "success",
|
||||||
|
executorType: task.executorType,
|
||||||
|
assetCount: task.assetIds?.length ?? 0,
|
||||||
|
artifactIds: [artifact._id],
|
||||||
|
};
|
||||||
|
await this.store.markTaskSuccess(task.id, {
|
||||||
|
finishedAt,
|
||||||
|
durationMs: this.computeDurationMs(startedAt, finishedAt),
|
||||||
|
summary,
|
||||||
|
logLine: "Executor completed successfully",
|
||||||
|
lastResultPreview: this.createResultPreview(result),
|
||||||
|
});
|
||||||
if (task.workflowRunId) {
|
if (task.workflowRunId) {
|
||||||
await this.store.queueReadyDependents(task.workflowRunId);
|
await this.store.queueReadyDependents(task.workflowRunId);
|
||||||
await this.store.refreshRunStatus(task.workflowRunId);
|
await this.store.refreshRunStatus(task.workflowRunId);
|
||||||
@ -57,11 +76,39 @@ export class WorkerRuntime {
|
|||||||
return this.store.getRunTask(task.id) ?? { ...task, status: "success" };
|
return this.store.getRunTask(task.id) ?? { ...task, status: "success" };
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
const message = error instanceof Error ? error.message : "worker execution failed";
|
const message = error instanceof Error ? error.message : "worker execution failed";
|
||||||
await this.store.markTaskFailed(task.id, message);
|
const finishedAt = new Date().toISOString();
|
||||||
|
const summary: TaskExecutionSummary = {
|
||||||
|
outcome: "failed",
|
||||||
|
executorType: task.executorType,
|
||||||
|
assetCount: task.assetIds?.length ?? 0,
|
||||||
|
artifactIds: [],
|
||||||
|
errorMessage: message,
|
||||||
|
};
|
||||||
|
await this.store.markTaskFailed(task.id, message, {
|
||||||
|
finishedAt,
|
||||||
|
durationMs: this.computeDurationMs(startedAt, finishedAt),
|
||||||
|
summary,
|
||||||
|
logLine: `Execution failed: ${message}`,
|
||||||
|
});
|
||||||
if (task.workflowRunId) {
|
if (task.workflowRunId) {
|
||||||
await this.store.refreshRunStatus(task.workflowRunId);
|
await this.store.refreshRunStatus(task.workflowRunId);
|
||||||
}
|
}
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private computeDurationMs(startedAt: string, finishedAt: string) {
|
||||||
|
const duration = Date.parse(finishedAt) - Date.parse(startedAt);
|
||||||
|
return Number.isFinite(duration) && duration >= 0 ? duration : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private createResultPreview(result: unknown): Record<string, unknown> | undefined {
|
||||||
|
if (result && typeof result === "object" && !Array.isArray(result)) {
|
||||||
|
return result as Record<string, unknown>;
|
||||||
|
}
|
||||||
|
if (typeof result === "string" || typeof result === "number" || typeof result === "boolean") {
|
||||||
|
return { value: result };
|
||||||
|
}
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -186,6 +186,17 @@ test("worker marks the run successful after the final queued task completes", as
|
|||||||
|
|
||||||
assert.equal(task?.status, "success");
|
assert.equal(task?.status, "success");
|
||||||
assert.equal(run?.status, "success");
|
assert.equal(run?.status, "success");
|
||||||
|
assert.ok(typeof task?.startedAt === "string");
|
||||||
|
assert.ok(typeof task?.finishedAt === "string");
|
||||||
|
assert.ok(typeof task?.durationMs === "number");
|
||||||
|
assert.equal(task?.summary?.outcome, "success");
|
||||||
|
assert.equal(task?.summary?.executorType, "python");
|
||||||
|
assert.match(task?.logLines?.[0] ?? "", /claimed/i);
|
||||||
|
assert.match(task?.logLines?.at(-1) ?? "", /completed/i);
|
||||||
|
assert.deepEqual(task?.lastResultPreview, {
|
||||||
|
taskId: "task-export",
|
||||||
|
executor: "python",
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
test("worker passes bound asset ids into the execution context and task artifacts", async (t) => {
|
test("worker passes bound asset ids into the execution context and task artifacts", async (t) => {
|
||||||
@ -248,4 +259,66 @@ test("worker passes bound asset ids into the execution context and task artifact
|
|||||||
(artifact?.payload as { assetIds?: string[] } | undefined)?.assetIds,
|
(artifact?.payload as { assetIds?: string[] } | undefined)?.assetIds,
|
||||||
["asset-42"],
|
["asset-42"],
|
||||||
);
|
);
|
||||||
|
assert.equal(storedTask?.summary?.outcome, "success");
|
||||||
|
assert.equal(storedTask?.summary?.assetCount, 1);
|
||||||
|
assert.deepEqual(storedTask?.summary?.artifactIds, [artifact?._id]);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("worker persists failure summaries and task log lines when execution throws", async (t) => {
|
||||||
|
const fixture = await createRuntimeFixture("emboflow-worker-failure-summary", {
|
||||||
|
executors: {
|
||||||
|
python: {
|
||||||
|
async execute() {
|
||||||
|
throw new Error("intentional executor failure");
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
t.after(async () => {
|
||||||
|
await fixture.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
await fixture.db.collection("workflow_runs").insertOne({
|
||||||
|
_id: "run-failure",
|
||||||
|
workflowDefinitionId: "workflow-failure",
|
||||||
|
workflowVersionId: "workflow-failure-v1",
|
||||||
|
status: "queued",
|
||||||
|
triggeredBy: "local-user",
|
||||||
|
assetIds: ["asset-9"],
|
||||||
|
createdAt: new Date().toISOString(),
|
||||||
|
updatedAt: new Date().toISOString(),
|
||||||
|
});
|
||||||
|
|
||||||
|
await fixture.db.collection("run_tasks").insertOne({
|
||||||
|
_id: "task-failure",
|
||||||
|
workflowRunId: "run-failure",
|
||||||
|
workflowVersionId: "workflow-failure-v1",
|
||||||
|
nodeId: "validate-structure",
|
||||||
|
nodeType: "inspect",
|
||||||
|
executorType: "python",
|
||||||
|
status: "queued",
|
||||||
|
attempt: 1,
|
||||||
|
assetIds: ["asset-9"],
|
||||||
|
upstreamNodeIds: [],
|
||||||
|
outputArtifactIds: [],
|
||||||
|
createdAt: new Date().toISOString(),
|
||||||
|
updatedAt: new Date().toISOString(),
|
||||||
|
});
|
||||||
|
|
||||||
|
await assert.rejects(() => fixture.runtime.runNextTask(), /intentional executor failure/);
|
||||||
|
|
||||||
|
const task = await fixture.db.collection("run_tasks").findOne({ _id: "task-failure" });
|
||||||
|
const run = await fixture.store.getRun("run-failure");
|
||||||
|
|
||||||
|
assert.equal(task?.status, "failed");
|
||||||
|
assert.equal(run?.status, "failed");
|
||||||
|
assert.equal(task?.errorMessage, "intentional executor failure");
|
||||||
|
assert.equal(task?.summary?.outcome, "failed");
|
||||||
|
assert.equal(task?.summary?.executorType, "python");
|
||||||
|
assert.equal(task?.summary?.artifactIds?.length ?? 0, 0);
|
||||||
|
assert.ok(typeof task?.startedAt === "string");
|
||||||
|
assert.ok(typeof task?.finishedAt === "string");
|
||||||
|
assert.ok(typeof task?.durationMs === "number");
|
||||||
|
assert.match(task?.logLines?.[0] ?? "", /claimed/i);
|
||||||
|
assert.match(task?.logLines?.at(-1) ?? "", /intentional executor failure/i);
|
||||||
});
|
});
|
||||||
|
|||||||
@ -283,6 +283,16 @@ The React workflow editor now loads the latest persisted version from the Mongo-
|
|||||||
|
|
||||||
The runtime Runs workspace now loads recent runs for the active project. Run detail views poll active runs until they settle and let the operator inspect task-level artifacts directly through Explore links.
|
The runtime Runs workspace now loads recent runs for the active project. Run detail views poll active runs until they settle and let the operator inspect task-level artifacts directly through Explore links.
|
||||||
|
|
||||||
|
The worker-backed runtime now persists task execution summaries directly on `run_tasks` instead of treating artifacts as the only observable output. Each completed or failed task records:
|
||||||
|
|
||||||
|
- `startedAt` and `finishedAt`
|
||||||
|
- `durationMs`
|
||||||
|
- appended `logLines`
|
||||||
|
- structured `summary` with outcome, executor, asset count, artifact ids, and failure text when present
|
||||||
|
- `lastResultPreview` for a lightweight selected-task preview in the Runs workspace
|
||||||
|
|
||||||
|
This makes the run detail view stable even when artifacts are large or delayed and keeps task-level observability queryable without reopening every artifact payload.
|
||||||
|
|
||||||
The API and worker runtimes now both have direct integration coverage against a real Mongo runtime through `mongodb-memory-server`, in addition to the older in-memory contract tests.
|
The API and worker runtimes now both have direct integration coverage against a real Mongo runtime through `mongodb-memory-server`, in addition to the older in-memory contract tests.
|
||||||
|
|
||||||
The first web authoring surface already follows the three-pane layout contract with:
|
The first web authoring surface already follows the three-pane layout contract with:
|
||||||
|
|||||||
@ -182,6 +182,15 @@ Recommended layout:
|
|||||||
- center: workflow graph with execution overlays
|
- center: workflow graph with execution overlays
|
||||||
- bottom or side drawer: logs and artifacts for selected node
|
- bottom or side drawer: logs and artifacts for selected node
|
||||||
|
|
||||||
|
V1 run detail should render the selected task as a stable operations panel, not just a raw document dump. The panel should show:
|
||||||
|
|
||||||
|
- task status and executor
|
||||||
|
- task duration and timestamps
|
||||||
|
- a concise execution summary
|
||||||
|
- appended log lines
|
||||||
|
- artifact links into Explore
|
||||||
|
- a lightweight result preview for quick inspection
|
||||||
|
|
||||||
## Screen 6: Explore Workspace
|
## Screen 6: Explore Workspace
|
||||||
|
|
||||||
Purpose:
|
Purpose:
|
||||||
|
|||||||
@ -324,9 +324,13 @@ Core fields:
|
|||||||
- `logRef`
|
- `logRef`
|
||||||
- `cacheKey`
|
- `cacheKey`
|
||||||
- `cacheHit`
|
- `cacheHit`
|
||||||
|
- `logLines`
|
||||||
- `errorMessage`
|
- `errorMessage`
|
||||||
|
- `summary`
|
||||||
|
- `lastResultPreview`
|
||||||
- `startedAt`
|
- `startedAt`
|
||||||
- `finishedAt`
|
- `finishedAt`
|
||||||
|
- `durationMs`
|
||||||
- `createdAt`
|
- `createdAt`
|
||||||
|
|
||||||
This collection should remain separate from `workflow_runs` because task volume grows quickly.
|
This collection should remain separate from `workflow_runs` because task volume grows quickly.
|
||||||
@ -338,6 +342,8 @@ The current executable worker path expects `run_tasks` to be self-sufficient eno
|
|||||||
- upstream node dependencies
|
- upstream node dependencies
|
||||||
- produced artifact ids
|
- produced artifact ids
|
||||||
- per-task status and error message
|
- per-task status and error message
|
||||||
|
- task log lines and result preview
|
||||||
|
- structured task summaries with executor, outcome, asset count, and artifact ids
|
||||||
|
|
||||||
### artifacts
|
### artifacts
|
||||||
|
|
||||||
|
|||||||
@ -20,6 +20,7 @@
|
|||||||
- `2026-03-26`: The follow-up runtime pass adds Mongo-backed HTTP integration tests and converts the workflow editor from a fixed sample graph to a persisted draft-and-version model.
|
- `2026-03-26`: The follow-up runtime pass adds Mongo-backed HTTP integration tests and converts the workflow editor from a fixed sample graph to a persisted draft-and-version model.
|
||||||
- `2026-03-26`: The current runtime pass binds workflow runs to registered project assets so run snapshots, run tasks, worker execution context, and the editor all agree on the concrete input asset being processed.
|
- `2026-03-26`: The current runtime pass binds workflow runs to registered project assets so run snapshots, run tasks, worker execution context, and the editor all agree on the concrete input asset being processed.
|
||||||
- `2026-03-26`: The current UI/runtime pass turns Runs into a real project-scoped workspace with run history queries, active-run polling, and task artifact links into Explore.
|
- `2026-03-26`: The current UI/runtime pass turns Runs into a real project-scoped workspace with run history queries, active-run polling, and task artifact links into Explore.
|
||||||
|
- `2026-03-27`: The current observability pass persists task execution summaries, timestamps, log lines, and result previews on Mongo-backed `run_tasks`, and surfaces those fields in the React run detail view.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user