344 lines
9.4 KiB
TypeScript

import process from "node:process";
import cors from "cors";
import express from "express";
import { MongoClient } from "mongodb";
import { createMongoConnectionUri } from "../common/mongo/mongo.module.ts";
import { MongoAppStore } from "./mongo-store.ts";
import { detectAssetShapeFromLocalPath } from "./local-source-probe.ts";
export type ApiRuntimeConfig = {
host: string;
port: number;
mongoUri: string;
database: string;
corsOrigin: string;
};
export function resolveApiRuntimeConfig(
env: NodeJS.ProcessEnv = process.env,
): ApiRuntimeConfig {
const mongoUri =
env.MONGO_URI ??
createMongoConnectionUri({
username: env.MONGO_ROOT_USERNAME ?? "emboflow",
password: env.MONGO_ROOT_PASSWORD ?? "emboflow",
host: env.MONGO_HOST ?? "127.0.0.1",
port: Number(env.MONGO_PORT ?? 27017),
database: env.MONGO_DB ?? "emboflow",
});
return {
host: env.API_HOST ?? "127.0.0.1",
port: Number(env.API_PORT ?? 3001),
mongoUri,
database: env.MONGO_DB ?? "emboflow",
corsOrigin: env.CORS_ORIGIN ?? "http://127.0.0.1:3000",
};
}
export async function createApiRuntime(config = resolveApiRuntimeConfig()) {
const client = new MongoClient(config.mongoUri);
await client.connect();
const db = client.db(config.database);
const store = new MongoAppStore(db);
const app = express();
app.use(cors({ origin: config.corsOrigin, credentials: true }));
app.use(express.json());
app.get("/api/health", async (_request, response) => {
await db.command({ ping: 1 });
response.json({ status: "ok" });
});
app.post("/api/dev/bootstrap", async (request, response, next) => {
try {
const result = await store.bootstrapDevContext(request.body ?? {});
response.json(result);
} catch (error) {
next(error);
}
});
app.get("/api/workspaces", async (request, response, next) => {
try {
const ownerId = String(request.query.ownerId ?? "local-user");
response.json(await store.listWorkspaces(ownerId));
} catch (error) {
next(error);
}
});
app.post("/api/projects", async (request, response, next) => {
try {
response.json(
await store.createProject({
workspaceId: request.body.workspaceId,
name: request.body.name,
description: request.body.description,
createdBy: request.body.createdBy ?? "local-user",
}),
);
} catch (error) {
next(error);
}
});
app.get("/api/projects", async (request, response, next) => {
try {
response.json(await store.listProjects(String(request.query.workspaceId)));
} catch (error) {
next(error);
}
});
app.post("/api/assets/register", async (request, response, next) => {
try {
const sourcePath = request.body.sourcePath as string | undefined;
const inferred = sourcePath
? detectAssetShapeFromLocalPath(sourcePath)
: undefined;
response.json(
await store.registerAsset({
workspaceId: request.body.workspaceId,
projectId: request.body.projectId,
type: request.body.type ?? inferred?.type ?? "folder",
sourceType: request.body.sourceType ?? inferred?.sourceType ?? "upload",
displayName: request.body.displayName ?? inferred?.displayName ?? "asset",
sourcePath,
createdBy: request.body.createdBy ?? "local-user",
}),
);
} catch (error) {
next(error);
}
});
app.get("/api/assets", async (request, response, next) => {
try {
response.json(await store.listAssets(String(request.query.projectId)));
} catch (error) {
next(error);
}
});
app.get("/api/assets/:assetId", async (request, response, next) => {
try {
const asset = await store.getAsset(request.params.assetId);
if (!asset) {
response.status(404).json({ message: "asset not found" });
return;
}
response.json(asset);
} catch (error) {
next(error);
}
});
app.post("/api/assets/:assetId/probe", async (request, response, next) => {
try {
response.json(await store.probeAsset(request.params.assetId));
} catch (error) {
next(error);
}
});
app.get("/api/assets/:assetId/probe-report", async (request, response, next) => {
try {
const report = await store.getLatestProbeReport(request.params.assetId);
if (!report) {
response.status(404).json({ message: "probe report not found" });
return;
}
response.json(report);
} catch (error) {
next(error);
}
});
app.get("/api/node-definitions", (_request, response) => {
response.json(store.listNodeDefinitions());
});
app.post("/api/workflows", async (request, response, next) => {
try {
response.json(
await store.createWorkflowDefinition({
workspaceId: request.body.workspaceId,
projectId: request.body.projectId,
name: request.body.name,
createdBy: request.body.createdBy ?? "local-user",
}),
);
} catch (error) {
next(error);
}
});
app.get("/api/workflows", async (request, response, next) => {
try {
response.json(await store.listWorkflowDefinitions(String(request.query.projectId)));
} catch (error) {
next(error);
}
});
app.get("/api/workflows/:workflowDefinitionId", async (request, response, next) => {
try {
const definition = await store.getWorkflowDefinition(request.params.workflowDefinitionId);
if (!definition) {
response.status(404).json({ message: "workflow definition not found" });
return;
}
response.json(definition);
} catch (error) {
next(error);
}
});
app.post("/api/workflows/:workflowDefinitionId/versions", async (request, response, next) => {
try {
response.json(
await store.saveWorkflowVersion({
workflowDefinitionId: request.params.workflowDefinitionId,
visualGraph: request.body.visualGraph ?? {},
logicGraph: request.body.logicGraph,
runtimeGraph: request.body.runtimeGraph ?? {},
pluginRefs: request.body.pluginRefs ?? [],
createdBy: request.body.createdBy ?? "local-user",
}),
);
} catch (error) {
next(error);
}
});
app.get("/api/workflows/:workflowDefinitionId/versions", async (request, response, next) => {
try {
response.json(await store.listWorkflowVersions(request.params.workflowDefinitionId));
} catch (error) {
next(error);
}
});
app.post("/api/runs", async (request, response, next) => {
try {
response.json(
await store.createRun({
workflowDefinitionId: request.body.workflowDefinitionId,
workflowVersionId: request.body.workflowVersionId,
triggeredBy: request.body.triggeredBy ?? "local-user",
}),
);
} catch (error) {
next(error);
}
});
app.get("/api/runs/:runId", async (request, response, next) => {
try {
const run = await store.getRun(request.params.runId);
if (!run) {
response.status(404).json({ message: "run not found" });
return;
}
response.json(run);
} catch (error) {
next(error);
}
});
app.get("/api/runs/:runId/tasks", async (request, response, next) => {
try {
response.json(await store.listRunTasks(request.params.runId));
} catch (error) {
next(error);
}
});
app.post("/api/artifacts", async (request, response, next) => {
try {
response.json(
await store.createArtifact({
type: request.body.type,
title: request.body.title,
producerType: request.body.producerType,
producerId: request.body.producerId,
payload: request.body.payload ?? {},
}),
);
} catch (error) {
next(error);
}
});
app.get("/api/artifacts", async (request, response, next) => {
try {
response.json(
await store.listArtifactsByProducer(
String(request.query.producerType),
String(request.query.producerId),
),
);
} catch (error) {
next(error);
}
});
app.get("/api/artifacts/:artifactId", async (request, response, next) => {
try {
const artifact = await store.getArtifact(request.params.artifactId);
if (!artifact) {
response.status(404).json({ message: "artifact not found" });
return;
}
response.json(artifact);
} catch (error) {
next(error);
}
});
app.use(
(
error: unknown,
_request: express.Request,
response: express.Response,
_next: express.NextFunction,
) => {
const message = error instanceof Error ? error.message : "unknown error";
response.status(400).json({ message });
},
);
return { app, client, db, store, config };
}
export async function startApiServer(config = resolveApiRuntimeConfig()) {
const runtime = await createApiRuntime(config);
return new Promise<{
close: () => Promise<void>;
port: number;
}>((resolve) => {
const server = runtime.app.listen(config.port, config.host, () => {
resolve({
port: config.port,
close: async () => {
await new Promise<void>((done, reject) => {
server.close((error) => {
if (error) {
reject(error);
return;
}
done();
});
});
await runtime.client.close();
},
});
});
});
}