MediaPulse

Scheduler/Orchestrator

Purpose

The Scheduler/Orchestrator is NOT an agent - it is a long-running BullMQ worker that monitors the database for schedules and executes them automatically. The orchestrator is completely generic and database-driven—it has no hardcoded agent configurations. Admins create schedules and pipelines through the admin interface, and the orchestrator dynamically discovers agents and executes jobs based on database configurations.

The scheduler runs continuously as a BullMQ worker, polling the database for schedules that need to be executed (once or repeating with cron/interval). When a schedule's time arrives, the orchestrator invokes agents via HTTP endpoints. Agents are language-agnostic and can run anywhere (containers, cloud functions, n8n workflows, etc.) as long as they follow the Agent Specification. Agents execute asynchronously, reading from and writing to the database. The orchestrator does not wait for agent completion but tracks job status through the database.

Note: Agents are one-off executions. When a schedule triggers, the scheduler invokes the agent's HTTP endpoint. The agent runs, completes its job, writes results to the database, and then shuts down. There are no long-running agent instances to manage.

Key Responsibilities:

  • Dynamically discover agents from the database (no hardcoded agent list)
  • Execute schedules created by admins (stored in Schedule table)
  • Support data source expansion for any agent parameter
    • Parse data source strings (e.g., db:ticker:all:id?enabled=true or db:user:all:email?active=true) to query database tables
    • Extract values from any specified column (not limited to id fields)
    • Create multiple job inputs from expanded values, processing in batches with stagger delay
    • Support complex filters via query string parameters or JSON filter objects
  • Orchestrate multi-agent pipelines with job dependencies (stored in Pipeline table)
  • Manage different schedule types: once or repeating (cron/interval)
  • Poll database continuously for schedules that need execution
  • Invoke agent HTTP endpoints with proper parameters and authentication
  • Monitor job status through database queries (AgentJobExecution table)
  • Handle retries for failed jobs by re-invoking agent endpoints
  • Validate agent responses and handle agent failures gracefully

Database Schema

The scheduler uses the following database tables (stored in PostgreSQL):

AgentRegistry

Stores agent type metadata (not version-specific), including agent HTTP endpoint URLs and configuration. The scheduler uses this to know how to invoke each agent type. See Agent Registry API Documentation for the complete schema definition and Agent Specification for full registration details.

AgentVersionDeployment

Determines which version is active in production for each agent type. Only one version per agentId can have environment: 'production'.

{
  agentId: string; // Agent type (e.g., 'data-collection')
  version: string; // Semantic version (e.g., '1.2.3')
  environment: 'production' | 'staging' | 'development';
  deployedAt: Date;
  deployedBy: string; // Admin user ID
}

Schedule

Stores all schedules created by admins. No hardcoded schedules. A schedule can run a single agent or a pipeline of agents, once or repeatedly.

{
  id: string;
  name: string;
  description?: string;
  
  // Schedule timing
  repeat: 'once' | 'repeating'; // 'once' for one-time execution, 'repeating' for recurring
  cronExpression?: string; // For repeating schedules (e.g., '0 2 * * 0' for weekly)
  interval?: number; // For repeating schedules (milliseconds, e.g., 3600000 for hourly)
  timezone?: string; // Default: 'America/New_York'
  startAt?: Date; // Optional: when to start the schedule (for 'once' or first run of 'repeating')
  
  // What to execute
  targetType: 'agent' | 'pipeline';
  agentId?: string; // If targetType is 'agent', references AgentRegistry
  pipelineId?: string; // If targetType is 'pipeline', references Pipeline
  
  // Agent parameters (any parameter with a data source string will be expanded)
  // Data source strings use URI scheme format: <source>:<table>:<filter>:<field>[?options]
  // Examples:
  //   - "db:ticker:all:id" - All IDs from ticker table
  //   - "db:ticker:AAPL:id" - Specific ticker ID
  //   - "db:ticker:all:id?enabled=true" - All enabled tickers
  //   - "db:ticker:all:id?enabled=true&batchSize=10&staggerDelay=2000" - With batch options
  //   - "db:ticker:all:id?filter={\"enabled\":true,\"status\":\"active\"}" - Complex filter
  params?: Record<string, any>; // Parameters to pass to agent(s)
  
  // Retry configuration
  retryConfig?: {
    maxRetries: number;
    backoff: 'exponential' | 'linear' | 'fixed';
    delay: number;
  };
  
  // Timeout
  timeout?: number; // Job timeout in milliseconds
  
  // Priority
  priority?: number; // Job priority (default: 0)
  
  // Status
  enabled: boolean;
  
  // Metadata
  createdAt: Date;
  updatedAt: Date;
  createdBy: string; // Admin user ID
}

Pipeline

Defines multi-agent workflows with dependencies. Pipelines can be scheduled just like agents.

{
  id: string;
  name: string;
  description?: string;
  steps: Array<{
    stepId: string; // Unique ID within pipeline
    agentId: string; // References AgentRegistry
    dependsOn?: string[]; // Step IDs this step depends on
    parallel: boolean; // Can run in parallel with other steps
    params?: Record<string, any>; // Step-specific parameters
    retryConfig?: {
      maxRetries: number;
      backoff: 'exponential' | 'linear' | 'fixed';
      delay: number;
    };
    timeout?: number;
  }>;
  enabled: boolean;
  createdAt: Date;
  updatedAt: Date;
  createdBy: string; // Admin user ID
}

ScheduleExecution

Tracks schedule execution history.

{
  id: string;
  scheduleId: string; // References Schedule
  executionTime: Date;
  status: 'success' | 'partial' | 'failed';
  jobsCreated: number;
  jobsEnqueued: number;
  errors?: Array<{
    message: string;
    timestamp: Date;
  }>;
  metadata?: object; // Additional execution metadata
}

Configurations

This is the minimal configuration for the orchestrator. It is stored in the AgentConfig table, key: scheduler.

{
  // Global scheduler settings
  scheduler: {
    enabled: true,
    timezone: 'America/New_York', // Default timezone for schedules
    checkInterval: 60000 // How often to check for due schedules (ms)
  },
  
  // HTTP client configuration for agent invocations
  httpClient: {
    timeout: 300000, // Default timeout (ms) - can be overridden per agent
    maxConcurrentRequests: 50, // Max concurrent agent invocations
    retryConfig: {
      maxRetries: 3,
      backoff: { type: 'exponential', delay: 5000 }
    },
    connectionPool: {
      maxSockets: 100,
      keepAlive: true
    }
  },
  
  // Default batch processing settings (can be overridden via query params in data source strings)
  defaultBatchProcessing: {
    batchSize: 10, // Default batch size for data source expansions
    staggerDelay: 1000 // Default delay between batches (ms)
  },
  
  // Monitoring and notifications
  monitoring: {
    enabled: true,
    notifyOnFailure: true,
    notificationChannels: ['email', 'slack'] // Optional
  }
}

Note: All agent-specific schedules and pipelines are stored in the database tables above, not in the orchestrator configuration. The orchestrator dynamically loads and executes them.

Execution Results

As a BullMQ worker, the scheduler doesn't return outputs directly. Instead, it writes execution results to the database. The following information is tracked:

ScheduleExecution Table - Records each schedule execution:

  • scheduleId: Reference to the executed schedule
  • executionTime: When the schedule was executed
  • status: 'success' | 'partial' | 'failed'
  • jobsCreated: Number of jobs created from this execution
  • jobsEnqueued: Number of jobs successfully enqueued
  • errors: Array of any errors encountered
  • metadata: Additional execution metadata including:
    • Data source expansion details (total IDs, batches processed, batch size)
    • Pipeline execution details (if pipeline was executed)

AgentJobExecution Table - Records each individual agent job:

  • jobId: Unique job identifier
  • agentId: Agent that will execute the job
  • scheduleId: Schedule that triggered this job (if applicable)
  • pipelineId: Pipeline this job is part of (if applicable)
  • pipelineStepId: Pipeline step this job is part of (if applicable)
  • status: 'pending' | 'running' | 'completed' | 'failed'
  • priority: Job priority
  • enqueuedAt: When the job was enqueued
  • dependencies: Job IDs this job depends on
  • params: Agent-specific parameters (merged from schedule + expanded values)

Note: The scheduler writes execution records to the database immediately after invoking agent HTTP endpoints. Actual job and pipeline status is tracked in the database (AgentJobExecution and ScheduleExecution tables) and can be queried separately. Agents update their job status in the database as they execute.

Process

  1. Initialize (BullMQ Worker Startup):
    • Start as a long-running BullMQ worker process
    • Load orchestrator config from database (AgentConfig table, key: scheduler)
    • Load all enabled schedules from Schedule table
    • Load all enabled pipelines from Pipeline table
    • Load agent registry from AgentRegistry table (discover available agents)
    • Validate agent endpoints are accessible (health checks)
    • Initialize BullMQ queues for schedule execution
    • Register BullMQ workers for processing schedules
  • Initialize job dependency tracking (for pipelines)
  • Initialize data source parser for ID expansion strings
  • Initialize ID cache (optional, for performance when processing many IDs)
  • Initialize HTTP client pool for agent invocations
  • Start polling loop to check for schedules that need execution
  1. Agent Discovery:

    • Query AgentRegistry table for all enabled agents (agent type metadata)
    • Query AgentVersionDeployment table to determine active production version for each agent
    • Validate that agents referenced in schedules and pipelines exist in registry
    • Load agent HTTP endpoint URLs and configuration from AgentRegistry
    • Load agent input schemas to validate job parameters
    • Cache agent metadata (endpoints, authentication, timeouts) for performance
  2. Schedule Polling & Execution:

    • Worker picks up schedule from queue
    • Load schedule configuration from database
    • Execute schedule (see section 4)
    • For one-time schedules: After execution, mark schedule as disabled or delete it (configurable)
    • For repeating schedules: Schedule next execution based on cron/interval
  3. Data Source Expansion (for any agent parameter):

    The orchestrator expands parameters based on data source strings in params. Any parameter value that matches the data source string format will be expanded, extracting values from the specified database column (not limited to ID fields).

    4a. Data Source String Format:

    Format: <source>:<table>:<filter>:<field>[?<options>]

    Components:

    • <source>: Data source identifier (currently db for database, extensible for future sources like api, file, cache)
    • <table>: Database table name (e.g., ticker, user)
    • <filter>: all to get all rows, or specific ID value (e.g., AAPL)
    • <field>: Column name to extract (e.g., id, tickerId)
    • <options>: Optional query string parameters:
      • Simple filters: enabled=true, status=active (multiple conditions: enabled=true&status=active)
      • Batch options: batchSize=10, staggerDelay=2000
      • Complex filters: filter={"enabled":true,"status":"active"} (JSON object for complex conditions)

    Security: All database queries use parameterized queries to prevent SQL injection. Filter values are validated and sanitized.

    4b. ID Expansion Process:

    • For each parameter in params:
      • Check if value is a data source string (matches format <source>:<table>:<filter>:<field>)
      • If yes, parse the string:
        • Extract source, table, filter, field, and query parameters
        • If filter = 'all':
          • Build parameterized SQL query with WHERE conditions from query string
          • Query the specified table from database (e.g., Ticker, User)
          • Apply filters from query parameters (e.g., enabled=true)
          • Get all matching IDs from the specified field
        • If filter is a specific ID value:
          • Use that single ID (still applies WHERE filters if provided)
      • If not a data source string:
        • Use the value as-is (no expansion)
    • If multiple IDs are found (from 'all' expansion):
      • Group IDs into batches based on batchSize from query params or default (10)
      • Create multiple job inputs, one per ID
      • Apply stagger delays between batches based on staggerDelay from query params or default (1000ms)
    • If single ID or no expansion:
      • Create single job input

    4c. Example: All Tickers with Filter:

    • Schedule has params: { tickerId: "db:ticker:all:id?enabled=true&batchSize=10&staggerDelay=1000" }
    • Scheduler parses the string and queries Ticker table with WHERE enabled = true
    • Finds 100 enabled tickers
    • Creates 100 job inputs (one per ticker)
    • Processes in batches of 10 with 1 second delay between batches
    • Each job gets params: { tickerId: '<specific-ticker-id>', ...otherParams }

    4d. Example: Specific ID:

    • Schedule has params: { tickerId: "db:ticker:AAPL:id" }
    • Creates single job input with params: { tickerId: 'AAPL', ...otherParams }

    4e. Example: Complex Filter:

    • Schedule has params: { tickerId: "db:ticker:all:id?filter={\"enabled\":true,\"status\":\"active\",\"createdAt\":{\"$gte\":\"2024-01-01\"}}" }
    • Scheduler parses JSON filter and builds parameterized query
    • Queries tickers matching all conditions
    • Creates job inputs for each matching ticker

    4f. Example: Multiple Filters:

    • Schedule has params: { tickerId: "db:ticker:all:id?enabled=true&active=1&batchSize=5" }
    • Scheduler queries with WHERE enabled = true AND active = 1
    • Processes in batches of 5 (overrides default batchSize)
  4. Job Creation:

    For each schedule execution:

    5a. Load Schedule Configuration:

    • Load schedule from Schedule table
    • Determine target: agentId (single agent) or pipelineId (pipeline of agents)
    • Validate that target exists in AgentRegistry or Pipeline table

    5b. ID Expansion:

    • For each parameter in params, check if value is a data source string
    • For each data source string found, expand IDs (see section 4)
    • If multiple IDs found, create multiple job inputs (one per ID)
    • Replace data source strings with actual ID values in job parameters

    5c. Batch Processing:

    • If multiple job inputs created (from data source expansion):
      • Group job inputs into batches based on batchSize from query params or default
      • Process batches sequentially with staggerDelay between batches (from query params or default)
      • Within each batch, process jobs in parallel (up to batch size)
    • If single job input:
      • Process immediately

    5d. Job Distribution:

    • For each job input:
      • Generate unique jobId and executionId
      • Create execution record in AgentJobExecution table with status: 'pending'
      • Build execution request with merged parameters (schedule params + expanded values)
      • Validate parameters against agent's inputSchema
      • Get agent HTTP endpoint URL from AgentRegistry table
      • Invoke agent HTTP endpoint (POST to agent endpoint URL)
        • Include authentication headers if configured
        • Set timeout from schedule config or agent default
        • Include X-Job-Id and X-Execution-Id headers
      • Handle agent response:
        • Agent accepts the job and starts execution
        • Agent updates AgentJobExecution table as it runs
        • If agent fails to start: Mark execution as failed, log error
      • Apply stagger delays between batches (if configured for data source expansion)
    • Track job creation in ScheduleExecution table
  5. Pipeline Orchestration:

    For pipeline execution (from Pipeline table or schedule with targetType: 'pipeline'):

    6a. Load Pipeline:

    • Load pipeline definition from Pipeline table
    • Validate all referenced agentId values exist in AgentRegistry

    6b. Data Source Expansion (if data source strings in schedule params):

    • Parse data source strings from schedule params (same as single agent expansion)
    • Expand values for each parameter that contains a data source string
    • If multiple values found, create multiple pipeline execution contexts (one per value)

    6c. Build Dependency Graph:

    • Parse pipeline steps to build dependency graph
    • Identify steps that can run in parallel (parallel: true)
    • Identify steps that must run sequentially (via dependsOn)

    6d. Execute Pipeline Steps:

    • For each expanded parameter set (if any):
      • Create execution records for all pipeline steps in AgentJobExecution table
      • Build dependency graph based on dependsOn relationships
      • For steps without dependencies: Invoke agent endpoints immediately
      • For steps with dependencies: Wait for dependency jobs to complete (check AgentJobExecution.status)
      • Merge step-specific params with expanded values and schedule params
      • Invoke agent endpoints with proper sequencing
    • Track pipeline execution in database
  6. Agent Execution:

    • Orchestrator invokes agent HTTP endpoints asynchronously
    • Agent accepts the job and starts execution
    • Agent executes work and updates AgentJobExecution table with status updates
    • Agent reads from and writes to database independently
    • No direct agent-to-agent communication
    • All coordination through database state
    • When agent completes, it updates AgentJobExecution.status to 'completed' or 'failed'
    • Agent shuts down after completing its job
    • Orchestrator monitors agent execution by querying AgentJobExecution table
  7. Job Dependencies & Status Tracking:

    • Track job status in AgentJobExecution table (pending, running, completed, failed)
    • For pipeline dependencies: Poll AgentJobExecution.status to wait for upstream jobs
    • Monitor job progress by querying AgentJobExecution table periodically
    • Handle failed jobs with retry logic:
      • Check AgentJobExecution.error.retryable flag
      • If retryable and retry count < max:
        • Re-invoke agent HTTP endpoint after backoff delay (from schedule retryConfig)
      • If not retryable: Mark as permanently failed, notify admins
    • Use exponential backoff from schedule config for retries
  8. Batch Processing & Parallelization:

    • Data Source Expansion with Batching:

      • When data source strings create multiple job inputs (e.g., 100 tickers):
        • Jobs are grouped into batches based on batchSize from query params or default (10)
        • Within each batch, jobs are processed in parallel (up to batch size)
        • Between batches, staggerDelay is applied (from query params or default: 1000ms) to avoid overloading agents
        • Example: 100 tickers with batchSize=10 in query string = 10 batches, each batch processes 10 jobs in parallel with 1 second delay between batches
      • Agents execute work asynchronously after accepting jobs
    • Single Value Jobs (no expansion or specific value):

      • Single job processes with one parameter value
      • Can run concurrently with other agent jobs
      • Invoked via HTTP endpoint when needed
    • Pipeline Steps:

      • Steps with parallel: true can run concurrently
      • Steps with dependsOn run sequentially after dependencies complete
      • If data source expansion creates multiple contexts (e.g., 100 tickers), each context runs the pipeline independently
      • Example: Pipeline with 100 ticker expansions = 100 parallel pipeline executions
      • Each execution invokes agent HTTP endpoints as needed
  9. Scalability Considerations:

  • Large ID Sets: When processing hundreds of IDs (e.g., all tickers):

    • Batch processing prevents overwhelming agent endpoints
    • Stagger delays prevent API rate limits and agent overload
    • Multiple agent invocations happen concurrently (up to batchSize)
    • Failed jobs for one ID don't block others
    • Partial failures are handled gracefully (continue with remaining items)
    • Agents can scale independently to handle their own load
  • Dynamic Management:

    • New IDs are automatically included in next scheduled run when filter: 'all' is used
    • Filters (e.g., enabled=true) automatically exclude disabled/removed IDs from processing
    • Priority can be set per schedule to influence job priority
  • Resource Management:

    • batchSize in query params (or default) controls concurrent job execution per batch
    • staggerDelay in query params (or default) controls rate of batch processing
    • Queue concurrency settings limit total parallel workers
    • Memory and CPU usage scales with batch size and concurrency
  1. Error Handling:
  • Agent Invocation Errors (network, timeout, agent unavailable):

    • Retry with exponential backoff (configured in schedule retryConfig)
    • Log errors to database for monitoring (ScheduleExecution table)
    • If agent endpoint is unreachable: Mark execution as failed, log error
  • Agent Execution Errors (agent fails during execution):

    • Agent updates AgentJobExecution.status: 'failed' with error details
    • Orchestrator checks error.retryable flag
    • If retryable: Re-invoke agent HTTP endpoint after backoff delay (from schedule retryConfig)
    • If not retryable: Mark as permanently failed, notify admins
  • Pipeline Dependencies:

    • Job dependencies can timeout if upstream jobs fail repeatedly
    • Pipeline continues with partial results if some jobs fail (non-blocking)
    • Each pipeline step can have its own retry configuration
  • General:

    • Notify on critical failures (via configured notification channels)
    • Failed jobs for one schedule don't block others
    • Agents can implement their own retry logic internally
    • Retry configuration from schedule is applied to all job attempts
  1. Metrics Collection:
  • Agents write metrics to database upon completion
  • Orchestrator queries database for job and pipeline status
  • Track schedule execution history in ScheduleExecution table
  • Learning Agent reads metrics from database independently (runs on its own schedule)

Admin Interface Usage

Admins can create and manage schedules and pipelines through the admin interface. The orchestrator dynamically loads and executes these configurations without any code changes.

Creating a Schedule

  1. Navigate to Schedules:

    • Go to /admin/scheduler/schedules
    • Click "Create Schedule"
  2. Configure Schedule:

    • Basic Info: Name, description
    • Target: Choose targetType:
      • **Agent: Select agentId from available agents
      • Pipeline: Select pipelineId from available pipelines
    • Timing:
      • Repeat: Choose 'once' for one-time execution or 'repeating' for recurring
      • For Repeating:
        • Cron: Provide cron expression (e.g., "0 2 * * 0" for weekly Sunday 2 AM)
        • Interval: Provide interval in milliseconds (e.g., 3600000 for hourly)
      • Timezone: Default is 'America/New_York'
      • Start At: Optional start time (for one-time or first run of repeating)
    • Parameters: Set agent parameters (JSON object)
      • For data source expansion, use data source strings in parameter values:
        • Format: <source>:<table>:<filter>:<field>[?<options>]
        • Examples:
          • "db:ticker:all:id" - All IDs from ticker table
          • "db:ticker:AAPL:id" - Specific ticker ID
          • "db:ticker:all:id?enabled=true" - All enabled tickers
          • "db:ticker:all:id?enabled=true&batchSize=10&staggerDelay=2000" - With batch options
          • "db:ticker:all:id?filter={\"enabled\":true,\"status\":\"active\"}" - Complex filter
        • Simple filters: Use query string format key=value&key2=value2
        • Complex filters: Use JSON in filter parameter
        • Batch options: batchSize and staggerDelay in query string override defaults
    • Retry Config: Configure retry behavior (max retries, backoff, delay)
    • Timeout: Job timeout in milliseconds
    • Priority: Job priority (default: 0)
    • Enabled: Toggle to enable/disable schedule
  3. Save the Schedule

Creating a Pipeline

  1. Navigate to Pipelines:

    • Go to /admin/scheduler/pipelines
    • Click "Create Pipeline"
  2. Configure Pipeline:

    • Basic Info: Name, description
    • Steps: Add pipeline steps:
      • Agent: Select agentId for each step
      • Parameters: Step-specific parameters (JSON object)
      • Dependencies: Define dependsOn (step IDs this step depends on)
      • Parallel: Toggle if step can run in parallel with others
      • Retry Config: Step-specific retry configuration
      • Timeout: Step-specific timeout
    • Enabled: Toggle to enable/disable pipeline
  3. Save the Pipeline

  4. Create a Schedule for the Pipeline:

    • Create a schedule with targetType: 'pipeline' and select the pipeline
    • Add data source strings to parameters if needed (applies to all pipeline steps)

Example: Query Strategy Agent Schedule

Schedule:

  • name: "Weekly Entity Graph Refresh"
  • repeat: 'repeating'
  • cronExpression: "0 2 * * 0" (Sunday 2 AM)
  • targetType: 'agent'
  • agentId: "query-strategy"
  • params: { "tickerId": "db:ticker:all:id?enabled=true&batchSize=10&staggerDelay=1000", "refreshEntityGraph": true }

Result: Every Sunday at 2 AM, the scheduler:

  1. Parses tickerId data source string: db:ticker:all:id?enabled=true&batchSize=10&staggerDelay=1000
  2. Queries Ticker table with WHERE enabled = true and finds 100 enabled tickers
  3. Creates 100 job inputs (one per ticker)
  4. Processes in batches of 10 with 1 second delay between batches
  5. Each job gets params: { tickerId: '<specific-ticker-id>', refreshEntityGraph: true }

Example: One-time Agent Execution

Schedule:

  • name: "One-time Data Collection for AAPL"
  • repeat: 'once'
  • startAt: "2024-01-15T10:00:00Z"
  • targetType: 'agent'
  • agentId: "data-collection"
  • params: { "tickerId": "db:ticker:AAPL:id", "source": "news" }

Result: On January 15, 2024 at 10:00 AM, the scheduler:

  1. Parses tickerId data source string: db:ticker:AAPL:id
  2. Creates a single job with params: { tickerId: "AAPL", source: "news" }

Example: Pipeline Schedule with Data Source Expansion

Pipeline:

  • name: "Newsletter Generation Pipeline"
  • steps: Array of step objects:
    • { stepId: "collect", agentId: "data-collection", parallel: false }
    • { stepId: "analyze", agentId: "analysis", dependsOn: ["collect"], parallel: false }
    • { stepId: "generate", agentId: "content-generation", dependsOn: ["analyze"], parallel: false }

Schedule:

  • name: "Daily Newsletter for All Tickers"
  • repeat: 'repeating'
  • cronExpression: "0 8 * * *" (Daily at 8 AM)
  • targetType: 'pipeline'
  • pipelineId: "newsletter-generation-pipeline"
  • params: { "tickerId": "db:ticker:all:id?enabled=true&batchSize=5&staggerDelay=2000" }

Result: Every day at 8 AM, the scheduler:

  1. Parses tickerId data source string: db:ticker:all:id?enabled=true&batchSize=5&staggerDelay=2000
  2. Queries Ticker table with WHERE enabled = true and finds 50 enabled tickers
  3. Creates 50 pipeline execution contexts (one per ticker)
  4. Processes in batches of 5 with 2 second delay between batches
  5. Each context runs the pipeline: collect → analyze → generate (sequential)

Sequence Diagram