Data Collection Agent
ID: data-collection
Purpose
Collect raw media data by querying admin-configured search sources (e.g., Serper.dev) with queries generated by the Query Strategy Agent, fetching the resulting web pages, processing them efficiently, and storing them for downstream agents.
What the Agent Does
-
Query-Based Data Collection:
- Retrieves search queries from the Query Strategy Agent (stored in database)
- Queries admin-configured search sources (e.g., Serper.dev, Google Search API, etc.) with these queries
- Fetches the actual web pages from search result URLs
- Processes and optimizes the fetched content for efficiency
-
Search Source Management:
- Supports multiple search source types (REST APIs, GraphQL, etc.)
- Handles authentication and API keys per source
- Manages rate limiting per source
- Performs health checks on search sources
-
Web Page Fetching:
- Downloads HTML content from search result URLs
- Extracts main content (removes ads, navigation, etc.)
- Handles JavaScript-rendered pages when necessary
- Respects robots.txt and rate limits
-
Efficient Processing:
- Deduplicates content across sources using fuzzy matching
- Filters low-quality or irrelevant content early
- Extracts and normalizes metadata (title, author, date, etc.)
- Optimizes content storage (compression, normalization)
-
Data Storage:
- Stores processed content with full metadata for traceability
- Links content to source queries and search sources
- Maintains audit trail of collection process
- Stores raw HTML for potential reprocessing
Inputs
interface DataCollectionInput {
tickerId: number; // Required: Foreign key to the Ticker table
timeWindow?: {
start: Date;
end: Date;
}; // Optional: Time window for collection
priority: "high" | "medium" | "low"; // Collection urgency (affects retry behavior)
jobId?: string; // Optional: Job ID for tracking this run
}Configurations (stored in AgentConfig table, key: data-collection):
{
// Search Sources Configuration
// Admin can define multiple search sources (e.g., Serper.dev, Google Search API, etc.)
searchSources: {
[sourceId: string]: { // e.g., "serper-dev", "google-search-api"
enabled: boolean,
type: 'rest' | 'graphql', // API type
baseUrl: string, // API base URL (e.g., "https://google.serper.dev")
authentication: {
type: 'api-key' | 'oauth' | 'bearer' | 'none',
apiKey?: string, // From env or secure storage
headerName?: string, // e.g., "X-API-KEY"
// OAuth config if needed
},
rateLimit: {
requests: number, // e.g., 100
perSeconds: number // e.g., 60
},
requestConfig: {
method: 'GET' | 'POST',
endpoint: string, // e.g., "/search" or "/v1/search"
queryParamName?: string, // e.g., "q" for query parameter
bodyTemplate?: object, // For POST requests
headers?: Record<string, string>
},
responseMapping: {
resultsPath: string, // JSON path to results array, e.g., "organic"
urlPath: string, // JSON path to URL, e.g., "link"
titlePath: string, // JSON path to title, e.g., "title"
snippetPath?: string, // JSON path to snippet, e.g., "snippet"
datePath?: string // JSON path to date, e.g., "date"
},
healthCheck: {
enabled: boolean,
endpoint?: string, // Health check endpoint
interval: number // Seconds between health checks
}
}
},
// Web Page Fetching Configuration
webFetching: {
enabled: boolean,
timeout: number, // Milliseconds
retries: number, // Max retry attempts
retryDelay: number, // Milliseconds between retries
userAgent: string, // Custom user agent
maxContentLength: number, // Max HTML size in bytes
javascriptRendering: {
enabled: boolean, // Use headless browser for JS pages
headless: boolean,
waitForSelector?: string, // Wait for specific selector
waitTime: number // Milliseconds to wait after load
},
contentExtraction: {
enabled: boolean,
removeAds: boolean,
removeNavigation: boolean,
extractMainContent: boolean, // Use algorithms like Readability
preserveImages: boolean,
preserveLinks: boolean
},
robotsTxt: {
enabled: boolean, // Respect robots.txt
userAgent: string // User agent for robots.txt check
},
proxyRotation: {
enabled: boolean,
proxies: string[], // Proxy list
rotationStrategy: 'round-robin' | 'random'
}
},
// Processing Configuration
processing: {
deduplication: {
enabled: boolean,
similarityThreshold: number, // 0-1, similarity score threshold
methods: ('title' | 'content' | 'url')[], // Deduplication methods
aiDeduplication: boolean // Use AI for fuzzy matching
},
filtering: {
enabled: boolean,
minContentLength: number, // Minimum characters
maxContentLength: number, // Maximum characters
blocklistDomains: string[], // Domains to skip
allowlistDomains?: string[] // Optional: only allow these domains
},
optimization: {
enabled: boolean,
compressHtml: boolean, // Compress stored HTML
normalizeWhitespace: boolean,
extractMetadata: boolean, // Extract title, author, date, etc.
languageDetection: boolean
}
},
// Query Retrieval Configuration
queryRetrieval: {
enabled: boolean,
source: 'database', // Where queries are stored (from Query Strategy Agent)
queryTypes: ('news' | 'socialMedia' | 'web')[], // Which query types to use
maxQueriesPerSource: number, // Limit queries per search source
priorityOrder: 'relevance' | 'chronological' | 'random'
},
// Error Handling
errorHandling: {
continueOnError: boolean, // Continue if one source fails
maxErrorsPerSource: number, // Max errors before disabling source
errorNotification: {
enabled: boolean,
channels: string[] // ['email', 'slack', 'webhook']
}
}
}Outputs
{
agentId: 'data-collection',
agentVersion: string, // Semantic version (e.g., "1.2.3") of the agent that generated this output
tickerId: number,
jobId: string,
timestamp: Date,
executionTime: number, // Milliseconds
data: {
items: Array<{
id: string, // Unique identifier
url: string, // Original URL
title: string, // Extracted title
content: string, // Processed main content
rawHtml?: string, // Optional: raw HTML for reprocessing
excerpt?: string, // First paragraph or snippet
searchSource: string, // Which search source found this (e.g., "serper-dev")
queryId: string, // ID of the query that found this (from Query Strategy Agent)
query: string, // The actual query used
publishedAt?: Date, // Extracted publish date if available
author?: string, // Extracted author if available
domain: string, // Extracted domain from URL
metadata: {
wordCount: number,
language?: string, // Detected language
contentLength: number, // Bytes
fetchTime: number, // Milliseconds to fetch
processingTime: number, // Milliseconds to process
extractedAt: Date // When this was collected
}
}>
},
metadata: {
queriesProcessed: number, // Number of queries from Query Strategy Agent
searchSourcesUsed: string[], // Which search sources were queried
searchResultsFound: number, // Total search results before filtering
webPagesFetched: number, // Successfully fetched web pages
itemsStored: number, // Items stored after processing
itemsFiltered: number, // Filtered out (duplicates, low quality, etc.)
duplicatesRemoved: number,
errors: Array<{
source: string, // Search source or "web-fetch"
queryId?: string, // Query ID if applicable
url?: string, // URL if web fetch error
error: string,
timestamp: Date,
retryCount: number
}>,
performance: {
queryRetrievalTime: number, // Milliseconds
searchQueryTime: number, // Total time querying search sources
webFetchTime: number, // Total time fetching web pages
processingTime: number // Total processing time
}
}
}Process (Step-by-Step)
-
Initialization Phase:
- Load agent configuration from database (
AgentConfigtable) - Validate inputs (identifier format, time window)
- Initialize rate limiters for each search source
- Check search source availability (health checks)
- Initialize browser instances (if JavaScript rendering enabled)
- Load agent configuration from database (
-
Configuration Resolution:
- Merge system defaults, agent config, and runtime overrides
- Resolve API keys from environment or secure storage
- Load search source configurations
- Validate search source configurations
-
Query Retrieval Phase:
- Retrieve queries from database (generated by Query Strategy Agent)
- Filter queries by identifier and time window
- Filter by query types (news, socialMedia, web) if configured
- Prioritize queries based on configuration
- Limit queries per search source if configured
-
Search Query Phase (parallel across search sources):
For each enabled search source and each query:
- Check rate limits (skip if exceeded)
- Construct API request based on source configuration
- Authenticate request (API key, OAuth, etc.)
- Execute search query
- Parse response using configured response mapping
- Extract URLs, titles, snippets from search results
- Collect all search result URLs for web fetching
-
Web Page Fetching Phase (parallel):
For each unique URL from search results:
- Check robots.txt if enabled
- Check if URL is in blocklist/allowlist
- Fetch HTML content (HTTP request or headless browser if JS needed)
- Extract main content (remove ads, navigation, etc.)
- Extract metadata (title, author, date, etc.)
- Handle errors and retries
- Store raw HTML if configured
-
Processing Phase:
6a. Filtering:
- Filter by content length (min/max)
- Filter by domain (blocklist/allowlist)
- Filter invalid or empty content
6b. Deduplication:
- Compare by URL (exact match)
- Compare by title similarity (fuzzy matching)
- Compare by content similarity (if enabled)
- Use AI for fuzzy content matching if enabled
- Mark duplicates and keep highest quality version
6c. Optimization:
- Normalize whitespace
- Compress HTML if configured
- Extract and normalize metadata
- Detect language
- Calculate word count and content length
-
Storage Phase:
- Save all processed items to
DataSourcetable - Store raw HTML if configured
- Link to identifier, query ID, and collection timestamp
- Update search source health status
- Store query performance metrics (for Query Strategy Agent feedback)
- Save all processed items to
-
Error Handling:
- Log all errors with context (source, query, URL)
- Retry failed requests (up to max retries)
- Disable search sources that exceed max errors
- Send notifications for critical failures
- Continue processing other items on error
-
Return Phase:
- Aggregate all collected data
- Calculate metadata (counts, timing, errors)
- Return structured output to scheduler