MediaPulse
Agents/Agent Types

Data Collection Agent

ID: data-collection

Purpose

Collect raw media data by querying admin-configured search sources (e.g., Serper.dev) with queries generated by the Query Strategy Agent, fetching the resulting web pages, processing them efficiently, and storing them for downstream agents.

What the Agent Does

  1. Query-Based Data Collection:

    • Retrieves search queries from the Query Strategy Agent (stored in database)
    • Queries admin-configured search sources (e.g., Serper.dev, Google Search API, etc.) with these queries
    • Fetches the actual web pages from search result URLs
    • Processes and optimizes the fetched content for efficiency
  2. Search Source Management:

    • Supports multiple search source types (REST APIs, GraphQL, etc.)
    • Handles authentication and API keys per source
    • Manages rate limiting per source
    • Performs health checks on search sources
  3. Web Page Fetching:

    • Downloads HTML content from search result URLs
    • Extracts main content (removes ads, navigation, etc.)
    • Handles JavaScript-rendered pages when necessary
    • Respects robots.txt and rate limits
  4. Efficient Processing:

    • Deduplicates content across sources using fuzzy matching
    • Filters low-quality or irrelevant content early
    • Extracts and normalizes metadata (title, author, date, etc.)
    • Optimizes content storage (compression, normalization)
  5. Data Storage:

    • Stores processed content with full metadata for traceability
    • Links content to source queries and search sources
    • Maintains audit trail of collection process
    • Stores raw HTML for potential reprocessing

Inputs

interface DataCollectionInput {
  tickerId: number; // Required: Foreign key to the Ticker table
  timeWindow?: {
    start: Date;
    end: Date;
  }; // Optional: Time window for collection
  priority: "high" | "medium" | "low"; // Collection urgency (affects retry behavior)
  jobId?: string; // Optional: Job ID for tracking this run
}

Configurations (stored in AgentConfig table, key: data-collection):

{
  // Search Sources Configuration
  // Admin can define multiple search sources (e.g., Serper.dev, Google Search API, etc.)
  searchSources: {
    [sourceId: string]: {              // e.g., "serper-dev", "google-search-api"
      enabled: boolean,
      type: 'rest' | 'graphql',        // API type
      baseUrl: string,                 // API base URL (e.g., "https://google.serper.dev")
      authentication: {
        type: 'api-key' | 'oauth' | 'bearer' | 'none',
        apiKey?: string,               // From env or secure storage
        headerName?: string,            // e.g., "X-API-KEY"
        // OAuth config if needed
      },
      rateLimit: {
        requests: number,              // e.g., 100
        perSeconds: number             // e.g., 60
      },
      requestConfig: {
        method: 'GET' | 'POST',
        endpoint: string,              // e.g., "/search" or "/v1/search"
        queryParamName?: string,       // e.g., "q" for query parameter
        bodyTemplate?: object,         // For POST requests
        headers?: Record<string, string>
      },
      responseMapping: {
        resultsPath: string,           // JSON path to results array, e.g., "organic"
        urlPath: string,               // JSON path to URL, e.g., "link"
        titlePath: string,             // JSON path to title, e.g., "title"
        snippetPath?: string,          // JSON path to snippet, e.g., "snippet"
        datePath?: string              // JSON path to date, e.g., "date"
      },
      healthCheck: {
        enabled: boolean,
        endpoint?: string,             // Health check endpoint
        interval: number               // Seconds between health checks
      }
    }
  },

  // Web Page Fetching Configuration
  webFetching: {
    enabled: boolean,
    timeout: number,                   // Milliseconds
    retries: number,                   // Max retry attempts
    retryDelay: number,                // Milliseconds between retries
    userAgent: string,                 // Custom user agent
    maxContentLength: number,          // Max HTML size in bytes
    javascriptRendering: {
      enabled: boolean,                // Use headless browser for JS pages
      headless: boolean,
      waitForSelector?: string,        // Wait for specific selector
      waitTime: number                  // Milliseconds to wait after load
    },
    contentExtraction: {
      enabled: boolean,
      removeAds: boolean,
      removeNavigation: boolean,
      extractMainContent: boolean,      // Use algorithms like Readability
      preserveImages: boolean,
      preserveLinks: boolean
    },
    robotsTxt: {
      enabled: boolean,                // Respect robots.txt
      userAgent: string                // User agent for robots.txt check
    },
    proxyRotation: {
      enabled: boolean,
      proxies: string[],               // Proxy list
      rotationStrategy: 'round-robin' | 'random'
    }
  },

  // Processing Configuration
  processing: {
    deduplication: {
      enabled: boolean,
      similarityThreshold: number,     // 0-1, similarity score threshold
      methods: ('title' | 'content' | 'url')[], // Deduplication methods
      aiDeduplication: boolean         // Use AI for fuzzy matching
    },
    filtering: {
      enabled: boolean,
      minContentLength: number,        // Minimum characters
      maxContentLength: number,        // Maximum characters
      blocklistDomains: string[],      // Domains to skip
      allowlistDomains?: string[]      // Optional: only allow these domains
    },
    optimization: {
      enabled: boolean,
      compressHtml: boolean,           // Compress stored HTML
      normalizeWhitespace: boolean,
      extractMetadata: boolean,        // Extract title, author, date, etc.
      languageDetection: boolean
    }
  },

  // Query Retrieval Configuration
  queryRetrieval: {
    enabled: boolean,
    source: 'database',                // Where queries are stored (from Query Strategy Agent)
    queryTypes: ('news' | 'socialMedia' | 'web')[], // Which query types to use
    maxQueriesPerSource: number,       // Limit queries per search source
    priorityOrder: 'relevance' | 'chronological' | 'random'
  },

  // Error Handling
  errorHandling: {
    continueOnError: boolean,          // Continue if one source fails
    maxErrorsPerSource: number,        // Max errors before disabling source
    errorNotification: {
      enabled: boolean,
      channels: string[]               // ['email', 'slack', 'webhook']
    }
  }
}

Outputs

{
  agentId: 'data-collection',
  agentVersion: string,                // Semantic version (e.g., "1.2.3") of the agent that generated this output
  tickerId: number,
  jobId: string,
  timestamp: Date,
  executionTime: number,               // Milliseconds
  data: {
    items: Array<{
      id: string,                      // Unique identifier
      url: string,                      // Original URL
      title: string,                    // Extracted title
      content: string,                  // Processed main content
      rawHtml?: string,                 // Optional: raw HTML for reprocessing
      excerpt?: string,                 // First paragraph or snippet
      searchSource: string,             // Which search source found this (e.g., "serper-dev")
      queryId: string,                  // ID of the query that found this (from Query Strategy Agent)
      query: string,                    // The actual query used
      publishedAt?: Date,               // Extracted publish date if available
      author?: string,                  // Extracted author if available
      domain: string,                   // Extracted domain from URL
      metadata: {
        wordCount: number,
        language?: string,              // Detected language
        contentLength: number,          // Bytes
        fetchTime: number,              // Milliseconds to fetch
        processingTime: number,         // Milliseconds to process
        extractedAt: Date               // When this was collected
      }
    }>
  },
  metadata: {
    queriesProcessed: number,         // Number of queries from Query Strategy Agent
    searchSourcesUsed: string[],        // Which search sources were queried
    searchResultsFound: number,        // Total search results before filtering
    webPagesFetched: number,           // Successfully fetched web pages
    itemsStored: number,                // Items stored after processing
    itemsFiltered: number,              // Filtered out (duplicates, low quality, etc.)
    duplicatesRemoved: number,
    errors: Array<{
      source: string,                   // Search source or "web-fetch"
      queryId?: string,                 // Query ID if applicable
      url?: string,                     // URL if web fetch error
      error: string,
      timestamp: Date,
      retryCount: number
    }>,
    performance: {
      queryRetrievalTime: number,      // Milliseconds
      searchQueryTime: number,          // Total time querying search sources
      webFetchTime: number,             // Total time fetching web pages
      processingTime: number            // Total processing time
    }
  }
}

Process (Step-by-Step)

  1. Initialization Phase:

    • Load agent configuration from database (AgentConfig table)
    • Validate inputs (identifier format, time window)
    • Initialize rate limiters for each search source
    • Check search source availability (health checks)
    • Initialize browser instances (if JavaScript rendering enabled)
  2. Configuration Resolution:

    • Merge system defaults, agent config, and runtime overrides
    • Resolve API keys from environment or secure storage
    • Load search source configurations
    • Validate search source configurations
  3. Query Retrieval Phase:

    • Retrieve queries from database (generated by Query Strategy Agent)
    • Filter queries by identifier and time window
    • Filter by query types (news, socialMedia, web) if configured
    • Prioritize queries based on configuration
    • Limit queries per search source if configured
  4. Search Query Phase (parallel across search sources):

    For each enabled search source and each query:

    • Check rate limits (skip if exceeded)
    • Construct API request based on source configuration
    • Authenticate request (API key, OAuth, etc.)
    • Execute search query
    • Parse response using configured response mapping
    • Extract URLs, titles, snippets from search results
    • Collect all search result URLs for web fetching
  5. Web Page Fetching Phase (parallel):

    For each unique URL from search results:

    • Check robots.txt if enabled
    • Check if URL is in blocklist/allowlist
    • Fetch HTML content (HTTP request or headless browser if JS needed)
    • Extract main content (remove ads, navigation, etc.)
    • Extract metadata (title, author, date, etc.)
    • Handle errors and retries
    • Store raw HTML if configured
  6. Processing Phase:

    6a. Filtering:

    • Filter by content length (min/max)
    • Filter by domain (blocklist/allowlist)
    • Filter invalid or empty content

    6b. Deduplication:

    • Compare by URL (exact match)
    • Compare by title similarity (fuzzy matching)
    • Compare by content similarity (if enabled)
    • Use AI for fuzzy content matching if enabled
    • Mark duplicates and keep highest quality version

    6c. Optimization:

    • Normalize whitespace
    • Compress HTML if configured
    • Extract and normalize metadata
    • Detect language
    • Calculate word count and content length
  7. Storage Phase:

    • Save all processed items to DataSource table
    • Store raw HTML if configured
    • Link to identifier, query ID, and collection timestamp
    • Update search source health status
    • Store query performance metrics (for Query Strategy Agent feedback)
  8. Error Handling:

    • Log all errors with context (source, query, URL)
    • Retry failed requests (up to max retries)
    • Disable search sources that exceed max errors
    • Send notifications for critical failures
    • Continue processing other items on error
  9. Return Phase:

    • Aggregate all collected data
    • Calculate metadata (counts, timing, errors)
    • Return structured output to scheduler

Sequence Diagram