Scheduler/Orchestrator
Purpose
The Scheduler/Orchestrator is NOT an agent - it is a long-running BullMQ worker that monitors the database for schedules and executes them automatically. The orchestrator is completely generic and database-driven—it has no hardcoded agent configurations. Admins create schedules and pipelines through the admin interface, and the orchestrator dynamically discovers agents and executes jobs based on database configurations.
The scheduler runs continuously as a BullMQ worker, polling the database for schedules that need to be executed (once or repeating with cron/interval). When a schedule's time arrives, the orchestrator invokes agents via HTTP endpoints. Agents are language-agnostic and can run anywhere (containers, cloud functions, n8n workflows, etc.) as long as they follow the Agent Specification. Agents execute asynchronously, reading from and writing to the database. The orchestrator does not wait for agent completion but tracks job status through the database.
Note: Agents are one-off executions. When a schedule triggers, the scheduler invokes the agent's HTTP endpoint. The agent runs, completes its job, writes results to the database, and then shuts down. There are no long-running agent instances to manage.
Key Responsibilities:
- Dynamically discover agents from the database (no hardcoded agent list)
- Execute schedules created by admins (stored in
Scheduletable) - Support data source expansion for any agent parameter
- Parse data source strings (e.g.,
db:ticker:all:id?enabled=trueordb:user:all:email?active=true) to query database tables - Extract values from any specified column (not limited to
idfields) - Create multiple job inputs from expanded values, processing in batches with stagger delay
- Support complex filters via query string parameters or JSON filter objects
- Parse data source strings (e.g.,
- Orchestrate multi-agent pipelines with job dependencies (stored in
Pipelinetable) - Manage different schedule types: once or repeating (cron/interval)
- Poll database continuously for schedules that need execution
- Invoke agent HTTP endpoints with proper parameters and authentication
- Monitor job status through database queries (
AgentJobExecutiontable) - Handle retries for failed jobs by re-invoking agent endpoints
- Validate agent responses and handle agent failures gracefully
Database Schema
The scheduler uses the following database tables (stored in PostgreSQL):
AgentRegistry
Stores agent type metadata (not version-specific), including agent HTTP endpoint URLs and configuration. The scheduler uses this to know how to invoke each agent type. See Agent Registry API Documentation for the complete schema definition and Agent Specification for full registration details.
AgentVersionDeployment
Determines which version is active in production for each agent type. Only one version per agentId can have environment: 'production'.
{
agentId: string; // Agent type (e.g., 'data-collection')
version: string; // Semantic version (e.g., '1.2.3')
environment: 'production' | 'staging' | 'development';
deployedAt: Date;
deployedBy: string; // Admin user ID
}Schedule
Stores all schedules created by admins. No hardcoded schedules. A schedule can run a single agent or a pipeline of agents, once or repeatedly.
{
id: string;
name: string;
description?: string;
// Schedule timing
repeat: 'once' | 'repeating'; // 'once' for one-time execution, 'repeating' for recurring
cronExpression?: string; // For repeating schedules (e.g., '0 2 * * 0' for weekly)
interval?: number; // For repeating schedules (milliseconds, e.g., 3600000 for hourly)
timezone?: string; // Default: 'America/New_York'
startAt?: Date; // Optional: when to start the schedule (for 'once' or first run of 'repeating')
// What to execute
targetType: 'agent' | 'pipeline';
agentId?: string; // If targetType is 'agent', references AgentRegistry
pipelineId?: string; // If targetType is 'pipeline', references Pipeline
// Agent parameters (any parameter with a data source string will be expanded)
// Data source strings use URI scheme format: <source>:<table>:<filter>:<field>[?options]
// Examples:
// - "db:ticker:all:id" - All IDs from ticker table
// - "db:ticker:AAPL:id" - Specific ticker ID
// - "db:ticker:all:id?enabled=true" - All enabled tickers
// - "db:ticker:all:id?enabled=true&batchSize=10&staggerDelay=2000" - With batch options
// - "db:ticker:all:id?filter={\"enabled\":true,\"status\":\"active\"}" - Complex filter
params?: Record<string, any>; // Parameters to pass to agent(s)
// Retry configuration
retryConfig?: {
maxRetries: number;
backoff: 'exponential' | 'linear' | 'fixed';
delay: number;
};
// Timeout
timeout?: number; // Job timeout in milliseconds
// Priority
priority?: number; // Job priority (default: 0)
// Status
enabled: boolean;
// Metadata
createdAt: Date;
updatedAt: Date;
createdBy: string; // Admin user ID
}Pipeline
Defines multi-agent workflows with dependencies. Pipelines can be scheduled just like agents.
{
id: string;
name: string;
description?: string;
steps: Array<{
stepId: string; // Unique ID within pipeline
agentId: string; // References AgentRegistry
dependsOn?: string[]; // Step IDs this step depends on
parallel: boolean; // Can run in parallel with other steps
params?: Record<string, any>; // Step-specific parameters
retryConfig?: {
maxRetries: number;
backoff: 'exponential' | 'linear' | 'fixed';
delay: number;
};
timeout?: number;
}>;
enabled: boolean;
createdAt: Date;
updatedAt: Date;
createdBy: string; // Admin user ID
}ScheduleExecution
Tracks schedule execution history.
{
id: string;
scheduleId: string; // References Schedule
executionTime: Date;
status: 'success' | 'partial' | 'failed';
jobsCreated: number;
jobsEnqueued: number;
errors?: Array<{
message: string;
timestamp: Date;
}>;
metadata?: object; // Additional execution metadata
}Configurations
This is the minimal configuration for the orchestrator. It is stored in the AgentConfig table, key: scheduler.
{
// Global scheduler settings
scheduler: {
enabled: true,
timezone: 'America/New_York', // Default timezone for schedules
checkInterval: 60000 // How often to check for due schedules (ms)
},
// HTTP client configuration for agent invocations
httpClient: {
timeout: 300000, // Default timeout (ms) - can be overridden per agent
maxConcurrentRequests: 50, // Max concurrent agent invocations
retryConfig: {
maxRetries: 3,
backoff: { type: 'exponential', delay: 5000 }
},
connectionPool: {
maxSockets: 100,
keepAlive: true
}
},
// Default batch processing settings (can be overridden via query params in data source strings)
defaultBatchProcessing: {
batchSize: 10, // Default batch size for data source expansions
staggerDelay: 1000 // Default delay between batches (ms)
},
// Monitoring and notifications
monitoring: {
enabled: true,
notifyOnFailure: true,
notificationChannels: ['email', 'slack'] // Optional
}
}Note: All agent-specific schedules and pipelines are stored in the database tables above, not in the orchestrator configuration. The orchestrator dynamically loads and executes them.
Execution Results
As a BullMQ worker, the scheduler doesn't return outputs directly. Instead, it writes execution results to the database. The following information is tracked:
ScheduleExecution Table - Records each schedule execution:
scheduleId: Reference to the executed scheduleexecutionTime: When the schedule was executedstatus: 'success' | 'partial' | 'failed'jobsCreated: Number of jobs created from this executionjobsEnqueued: Number of jobs successfully enqueuederrors: Array of any errors encounteredmetadata: Additional execution metadata including:- Data source expansion details (total IDs, batches processed, batch size)
- Pipeline execution details (if pipeline was executed)
AgentJobExecution Table - Records each individual agent job:
jobId: Unique job identifieragentId: Agent that will execute the jobscheduleId: Schedule that triggered this job (if applicable)pipelineId: Pipeline this job is part of (if applicable)pipelineStepId: Pipeline step this job is part of (if applicable)status: 'pending' | 'running' | 'completed' | 'failed'priority: Job priorityenqueuedAt: When the job was enqueueddependencies: Job IDs this job depends onparams: Agent-specific parameters (merged from schedule + expanded values)
Note: The scheduler writes execution records to the database immediately after invoking agent HTTP endpoints. Actual job and pipeline status is tracked in the database (AgentJobExecution and ScheduleExecution tables) and can be queried separately. Agents update their job status in the database as they execute.
Process
- Initialize (BullMQ Worker Startup):
- Start as a long-running BullMQ worker process
- Load orchestrator config from database (
AgentConfigtable, key:scheduler) - Load all enabled schedules from
Scheduletable - Load all enabled pipelines from
Pipelinetable - Load agent registry from
AgentRegistrytable (discover available agents) - Validate agent endpoints are accessible (health checks)
- Initialize BullMQ queues for schedule execution
- Register BullMQ workers for processing schedules
- Initialize job dependency tracking (for pipelines)
- Initialize data source parser for ID expansion strings
- Initialize ID cache (optional, for performance when processing many IDs)
- Initialize HTTP client pool for agent invocations
- Start polling loop to check for schedules that need execution
-
Agent Discovery:
- Query
AgentRegistrytable for all enabled agents (agent type metadata) - Query
AgentVersionDeploymenttable to determine active production version for each agent - Validate that agents referenced in schedules and pipelines exist in registry
- Load agent HTTP endpoint URLs and configuration from
AgentRegistry - Load agent input schemas to validate job parameters
- Cache agent metadata (endpoints, authentication, timeouts) for performance
- Query
-
Schedule Polling & Execution:
- Worker picks up schedule from queue
- Load schedule configuration from database
- Execute schedule (see section 4)
- For one-time schedules: After execution, mark schedule as disabled or delete it (configurable)
- For repeating schedules: Schedule next execution based on cron/interval
-
Data Source Expansion (for any agent parameter):
The orchestrator expands parameters based on data source strings in
params. Any parameter value that matches the data source string format will be expanded, extracting values from the specified database column (not limited to ID fields).4a. Data Source String Format:
Format:
<source>:<table>:<filter>:<field>[?<options>]Components:
<source>: Data source identifier (currentlydbfor database, extensible for future sources likeapi,file,cache)<table>: Database table name (e.g.,ticker,user)<filter>:allto get all rows, or specific ID value (e.g.,AAPL)<field>: Column name to extract (e.g.,id,tickerId)<options>: Optional query string parameters:- Simple filters:
enabled=true,status=active(multiple conditions:enabled=true&status=active) - Batch options:
batchSize=10,staggerDelay=2000 - Complex filters:
filter={"enabled":true,"status":"active"}(JSON object for complex conditions)
- Simple filters:
Security: All database queries use parameterized queries to prevent SQL injection. Filter values are validated and sanitized.
4b. ID Expansion Process:
- For each parameter in
params:- Check if value is a data source string (matches format
<source>:<table>:<filter>:<field>) - If yes, parse the string:
- Extract source, table, filter, field, and query parameters
- If
filter = 'all':- Build parameterized SQL query with WHERE conditions from query string
- Query the specified table from database (e.g.,
Ticker,User) - Apply filters from query parameters (e.g.,
enabled=true) - Get all matching IDs from the specified field
- If
filteris a specific ID value:- Use that single ID (still applies WHERE filters if provided)
- If not a data source string:
- Use the value as-is (no expansion)
- Check if value is a data source string (matches format
- If multiple IDs are found (from 'all' expansion):
- Group IDs into batches based on
batchSizefrom query params or default (10) - Create multiple job inputs, one per ID
- Apply stagger delays between batches based on
staggerDelayfrom query params or default (1000ms)
- Group IDs into batches based on
- If single ID or no expansion:
- Create single job input
4c. Example: All Tickers with Filter:
- Schedule has
params: { tickerId: "db:ticker:all:id?enabled=true&batchSize=10&staggerDelay=1000" } - Scheduler parses the string and queries
Tickertable withWHERE enabled = true - Finds 100 enabled tickers
- Creates 100 job inputs (one per ticker)
- Processes in batches of 10 with 1 second delay between batches
- Each job gets
params: { tickerId: '<specific-ticker-id>', ...otherParams }
4d. Example: Specific ID:
- Schedule has
params: { tickerId: "db:ticker:AAPL:id" } - Creates single job input with
params: { tickerId: 'AAPL', ...otherParams }
4e. Example: Complex Filter:
- Schedule has
params: { tickerId: "db:ticker:all:id?filter={\"enabled\":true,\"status\":\"active\",\"createdAt\":{\"$gte\":\"2024-01-01\"}}" } - Scheduler parses JSON filter and builds parameterized query
- Queries tickers matching all conditions
- Creates job inputs for each matching ticker
4f. Example: Multiple Filters:
- Schedule has
params: { tickerId: "db:ticker:all:id?enabled=true&active=1&batchSize=5" } - Scheduler queries with
WHERE enabled = true AND active = 1 - Processes in batches of 5 (overrides default batchSize)
-
Job Creation:
For each schedule execution:
5a. Load Schedule Configuration:
- Load schedule from
Scheduletable - Determine target:
agentId(single agent) orpipelineId(pipeline of agents) - Validate that target exists in
AgentRegistryorPipelinetable
5b. ID Expansion:
- For each parameter in
params, check if value is a data source string - For each data source string found, expand IDs (see section 4)
- If multiple IDs found, create multiple job inputs (one per ID)
- Replace data source strings with actual ID values in job parameters
5c. Batch Processing:
- If multiple job inputs created (from data source expansion):
- Group job inputs into batches based on
batchSizefrom query params or default - Process batches sequentially with
staggerDelaybetween batches (from query params or default) - Within each batch, process jobs in parallel (up to batch size)
- Group job inputs into batches based on
- If single job input:
- Process immediately
5d. Job Distribution:
- For each job input:
- Generate unique
jobIdandexecutionId - Create execution record in
AgentJobExecutiontable withstatus: 'pending' - Build execution request with merged parameters (schedule params + expanded values)
- Validate parameters against agent's
inputSchema - Get agent HTTP endpoint URL from
AgentRegistrytable - Invoke agent HTTP endpoint (POST to agent endpoint URL)
- Include authentication headers if configured
- Set timeout from schedule config or agent default
- Include
X-Job-IdandX-Execution-Idheaders
- Handle agent response:
- Agent accepts the job and starts execution
- Agent updates
AgentJobExecutiontable as it runs - If agent fails to start: Mark execution as failed, log error
- Apply stagger delays between batches (if configured for data source expansion)
- Generate unique
- Track job creation in
ScheduleExecutiontable
- Load schedule from
-
Pipeline Orchestration:
For pipeline execution (from
Pipelinetable or schedule withtargetType: 'pipeline'):6a. Load Pipeline:
- Load pipeline definition from
Pipelinetable - Validate all referenced
agentIdvalues exist inAgentRegistry
6b. Data Source Expansion (if data source strings in schedule params):
- Parse data source strings from schedule
params(same as single agent expansion) - Expand values for each parameter that contains a data source string
- If multiple values found, create multiple pipeline execution contexts (one per value)
6c. Build Dependency Graph:
- Parse pipeline
stepsto build dependency graph - Identify steps that can run in parallel (
parallel: true) - Identify steps that must run sequentially (via
dependsOn)
6d. Execute Pipeline Steps:
- For each expanded parameter set (if any):
- Create execution records for all pipeline steps in
AgentJobExecutiontable - Build dependency graph based on
dependsOnrelationships - For steps without dependencies: Invoke agent endpoints immediately
- For steps with dependencies: Wait for dependency jobs to complete (check
AgentJobExecution.status) - Merge step-specific
paramswith expanded values and schedule params - Invoke agent endpoints with proper sequencing
- Create execution records for all pipeline steps in
- Track pipeline execution in database
- Load pipeline definition from
-
Agent Execution:
- Orchestrator invokes agent HTTP endpoints asynchronously
- Agent accepts the job and starts execution
- Agent executes work and updates
AgentJobExecutiontable with status updates - Agent reads from and writes to database independently
- No direct agent-to-agent communication
- All coordination through database state
- When agent completes, it updates
AgentJobExecution.statusto 'completed' or 'failed' - Agent shuts down after completing its job
- Orchestrator monitors agent execution by querying
AgentJobExecutiontable
-
Job Dependencies & Status Tracking:
- Track job status in
AgentJobExecutiontable (pending, running, completed, failed) - For pipeline dependencies: Poll
AgentJobExecution.statusto wait for upstream jobs - Monitor job progress by querying
AgentJobExecutiontable periodically - Handle failed jobs with retry logic:
- Check
AgentJobExecution.error.retryableflag - If retryable and retry count < max:
- Re-invoke agent HTTP endpoint after backoff delay (from schedule
retryConfig)
- Re-invoke agent HTTP endpoint after backoff delay (from schedule
- If not retryable: Mark as permanently failed, notify admins
- Check
- Use exponential backoff from schedule config for retries
- Track job status in
-
Batch Processing & Parallelization:
-
Data Source Expansion with Batching:
- When data source strings create multiple job inputs (e.g., 100 tickers):
- Jobs are grouped into batches based on
batchSizefrom query params or default (10) - Within each batch, jobs are processed in parallel (up to batch size)
- Between batches,
staggerDelayis applied (from query params or default: 1000ms) to avoid overloading agents - Example: 100 tickers with
batchSize=10in query string = 10 batches, each batch processes 10 jobs in parallel with 1 second delay between batches
- Jobs are grouped into batches based on
- Agents execute work asynchronously after accepting jobs
- When data source strings create multiple job inputs (e.g., 100 tickers):
-
Single Value Jobs (no expansion or specific value):
- Single job processes with one parameter value
- Can run concurrently with other agent jobs
- Invoked via HTTP endpoint when needed
-
Pipeline Steps:
- Steps with
parallel: truecan run concurrently - Steps with
dependsOnrun sequentially after dependencies complete - If data source expansion creates multiple contexts (e.g., 100 tickers), each context runs the pipeline independently
- Example: Pipeline with 100 ticker expansions = 100 parallel pipeline executions
- Each execution invokes agent HTTP endpoints as needed
- Steps with
-
-
Scalability Considerations:
-
Large ID Sets: When processing hundreds of IDs (e.g., all tickers):
- Batch processing prevents overwhelming agent endpoints
- Stagger delays prevent API rate limits and agent overload
- Multiple agent invocations happen concurrently (up to
batchSize) - Failed jobs for one ID don't block others
- Partial failures are handled gracefully (continue with remaining items)
- Agents can scale independently to handle their own load
-
Dynamic Management:
- New IDs are automatically included in next scheduled run when
filter: 'all'is used - Filters (e.g.,
enabled=true) automatically exclude disabled/removed IDs from processing - Priority can be set per schedule to influence job priority
- New IDs are automatically included in next scheduled run when
-
Resource Management:
batchSizein query params (or default) controls concurrent job execution per batchstaggerDelayin query params (or default) controls rate of batch processing- Queue concurrency settings limit total parallel workers
- Memory and CPU usage scales with batch size and concurrency
- Error Handling:
-
Agent Invocation Errors (network, timeout, agent unavailable):
- Retry with exponential backoff (configured in schedule
retryConfig) - Log errors to database for monitoring (
ScheduleExecutiontable) - If agent endpoint is unreachable: Mark execution as failed, log error
- Retry with exponential backoff (configured in schedule
-
Agent Execution Errors (agent fails during execution):
- Agent updates
AgentJobExecution.status: 'failed'with error details - Orchestrator checks
error.retryableflag - If retryable: Re-invoke agent HTTP endpoint after backoff delay (from schedule
retryConfig) - If not retryable: Mark as permanently failed, notify admins
- Agent updates
-
Pipeline Dependencies:
- Job dependencies can timeout if upstream jobs fail repeatedly
- Pipeline continues with partial results if some jobs fail (non-blocking)
- Each pipeline step can have its own retry configuration
-
General:
- Notify on critical failures (via configured notification channels)
- Failed jobs for one schedule don't block others
- Agents can implement their own retry logic internally
- Retry configuration from schedule is applied to all job attempts
- Metrics Collection:
- Agents write metrics to database upon completion
- Orchestrator queries database for job and pipeline status
- Track schedule execution history in
ScheduleExecutiontable - Learning Agent reads metrics from database independently (runs on its own schedule)
Admin Interface Usage
Admins can create and manage schedules and pipelines through the admin interface. The orchestrator dynamically loads and executes these configurations without any code changes.
Creating a Schedule
-
Navigate to Schedules:
- Go to
/admin/scheduler/schedules - Click "Create Schedule"
- Go to
-
Configure Schedule:
- Basic Info: Name, description
- Target: Choose
targetType:- **Agent: Select
agentIdfrom available agents - Pipeline: Select
pipelineIdfrom available pipelines
- **Agent: Select
- Timing:
- Repeat: Choose
'once'for one-time execution or'repeating'for recurring - For Repeating:
- Cron: Provide cron expression (e.g.,
"0 2 * * 0"for weekly Sunday 2 AM) - Interval: Provide interval in milliseconds (e.g.,
3600000for hourly)
- Cron: Provide cron expression (e.g.,
- Timezone: Default is
'America/New_York' - Start At: Optional start time (for one-time or first run of repeating)
- Repeat: Choose
- Parameters: Set agent parameters (JSON object)
- For data source expansion, use data source strings in parameter values:
- Format:
<source>:<table>:<filter>:<field>[?<options>] - Examples:
"db:ticker:all:id"- All IDs from ticker table"db:ticker:AAPL:id"- Specific ticker ID"db:ticker:all:id?enabled=true"- All enabled tickers"db:ticker:all:id?enabled=true&batchSize=10&staggerDelay=2000"- With batch options"db:ticker:all:id?filter={\"enabled\":true,\"status\":\"active\"}"- Complex filter
- Simple filters: Use query string format
key=value&key2=value2 - Complex filters: Use JSON in
filterparameter - Batch options:
batchSizeandstaggerDelayin query string override defaults
- Format:
- For data source expansion, use data source strings in parameter values:
- Retry Config: Configure retry behavior (max retries, backoff, delay)
- Timeout: Job timeout in milliseconds
- Priority: Job priority (default: 0)
- Enabled: Toggle to enable/disable schedule
-
Save the Schedule
Creating a Pipeline
-
Navigate to Pipelines:
- Go to
/admin/scheduler/pipelines - Click "Create Pipeline"
- Go to
-
Configure Pipeline:
- Basic Info: Name, description
- Steps: Add pipeline steps:
- Agent: Select
agentIdfor each step - Parameters: Step-specific parameters (JSON object)
- Dependencies: Define
dependsOn(step IDs this step depends on) - Parallel: Toggle if step can run in parallel with others
- Retry Config: Step-specific retry configuration
- Timeout: Step-specific timeout
- Agent: Select
- Enabled: Toggle to enable/disable pipeline
-
Save the Pipeline
-
Create a Schedule for the Pipeline:
- Create a schedule with
targetType: 'pipeline'and select the pipeline - Add data source strings to parameters if needed (applies to all pipeline steps)
- Create a schedule with
Example: Query Strategy Agent Schedule
Schedule:
name: "Weekly Entity Graph Refresh"repeat:'repeating'cronExpression:"0 2 * * 0"(Sunday 2 AM)targetType:'agent'agentId:"query-strategy"params:{ "tickerId": "db:ticker:all:id?enabled=true&batchSize=10&staggerDelay=1000", "refreshEntityGraph": true }
Result: Every Sunday at 2 AM, the scheduler:
- Parses
tickerIddata source string:db:ticker:all:id?enabled=true&batchSize=10&staggerDelay=1000 - Queries
Tickertable withWHERE enabled = trueand finds 100 enabled tickers - Creates 100 job inputs (one per ticker)
- Processes in batches of 10 with 1 second delay between batches
- Each job gets
params: { tickerId: '<specific-ticker-id>', refreshEntityGraph: true }
Example: One-time Agent Execution
Schedule:
name: "One-time Data Collection for AAPL"repeat:'once'startAt:"2024-01-15T10:00:00Z"targetType:'agent'agentId:"data-collection"params:{ "tickerId": "db:ticker:AAPL:id", "source": "news" }
Result: On January 15, 2024 at 10:00 AM, the scheduler:
- Parses
tickerIddata source string:db:ticker:AAPL:id - Creates a single job with
params: { tickerId: "AAPL", source: "news" }
Example: Pipeline Schedule with Data Source Expansion
Pipeline:
name: "Newsletter Generation Pipeline"steps: Array of step objects:{ stepId: "collect", agentId: "data-collection", parallel: false }{ stepId: "analyze", agentId: "analysis", dependsOn: ["collect"], parallel: false }{ stepId: "generate", agentId: "content-generation", dependsOn: ["analyze"], parallel: false }
Schedule:
name: "Daily Newsletter for All Tickers"repeat:'repeating'cronExpression:"0 8 * * *"(Daily at 8 AM)targetType:'pipeline'pipelineId:"newsletter-generation-pipeline"params:{ "tickerId": "db:ticker:all:id?enabled=true&batchSize=5&staggerDelay=2000" }
Result: Every day at 8 AM, the scheduler:
- Parses
tickerIddata source string:db:ticker:all:id?enabled=true&batchSize=5&staggerDelay=2000 - Queries
Tickertable withWHERE enabled = trueand finds 50 enabled tickers - Creates 50 pipeline execution contexts (one per ticker)
- Processes in batches of 5 with 2 second delay between batches
- Each context runs the pipeline: collect → analyze → generate (sequential)