MediaPulse

Agent Registry API

Purpose

The Agent Registry API is a separate service (apps/agent-registry-api/) that provides endpoints for agents to register themselves, report their status, and manage their lifecycle. It serves as the central registry for all agent instances in the system, enabling the orchestrator to discover available agents, monitor their health, and distribute jobs effectively.

Key Responsibilities:

  • Provide HTTP endpoints for agent registration and deregistration
  • Accept and process agent heartbeat/status updates
  • Track agent instance capacity and current load in real-time
  • Maintain the AgentInstance table in the database
  • Enable the orchestrator to discover available agent instances
  • Support health monitoring and automatic failover
  • Track agent type metadata in the AgentRegistry table

Authentication

The Agent Data API is authenticated using the API keys stored in the AgentAuth table. The API key is passed in the Authorization header as a Bearer token.

Architecture

The Agent Registry API is a server (Next.js or other framework) that exposes REST endpoints. It maintains two key database tables:

  • AgentRegistry: Stores agent type metadata (not version-specific)
  • AgentInstance: Tracks all running agent instances and their status

The orchestrator queries these tables (either directly or via the API) to discover agents and distribute jobs. Agents use the API endpoints to register themselves and report status.

Database Schema

AgentRegistry

Stores agent type metadata (not version-specific). This table contains information about agent types, not individual instances.

{
  id: string; // Agent ID (e.g., 'query-strategy', 'data-collection')
  name: string; // Human-readable name
  description: string;
  version: string; // Current/latest version (informational only)
  endpoint: {
    type: 'http' | 'webhook' | 'n8n' | 'cloud-function';
    url: string; // Full URL to agent endpoint (base URL, not instance-specific)
    method: 'POST' | 'PUT';
    authentication?: {
      type: 'bearer' | 'api-key' | 'oauth';
      token?: string;
    };
    timeout: number; // Request timeout in milliseconds
    retryConfig?: {
      maxRetries: number;
      backoff: 'exponential' | 'linear' | 'fixed';
      delay: number;
    };
  };
  inputSchema: object; // JSON Schema or Zod schema (JSON representation)
  outputSchema: object; // JSON Schema or Zod schema (JSON representation)
  parameterTypes: {
    // Defines what parameters this agent accepts and how they can be expanded
    ticker?: {
      required: boolean;
      expandable: boolean; // If true, scheduler can expand this to multiple jobs
      scope: 'all' | 'active' | 'custom'; // Default scope for expansion
    };
    userId?: {
      required: boolean;
      expandable: boolean;
    };
    // ... other parameter types
  };
  enabled: boolean;
  healthCheck?: {
    endpoint: string;
    interval: number;
  };
  metadata?: {
    language?: string; // Implementation language
    runtime?: string; // Runtime environment
    deployment?: string; // Deployment type
  };
  lastHeartbeat?: Date; // Last health check timestamp (for agent type health)
  createdAt: Date;
  updatedAt: Date;
}

AgentInstance

Tracks all running agent instances. Multiple instances of the same agent version can exist for horizontal scaling.

{
  id: string; // Unique instance ID (e.g., 'data-collection-instance-1')
  agentId: string; // Agent type (e.g., 'data-collection')
  agentVersion: string; // Version this instance runs (e.g., '1.2.3')
  endpoint: {
    url: string; // Instance-specific endpoint URL
    method: 'POST';
    timeout: number;
  };
  status: 'active' | 'inactive' | 'unhealthy' | 'spawning';
  capacity: number; // Max concurrent jobs this instance can handle
  currentLoad: number; // Current number of running jobs
  lastHeartbeat: Date; // Last health check timestamp
  metadata?: {
    region?: string;
    zone?: string;
    deployment?: string; // 'container', 'lambda', etc.
  };
  createdAt: Date;
  updatedAt: Date;
}

API Endpoints

POST /api/register

Registers a new agent instance or updates an existing registration.

Request Body:

{
  instanceId: string; // Unique instance ID (generated by orchestrator or agent)
  agentId: string; // Agent type (e.g., 'data-collection')
  agentVersion: string; // Version this instance runs (e.g., '1.2.3')
  endpoint: {
    url: string; // Instance-specific endpoint URL
    method: 'POST';
    timeout: number;
  };
  capacity: number; // Max concurrent jobs this instance can handle
  metadata?: {
    region?: string;
    zone?: string;
    deployment?: string;
  };
}

Response:

{
  success: boolean;
  instanceId: string;
  status: 'registered' | 'updated';
  message?: string;
}

Behavior:

  • If instanceId exists: Updates the existing instance record
  • If instanceId doesn't exist: Creates a new instance record
  • Sets status: 'active' and updates lastHeartbeat
  • Initializes currentLoad: 0

POST /api/heartbeat

Updates instance status, load, and heartbeat timestamp.

Request Body:

{
  instanceId: string;
  status?: 'active' | 'inactive' | 'unhealthy'; // Optional status update
  currentLoad?: number; // Current number of running jobs
  capacity?: number; // Updated capacity (if changed)
}

Response:

{
  success: boolean;
  lastHeartbeat: Date;
}

Behavior:

  • Updates lastHeartbeat timestamp
  • Updates currentLoad if provided
  • Updates status if provided
  • Updates capacity if provided
  • Used by agents to report their current state periodically

POST /api/deregister

Deregisters an agent instance (called when instance shuts down).

Request Body:

{
  instanceId: string;
}

Response:

{
  success: boolean;
  message?: string;
}

Behavior:

  • Sets instance status: 'inactive'
  • Optionally removes instance record (or marks for cleanup)
  • Called when agent instance is shutting down gracefully

GET /api/instances

Queries available agent instances (used by orchestrator).

Query Parameters:

  • agentId?: string - Filter by agent type
  • agentVersion?: string - Filter by version
  • status?: string - Filter by status (default: 'active')
  • minCapacity?: number - Only return instances with available capacity

Response:

{
  instances: Array<{
    id: string;
    agentId: string;
    agentVersion: string;
    endpoint: {
      url: string;
      method: 'POST';
      timeout: number;
    };
    status: 'active' | 'inactive' | 'unhealthy';
    capacity: number;
    currentLoad: number;
    availableCapacity: number; // capacity - currentLoad
    lastHeartbeat: Date;
    metadata?: object;
  }>;
  total: number;
}

GET /api/registry

Queries agent type registry (used by orchestrator).

Query Parameters:

  • agentId?: string - Filter by agent ID
  • enabled?: boolean - Filter by enabled status

Response:

{
  agents: Array<{
    id: string;
    name: string;
    description: string;
    version: string;
    endpoint: object;
    inputSchema: object;
    outputSchema: object;
    parameterTypes: object;
    enabled: boolean;
    metadata?: object;
  }>;
  total: number;
}

POST /api/registry/register

Registers or updates agent type metadata (called by agents during initialization).

Request Body:

{
  agentId: string;
  name: string;
  description: string;
  version: string;
  endpoint: {
    type: 'http' | 'webhook' | 'n8n' | 'cloud-function';
    url: string;
    method: 'POST' | 'PUT';
    authentication?: object;
    timeout: number;
    retryConfig?: object;
  };
  inputSchema: object;
  outputSchema: object;
  parameterTypes: object;
  enabled: boolean;
  healthCheck?: {
    endpoint: string;
    interval: number;
  };
  metadata?: object;
}

Response:

{
  success: boolean;
  agentId: string;
  status: 'registered' | 'updated';
  message?: string;
}

Agent Registration Flow

  1. Agent Type Registration (on agent startup):

    • Agent calls POST /api/registry/register to register agent type metadata
    • Creates or updates AgentRegistry record
    • This is typically done once per agent type deployment
  2. Instance Registration (when instance starts):

    • Orchestrator spawns instance or instance starts independently
    • Instance calls POST /api/register with its instance details
    • Creates AgentInstance record with status: 'active'
    • Instance begins periodic heartbeat updates
  3. Heartbeat Updates (periodically):

    • Instance calls POST /api/heartbeat every 30-60 seconds
    • Updates currentLoad, status, and lastHeartbeat
    • Keeps instance marked as available for job distribution
  4. Deregistration (when instance shuts down):

    • Instance calls POST /api/deregister before shutting down
    • Marks instance as status: 'inactive'
    • Orchestrator stops routing jobs to this instance

Orchestrator Integration

The orchestrator uses the Agent Registry API (or queries the database directly) to discover agents, monitor health, and distribute jobs. For detailed information on how the orchestrator uses this API, see the Scheduler/Orchestrator Documentation.

Key Integration Points:

  • Agent Discovery: Orchestrator queries AgentRegistry and AgentInstance tables to discover available agents and instances
  • Health Monitoring: Orchestrator monitors instance heartbeats and automatically handles failed instances
  • Load Balancing: Orchestrator queries instances with available capacity and distributes jobs accordingly
  • Auto-Scaling: Orchestrator monitors capacity and spawns/terminates instances as needed

See the Scheduler/Orchestrator Documentation for complete details on orchestrator behavior.

Health Monitoring

The Agent Registry API provides health monitoring capabilities through heartbeat tracking:

  • Heartbeat Updates: Instances report their status via POST /api/heartbeat endpoint
  • Status Tracking: Instances can report 'unhealthy' status if they detect internal issues
  • Heartbeat Timeout: Instances with stale heartbeats (exceeding heartbeatTimeout configuration) are considered unavailable

The orchestrator monitors instance health by querying the AgentInstance table and automatically handles failed instances. See the Scheduler/Orchestrator Documentation for details on how the orchestrator uses this information.

Error Handling

  • Registration Failures: If registration fails, agent instance should retry with exponential backoff
  • Heartbeat Failures: If heartbeat fails, instance should continue retrying; orchestrator will mark as unhealthy if heartbeat is stale
  • Network Issues: API should handle network failures gracefully and return appropriate error codes
  • Validation Errors: API validates all input data and returns clear error messages

Security

  • Authentication: API endpoints should require authentication (API keys, bearer tokens, etc.)
  • Authorization: Only authorized agents and orchestrator should be able to register/query instances
  • Rate Limiting: Implement rate limiting to prevent abuse
  • Input Validation: Validate all input data to prevent injection attacks

Configuration

The Agent Registry API configuration is stored in the database (SystemConfig or AgentConfig table):

{
  agentRegistry: {
    heartbeatTimeout: 120000, // Consider instance dead if no heartbeat for 2 minutes
    cleanupInterval: 300000, // Clean up stale instances every 5 minutes
    maxInstancesPerAgent: 100, // Maximum instances per agent type
    healthCheckInterval: 30000, // How often to check instance health
  }
}