Agent Specification
Overview
This document defines the Agent Specification - a contract that all agents must follow to integrate with the MediaPulse system. Agents can be implemented in any language (JavaScript, TypeScript, Rust, Python, Go, etc.) or even as n8n workflows, as long as they adhere to this specification.
The specification ensures:
- Language Independence: Agents can be written in any programming language
- Deployment Flexibility: Agents can run anywhere (cloud functions, containers, edge functions, n8n, etc.)
- Standardized Communication: All agents communicate via HTTP and the database
- Self-Registration: Agents register themselves in the database
- Status Reporting: Agents report their execution status and results
Core Principles
- Database as Central Hub: All agents read from and write to the shared database
- HTTP-Based Invocation: Agents expose HTTP endpoints for execution
- Self-Registration: Agents register themselves in the
AgentRegistrytable (agent type metadata) andAgentInstancetable (instance-specific information) - Asynchronous Execution: Agents execute asynchronously and report status via database
- Specification Compliance: Agents must implement the required endpoints and data structures
- Versioning: Only one version per agent type can be active in production at a time
- Multi-Instance Support: Multiple instances of the same agent version can run in parallel for horizontal scaling
Agent Registration
Registration Models
Agents do not need to be long-running. The AgentRegistry table must contain the agent's endpoint URL and metadata, but how it gets there depends on the deployment model:
1. Self-Registration (Long-Running Services)
For: Containers, traditional servers, Kubernetes deployments
- Agent registers itself in
AgentRegistryon startup - Updates
lastHeartbeatperiodically to indicate it's alive - Handles multiple execution requests over time
- Best for: Production deployments, high-throughput agents
Example: Container-based agent that runs continuously
2. Deployment-Time Registration (Serverless Functions)
For: AWS Lambda, Google Cloud Functions, Azure Functions, Vercel Serverless
- Registration happens during deployment (via CI/CD pipeline or deployment script)
- Agent function doesn't need to register itself on each invocation
- Registration script runs once per deployment
- Best for: Cost-efficient, auto-scaling agents
Example: Lambda function registered via Terraform/CloudFormation during infrastructure deployment
3. Manual Registration (External Agents)
For: n8n workflows, third-party services, one-off integrations
- Admin registers agent via admin interface
- Agent metadata and endpoint URL entered manually
- No code changes needed
- Best for: Non-code agents, external integrations, rapid prototyping
Example: n8n workflow webhook URL registered by admin
4. Hybrid Registration (Serverless with Cold-Start Registration)
For: Serverless functions that want to update their status
- Registration happens during cold start (first invocation)
- Agent function checks/updates registry on first call
- Subsequent invocations skip registration
- Best for: Serverless functions that want to report their availability
Example: Lambda function that registers itself on cold start
Registration Requirements
Important: AgentRegistry stores agent type metadata, not version-specific information. The active version is determined by the AgentVersionDeployment table.
Regardless of registration method, the AgentRegistry table must contain:
interface AgentRegistry {
id: string; // Unique agent ID (e.g., 'query-strategy', 'data-collection')
name: string; // Human-readable name
description: string; // What the agent does
version: string; // Current/latest version (informational only - active version determined by AgentVersionDeployment)
// Agent endpoint configuration
endpoint: {
type: 'http' | 'webhook' | 'n8n' | 'cloud-function';
url: string; // Full URL to agent endpoint
method: 'POST' | 'PUT'; // HTTP method
authentication?: {
type: 'bearer' | 'api-key' | 'oauth';
token?: string; // Token for scheduler to authenticate
};
timeout: number; // Request timeout in milliseconds
retryConfig?: {
maxRetries: number;
backoff: 'exponential' | 'linear' | 'fixed';
delay: number;
};
};
// Agent capabilities
inputSchema: object; // JSON Schema or Zod schema (JSON representation)
outputSchema: object; // JSON Schema or Zod schema (JSON representation)
parameterTypes: {
ticker?: {
required: boolean;
expandable: boolean; // Can scheduler expand this parameter?
scope: 'all' | 'active' | 'custom';
};
userId?: {
required: boolean;
expandable: boolean;
};
// ... other parameter types
};
// Agent metadata
enabled: boolean;
healthCheck?: {
endpoint: string; // Health check endpoint (optional for serverless)
interval: number; // Health check interval (seconds)
};
metadata?: {
language?: string; // Implementation language (e.g., 'typescript', 'python', 'rust')
runtime?: string; // Runtime environment (e.g., 'node', 'python3.11', 'wasmtime')
deployment?: string; // Deployment type (e.g., 'container', 'lambda', 'n8n')
repository?: string; // Source code repository URL
maintainer?: string; // Team or person responsible
};
createdAt: Date;
updatedAt: Date;
lastHeartbeat?: Date; // Last health check timestamp (optional for serverless)
}Registration Methods
Agents can register via:
- Direct Database Access: Agent or deployment script connects to database and inserts/updates record
- Registration API: Agent calls a registration API endpoint (if provided by the system)
- Admin Interface: Manual registration by admin via web interface
- Infrastructure as Code: Registration via Terraform, CloudFormation, or similar tools
Example: Self-Registration (Long-Running Service)
import psycopg2
import json
from datetime import datetime
def register_agent():
conn = psycopg2.connect(DATABASE_URL)
cur = conn.cursor()
agent_data = {
'id': 'data-collection',
'name': 'Data Collection Agent',
'description': 'Collects raw media data from search sources',
'version': '1.0.0',
'endpoint': {
'type': 'http',
'url': 'https://my-agent.example.com/execute',
'method': 'POST',
'authentication': {
'type': 'bearer',
'token': os.getenv('AGENT_API_TOKEN')
},
'timeout': 300000
},
'inputSchema': {...}, # JSON Schema
'outputSchema': {...}, # JSON Schema
'enabled': True
}
cur.execute("""
INSERT INTO "AgentRegistry" (id, name, description, version, endpoint, "inputSchema", "outputSchema", enabled, "createdAt", "updatedAt")
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
endpoint = EXCLUDED.endpoint,
"updatedAt" = EXCLUDED."updatedAt"
""", (
agent_data['id'],
agent_data['name'],
agent_data['description'],
agent_data['version'],
json.dumps(agent_data['endpoint']),
json.dumps(agent_data['inputSchema']),
json.dumps(agent_data['outputSchema']),
agent_data['enabled'],
datetime.now(),
datetime.now()
))
conn.commit()
cur.close()
conn.close()
# Call on startup for long-running services
if __name__ == '__main__':
register_agent()
# Start HTTP server...Example: Deployment-Time Registration (Serverless)
# registration_script.py (runs during CI/CD deployment)
import psycopg2
import json
import os
def register_lambda_agent():
"""Run this script during Lambda deployment"""
conn = psycopg2.connect(os.getenv('DATABASE_URL'))
cur = conn.cursor()
lambda_url = os.getenv('LAMBDA_FUNCTION_URL') # Set by deployment
agent_data = {
'id': 'data-collection',
'name': 'Data Collection Agent',
'description': 'Collects raw media data from search sources',
'version': os.getenv('VERSION', '1.0.0'),
'endpoint': {
'type': 'cloud-function',
'url': lambda_url,
'method': 'POST',
'timeout': 300000
},
'inputSchema': {...},
'outputSchema': {...},
'enabled': True,
'metadata': {
'deployment': 'lambda',
'runtime': 'python3.11'
}
}
# Insert or update registration
cur.execute("""
INSERT INTO "AgentRegistry" (id, name, description, version, endpoint, "inputSchema", "outputSchema", enabled, metadata, "createdAt", "updatedAt")
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
ON CONFLICT (id) DO UPDATE SET
endpoint = EXCLUDED.endpoint,
version = EXCLUDED.version,
"updatedAt" = NOW()
""", (
agent_data['id'],
agent_data['name'],
agent_data['description'],
agent_data['version'],
json.dumps(agent_data['endpoint']),
json.dumps(agent_data['inputSchema']),
json.dumps(agent_data['outputSchema']),
agent_data['enabled'],
json.dumps(agent_data['metadata']),
))
conn.commit()
cur.close()
conn.close()
# Lambda function itself doesn't need to registerExample: Manual Registration (n8n Workflow)
- Admin navigates to
/admin/agents/register - Enters agent details:
- ID:
data-collection-n8n - Name:
Data Collection (n8n) - Endpoint URL:
https://n8n.example.com/webhook/data-collection - Input/Output schemas (JSON)
- ID:
- System creates record in
AgentRegistry - Orchestrator discovers and invokes agent
Note: For serverless and event-driven agents, lastHeartbeat is optional since the agent may not be continuously running. The orchestrator will attempt to invoke the agent endpoint and handle failures accordingly.
Agent Versioning
Version Management
- Multiple versions exist per agent type (stored in
AgentVersiontable) - Only one version can be active in production per agent type (determined by
AgentVersionDeploymenttable) - When a new version is promoted to production, it automatically replaces the previous production version
- Agents must read their active version from
AgentVersionDeploymentduring initialization - All agent outputs must include an
agentVersionfield for traceability
Determining Active Version
Agents must query the AgentVersionDeployment table on startup to determine which version they should run:
// Pseudo-code for agent initialization
const activeVersion = await db.agentVersionDeployment.findFirst({
where: {
agentId: 'data-collection',
environment: 'production'
}
});
if (!activeVersion) {
throw new Error('No production version found for agent');
}
const agentVersion = activeVersion.version; // e.g., "1.2.3"Version in Outputs
All agent outputs must include the agentVersion field:
interface AgentOutput {
agentId: string;
agentVersion: string; // Must match the version from AgentVersionDeployment
executionId: string;
timestamp: Date;
// ... rest of output
}Agent Instance Management
Automatic Instance Management by Orchestrator
Important: Agent instances are automatically spawned and managed by the orchestrator. Admins do not need to manually start instances. The orchestrator:
- Automatically spawns instances when needed (based on job queue and load)
- Manages instance lifecycle (spawn → active → idle → terminate)
- Scales instances up/down based on demand
- Replaces failed instances automatically
- Monitors instance health and capacity
Instance Lifecycle
Instances are created and managed by the orchestrator:
-
Spawned by Orchestrator:
- Orchestrator spawns new instance when needed (job queue, high load, etc.)
- Orchestrator creates record in
AgentInstancetable with:- Unique instance ID (generated by orchestrator, e.g.,
data-collection-instance-1) - Agent type (
agentId) - Active version (from
AgentVersionDeployment) - Instance-specific endpoint URL (assigned by orchestrator)
- Capacity (max concurrent jobs, from config)
- Initial status:
'spawning'
- Unique instance ID (generated by orchestrator, e.g.,
-
Instance Startup:
- Instance receives startup signal from orchestrator (or detects it's been spawned)
- Instance reads its configuration from database
- Instance determines active version from
AgentVersionDeploymenttable - Instance starts HTTP server and becomes ready
- Instance updates status to
'active'inAgentInstancetable
-
During Operation:
- Instance periodically updates
currentLoadandlastHeartbeatinAgentInstancetable - Instance processes jobs received from orchestrator
- Instance updates
currentLoadwhen jobs start/complete
- Instance periodically updates
-
Termination:
- Orchestrator terminates idle instances or when scaling down
- Orchestrator updates instance status to
'terminating'then removes record - Instance gracefully shuts down when terminated
Instance Schema
interface AgentInstance {
id: string; // Unique instance ID (e.g., 'data-collection-instance-1')
agentId: string; // Agent type (e.g., 'data-collection')
agentVersion: string; // Version this instance runs (e.g., '1.2.3')
endpoint: {
url: string; // Instance-specific endpoint URL
method: 'POST';
timeout: number;
};
status: 'active' | 'inactive' | 'unhealthy';
capacity: number; // Max concurrent jobs this instance can handle
currentLoad: number; // Current number of running jobs
lastHeartbeat: Date; // Last health check timestamp
metadata?: {
region?: string;
zone?: string;
deployment?: string; // 'container', 'lambda', etc.
};
createdAt: Date;
updatedAt: Date;
}Instance Lifecycle
Startup (Instance started by orchestrator):
// Instance reads its configuration
const instanceId = process.env.INSTANCE_ID; // Provided by orchestrator
const instanceUrl = process.env.INSTANCE_URL; // Provided by orchestrator
// Instance determines active version
const activeVersion = await db.agentVersionDeployment.findFirst({
where: {
agentId: 'data-collection',
environment: 'production'
}
});
// Instance updates status to active when ready
await db.agentInstance.update({
where: { id: instanceId },
data: {
status: 'active',
agentVersion: activeVersion.version,
lastHeartbeat: new Date(),
}
});During Operation:
// Update load when job starts
await db.agentInstance.update({
where: { id: instanceId },
data: {
currentLoad: { increment: 1 },
lastHeartbeat: new Date()
}
});
// Update load when job completes
await db.agentInstance.update({
where: { id: instanceId },
data: {
currentLoad: { decrement: 1 },
lastHeartbeat: new Date()
}
});Shutdown (Instance terminated by orchestrator):
// Orchestrator signals termination (e.g., via SIGTERM or HTTP endpoint)
// Instance gracefully shuts down:
// - Stop accepting new jobs
// - Wait for current jobs to complete (with timeout)
// - Update status and exit
await db.agentInstance.update({
where: { id: instanceId },
data: { status: 'terminating' }
});
// Wait for jobs to complete, then orchestrator removes the recordMultiple Instances
- Multiple instances of the same agent version can run simultaneously
- Instances are automatically spawned by the orchestrator based on demand
- Each instance has its own unique ID (generated by orchestrator) and endpoint URL (assigned by orchestrator)
- The orchestrator distributes jobs across instances based on:
- Instance status (
'active') - Recent heartbeat (not stale)
- Available capacity (
currentLoad < capacity) - Load balancing algorithm (round-robin, least-loaded, etc.)
- Instance status (
Instance Health Monitoring
- Instances must update
lastHeartbeatperiodically (e.g., every 30 seconds) - Instances with stale heartbeats (e.g., > 2 minutes old) are considered unavailable
- The orchestrator automatically spawns replacement instances for failed/unhealthy instances
- The orchestrator will not route jobs to unhealthy instances
- Instances can self-report
status: 'unhealthy'if they detect issues (orchestrator will replace them)
Agent Execution Endpoint
Required Endpoint
All agents must expose an HTTP endpoint that accepts execution requests from the orchestrator. The endpoint signature is:
Endpoint: POST /execute (or custom path as specified in registration)
Request Headers:
Content-Type: application/json
Authorization: Bearer [token] (if authentication configured)
X-Job-Id: [job-id] (unique job identifier)
X-Execution-Id: [execution-id] (unique execution identifier)Note: The placeholders [token], [job-id], and [execution-id] should be replaced with actual values.
Request Body:
interface AgentExecutionRequest {
jobId: string; // Unique job ID from orchestrator
executionId: string; // Unique execution ID
agentId: string; // Agent ID (should match registered ID)
agentVersion: string; // Expected agent version (must match instance's version)
instanceId?: string; // Instance ID that received this job (for load tracking)
params: Record<string, any>; // Agent-specific parameters (validated against inputSchema)
config?: Record<string, any>; // Optional runtime config overrides
metadata?: {
scheduleId?: string; // If triggered by schedule
pipelineId?: string; // If part of pipeline
pipelineStepId?: string; // If part of pipeline step
priority?: number; // Job priority
retryCount?: number; // Current retry attempt
parentJobId?: string; // If this is a sub-job from job division
};
}Response (Synchronous - Immediate Acknowledgment):
interface AgentExecutionResponse {
jobId: string;
executionId: string;
status: 'accepted' | 'rejected';
message?: string; // Error message if rejected
estimatedDuration?: number; // Estimated execution time (ms)
}Important: The execution endpoint should return immediately (within 1-2 seconds) with an acceptance response. The actual work should be performed asynchronously. The agent reports progress and results via the database (see Status Reporting below). The orchestrator does not wait for agent completion but tracks job status through the database.
Status Reporting
Database Status Updates
Agents must update job status in the database during execution. The orchestrator and monitoring systems query the database to track agent progress.
Database Table: AgentJobExecution
interface AgentJobExecution {
id: string; // Unique execution ID
jobId: string; // Job ID from scheduler
agentId: string; // Agent ID
agentVersion: string; // Agent version that executed
status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled';
params: Record<string, any>; // Execution parameters
result?: Record<string, any>; // Agent output (validated against outputSchema)
error?: {
message: string;
code?: string;
stack?: string;
timestamp: Date;
};
metrics?: {
executionTime: number; // Milliseconds
memoryUsage?: number; // Bytes
cpuUsage?: number; // Percentage
itemsProcessed?: number; // Agent-specific metrics
};
startedAt?: Date;
completedAt?: Date;
createdAt: Date;
updatedAt: Date;
}Status Update Flow
- Job Accepted: Agent creates record with
status: 'pending' - Execution Started: Agent updates to
status: 'running', setsstartedAt - Progress Updates: Agent can update
metricsperiodically (optional) - Execution Complete: Agent updates to
status: 'completed', setsresultandcompletedAt - Execution Failed: Agent updates to
status: 'failed', setserrorandcompletedAt
Example Status Updates (Rust):
use sqlx::PgPool;
use serde_json::json;
async fn update_job_status(
pool: &PgPool,
execution_id: &str,
status: &str,
result: Option<serde_json::Value>,
error: Option<&str>,
) -> Result<(), sqlx::Error> {
let now = chrono::Utc::now();
sqlx::query!(
r#"
UPDATE "AgentJobExecution"
SET status = $1,
result = $2,
error = CASE WHEN $3 IS NOT NULL THEN jsonb_build_object('message', $3, 'timestamp', $4) ELSE NULL END,
"updatedAt" = $4,
"completedAt" = CASE WHEN $1 IN ('completed', 'failed') THEN $4 ELSE "completedAt" END
WHERE id = $5
"#,
status,
result.as_ref().map(|r| serde_json::to_value(r).unwrap()),
error,
now,
execution_id
)
.execute(pool)
.await?;
Ok(())
}Agent Output Specification
Output Structure
All agents must return outputs that conform to their registered outputSchema. The output must include:
interface AgentOutput {
agentId: string; // Agent ID
agentVersion: string; // Semantic version (e.g., "1.2.3")
executionId: string; // Execution ID
timestamp: Date; // When output was generated
executionTime: number; // Execution time in milliseconds
data: Record<string, any>; // Agent-specific output data
metadata?: {
itemsProcessed?: number;
errors?: Array<{
message: string;
timestamp: Date;
}>;
// ... agent-specific metadata
};
}Output Storage
Agents write their outputs to the database. The exact table depends on the agent type:
- Query Strategy Agent: Writes to
Querytable - Data Collection Agent: Writes to
DataSourcetable - Analysis Agent: Writes to
AnalysisResulttable - Content Generation Agent: Writes to
Newslettertable - etc.
The output structure is validated against the outputSchema registered in AgentRegistry.
Health Checks
Health Check Endpoint
Agents should expose a health check endpoint (optional but recommended):
Endpoint: GET /health (or as specified in AgentRegistry.healthCheck.endpoint)
Response:
interface HealthCheckResponse {
status: 'healthy' | 'degraded' | 'unhealthy';
version: string;
uptime?: number; // Seconds since agent started
checks?: {
database?: 'ok' | 'error';
externalApis?: 'ok' | 'error';
// ... other health checks
};
timestamp: Date;
}Heartbeat Updates
Agent instances should periodically update their lastHeartbeat timestamp in AgentInstance table to indicate they are alive:
UPDATE "AgentInstance"
SET "lastHeartbeat" = NOW(), "updatedAt" = NOW()
WHERE id = 'instance-id';Error Handling
Error Response Format
If an agent rejects a job or encounters an error, it should:
- Immediate Rejection: Return
status: 'rejected'in execution response with error message - Runtime Error: Update
AgentJobExecutionwithstatus: 'failed'and error details
Error Structure:
interface AgentError {
message: string; // Human-readable error message
code?: string; // Error code (e.g., 'VALIDATION_ERROR', 'TIMEOUT')
stack?: string; // Stack trace (for debugging)
timestamp: Date;
retryable: boolean; // Whether the job can be retried
}Retry Behavior
- Agents should set
retryable: truefor transient errors (network issues, rate limits) - Agents should set
retryable: falsefor permanent errors (validation errors, invalid config) - Scheduler uses
retryableflag to determine if job should be retried
Configuration Management
Configuration Loading
Agents load their configuration from the database:
- Agent Config: Read from
AgentConfigtable (key:agentId) - Active Version: Determine active version from
AgentVersionDeploymenttable - Version Config: If version-specific config exists, merge with agent config
- Runtime Overrides: Apply any runtime config overrides from execution request
Configuration Hierarchy (same as existing system):
- Runtime overrides (from execution request)
- Agent-specific config from
AgentConfig - System-wide defaults from
SystemConfig
Configuration Hot-Reload
Agents should support configuration hot-reload:
- Poll
AgentConfigtable periodically (e.g., every 60 seconds) - Or subscribe to configuration change events (if event system exists)
- Re-initialize agent when config changes detected
Implementation Examples
Example 1: TypeScript/Node.js Agent
import express from 'express';
import { PrismaClient } from '@prisma/client';
const app = express();
const prisma = new PrismaClient();
// Register agent on startup
async function registerAgent() {
await prisma.agentRegistry.upsert({
where: { id: 'data-collection' },
update: {
endpoint: {
type: 'http',
url: process.env.AGENT_URL,
method: 'POST',
timeout: 300000,
},
lastHeartbeat: new Date(),
},
create: {
id: 'data-collection',
name: 'Data Collection Agent',
description: 'Collects raw media data',
version: '1.0.0',
endpoint: {
type: 'http',
url: process.env.AGENT_URL,
method: 'POST',
timeout: 300000,
},
inputSchema: {...},
outputSchema: {...},
enabled: true,
},
});
}
// Execution endpoint
app.post('/execute', async (req, res) => {
const { jobId, executionId, params } = req.body;
// Accept job immediately
res.json({
jobId,
executionId,
status: 'accepted',
estimatedDuration: 60000,
});
// Execute asynchronously
executeJob(jobId, executionId, params).catch(console.error);
});
async function executeJob(jobId: string, executionId: string, params: any) {
// Create execution record
await prisma.agentJobExecution.create({
data: {
id: executionId,
jobId,
agentId: 'data-collection',
agentVersion: '1.0.0',
status: 'pending',
params,
},
});
try {
// Update to running
await prisma.agentJobExecution.update({
where: { id: executionId },
data: { status: 'running', startedAt: new Date() },
});
// Do actual work
const result = await collectData(params);
// Update to completed
await prisma.agentJobExecution.update({
where: { id: executionId },
data: {
status: 'completed',
result,
completedAt: new Date(),
},
});
// Write to agent-specific table
await prisma.dataSource.createMany({
data: result.items,
});
} catch (error) {
await prisma.agentJobExecution.update({
where: { id: executionId },
data: {
status: 'failed',
error: {
message: error.message,
timestamp: new Date(),
},
completedAt: new Date(),
},
});
}
}
app.listen(3000, () => {
registerAgent();
console.log('Agent running on port 3000');
});Example 2: Python Agent
from flask import Flask, request, jsonify
import psycopg2
import psycopg2.extras
from datetime import datetime
import os
import threading
app = Flask(__name__)
def get_db_connection():
return psycopg2.connect(
host=os.getenv('DB_HOST'),
database=os.getenv('DB_NAME'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD')
)
def register_agent():
conn = get_db_connection()
cur = conn.cursor()
cur.execute("""
INSERT INTO "AgentRegistry" (id, name, description, version, endpoint, "inputSchema", "outputSchema", enabled, "createdAt", "updatedAt")
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
endpoint = EXCLUDED.endpoint,
"updatedAt" = EXCLUDED."updatedAt"
""", (
'analysis',
'Analysis Agent',
'Analyzes collected data',
'1.0.0',
json.dumps({
'type': 'http',
'url': os.getenv('AGENT_URL'),
'method': 'POST',
'timeout': 300000
}),
json.dumps({...}), # inputSchema
json.dumps({...}), # outputSchema
True,
datetime.now(),
datetime.now()
))
conn.commit()
cur.close()
conn.close()
@app.route('/execute', methods=['POST'])
def execute():
data = request.json
job_id = data['jobId']
execution_id = data['executionId']
params = data['params']
# Accept immediately
response = {
'jobId': job_id,
'executionId': execution_id,
'status': 'accepted',
'estimatedDuration': 120000
}
# Execute asynchronously
thread = threading.Thread(target=execute_job, args=(job_id, execution_id, params))
thread.start()
return jsonify(response)
def execute_job(job_id, execution_id, params):
conn = get_db_connection()
cur = conn.cursor()
try:
# Create execution record
cur.execute("""
INSERT INTO "AgentJobExecution" (id, "jobId", "agentId", "agentVersion", status, params, "createdAt", "updatedAt")
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (execution_id, job_id, 'analysis', '1.0.0', 'pending', json.dumps(params), datetime.now(), datetime.now()))
# Update to running
cur.execute("""
UPDATE "AgentJobExecution"
SET status = 'running', "startedAt" = %s, "updatedAt" = %s
WHERE id = %s
""", (datetime.now(), datetime.now(), execution_id))
# Do actual work
result = perform_analysis(params)
# Update to completed
cur.execute("""
UPDATE "AgentJobExecution"
SET status = 'completed', result = %s, "completedAt" = %s, "updatedAt" = %s
WHERE id = %s
""", (json.dumps(result), datetime.now(), datetime.now(), execution_id))
conn.commit()
except Exception as e:
cur.execute("""
UPDATE "AgentJobExecution"
SET status = 'failed', error = %s, "completedAt" = %s, "updatedAt" = %s
WHERE id = %s
""", (json.dumps({'message': str(e), 'timestamp': datetime.now().isoformat()}), datetime.now(), datetime.now(), execution_id))
conn.commit()
finally:
cur.close()
conn.close()
if __name__ == '__main__':
register_agent()
app.run(host='0.0.0.0', port=5000)Example 3: n8n Workflow Agent
For n8n workflows, the agent is implemented as a webhook workflow:
- Create n8n Workflow with webhook trigger
- Register Webhook URL in
AgentRegistry.endpoint.url - Workflow Steps:
- Receive execution request
- Create execution record in database (via HTTP Request node)
- Perform agent work
- Update execution status in database
- Write results to database
n8n Workflow Structure:
- Webhook Trigger: Receives execution request
- HTTP Request Node: Create execution record in database
- Function Node: Perform agent logic (or call external service)
- HTTP Request Node: Update execution status to 'running'
- HTTP Request Node: Write results to database
- HTTP Request Node: Update execution status to 'completed'
Testing & Validation
Agent Validation Checklist
Before deploying an agent, ensure:
- Agent type is registered in
AgentRegistry(done by admin or deployment) - Agent instance is spawned by orchestrator (not manually started)
- Agent instance updates status to
'active'when ready (instance-specific info inAgentInstancetable) - Agent reads active version from
AgentVersionDeploymenttable - Agent exposes
/executeendpoint (or configured path) - Agent returns immediate acceptance response (< 2 seconds)
- Agent creates execution record in
AgentJobExecutiontable - Agent updates instance
currentLoadwhen job starts/completes - Agent updates execution status during execution
- Agent writes results to appropriate database tables
- Agent output conforms to registered
outputSchemaand includesagentVersion - Agent handles errors gracefully and updates status
- Agent supports configuration hot-reload
- Agent exposes health check endpoint (optional but recommended)
- Agent updates heartbeat in
AgentInstancetable periodically - Agent instance gracefully shuts down when terminated by orchestrator
Testing Tools
The system should provide:
- Agent Test Harness: Test agent execution locally
- Schema Validator: Validate agent input/output schemas
- Mock Scheduler: Test agent registration and execution
- Database Fixtures: Test data for agent testing
Migration from Current Architecture
For existing agents in apps/worker/agents/, migration involves:
- Extract Agent Logic: Move agent logic to standalone service
- Add HTTP Endpoint: Expose
/executeendpoint - Add Registration: Implement self-registration on startup
- Add Status Updates: Update
AgentJobExecutiontable during execution - Deploy Separately: Deploy agent as independent service
- Update Orchestrator: Orchestrator calls agent via HTTP instead of BullMQ
The database schema and configuration system remain the same - only the execution mechanism changes.