API Reference

workflow-studio/compute-worker

SQLite control plane, API routes, and idempotency

Overview

This document covers the internal architecture of the @workflow-studio/compute-worker package, which provides the HTTP API server for remote workflow execution.

Package Structure

packages/compute-worker/
├── src/
│   ├── index.ts           # Main server implementation
│   ├── index.test.ts      # Server tests
│   └── types.ts           # TypeScript interfaces
└── package.json

Database Schema

The worker uses SQLite for persistence. Key tables:

API Keys

CREATE TABLE IF NOT EXISTS api_keys (
  id TEXT PRIMARY KEY,
  project_id TEXT NOT NULL,
  environment TEXT NOT NULL,
  scopes TEXT NOT NULL,  -- JSON array of scopes
  secret_hash TEXT NOT NULL,  -- SHA-256 of secret
  revoked_at TEXT,
  created_at TEXT NOT NULL
);

API keys are seeded at startup via options.seedApiKeys:

export type ComputeWorkerOptions = {
  seedApiKeys?: SeedApiKey[];
  // ...
};

type SeedApiKey = {
  keyId: string;
  projectId: string;
  environment: string;
  scopes: ComputeScope[];
  secret: string;
};

Idempotency

CREATE TABLE IF NOT EXISTS idempotency (
  project_id TEXT NOT NULL,
  idempotency_key TEXT NOT NULL,
  endpoint TEXT NOT NULL,
  status_code INTEGER NOT NULL,
  body TEXT NOT NULL,
  created_at INTEGER NOT NULL,
  state TEXT,             -- 'in_progress' | 'completed' (used by /v1/runs)
  request_hash TEXT,      -- Canonical JSON hash for /v1/runs payload
  response_run_id TEXT,   -- Stable run ID across retries
  PRIMARY KEY (project_id, idempotency_key, endpoint)
);

Keys expire after 24 hours. The endpoint is part of the primary key, so the same idempotency key can be used for different endpoints.

Deployments

CREATE TABLE IF NOT EXISTS deployments (
  deployment_id TEXT PRIMARY KEY,
  manifest_json TEXT NOT NULL,
  artifact_path TEXT NOT NULL,
  status TEXT NOT NULL,  -- 'created', 'active', 'inactive'
  created_at TEXT NOT NULL,
  activated_at TEXT
);

Settings

CREATE TABLE IF NOT EXISTS settings (
  key TEXT PRIMARY KEY,
  value TEXT NOT NULL
);

-- Active deployment pointer
-- key = 'active_deployment', value = deployment_id

Audit Logs

CREATE TABLE IF NOT EXISTS audit_logs (
  id TEXT PRIMARY KEY,
  key_id TEXT,
  project_id TEXT,
  action TEXT NOT NULL,
  resource TEXT NOT NULL,
  result TEXT NOT NULL,  -- 'success' or 'failure'
  ip TEXT,
  user_agent TEXT,
  metadata_json TEXT,  -- JSON object with lane, deduped, etc.
  created_at TEXT NOT NULL
);

Authentication

Authentication uses Bearer tokens verified against api_keys table:

function authorize(
  db: SqliteDB,
  req: Request,
  requiredScope: ComputeScope
): AuthContext | Response {
  const token = readBearer(req.headers);
  if (!token) {
    return json({ code: 'unauthorized', message: 'Missing API key' }, { status: 401 });
  }
  
  const row = verifyToken(db, token);
  if (!row) {
    return json({ code: 'unauthorized', message: 'Invalid API key' }, { status: 401 });
  }
  
  const auth: AuthContext = {
    keyId: row.id,
    projectId: row.projectId,
    scopes: new Set(JSON.parse(row.scopes) as ComputeScope[]),
  };
  
  const scopeError = ensureScope(auth, requiredScope);
  if (scopeError) return scopeError;
  
  return auth;
}

Token verification uses constant-time comparison:

function verifyToken(db: SqliteDB, secret: string): ApiKeyRecord | null {
  const rows = db.prepare(/* ... */).all() as ApiKeyRecord[];
  const provided = Buffer.from(sha256(secret));
  
  for (const row of rows) {
    const stored = Buffer.from(row.secretHash);
    if (stored.length === provided.length && timingSafeEqual(stored, provided)) {
      return row;
    }
  }
  return null;
}

API Routes

Health Check

GET /v1/health
Authorization: Bearer <token>  # optional

Response: { healthy: true, timestamp: "2024-01-01T00:00:00.000Z" }

Active Deployment

GET /v1/deployments/active
Authorization: Bearer <token>  # requires 'deploy:read' scope

Response: { deploymentId: "dep_abc123" }
## or 409 { code: "no_active_deployment" }

Trigger Run

POST /v1/runs
Authorization: Bearer <token>  # requires 'trigger:write' scope
Idempotency-Key: <key>  # required
Content-Type: application/json

Body: {
  workflowName: "send-email",
  input?: {...},
  runId?: "wrun_...",  // optional, auto-generated if omitted
  deploymentId?: "dep_...",  // optional, uses active if omitted
  specVersion?: 1
}

Response: { 
  runId: "wrun_...", 
  status: "pending", 
  deploymentId: "dep_..." 
}

Idempotency for /v1/runs uses canonical JSON hashing:

  • same Idempotency-Key + semantically equal payload => same logical request/replay
  • same Idempotency-Key + different payload => 409 { code: "idempotency_conflict" }

Resume Run

POST /v1/runs/resume
Authorization: Bearer <token>  # requires 'runs:write' scope
Content-Type: application/json

Body: {
  runId: "wrun_...",
  reason?: "manual-retry",
  payload?: {...}
}

Response: {
  accepted: true
}

When present, reason and payload are forwarded to queue metadata headers:

  • x-workflow-resume-reason
  • x-workflow-resume-payload (JSON string)

World Deployment ID

GET /v1/world/deployment-id
Authorization: Bearer <token>  # requires 'world:proxy' scope

Response: { deploymentId: "dep_abc123" }
## or 409 { code: "no_active_deployment" }

Queue Publish (World Proxy)

POST /v1/queue/publish
Authorization: Bearer <token>  # requires 'world:proxy' scope
Content-Type: application/json

Body: {
  queueName: "__wkf_workflow_send-email",
  message: { runId: "wrun_..." },
  opts?: {
    deploymentId?: "dep_...",
    idempotencyKey?: "key-123"  // explicit only
  }
}

Response: { messageId: "msg_..." }

Deduplication only applies when opts.idempotencyKey is provided.

Create Deployment

deploymentId from manifest is validated with ^[A-Za-z0-9_-]+$ and rejected if invalid or path-unsafe:

{
  "code": "invalid_deployment_id",
  "message": "deploymentId must match ^[A-Za-z0-9_-]+$ and cannot contain path separators."
}

Strict Remote Guard

The worker enforces WORKFLOW_TARGET_WORLD in production:

function enforceRemoteTargetGuard(env: NodeJS.ProcessEnv): void {
  const mode = executionMode ?? env.WORKFLOW_EXECUTION_MODE;
  if (mode !== 'remote' || !shouldApplyStrictRemoteGuard(env)) {
    return;
  }
  
  const expectedTarget = env.WORKFLOW_STUDIO_REMOTE_WORLD_TARGET ?? 'workflow-studio/world-remote';
  const actualTarget = env.WORKFLOW_TARGET_WORLD;
  
  if (actualTarget !== expectedTarget) {
    throw new Error(
      `Remote execution guard failed: WORKFLOW_TARGET_WORLD must equal "${expectedTarget}"...`
    );
  }
}

This runs at worker startup in createComputeWorker().

Rate Limiting

Per-key rate limiting with 1-minute windows:

function applyRateLimit(
  db: SqliteDB,
  auth: AuthContext,
  limit: number  // default 120/minute
): Response | undefined {
  const windowStart = Math.floor(Date.now() / 60_000) * 60_000;
  
  // Upsert count
  db.prepare(`
    INSERT INTO rate_limits (key_id, project_id, window_start, count)
    VALUES (?, ?, ?, 1)
    ON CONFLICT(key_id, project_id, window_start)
    DO UPDATE SET count = count + 1
  `).run(auth.keyId, auth.projectId, windowStart);
  
  // Check limit
  const row = db.prepare(/* select count */).get(/* ... */);
  if ((row?.count ?? 0) > limit) {
    return json({ code: 'rate_limited', message: 'Rate limit exceeded' }, { status: 429 });
  }
}

HTTPS Enforcement

Production deployments should enforce HTTPS:

function enforceHttps(req: Request, requireHttps: boolean): Response | undefined {
  if (!requireHttps) return undefined;
  
  const protocol = req.headers.get('x-forwarded-proto');
  if (protocol && protocol !== 'https') {
    return json(
      { code: 'https_required', message: 'HTTPS is required' },
      { status: 400 }
    );
  }
  return undefined;
}