workflow-studio/compute-worker
SQLite control plane, API routes, and idempotency
Overview
This document covers the internal architecture of the @workflow-studio/compute-worker package, which provides the HTTP API server for remote workflow execution.
Package Structure
packages/compute-worker/
├── src/
│ ├── index.ts # Main server implementation
│ ├── index.test.ts # Server tests
│ └── types.ts # TypeScript interfaces
└── package.jsonDatabase Schema
The worker uses SQLite for persistence. Key tables:
API Keys
CREATE TABLE IF NOT EXISTS api_keys (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
environment TEXT NOT NULL,
scopes TEXT NOT NULL, -- JSON array of scopes
secret_hash TEXT NOT NULL, -- SHA-256 of secret
revoked_at TEXT,
created_at TEXT NOT NULL
);API keys are seeded at startup via options.seedApiKeys:
export type ComputeWorkerOptions = {
seedApiKeys?: SeedApiKey[];
// ...
};
type SeedApiKey = {
keyId: string;
projectId: string;
environment: string;
scopes: ComputeScope[];
secret: string;
};Idempotency
CREATE TABLE IF NOT EXISTS idempotency (
project_id TEXT NOT NULL,
idempotency_key TEXT NOT NULL,
endpoint TEXT NOT NULL,
status_code INTEGER NOT NULL,
body TEXT NOT NULL,
created_at INTEGER NOT NULL,
state TEXT, -- 'in_progress' | 'completed' (used by /v1/runs)
request_hash TEXT, -- Canonical JSON hash for /v1/runs payload
response_run_id TEXT, -- Stable run ID across retries
PRIMARY KEY (project_id, idempotency_key, endpoint)
);Keys expire after 24 hours. The endpoint is part of the primary key, so the same idempotency key can be used for different endpoints.
Deployments
CREATE TABLE IF NOT EXISTS deployments (
deployment_id TEXT PRIMARY KEY,
manifest_json TEXT NOT NULL,
artifact_path TEXT NOT NULL,
status TEXT NOT NULL, -- 'created', 'active', 'inactive'
created_at TEXT NOT NULL,
activated_at TEXT
);Settings
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
-- Active deployment pointer
-- key = 'active_deployment', value = deployment_idAudit Logs
CREATE TABLE IF NOT EXISTS audit_logs (
id TEXT PRIMARY KEY,
key_id TEXT,
project_id TEXT,
action TEXT NOT NULL,
resource TEXT NOT NULL,
result TEXT NOT NULL, -- 'success' or 'failure'
ip TEXT,
user_agent TEXT,
metadata_json TEXT, -- JSON object with lane, deduped, etc.
created_at TEXT NOT NULL
);Authentication
Authentication uses Bearer tokens verified against api_keys table:
function authorize(
db: SqliteDB,
req: Request,
requiredScope: ComputeScope
): AuthContext | Response {
const token = readBearer(req.headers);
if (!token) {
return json({ code: 'unauthorized', message: 'Missing API key' }, { status: 401 });
}
const row = verifyToken(db, token);
if (!row) {
return json({ code: 'unauthorized', message: 'Invalid API key' }, { status: 401 });
}
const auth: AuthContext = {
keyId: row.id,
projectId: row.projectId,
scopes: new Set(JSON.parse(row.scopes) as ComputeScope[]),
};
const scopeError = ensureScope(auth, requiredScope);
if (scopeError) return scopeError;
return auth;
}Token verification uses constant-time comparison:
function verifyToken(db: SqliteDB, secret: string): ApiKeyRecord | null {
const rows = db.prepare(/* ... */).all() as ApiKeyRecord[];
const provided = Buffer.from(sha256(secret));
for (const row of rows) {
const stored = Buffer.from(row.secretHash);
if (stored.length === provided.length && timingSafeEqual(stored, provided)) {
return row;
}
}
return null;
}API Routes
Health Check
GET /v1/health
Authorization: Bearer <token> # optional
Response: { healthy: true, timestamp: "2024-01-01T00:00:00.000Z" }Active Deployment
GET /v1/deployments/active
Authorization: Bearer <token> # requires 'deploy:read' scope
Response: { deploymentId: "dep_abc123" }
## or 409 { code: "no_active_deployment" }Trigger Run
POST /v1/runs
Authorization: Bearer <token> # requires 'trigger:write' scope
Idempotency-Key: <key> # required
Content-Type: application/json
Body: {
workflowName: "send-email",
input?: {...},
runId?: "wrun_...", // optional, auto-generated if omitted
deploymentId?: "dep_...", // optional, uses active if omitted
specVersion?: 1
}
Response: {
runId: "wrun_...",
status: "pending",
deploymentId: "dep_..."
}Idempotency for /v1/runs uses canonical JSON hashing:
- same
Idempotency-Key+ semantically equal payload => same logical request/replay - same
Idempotency-Key+ different payload =>409 { code: "idempotency_conflict" }
Resume Run
POST /v1/runs/resume
Authorization: Bearer <token> # requires 'runs:write' scope
Content-Type: application/json
Body: {
runId: "wrun_...",
reason?: "manual-retry",
payload?: {...}
}
Response: {
accepted: true
}When present, reason and payload are forwarded to queue metadata headers:
x-workflow-resume-reasonx-workflow-resume-payload(JSON string)
World Deployment ID
GET /v1/world/deployment-id
Authorization: Bearer <token> # requires 'world:proxy' scope
Response: { deploymentId: "dep_abc123" }
## or 409 { code: "no_active_deployment" }Queue Publish (World Proxy)
POST /v1/queue/publish
Authorization: Bearer <token> # requires 'world:proxy' scope
Content-Type: application/json
Body: {
queueName: "__wkf_workflow_send-email",
message: { runId: "wrun_..." },
opts?: {
deploymentId?: "dep_...",
idempotencyKey?: "key-123" // explicit only
}
}
Response: { messageId: "msg_..." }Deduplication only applies when opts.idempotencyKey is provided.
Create Deployment
deploymentId from manifest is validated with ^[A-Za-z0-9_-]+$ and rejected if invalid or path-unsafe:
{
"code": "invalid_deployment_id",
"message": "deploymentId must match ^[A-Za-z0-9_-]+$ and cannot contain path separators."
}Strict Remote Guard
The worker enforces WORKFLOW_TARGET_WORLD in production:
function enforceRemoteTargetGuard(env: NodeJS.ProcessEnv): void {
const mode = executionMode ?? env.WORKFLOW_EXECUTION_MODE;
if (mode !== 'remote' || !shouldApplyStrictRemoteGuard(env)) {
return;
}
const expectedTarget = env.WORKFLOW_STUDIO_REMOTE_WORLD_TARGET ?? 'workflow-studio/world-remote';
const actualTarget = env.WORKFLOW_TARGET_WORLD;
if (actualTarget !== expectedTarget) {
throw new Error(
`Remote execution guard failed: WORKFLOW_TARGET_WORLD must equal "${expectedTarget}"...`
);
}
}This runs at worker startup in createComputeWorker().
Rate Limiting
Per-key rate limiting with 1-minute windows:
function applyRateLimit(
db: SqliteDB,
auth: AuthContext,
limit: number // default 120/minute
): Response | undefined {
const windowStart = Math.floor(Date.now() / 60_000) * 60_000;
// Upsert count
db.prepare(`
INSERT INTO rate_limits (key_id, project_id, window_start, count)
VALUES (?, ?, ?, 1)
ON CONFLICT(key_id, project_id, window_start)
DO UPDATE SET count = count + 1
`).run(auth.keyId, auth.projectId, windowStart);
// Check limit
const row = db.prepare(/* select count */).get(/* ... */);
if ((row?.count ?? 0) > limit) {
return json({ code: 'rate_limited', message: 'Rate limit exceeded' }, { status: 429 });
}
}HTTPS Enforcement
Production deployments should enforce HTTPS:
function enforceHttps(req: Request, requireHttps: boolean): Response | undefined {
if (!requireHttps) return undefined;
const protocol = req.headers.get('x-forwarded-proto');
if (protocol && protocol !== 'https') {
return json(
{ code: 'https_required', message: 'HTTPS is required' },
{ status: 400 }
);
}
return undefined;
}