MediaPulse
Agents

Agent Specification

Overview

This document defines the Agent Specification - a contract that all agents must follow to integrate with the MediaPulse system. Agents can be implemented in any language (JavaScript, TypeScript, Rust, Python, Go, etc.) or even as n8n workflows, as long as they adhere to this specification.

The specification ensures:

  • Language Independence: Agents can be written in any programming language
  • Deployment Flexibility: Agents can run anywhere (cloud functions, containers, edge functions, n8n, etc.)
  • Standardized Communication: All agents communicate via HTTP and the database
  • Self-Registration: Agents register themselves in the database
  • Status Reporting: Agents report their execution status and results

Core Principles

  1. Database as Central Hub: All agents read from and write to the shared database
  2. HTTP-Based Invocation: Agents expose HTTP endpoints for execution
  3. Self-Registration: Agents register themselves in the AgentRegistry table (agent type metadata) and AgentInstance table (instance-specific information)
  4. Asynchronous Execution: Agents execute asynchronously and report status via database
  5. Specification Compliance: Agents must implement the required endpoints and data structures
  6. Versioning: Only one version per agent type can be active in production at a time
  7. Multi-Instance Support: Multiple instances of the same agent version can run in parallel for horizontal scaling

Agent Registration

Registration Models

Agents do not need to be long-running. The AgentRegistry table must contain the agent's endpoint URL and metadata, but how it gets there depends on the deployment model:

1. Self-Registration (Long-Running Services)

For: Containers, traditional servers, Kubernetes deployments

  • Agent registers itself in AgentRegistry on startup
  • Updates lastHeartbeat periodically to indicate it's alive
  • Handles multiple execution requests over time
  • Best for: Production deployments, high-throughput agents

Example: Container-based agent that runs continuously

2. Deployment-Time Registration (Serverless Functions)

For: AWS Lambda, Google Cloud Functions, Azure Functions, Vercel Serverless

  • Registration happens during deployment (via CI/CD pipeline or deployment script)
  • Agent function doesn't need to register itself on each invocation
  • Registration script runs once per deployment
  • Best for: Cost-efficient, auto-scaling agents

Example: Lambda function registered via Terraform/CloudFormation during infrastructure deployment

3. Manual Registration (External Agents)

For: n8n workflows, third-party services, one-off integrations

  • Admin registers agent via admin interface
  • Agent metadata and endpoint URL entered manually
  • No code changes needed
  • Best for: Non-code agents, external integrations, rapid prototyping

Example: n8n workflow webhook URL registered by admin

4. Hybrid Registration (Serverless with Cold-Start Registration)

For: Serverless functions that want to update their status

  • Registration happens during cold start (first invocation)
  • Agent function checks/updates registry on first call
  • Subsequent invocations skip registration
  • Best for: Serverless functions that want to report their availability

Example: Lambda function that registers itself on cold start

Registration Requirements

Important: AgentRegistry stores agent type metadata, not version-specific information. The active version is determined by the AgentVersionDeployment table.

Regardless of registration method, the AgentRegistry table must contain:

interface AgentRegistry {
  id: string;                    // Unique agent ID (e.g., 'query-strategy', 'data-collection')
  name: string;                  // Human-readable name
  description: string;           // What the agent does
  version: string;               // Current/latest version (informational only - active version determined by AgentVersionDeployment)
  
  // Agent endpoint configuration
  endpoint: {
    type: 'http' | 'webhook' | 'n8n' | 'cloud-function';
    url: string;                 // Full URL to agent endpoint
    method: 'POST' | 'PUT';      // HTTP method
    authentication?: {
      type: 'bearer' | 'api-key' | 'oauth';
      token?: string;             // Token for scheduler to authenticate
    };
    timeout: number;              // Request timeout in milliseconds
    retryConfig?: {
      maxRetries: number;
      backoff: 'exponential' | 'linear' | 'fixed';
      delay: number;
    };
  };
  
  // Agent capabilities
  inputSchema: object;           // JSON Schema or Zod schema (JSON representation)
  outputSchema: object;           // JSON Schema or Zod schema (JSON representation)
  parameterTypes: {
    ticker?: {
      required: boolean;
      expandable: boolean;        // Can scheduler expand this parameter?
      scope: 'all' | 'active' | 'custom';
    };
    userId?: {
      required: boolean;
      expandable: boolean;
    };
    // ... other parameter types
  };
  
  // Agent metadata
  enabled: boolean;
  healthCheck?: {
    endpoint: string;            // Health check endpoint (optional for serverless)
    interval: number;            // Health check interval (seconds)
  };
  metadata?: {
    language?: string;           // Implementation language (e.g., 'typescript', 'python', 'rust')
    runtime?: string;            // Runtime environment (e.g., 'node', 'python3.11', 'wasmtime')
    deployment?: string;         // Deployment type (e.g., 'container', 'lambda', 'n8n')
    repository?: string;        // Source code repository URL
    maintainer?: string;         // Team or person responsible
  };
  createdAt: Date;
  updatedAt: Date;
  lastHeartbeat?: Date;          // Last health check timestamp (optional for serverless)
}

Registration Methods

Agents can register via:

  1. Direct Database Access: Agent or deployment script connects to database and inserts/updates record
  2. Registration API: Agent calls a registration API endpoint (if provided by the system)
  3. Admin Interface: Manual registration by admin via web interface
  4. Infrastructure as Code: Registration via Terraform, CloudFormation, or similar tools

Example: Self-Registration (Long-Running Service)

import psycopg2
import json
from datetime import datetime

def register_agent():
    conn = psycopg2.connect(DATABASE_URL)
    cur = conn.cursor()
    
    agent_data = {
        'id': 'data-collection',
        'name': 'Data Collection Agent',
        'description': 'Collects raw media data from search sources',
        'version': '1.0.0',
        'endpoint': {
            'type': 'http',
            'url': 'https://my-agent.example.com/execute',
            'method': 'POST',
            'authentication': {
                'type': 'bearer',
                'token': os.getenv('AGENT_API_TOKEN')
            },
            'timeout': 300000
        },
        'inputSchema': {...},  # JSON Schema
        'outputSchema': {...},  # JSON Schema
        'enabled': True
    }
    
    cur.execute("""
        INSERT INTO "AgentRegistry" (id, name, description, version, endpoint, "inputSchema", "outputSchema", enabled, "createdAt", "updatedAt")
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (id) DO UPDATE SET
            name = EXCLUDED.name,
            endpoint = EXCLUDED.endpoint,
            "updatedAt" = EXCLUDED."updatedAt"
    """, (
        agent_data['id'],
        agent_data['name'],
        agent_data['description'],
        agent_data['version'],
        json.dumps(agent_data['endpoint']),
        json.dumps(agent_data['inputSchema']),
        json.dumps(agent_data['outputSchema']),
        agent_data['enabled'],
        datetime.now(),
        datetime.now()
    ))
    
    conn.commit()
    cur.close()
    conn.close()

# Call on startup for long-running services
if __name__ == '__main__':
    register_agent()
    # Start HTTP server...

Example: Deployment-Time Registration (Serverless)

# registration_script.py (runs during CI/CD deployment)
import psycopg2
import json
import os

def register_lambda_agent():
    """Run this script during Lambda deployment"""
    conn = psycopg2.connect(os.getenv('DATABASE_URL'))
    cur = conn.cursor()
    
    lambda_url = os.getenv('LAMBDA_FUNCTION_URL')  # Set by deployment
    
    agent_data = {
        'id': 'data-collection',
        'name': 'Data Collection Agent',
        'description': 'Collects raw media data from search sources',
        'version': os.getenv('VERSION', '1.0.0'),
        'endpoint': {
            'type': 'cloud-function',
            'url': lambda_url,
            'method': 'POST',
            'timeout': 300000
        },
        'inputSchema': {...},
        'outputSchema': {...},
        'enabled': True,
        'metadata': {
            'deployment': 'lambda',
            'runtime': 'python3.11'
        }
    }
    
    # Insert or update registration
    cur.execute("""
        INSERT INTO "AgentRegistry" (id, name, description, version, endpoint, "inputSchema", "outputSchema", enabled, metadata, "createdAt", "updatedAt")
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
        ON CONFLICT (id) DO UPDATE SET
            endpoint = EXCLUDED.endpoint,
            version = EXCLUDED.version,
            "updatedAt" = NOW()
    """, (
        agent_data['id'],
        agent_data['name'],
        agent_data['description'],
        agent_data['version'],
        json.dumps(agent_data['endpoint']),
        json.dumps(agent_data['inputSchema']),
        json.dumps(agent_data['outputSchema']),
        agent_data['enabled'],
        json.dumps(agent_data['metadata']),
    ))
    
    conn.commit()
    cur.close()
    conn.close()

# Lambda function itself doesn't need to register

Example: Manual Registration (n8n Workflow)

  1. Admin navigates to /admin/agents/register
  2. Enters agent details:
    • ID: data-collection-n8n
    • Name: Data Collection (n8n)
    • Endpoint URL: https://n8n.example.com/webhook/data-collection
    • Input/Output schemas (JSON)
  3. System creates record in AgentRegistry
  4. Orchestrator discovers and invokes agent

Note: For serverless and event-driven agents, lastHeartbeat is optional since the agent may not be continuously running. The orchestrator will attempt to invoke the agent endpoint and handle failures accordingly.

Agent Versioning

Version Management

  • Multiple versions exist per agent type (stored in AgentVersion table)
  • Only one version can be active in production per agent type (determined by AgentVersionDeployment table)
  • When a new version is promoted to production, it automatically replaces the previous production version
  • Agents must read their active version from AgentVersionDeployment during initialization
  • All agent outputs must include an agentVersion field for traceability

Determining Active Version

Agents must query the AgentVersionDeployment table on startup to determine which version they should run:

// Pseudo-code for agent initialization
const activeVersion = await db.agentVersionDeployment.findFirst({
  where: {
    agentId: 'data-collection',
    environment: 'production'
  }
});

if (!activeVersion) {
  throw new Error('No production version found for agent');
}

const agentVersion = activeVersion.version; // e.g., "1.2.3"

Version in Outputs

All agent outputs must include the agentVersion field:

interface AgentOutput {
  agentId: string;
  agentVersion: string;  // Must match the version from AgentVersionDeployment
  executionId: string;
  timestamp: Date;
  // ... rest of output
}

Agent Instance Management

Automatic Instance Management by Orchestrator

Important: Agent instances are automatically spawned and managed by the orchestrator. Admins do not need to manually start instances. The orchestrator:

  • Automatically spawns instances when needed (based on job queue and load)
  • Manages instance lifecycle (spawn → active → idle → terminate)
  • Scales instances up/down based on demand
  • Replaces failed instances automatically
  • Monitors instance health and capacity

Instance Lifecycle

Instances are created and managed by the orchestrator:

  1. Spawned by Orchestrator:

    • Orchestrator spawns new instance when needed (job queue, high load, etc.)
    • Orchestrator creates record in AgentInstance table with:
      • Unique instance ID (generated by orchestrator, e.g., data-collection-instance-1)
      • Agent type (agentId)
      • Active version (from AgentVersionDeployment)
      • Instance-specific endpoint URL (assigned by orchestrator)
      • Capacity (max concurrent jobs, from config)
      • Initial status: 'spawning'
  2. Instance Startup:

    • Instance receives startup signal from orchestrator (or detects it's been spawned)
    • Instance reads its configuration from database
    • Instance determines active version from AgentVersionDeployment table
    • Instance starts HTTP server and becomes ready
    • Instance updates status to 'active' in AgentInstance table
  3. During Operation:

    • Instance periodically updates currentLoad and lastHeartbeat in AgentInstance table
    • Instance processes jobs received from orchestrator
    • Instance updates currentLoad when jobs start/complete
  4. Termination:

    • Orchestrator terminates idle instances or when scaling down
    • Orchestrator updates instance status to 'terminating' then removes record
    • Instance gracefully shuts down when terminated

Instance Schema

interface AgentInstance {
  id: string;                    // Unique instance ID (e.g., 'data-collection-instance-1')
  agentId: string;                // Agent type (e.g., 'data-collection')
  agentVersion: string;           // Version this instance runs (e.g., '1.2.3')
  endpoint: {
    url: string;                   // Instance-specific endpoint URL
    method: 'POST';
    timeout: number;
  };
  status: 'active' | 'inactive' | 'unhealthy';
  capacity: number;                // Max concurrent jobs this instance can handle
  currentLoad: number;             // Current number of running jobs
  lastHeartbeat: Date;            // Last health check timestamp
  metadata?: {
    region?: string;
    zone?: string;
    deployment?: string;           // 'container', 'lambda', etc.
  };
  createdAt: Date;
  updatedAt: Date;
}

Instance Lifecycle

Startup (Instance started by orchestrator):

// Instance reads its configuration
const instanceId = process.env.INSTANCE_ID; // Provided by orchestrator
const instanceUrl = process.env.INSTANCE_URL; // Provided by orchestrator

// Instance determines active version
const activeVersion = await db.agentVersionDeployment.findFirst({
  where: {
    agentId: 'data-collection',
    environment: 'production'
  }
});

// Instance updates status to active when ready
await db.agentInstance.update({
  where: { id: instanceId },
  data: {
    status: 'active',
    agentVersion: activeVersion.version,
    lastHeartbeat: new Date(),
  }
});

During Operation:

// Update load when job starts
await db.agentInstance.update({
  where: { id: instanceId },
  data: { 
    currentLoad: { increment: 1 },
    lastHeartbeat: new Date()
  }
});

// Update load when job completes
await db.agentInstance.update({
  where: { id: instanceId },
  data: { 
    currentLoad: { decrement: 1 },
    lastHeartbeat: new Date()
  }
});

Shutdown (Instance terminated by orchestrator):

// Orchestrator signals termination (e.g., via SIGTERM or HTTP endpoint)
// Instance gracefully shuts down:
// - Stop accepting new jobs
// - Wait for current jobs to complete (with timeout)
// - Update status and exit

await db.agentInstance.update({
  where: { id: instanceId },
  data: { status: 'terminating' }
});

// Wait for jobs to complete, then orchestrator removes the record

Multiple Instances

  • Multiple instances of the same agent version can run simultaneously
  • Instances are automatically spawned by the orchestrator based on demand
  • Each instance has its own unique ID (generated by orchestrator) and endpoint URL (assigned by orchestrator)
  • The orchestrator distributes jobs across instances based on:
    • Instance status ('active')
    • Recent heartbeat (not stale)
    • Available capacity (currentLoad < capacity)
    • Load balancing algorithm (round-robin, least-loaded, etc.)

Instance Health Monitoring

  • Instances must update lastHeartbeat periodically (e.g., every 30 seconds)
  • Instances with stale heartbeats (e.g., > 2 minutes old) are considered unavailable
  • The orchestrator automatically spawns replacement instances for failed/unhealthy instances
  • The orchestrator will not route jobs to unhealthy instances
  • Instances can self-report status: 'unhealthy' if they detect issues (orchestrator will replace them)

Agent Execution Endpoint

Required Endpoint

All agents must expose an HTTP endpoint that accepts execution requests from the orchestrator. The endpoint signature is:

Endpoint: POST /execute (or custom path as specified in registration)

Request Headers:

Content-Type: application/json
Authorization: Bearer [token] (if authentication configured)
X-Job-Id: [job-id] (unique job identifier)
X-Execution-Id: [execution-id] (unique execution identifier)

Note: The placeholders [token], [job-id], and [execution-id] should be replaced with actual values.

Request Body:

interface AgentExecutionRequest {
  jobId: string;                 // Unique job ID from orchestrator
  executionId: string;            // Unique execution ID
  agentId: string;               // Agent ID (should match registered ID)
  agentVersion: string;          // Expected agent version (must match instance's version)
  instanceId?: string;            // Instance ID that received this job (for load tracking)
  params: Record<string, any>;   // Agent-specific parameters (validated against inputSchema)
  config?: Record<string, any>;  // Optional runtime config overrides
  metadata?: {
    scheduleId?: string;         // If triggered by schedule
    pipelineId?: string;         // If part of pipeline
    pipelineStepId?: string;     // If part of pipeline step
    priority?: number;           // Job priority
    retryCount?: number;         // Current retry attempt
    parentJobId?: string;        // If this is a sub-job from job division
  };
}

Response (Synchronous - Immediate Acknowledgment):

interface AgentExecutionResponse {
  jobId: string;
  executionId: string;
  status: 'accepted' | 'rejected';
  message?: string;              // Error message if rejected
  estimatedDuration?: number;    // Estimated execution time (ms)
}

Important: The execution endpoint should return immediately (within 1-2 seconds) with an acceptance response. The actual work should be performed asynchronously. The agent reports progress and results via the database (see Status Reporting below). The orchestrator does not wait for agent completion but tracks job status through the database.

Status Reporting

Database Status Updates

Agents must update job status in the database during execution. The orchestrator and monitoring systems query the database to track agent progress.

Database Table: AgentJobExecution

interface AgentJobExecution {
  id: string;                    // Unique execution ID
  jobId: string;                  // Job ID from scheduler
  agentId: string;                // Agent ID
  agentVersion: string;           // Agent version that executed
  status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled';
  params: Record<string, any>;    // Execution parameters
  result?: Record<string, any>;   // Agent output (validated against outputSchema)
  error?: {
    message: string;
    code?: string;
    stack?: string;
    timestamp: Date;
  };
  metrics?: {
    executionTime: number;        // Milliseconds
    memoryUsage?: number;         // Bytes
    cpuUsage?: number;            // Percentage
    itemsProcessed?: number;      // Agent-specific metrics
  };
  startedAt?: Date;
  completedAt?: Date;
  createdAt: Date;
  updatedAt: Date;
}

Status Update Flow

  1. Job Accepted: Agent creates record with status: 'pending'
  2. Execution Started: Agent updates to status: 'running', sets startedAt
  3. Progress Updates: Agent can update metrics periodically (optional)
  4. Execution Complete: Agent updates to status: 'completed', sets result and completedAt
  5. Execution Failed: Agent updates to status: 'failed', sets error and completedAt

Example Status Updates (Rust):

use sqlx::PgPool;
use serde_json::json;

async fn update_job_status(
    pool: &PgPool,
    execution_id: &str,
    status: &str,
    result: Option<serde_json::Value>,
    error: Option<&str>,
) -> Result<(), sqlx::Error> {
    let now = chrono::Utc::now();
    
    sqlx::query!(
        r#"
        UPDATE "AgentJobExecution"
        SET status = $1,
            result = $2,
            error = CASE WHEN $3 IS NOT NULL THEN jsonb_build_object('message', $3, 'timestamp', $4) ELSE NULL END,
            "updatedAt" = $4,
            "completedAt" = CASE WHEN $1 IN ('completed', 'failed') THEN $4 ELSE "completedAt" END
        WHERE id = $5
        "#,
        status,
        result.as_ref().map(|r| serde_json::to_value(r).unwrap()),
        error,
        now,
        execution_id
    )
    .execute(pool)
    .await?;
    
    Ok(())
}

Agent Output Specification

Output Structure

All agents must return outputs that conform to their registered outputSchema. The output must include:

interface AgentOutput {
  agentId: string;               // Agent ID
  agentVersion: string;          // Semantic version (e.g., "1.2.3")
  executionId: string;           // Execution ID
  timestamp: Date;               // When output was generated
  executionTime: number;          // Execution time in milliseconds
  data: Record<string, any>;      // Agent-specific output data
  metadata?: {
    itemsProcessed?: number;
    errors?: Array<{
      message: string;
      timestamp: Date;
    }>;
    // ... agent-specific metadata
  };
}

Output Storage

Agents write their outputs to the database. The exact table depends on the agent type:

  • Query Strategy Agent: Writes to Query table
  • Data Collection Agent: Writes to DataSource table
  • Analysis Agent: Writes to AnalysisResult table
  • Content Generation Agent: Writes to Newsletter table
  • etc.

The output structure is validated against the outputSchema registered in AgentRegistry.

Health Checks

Health Check Endpoint

Agents should expose a health check endpoint (optional but recommended):

Endpoint: GET /health (or as specified in AgentRegistry.healthCheck.endpoint)

Response:

interface HealthCheckResponse {
  status: 'healthy' | 'degraded' | 'unhealthy';
  version: string;
  uptime?: number;               // Seconds since agent started
  checks?: {
    database?: 'ok' | 'error';
    externalApis?: 'ok' | 'error';
    // ... other health checks
  };
  timestamp: Date;
}

Heartbeat Updates

Agent instances should periodically update their lastHeartbeat timestamp in AgentInstance table to indicate they are alive:

UPDATE "AgentInstance"
SET "lastHeartbeat" = NOW(), "updatedAt" = NOW()
WHERE id = 'instance-id';

Error Handling

Error Response Format

If an agent rejects a job or encounters an error, it should:

  1. Immediate Rejection: Return status: 'rejected' in execution response with error message
  2. Runtime Error: Update AgentJobExecution with status: 'failed' and error details

Error Structure:

interface AgentError {
  message: string;               // Human-readable error message
  code?: string;                 // Error code (e.g., 'VALIDATION_ERROR', 'TIMEOUT')
  stack?: string;                // Stack trace (for debugging)
  timestamp: Date;
  retryable: boolean;            // Whether the job can be retried
}

Retry Behavior

  • Agents should set retryable: true for transient errors (network issues, rate limits)
  • Agents should set retryable: false for permanent errors (validation errors, invalid config)
  • Scheduler uses retryable flag to determine if job should be retried

Configuration Management

Configuration Loading

Agents load their configuration from the database:

  1. Agent Config: Read from AgentConfig table (key: agentId)
  2. Active Version: Determine active version from AgentVersionDeployment table
  3. Version Config: If version-specific config exists, merge with agent config
  4. Runtime Overrides: Apply any runtime config overrides from execution request

Configuration Hierarchy (same as existing system):

  1. Runtime overrides (from execution request)
  2. Agent-specific config from AgentConfig
  3. System-wide defaults from SystemConfig

Configuration Hot-Reload

Agents should support configuration hot-reload:

  • Poll AgentConfig table periodically (e.g., every 60 seconds)
  • Or subscribe to configuration change events (if event system exists)
  • Re-initialize agent when config changes detected

Implementation Examples

Example 1: TypeScript/Node.js Agent

import express from 'express';
import { PrismaClient } from '@prisma/client';

const app = express();
const prisma = new PrismaClient();

// Register agent on startup
async function registerAgent() {
  await prisma.agentRegistry.upsert({
    where: { id: 'data-collection' },
    update: {
      endpoint: {
        type: 'http',
        url: process.env.AGENT_URL,
        method: 'POST',
        timeout: 300000,
      },
      lastHeartbeat: new Date(),
    },
    create: {
      id: 'data-collection',
      name: 'Data Collection Agent',
      description: 'Collects raw media data',
      version: '1.0.0',
      endpoint: {
        type: 'http',
        url: process.env.AGENT_URL,
        method: 'POST',
        timeout: 300000,
      },
      inputSchema: {...},
      outputSchema: {...},
      enabled: true,
    },
  });
}

// Execution endpoint
app.post('/execute', async (req, res) => {
  const { jobId, executionId, params } = req.body;
  
  // Accept job immediately
  res.json({
    jobId,
    executionId,
    status: 'accepted',
    estimatedDuration: 60000,
  });
  
  // Execute asynchronously
  executeJob(jobId, executionId, params).catch(console.error);
});

async function executeJob(jobId: string, executionId: string, params: any) {
  // Create execution record
  await prisma.agentJobExecution.create({
    data: {
      id: executionId,
      jobId,
      agentId: 'data-collection',
      agentVersion: '1.0.0',
      status: 'pending',
      params,
    },
  });
  
  try {
    // Update to running
    await prisma.agentJobExecution.update({
      where: { id: executionId },
      data: { status: 'running', startedAt: new Date() },
    });
    
    // Do actual work
    const result = await collectData(params);
    
    // Update to completed
    await prisma.agentJobExecution.update({
      where: { id: executionId },
      data: {
        status: 'completed',
        result,
        completedAt: new Date(),
      },
    });
    
    // Write to agent-specific table
    await prisma.dataSource.createMany({
      data: result.items,
    });
  } catch (error) {
    await prisma.agentJobExecution.update({
      where: { id: executionId },
      data: {
        status: 'failed',
        error: {
          message: error.message,
          timestamp: new Date(),
        },
        completedAt: new Date(),
      },
    });
  }
}

app.listen(3000, () => {
  registerAgent();
  console.log('Agent running on port 3000');
});

Example 2: Python Agent

from flask import Flask, request, jsonify
import psycopg2
import psycopg2.extras
from datetime import datetime
import os
import threading

app = Flask(__name__)

def get_db_connection():
    return psycopg2.connect(
        host=os.getenv('DB_HOST'),
        database=os.getenv('DB_NAME'),
        user=os.getenv('DB_USER'),
        password=os.getenv('DB_PASSWORD')
    )

def register_agent():
    conn = get_db_connection()
    cur = conn.cursor()
    
    cur.execute("""
        INSERT INTO "AgentRegistry" (id, name, description, version, endpoint, "inputSchema", "outputSchema", enabled, "createdAt", "updatedAt")
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (id) DO UPDATE SET
            endpoint = EXCLUDED.endpoint,
            "updatedAt" = EXCLUDED."updatedAt"
    """, (
        'analysis',
        'Analysis Agent',
        'Analyzes collected data',
        '1.0.0',
        json.dumps({
            'type': 'http',
            'url': os.getenv('AGENT_URL'),
            'method': 'POST',
            'timeout': 300000
        }),
        json.dumps({...}),  # inputSchema
        json.dumps({...}),  # outputSchema
        True,
        datetime.now(),
        datetime.now()
    ))
    
    conn.commit()
    cur.close()
    conn.close()

@app.route('/execute', methods=['POST'])
def execute():
    data = request.json
    job_id = data['jobId']
    execution_id = data['executionId']
    params = data['params']
    
    # Accept immediately
    response = {
        'jobId': job_id,
        'executionId': execution_id,
        'status': 'accepted',
        'estimatedDuration': 120000
    }
    
    # Execute asynchronously
    thread = threading.Thread(target=execute_job, args=(job_id, execution_id, params))
    thread.start()
    
    return jsonify(response)

def execute_job(job_id, execution_id, params):
    conn = get_db_connection()
    cur = conn.cursor()
    
    try:
        # Create execution record
        cur.execute("""
            INSERT INTO "AgentJobExecution" (id, "jobId", "agentId", "agentVersion", status, params, "createdAt", "updatedAt")
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
        """, (execution_id, job_id, 'analysis', '1.0.0', 'pending', json.dumps(params), datetime.now(), datetime.now()))
        
        # Update to running
        cur.execute("""
            UPDATE "AgentJobExecution"
            SET status = 'running', "startedAt" = %s, "updatedAt" = %s
            WHERE id = %s
        """, (datetime.now(), datetime.now(), execution_id))
        
        # Do actual work
        result = perform_analysis(params)
        
        # Update to completed
        cur.execute("""
            UPDATE "AgentJobExecution"
            SET status = 'completed', result = %s, "completedAt" = %s, "updatedAt" = %s
            WHERE id = %s
        """, (json.dumps(result), datetime.now(), datetime.now(), execution_id))
        
        conn.commit()
    except Exception as e:
        cur.execute("""
            UPDATE "AgentJobExecution"
            SET status = 'failed', error = %s, "completedAt" = %s, "updatedAt" = %s
            WHERE id = %s
        """, (json.dumps({'message': str(e), 'timestamp': datetime.now().isoformat()}), datetime.now(), datetime.now(), execution_id))
        conn.commit()
    finally:
        cur.close()
        conn.close()

if __name__ == '__main__':
    register_agent()
    app.run(host='0.0.0.0', port=5000)

Example 3: n8n Workflow Agent

For n8n workflows, the agent is implemented as a webhook workflow:

  1. Create n8n Workflow with webhook trigger
  2. Register Webhook URL in AgentRegistry.endpoint.url
  3. Workflow Steps:
    • Receive execution request
    • Create execution record in database (via HTTP Request node)
    • Perform agent work
    • Update execution status in database
    • Write results to database

n8n Workflow Structure:

  • Webhook Trigger: Receives execution request
  • HTTP Request Node: Create execution record in database
  • Function Node: Perform agent logic (or call external service)
  • HTTP Request Node: Update execution status to 'running'
  • HTTP Request Node: Write results to database
  • HTTP Request Node: Update execution status to 'completed'

Testing & Validation

Agent Validation Checklist

Before deploying an agent, ensure:

  • Agent type is registered in AgentRegistry (done by admin or deployment)
  • Agent instance is spawned by orchestrator (not manually started)
  • Agent instance updates status to 'active' when ready (instance-specific info in AgentInstance table)
  • Agent reads active version from AgentVersionDeployment table
  • Agent exposes /execute endpoint (or configured path)
  • Agent returns immediate acceptance response (< 2 seconds)
  • Agent creates execution record in AgentJobExecution table
  • Agent updates instance currentLoad when job starts/completes
  • Agent updates execution status during execution
  • Agent writes results to appropriate database tables
  • Agent output conforms to registered outputSchema and includes agentVersion
  • Agent handles errors gracefully and updates status
  • Agent supports configuration hot-reload
  • Agent exposes health check endpoint (optional but recommended)
  • Agent updates heartbeat in AgentInstance table periodically
  • Agent instance gracefully shuts down when terminated by orchestrator

Testing Tools

The system should provide:

  • Agent Test Harness: Test agent execution locally
  • Schema Validator: Validate agent input/output schemas
  • Mock Scheduler: Test agent registration and execution
  • Database Fixtures: Test data for agent testing

Migration from Current Architecture

For existing agents in apps/worker/agents/, migration involves:

  1. Extract Agent Logic: Move agent logic to standalone service
  2. Add HTTP Endpoint: Expose /execute endpoint
  3. Add Registration: Implement self-registration on startup
  4. Add Status Updates: Update AgentJobExecution table during execution
  5. Deploy Separately: Deploy agent as independent service
  6. Update Orchestrator: Orchestrator calls agent via HTTP instead of BullMQ

The database schema and configuration system remain the same - only the execution mechanism changes.