MediaPulse
Agents

Agent Deployment Architecture

Overview

Agents in MediaPulse are language-agnostic and can run anywhere. This document describes how agents are deployed and integrated into the system.

Core Principles

  1. Database as Central Hub: All agents read from and write to the shared PostgreSQL database
  2. HTTP-Based Communication: Agents expose HTTP endpoints that the scheduler invokes
  3. Self-Registration: Agents register themselves in the AgentRegistry table on startup
  4. Independent Deployment: Agents can be deployed separately from the core system
  5. Language Independence: Agents can be written in any language (TypeScript, Python, Rust, Go, etc.) or even as n8n workflows

Agent Deployment Options

1. Container-Based Deployment (Docker/Kubernetes)

Best for: Production deployments, scalability, resource isolation

# Example: Python agent
FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

# Agent connects to database and registers itself
ENV DATABASE_URL=postgresql://...
ENV AGENT_ID=data-collection
ENV AGENT_URL=https://data-collection.example.com

CMD ["python", "agent.py"]

Kubernetes Deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: data-collection-agent
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: agent
        image: mediapulse/data-collection-agent:1.0.0
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: db-credentials
              key: url
        - name: AGENT_URL
          value: "https://data-collection.example.com"

2. Serverless Functions (AWS Lambda, Google Cloud Functions, Azure Functions)

Best for: Cost efficiency, automatic scaling, event-driven workloads

Example: AWS Lambda (Python):

import json
import boto3
import psycopg2

def lambda_handler(event, context):
    # Agent execution endpoint
    job_id = event['jobId']
    execution_id = event['executionId']
    params = event['params']
    
    # Connect to database
    conn = psycopg2.connect(os.environ['DATABASE_URL'])
    
    # Create execution record
    # ... execute agent logic ...
    
    return {
        'statusCode': 200,
        'body': json.dumps({
            'jobId': job_id,
            'executionId': execution_id,
            'status': 'accepted'
        })
    }

Registration: Lambda function registers itself via API Gateway endpoint or direct database connection.

3. Cloud Run / Cloud Functions (GCP)

Best for: Serverless with container support, auto-scaling

# cloud-run-service.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: data-collection-agent
spec:
  template:
    spec:
      containers:
      - image: gcr.io/project/data-collection-agent
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: db-secret
              key: url

4. n8n Workflows

Best for: Non-developers, visual workflow design, rapid prototyping

Setup:

  1. Create n8n workflow with webhook trigger
  2. Add nodes to:
    • Receive execution request
    • Connect to database
    • Perform agent logic
    • Update execution status
    • Write results to database
  3. Register webhook URL in AgentRegistry.endpoint.url

Example n8n Workflow:

  • Webhook Trigger: Receives POST request from scheduler
  • HTTP Request Node: Create execution record in database
  • Function Node: Perform agent logic (or call external API)
  • HTTP Request Node: Update execution status
  • HTTP Request Node: Write results to database

5. Edge Functions (Vercel, Cloudflare Workers, Deno Deploy)

Best for: Low latency, global distribution, lightweight agents

Example: Vercel Serverless Function:

// api/execute.ts
import { PrismaClient } from '@prisma/client';

const prisma = new PrismaClient();

export default async function handler(req, res) {
  if (req.method !== 'POST') {
    return res.status(405).json({ error: 'Method not allowed' });
  }
  
  const { jobId, executionId, params } = req.body;
  
  // Accept job immediately
  res.json({
    jobId,
    executionId,
    status: 'accepted',
    estimatedDuration: 60000,
  });
  
  // Execute asynchronously
  executeJob(jobId, executionId, params).catch(console.error);
}

6. Traditional Server Deployment

Best for: On-premise deployments, existing infrastructure

Example: Systemd Service:

[Unit]
Description=Data Collection Agent
After=network.target postgresql.service

[Service]
Type=simple
User=mediapulse
WorkingDirectory=/opt/mediapulse/agents/data-collection
Environment="DATABASE_URL=postgresql://..."
Environment="AGENT_URL=http://localhost:5000"
ExecStart=/usr/bin/python3 agent.py
Restart=always

[Install]
WantedBy=multi-user.target

Agent Registration

For complete registration details, see the Agent Specification document.

All agents must register themselves in the AgentRegistry table. The registration method depends on your deployment model:

  • Self-Registration: Long-running services (containers, servers) register on startup
  • Deployment-Time Registration: Serverless functions register during CI/CD deployment
  • Manual Registration: External agents (n8n workflows) registered via admin interface
  • Hybrid Registration: Serverless functions that register on cold start

Deployment-Specific Registration

Container-Based: Register in startup script or entrypoint

# Called on container startup
register_agent()
start_server()

Serverless Functions: Register during deployment (Terraform, CloudFormation, or CI/CD script)

# Run during deployment pipeline
python register_agent.py

n8n Workflows: Register webhook URL via admin interface after workflow is created

Agent Discovery

The scheduler discovers agents by querying the AgentRegistry table:

  1. On Startup: Scheduler loads all enabled agents from AgentRegistry
  2. Periodic Refresh: Scheduler refreshes agent list periodically (e.g., every 60 seconds)
  3. Health Checks: Scheduler verifies agent endpoints are accessible
  4. Heartbeat Monitoring: Scheduler monitors agent lastHeartbeat to detect unavailable agents

Agent Execution Flow

  1. Scheduler Invocation: Scheduler calls agent HTTP endpoint with execution request
  2. Agent Acceptance: Agent returns immediate acceptance (status: 'accepted')
  3. Asynchronous Execution: Agent performs work asynchronously
  4. Status Updates: Agent updates AgentJobExecution table during execution
  5. Result Storage: Agent writes results to appropriate database tables
  6. Status Monitoring: Scheduler queries AgentJobExecution table to track progress

Multi-Language Agent Examples

TypeScript/Node.js Agent

import express from 'express';
import { PrismaClient } from '@prisma/client';

const app = express();
const prisma = new PrismaClient();

// Register on startup
await registerAgent();

app.post('/execute', async (req, res) => {
  const { jobId, executionId, params } = req.body;
  res.json({ jobId, executionId, status: 'accepted' });
  executeJob(jobId, executionId, params).catch(console.error);
});

app.listen(3000);

Python Agent

from flask import Flask, request, jsonify
import psycopg2

app = Flask(__name__)

register_agent()

@app.route('/execute', methods=['POST'])
def execute():
    data = request.json
    return jsonify({
        'jobId': data['jobId'],
        'executionId': data['executionId'],
        'status': 'accepted'
    })

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Rust Agent

use actix_web::{web, App, HttpServer, HttpResponse};
use sqlx::PgPool;

async fn execute(req: web::Json<ExecutionRequest>) -> HttpResponse {
    // Accept immediately
    HttpResponse::Ok().json(ExecutionResponse {
        job_id: req.job_id.clone(),
        execution_id: req.execution_id.clone(),
        status: "accepted".to_string(),
    })
    
    // Execute asynchronously
    tokio::spawn(async move {
        execute_job(req.job_id, req.execution_id, req.params).await;
    });
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    register_agent().await;
    HttpServer::new(|| App::new().route("/execute", web::post().to(execute)))
        .bind("0.0.0.0:8080")?
        .run()
        .await
}

Go Agent

package main

import (
    "encoding/json"
    "net/http"
    "database/sql"
    _ "github.com/lib/pq"
)

func executeHandler(w http.ResponseWriter, r *http.Request) {
    var req ExecutionRequest
    json.NewDecoder(r.Body).Decode(&req)
    
    // Accept immediately
    response := ExecutionResponse{
        JobID: req.JobID,
        ExecutionID: req.ExecutionID,
        Status: "accepted",
    }
    json.NewEncoder(w).Encode(response)
    
    // Execute asynchronously
    go executeJob(req.JobID, req.ExecutionID, req.Params)
}

func main() {
    registerAgent()
    http.HandleFunc("/execute", executeHandler)
    http.ListenAndServe(":8080", nil)
}

Deployment Best Practices

  1. Health Checks: Implement /health endpoint for monitoring
  2. Heartbeats: Update lastHeartbeat in AgentRegistry periodically
  3. Error Handling: Update AgentJobExecution.status: 'failed' on errors
  4. Configuration: Load config from AgentConfig table, support hot-reload
  5. Logging: Log to database or centralized logging system
  6. Monitoring: Expose metrics for monitoring (execution time, success rate, etc.)
  7. Security: Use authentication tokens for agent endpoints
  8. Scaling: Agents can scale independently based on their own load

Migration from Current Architecture

For existing agents in apps/worker/agents/:

  1. Extract Agent Logic: Move agent code to standalone service
  2. Add HTTP Endpoint: Expose /execute endpoint
  3. Add Registration: Implement self-registration on startup
  4. Add Status Updates: Update AgentJobExecution table during execution
  5. Deploy Separately: Deploy agent as independent service
  6. Update Scheduler: Scheduler will automatically discover and invoke agent via HTTP

The database schema and configuration system remain unchanged - only the execution mechanism changes from BullMQ to HTTP.