Composable Topology
Configuring queue, stream, and execution adapters
Overview
Workflow Studio exposes composable adapters so you can choose your execution topology independently. Configure queue backends, stream backends, and execution location separately.
Queue Adapters
Queue adapters determine where workflow messages are persisted and consumed.
Postgres Queue
import { createPostgresQueueAdapter } from 'workflow-studio';
const queue = createPostgresQueueAdapter({
connectionString: process.env.DATABASE_URL,
});Redis Queue
import { createRedisQueueAdapter } from 'workflow-studio';
const queue = createRedisQueueAdapter({
url: process.env.REDIS_URL,
});MongoDB Queue
import { createMongoQueueAdapter } from 'workflow-studio';
const queue = createMongoQueueAdapter({
uri: process.env.MONGODB_URI,
});Custom Queue
import { createCustomQueueAdapter } from 'workflow-studio';
const queue = createCustomQueueAdapter({
providerName: 'my-provider',
options: { /* custom options */ },
compute: {
publish: async (queueName, message, opts) => {
// Custom publish implementation
return { messageId: 'custom-id' };
},
createConsumer: async (prefix, handler) => {
// Custom consumer implementation
return async (req) => new Response('ok');
},
},
});Stream Adapters
Stream adapters handle real-time workflow output streams.
Local Stream
import { createLocalStreamAdapter } from 'workflow-studio';
const streams = createLocalStreamAdapter();Redis Stream
import { createRedisStreamAdapter } from 'workflow-studio';
const streams = createRedisStreamAdapter({
url: process.env.REDIS_URL,
});Custom Stream
import { createCustomStreamAdapter } from 'workflow-studio';
const streams = createCustomStreamAdapter({
options: { /* custom options */ },
});Execution Adapters
Execution adapters determine where workflow functions actually run.
Local Execution
Default behavior - workflows execute on the same host as the app:
import { localExecutorAdapter } from 'workflow-studio';
const adapter = localExecutorAdapter({
trigger: async (req) => ({ runId: 'local', status: 'queued', deploymentId: 'local' }),
resume: async (req) => ({ accepted: true }),
getDeploymentId: async () => 'local-deployment',
});Remote Execution
Workflows execute on a remote compute worker:
import { remoteExecutorAdapter } from 'workflow-studio';
const adapter = remoteExecutorAdapter({
baseUrl: process.env.WORKFLOW_COMPUTE_BASE_URL,
apiKey: process.env.WORKFLOW_COMPUTE_API_KEY,
timeoutMs: 10_000,
retries: 2,
failPolicy: 'error', // or 'fallback-local'
});Failure Semantics
Local Execution
Uses existing local behavior - workflows run on your app host.
Remote Execution with failPolicy: 'error' (default)
Remote failures do not silently fall back to local execution. If the remote worker is unreachable, the operation fails explicitly.
Remote Execution with failPolicy: 'fallback-local'
If remote execution fails and local handlers are provided, operations can fall back to local execution:
const adapter = remoteExecutorAdapter({
baseUrl: process.env.WORKFLOW_COMPUTE_BASE_URL,
failPolicy: 'fallback-local',
localFallback: {
trigger: async (req) => {
// Local fallback implementation
},
resume: async (req) => {
// Local fallback implementation
},
getDeploymentId: async () => 'local-deployment',
},
});Configuration Examples
External Queue + Local Execution
Use when you want external queueing but still execute functions on app host:
import { defineWorkflowStudioConfig, createPostgresQueueAdapter } from 'workflow-studio';
export default defineWorkflowStudioConfig({
queue: createPostgresQueueAdapter({
connectionString: process.env.DATABASE_URL,
}),
execution: { mode: 'local' },
});External Queue + Remote Execution
Use when you want app host and workflow execution separated:
import {
defineWorkflowStudioConfig,
createPostgresQueueAdapter,
createRedisStreamAdapter,
} from 'workflow-studio';
export default defineWorkflowStudioConfig({
queue: createPostgresQueueAdapter({
connectionString: process.env.DATABASE_URL,
}),
streams: createRedisStreamAdapter({
url: process.env.REDIS_URL,
}),
execution: {
mode: 'remote',
remoteWorkerUrl: process.env.WORKFLOW_COMPUTE_BASE_URL,
},
});Default Queue + Remote Execution
Use when queue backend is managed by your remote compute setup:
import { defineWorkflowStudioConfig } from 'workflow-studio';
export default defineWorkflowStudioConfig({
execution: {
mode: 'remote',
remoteWorkerUrl: process.env.WORKFLOW_COMPUTE_BASE_URL,
},
});