EmboFlow/apps/worker/test/task-runner.spec.ts

89 lines
2.2 KiB
TypeScript

import test from "node:test";
import assert from "node:assert/strict";
import { TaskRunner } from "../src/runner/task-runner.ts";
import { LocalScheduler } from "../src/scheduler/local-scheduler.ts";
import { PythonExecutor } from "../src/executors/python-executor.ts";
import { DockerExecutor } from "../src/executors/docker-executor.ts";
import { HttpExecutor } from "../src/executors/http-executor.ts";
test("worker loads pending tasks", async () => {
const scheduler = new LocalScheduler([
{
id: "task-1",
nodeId: "source",
executorType: "python",
status: "pending",
},
]);
const runner = new TaskRunner({
scheduler,
executors: {
python: new PythonExecutor(),
docker: new DockerExecutor(),
http: new HttpExecutor(),
},
});
const task = await runner.runNextTask();
assert.equal(task?.id, "task-1");
assert.equal(task?.status, "success");
});
test("worker marks task running then success", async () => {
const scheduler = new LocalScheduler([
{
id: "task-2",
nodeId: "transform",
executorType: "docker",
status: "pending",
},
]);
const runner = new TaskRunner({
scheduler,
executors: {
python: new PythonExecutor(),
docker: new DockerExecutor(),
http: new HttpExecutor(),
},
});
await runner.runNextTask();
const timeline = scheduler.getStatusTimeline("task-2");
assert.deepEqual(timeline, ["pending", "running", "success"]);
});
test("worker chooses executor based on runtime config", async () => {
const scheduler = new LocalScheduler([
{
id: "task-3",
nodeId: "export",
executorType: "http",
status: "pending",
},
]);
const pythonExecutor = new PythonExecutor();
const dockerExecutor = new DockerExecutor();
const httpExecutor = new HttpExecutor();
const runner = new TaskRunner({
scheduler,
executors: {
python: pythonExecutor,
docker: dockerExecutor,
http: httpExecutor,
},
});
await runner.runNextTask();
assert.equal(pythonExecutor.executionCount, 0);
assert.equal(dockerExecutor.executionCount, 0);
assert.equal(httpExecutor.executionCount, 1);
});