Agent Deployment Architecture
Overview
Agents in MediaPulse are language-agnostic and can run anywhere. This document describes how agents are deployed and integrated into the system.
Core Principles
- Database as Central Hub: All agents read from and write to the shared PostgreSQL database
- HTTP-Based Communication: Agents expose HTTP endpoints that the scheduler invokes
- Self-Registration: Agents register themselves in the
AgentRegistrytable on startup - Independent Deployment: Agents can be deployed separately from the core system
- Language Independence: Agents can be written in any language (TypeScript, Python, Rust, Go, etc.) or even as n8n workflows
Agent Deployment Options
1. Container-Based Deployment (Docker/Kubernetes)
Best for: Production deployments, scalability, resource isolation
# Example: Python agent
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
# Agent connects to database and registers itself
ENV DATABASE_URL=postgresql://...
ENV AGENT_ID=data-collection
ENV AGENT_URL=https://data-collection.example.com
CMD ["python", "agent.py"]Kubernetes Deployment:
apiVersion: apps/v1
kind: Deployment
metadata:
name: data-collection-agent
spec:
replicas: 3
template:
spec:
containers:
- name: agent
image: mediapulse/data-collection-agent:1.0.0
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-credentials
key: url
- name: AGENT_URL
value: "https://data-collection.example.com"2. Serverless Functions (AWS Lambda, Google Cloud Functions, Azure Functions)
Best for: Cost efficiency, automatic scaling, event-driven workloads
Example: AWS Lambda (Python):
import json
import boto3
import psycopg2
def lambda_handler(event, context):
# Agent execution endpoint
job_id = event['jobId']
execution_id = event['executionId']
params = event['params']
# Connect to database
conn = psycopg2.connect(os.environ['DATABASE_URL'])
# Create execution record
# ... execute agent logic ...
return {
'statusCode': 200,
'body': json.dumps({
'jobId': job_id,
'executionId': execution_id,
'status': 'accepted'
})
}Registration: Lambda function registers itself via API Gateway endpoint or direct database connection.
3. Cloud Run / Cloud Functions (GCP)
Best for: Serverless with container support, auto-scaling
# cloud-run-service.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: data-collection-agent
spec:
template:
spec:
containers:
- image: gcr.io/project/data-collection-agent
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-secret
key: url4. n8n Workflows
Best for: Non-developers, visual workflow design, rapid prototyping
Setup:
- Create n8n workflow with webhook trigger
- Add nodes to:
- Receive execution request
- Connect to database
- Perform agent logic
- Update execution status
- Write results to database
- Register webhook URL in
AgentRegistry.endpoint.url
Example n8n Workflow:
- Webhook Trigger: Receives POST request from scheduler
- HTTP Request Node: Create execution record in database
- Function Node: Perform agent logic (or call external API)
- HTTP Request Node: Update execution status
- HTTP Request Node: Write results to database
5. Edge Functions (Vercel, Cloudflare Workers, Deno Deploy)
Best for: Low latency, global distribution, lightweight agents
Example: Vercel Serverless Function:
// api/execute.ts
import { PrismaClient } from '@prisma/client';
const prisma = new PrismaClient();
export default async function handler(req, res) {
if (req.method !== 'POST') {
return res.status(405).json({ error: 'Method not allowed' });
}
const { jobId, executionId, params } = req.body;
// Accept job immediately
res.json({
jobId,
executionId,
status: 'accepted',
estimatedDuration: 60000,
});
// Execute asynchronously
executeJob(jobId, executionId, params).catch(console.error);
}6. Traditional Server Deployment
Best for: On-premise deployments, existing infrastructure
Example: Systemd Service:
[Unit]
Description=Data Collection Agent
After=network.target postgresql.service
[Service]
Type=simple
User=mediapulse
WorkingDirectory=/opt/mediapulse/agents/data-collection
Environment="DATABASE_URL=postgresql://..."
Environment="AGENT_URL=http://localhost:5000"
ExecStart=/usr/bin/python3 agent.py
Restart=always
[Install]
WantedBy=multi-user.targetAgent Registration
For complete registration details, see the Agent Specification document.
All agents must register themselves in the AgentRegistry table. The registration method depends on your deployment model:
- Self-Registration: Long-running services (containers, servers) register on startup
- Deployment-Time Registration: Serverless functions register during CI/CD deployment
- Manual Registration: External agents (n8n workflows) registered via admin interface
- Hybrid Registration: Serverless functions that register on cold start
Deployment-Specific Registration
Container-Based: Register in startup script or entrypoint
# Called on container startup
register_agent()
start_server()Serverless Functions: Register during deployment (Terraform, CloudFormation, or CI/CD script)
# Run during deployment pipeline
python register_agent.pyn8n Workflows: Register webhook URL via admin interface after workflow is created
Agent Discovery
The scheduler discovers agents by querying the AgentRegistry table:
- On Startup: Scheduler loads all enabled agents from
AgentRegistry - Periodic Refresh: Scheduler refreshes agent list periodically (e.g., every 60 seconds)
- Health Checks: Scheduler verifies agent endpoints are accessible
- Heartbeat Monitoring: Scheduler monitors agent
lastHeartbeatto detect unavailable agents
Agent Execution Flow
- Scheduler Invocation: Scheduler calls agent HTTP endpoint with execution request
- Agent Acceptance: Agent returns immediate acceptance (
status: 'accepted') - Asynchronous Execution: Agent performs work asynchronously
- Status Updates: Agent updates
AgentJobExecutiontable during execution - Result Storage: Agent writes results to appropriate database tables
- Status Monitoring: Scheduler queries
AgentJobExecutiontable to track progress
Multi-Language Agent Examples
TypeScript/Node.js Agent
import express from 'express';
import { PrismaClient } from '@prisma/client';
const app = express();
const prisma = new PrismaClient();
// Register on startup
await registerAgent();
app.post('/execute', async (req, res) => {
const { jobId, executionId, params } = req.body;
res.json({ jobId, executionId, status: 'accepted' });
executeJob(jobId, executionId, params).catch(console.error);
});
app.listen(3000);Python Agent
from flask import Flask, request, jsonify
import psycopg2
app = Flask(__name__)
register_agent()
@app.route('/execute', methods=['POST'])
def execute():
data = request.json
return jsonify({
'jobId': data['jobId'],
'executionId': data['executionId'],
'status': 'accepted'
})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)Rust Agent
use actix_web::{web, App, HttpServer, HttpResponse};
use sqlx::PgPool;
async fn execute(req: web::Json<ExecutionRequest>) -> HttpResponse {
// Accept immediately
HttpResponse::Ok().json(ExecutionResponse {
job_id: req.job_id.clone(),
execution_id: req.execution_id.clone(),
status: "accepted".to_string(),
})
// Execute asynchronously
tokio::spawn(async move {
execute_job(req.job_id, req.execution_id, req.params).await;
});
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
register_agent().await;
HttpServer::new(|| App::new().route("/execute", web::post().to(execute)))
.bind("0.0.0.0:8080")?
.run()
.await
}Go Agent
package main
import (
"encoding/json"
"net/http"
"database/sql"
_ "github.com/lib/pq"
)
func executeHandler(w http.ResponseWriter, r *http.Request) {
var req ExecutionRequest
json.NewDecoder(r.Body).Decode(&req)
// Accept immediately
response := ExecutionResponse{
JobID: req.JobID,
ExecutionID: req.ExecutionID,
Status: "accepted",
}
json.NewEncoder(w).Encode(response)
// Execute asynchronously
go executeJob(req.JobID, req.ExecutionID, req.Params)
}
func main() {
registerAgent()
http.HandleFunc("/execute", executeHandler)
http.ListenAndServe(":8080", nil)
}Deployment Best Practices
- Health Checks: Implement
/healthendpoint for monitoring - Heartbeats: Update
lastHeartbeatinAgentRegistryperiodically - Error Handling: Update
AgentJobExecution.status: 'failed'on errors - Configuration: Load config from
AgentConfigtable, support hot-reload - Logging: Log to database or centralized logging system
- Monitoring: Expose metrics for monitoring (execution time, success rate, etc.)
- Security: Use authentication tokens for agent endpoints
- Scaling: Agents can scale independently based on their own load
Migration from Current Architecture
For existing agents in apps/worker/agents/:
- Extract Agent Logic: Move agent code to standalone service
- Add HTTP Endpoint: Expose
/executeendpoint - Add Registration: Implement self-registration on startup
- Add Status Updates: Update
AgentJobExecutiontable during execution - Deploy Separately: Deploy agent as independent service
- Update Scheduler: Scheduler will automatically discover and invoke agent via HTTP
The database schema and configuration system remain unchanged - only the execution mechanism changes from BullMQ to HTTP.