WhatsApp Group Monitor with AI Summaries & Scheduled Email Reports
Build a complete WhatsApp monitoring platform using Evolution API, FastAPI, PostgreSQL, Claude AI, and Resend -- no official WhatsApp Business API required.
Table of Contents
- Why Monitor WhatsApp Groups?
- Architecture Overview
- Prerequisites
- Evolution API: Docker Setup
- PostgreSQL Schema with Table Partitioning
- FastAPI Webhook Receiver
- Message Processing Pipeline
- AI-Powered Daily Summaries
- Email Reports with Resend
- Dashboard UI (PHP)
- Prometheus Metrics Endpoint
- Cron Jobs & Automation
- Connection Watchdog & Auto-Reconnect
- Production Tips & Lessons Learned
- Complete File Reference
1. Why Monitor WhatsApp Groups?
WhatsApp is the de facto communication channel for teams across Europe, Latin America, Africa, and Asia. For businesses that run distributed operations -- call centers, field teams, multi-office setups -- critical information flows through WhatsApp groups every day:
- Operational decisions get made in group chats, not email.
- Shift handoffs happen via voice notes and quick messages.
- Customer escalations arrive as forwarded screenshots.
- Staffing problems surface first in team groups, hours before they hit the ticketing system.
The problem: WhatsApp has no built-in search across groups, no analytics, no export, no way for management to see what happened yesterday without scrolling through hundreds of messages on a phone.
What this system solves:
| Problem | Solution |
|---|---|
| "What did the team discuss yesterday?" | AI-generated daily summaries, delivered by email |
| "Which groups went silent?" | Activity alerts flag zero-message groups |
| "Who are the most active participants?" | Per-group stats with top senders |
| "Find that message from last month" | Full-text search across all groups |
| "I need a weekly report for the board" | Scheduled digest emails with stats and highlights |
| "We need to keep records for compliance" | All messages stored in PostgreSQL with timestamps |
Cost: Essentially free. Evolution API is open-source. PostgreSQL is free. Resend gives you 3,000 emails/month on the free tier. The only paid component is the Claude API for summaries, which costs roughly $0.02-0.05 per group per day using Haiku.
2. Architecture Overview
┌─────────────────────────┐
│ Your Phone │
│ (WhatsApp linked) │
└────────┬────────────────┘
│ Baileys Protocol
│ (WebSocket)
┌─────────────────────────────────────────────┼──────────────────────────────┐
│ Docker Network (monitoring) │ │
│ │ │
│ ┌──────────────────────┐ │ │
│ │ Redis 7 Alpine │◄──cache──┐ │ │
│ │ (session store) │ │ │ │
│ └──────────────────────┘ │ │ │
│ │ ▼ │
│ ┌──────────────────────┐ ┌─────┴────────────────┐ │
│ │ PostgreSQL 16 │◄───│ Evolution API │ │
│ │ (evolution DB) │ │ v2.x (Baileys) │ │
│ │ (whatsapp_monitor) │ │ Port 8080 │ │
│ └──────────┬───────────┘ └─────────┬────────────┘ │
│ │ │ Webhook POST │
│ │ │ /webhook/evolution │
│ │ ▼ │
│ │ ┌───────────────────────┐ │
│ │ │ FastAPI Service │ │
│ │◄─── asyncpg ─│ (wa-monitor) │──► Claude API │
│ │ │ Port 8086 │ (Haiku/Sonnet) │
│ │ │ systemd managed │ │
│ │ └───────────┬────────────┘ │
│ │ │ │
│ │ ┌───────┼────────┐ │
│ │ │ │ │ │
│ │ ▼ ▼ ▼ │
│ │ /metrics /health /reports │
│ │ │ │ │
│ ┌──────────┴───┐ │ │ │
│ │ Prometheus │◄── scrape ──┘ │ │
│ └──────┬───────┘ │ │
│ │ ▼ │
│ ┌──────┴───────┐ ┌──────────────────┐ │
│ │ Grafana │ │ Resend API │ │
│ │ Dashboards │ │ (email delivery)│ │
│ └──────────────┘ └──────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ PHP Dashboard (Apache, port 8082) │ │
│ │ whatsapp.php ── Group monitor, messages, AI chat │ │
│ │ email-reports.php ── Report CRUD, SMTP config │ │
│ │ whatsapp-data.php ── AJAX proxy to FastAPI │ │
│ │ whatsapp-ask.php ── AI analysis proxy │ │
│ └──────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────────────────┘
Data flow summary:
- Your phone stays connected to WhatsApp via Evolution API's Baileys integration (open-source reimplementation of the WhatsApp Web protocol).
- Every group message triggers a webhook POST to the FastAPI service.
- FastAPI parses the message, stores it in a partitioned PostgreSQL table, and optionally downloads media.
- A nightly cron job generates AI summaries for each group using Claude Haiku.
- Every 15 minutes, the report scheduler checks for due email reports and sends them via Resend.
- The PHP dashboard provides a web UI for browsing groups, searching messages, and managing reports.
3. Prerequisites
Server requirements:
- Linux server (Ubuntu 22.04/24.04 recommended), 2+ CPU cores, 4+ GB RAM
- Docker and Docker Compose installed
- Python 3.11+ with pip and venv
- A phone number dedicated to WhatsApp monitoring (this phone stays "connected")
Accounts needed:
- Anthropic API key -- for Claude AI summaries (console.anthropic.com)
- Resend account -- for email delivery (resend.com), free tier: 3,000 emails/month
Install system dependencies:
# Docker
curl -fsSL https://get.docker.com | sh
# Python 3.12 + venv
apt update && apt install -y python3.12 python3.12-venv python3-pip ffmpeg
# Optional: for image thumbnail generation
pip3 install Pillow
4. Evolution API: Docker Setup
Evolution API is the bridge between WhatsApp and your application. It uses the Baileys library (a reverse-engineered WhatsApp Web client) to maintain a persistent connection -- no official API approval, no Facebook Business verification, no monthly fees.
Docker Compose Configuration
Create your docker-compose.yml. The key services are Evolution API, Redis (for session caching), and PostgreSQL (shared with your other services):
version: "3.8"
services:
# PostgreSQL — shared database for Evolution API + your monitor
postgres:
image: postgres:16-alpine
container_name: postgres
restart: unless-stopped
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 10s
timeout: 5s
retries: 5
networks:
- monitoring
# Evolution API — WhatsApp connection via Baileys
evolution-api:
image: evoapicloud/evolution-api:v2.3.7
container_name: evolution-api
restart: unless-stopped
ports:
- "8080:8080"
environment:
# Server
- SERVER_URL=http://YOUR_SERVER_IP:8080
- AUTHENTICATION_API_KEY=${EVOLUTION_API_KEY}
# Database persistence
- DATABASE_ENABLED=true
- DATABASE_PROVIDER=postgresql
- DATABASE_CONNECTION_URI=postgresql://postgres:${POSTGRES_PASSWORD}@postgres:5432/evolution
- DATABASE_SAVE_DATA_INSTANCE=true
- DATABASE_SAVE_DATA_NEW_MESSAGE=true
- DATABASE_SAVE_DATA_CONTACTS=true
- DATABASE_SAVE_DATA_CHATS=true
# Redis caching
- CACHE_REDIS_ENABLED=true
- CACHE_REDIS_URI=redis://redis:6379/0
# Logging
- LOG_LEVEL=WARN
# Global webhook — sends all events to your FastAPI service
- WEBHOOK_GLOBAL_ENABLED=true
- WEBHOOK_GLOBAL_URL=http://172.18.0.1:8086/webhook/evolution
- WEBHOOK_GLOBAL_WEBHOOK_BY_EVENTS=false
- WEBHOOK_EVENTS_MESSAGES_UPSERT=true
- WEBHOOK_EVENTS_GROUPS_UPSERT=true
- WEBHOOK_EVENTS_GROUP_PARTICIPANTS_UPDATE=true
- WEBHOOK_EVENTS_CONNECTION_UPDATE=true
- WEBHOOK_EVENTS_QRCODE_UPDATED=true
# Disable unused integrations
- TYPEBOT_ENABLED=false
- CHATWOOT_ENABLED=false
- OPENAI_ENABLED=false
- DIFY_ENABLED=false
- S3_ENABLED=false
volumes:
- evolution_instances:/evolution/instances
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
networks:
- monitoring
# Redis — session and cache store for Evolution API
redis:
image: redis:7-alpine
container_name: redis
restart: unless-stopped
command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru
volumes:
- redis_data:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 3s
retries: 5
networks:
- monitoring
volumes:
postgres_data:
evolution_instances:
redis_data:
networks:
monitoring:
driver: bridge
Environment File
Create a .env file alongside your compose file:
POSTGRES_PASSWORD=your_secure_pg_password_here
EVOLUTION_API_KEY=your_evo_api_key_here
Starting the Stack
docker compose up -d
docker compose logs -f evolution-api # Watch for startup
Creating a WhatsApp Instance and Linking Your Phone
Once Evolution API is running, create an instance and scan the QR code:
# Create instance
curl -s -X POST http://localhost:8080/instance/create \
-H "apikey: YOUR_EVOLUTION_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"instanceName": "wa-monitor",
"integration": "WHATSAPP-BAILEYS",
"qrcode": true,
"webhook": {
"url": "http://172.18.0.1:8086/webhook/evolution",
"byEvents": false,
"base64": true,
"events": [
"MESSAGES_UPSERT",
"GROUPS_UPSERT",
"GROUP_PARTICIPANTS_UPDATE",
"CONNECTION_UPDATE",
"QRCODE_UPDATED"
]
}
}'
# Get QR code (returns base64-encoded image)
curl -s http://localhost:8080/instance/connect/wa-monitor \
-H "apikey: YOUR_EVOLUTION_API_KEY" | jq -r '.base64'
Open WhatsApp on your phone, go to Linked Devices, and scan the QR code. The connection persists across restarts because Evolution API stores the session in PostgreSQL.
Important: The webhook URL uses
172.18.0.1because Evolution API runs inside Docker but needs to reach your FastAPI service running on the host. This is the Docker bridge gateway IP. Adjust if your Docker network uses a different subnet.
Verifying the Connection
curl -s http://localhost:8080/instance/connectionState/wa-monitor \
-H "apikey: YOUR_EVOLUTION_API_KEY"
# Expected: {"instance":{"instanceName":"wa-monitor","state":"open"}}
5. PostgreSQL Schema with Table Partitioning
The message volume from active WhatsApp groups can grow quickly. A single group of 50 people can produce 500+ messages per day. Multiply by 10-20 groups, add media, and you are looking at millions of rows within months.
Table partitioning solves this by splitting wa_messages into monthly chunks. Each month gets its own physical table, which means:
- Queries that filter by date only scan the relevant partition
- Old data can be dropped by simply detaching a partition (instant, zero-downtime)
- VACUUM and index maintenance operate on smaller tables
- Backups can target specific months
Schema Design
-- Run inside your PostgreSQL container
CREATE DATABASE whatsapp_monitor;
\c whatsapp_monitor
-- ── Groups ──
CREATE TABLE wa_groups (
jid TEXT PRIMARY KEY, -- WhatsApp JID: [email protected]
name TEXT NOT NULL DEFAULT '',
description TEXT DEFAULT '',
participant_count INTEGER DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
is_monitored BOOLEAN DEFAULT true
);
-- ── Participants ──
CREATE TABLE wa_participants (
id SERIAL PRIMARY KEY,
group_jid TEXT NOT NULL REFERENCES wa_groups(jid),
phone TEXT NOT NULL,
push_name TEXT DEFAULT '',
is_admin BOOLEAN DEFAULT false,
first_seen TIMESTAMPTZ DEFAULT NOW(),
last_seen TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(group_jid, phone)
);
-- ── Messages (partitioned by month) ──
CREATE TABLE wa_messages (
id TEXT NOT NULL, -- WhatsApp message ID
group_jid TEXT NOT NULL, -- Group JID
sender_phone TEXT NOT NULL, -- Sender phone number
sender_name TEXT DEFAULT '', -- Push name (display name)
message_type TEXT NOT NULL DEFAULT 'conversation',
content TEXT DEFAULT '',
-- Full-text search vector, auto-generated from content
content_search TSVECTOR GENERATED ALWAYS AS (
to_tsvector('simple', coalesce(content, ''))
) STORED,
media_type TEXT, -- image, audio, video, document
media_path TEXT, -- Local file path after download
media_thumb_path TEXT, -- Thumbnail path
media_mime TEXT, -- MIME type
media_size INTEGER, -- File size in bytes
media_duration INTEGER, -- Audio/video duration in seconds
reply_to_id TEXT, -- Quoted message ID
timestamp TIMESTAMPTZ NOT NULL, -- Original WhatsApp timestamp
raw_payload JSONB, -- Full webhook payload for debugging
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (id, timestamp) -- Composite PK required for partitioning
) PARTITION BY RANGE (timestamp);
-- Create partitions for current + next 3 months
-- (Replace 2026 with your current year)
CREATE TABLE wa_messages_2026_03 PARTITION OF wa_messages
FOR VALUES FROM ('2026-03-01') TO ('2026-04-01');
CREATE TABLE wa_messages_2026_04 PARTITION OF wa_messages
FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');
CREATE TABLE wa_messages_2026_05 PARTITION OF wa_messages
FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');
CREATE TABLE wa_messages_2026_06 PARTITION OF wa_messages
FOR VALUES FROM ('2026-06-01') TO ('2026-07-01');
-- Performance indexes
CREATE INDEX idx_wa_msg_group_ts ON wa_messages (group_jid, timestamp DESC);
CREATE INDEX idx_wa_msg_sender ON wa_messages (sender_phone, timestamp DESC);
CREATE INDEX idx_wa_msg_search ON wa_messages USING GIN (content_search);
-- ── AI Summary Cache ──
CREATE TABLE wa_daily_summaries (
id SERIAL PRIMARY KEY,
group_jid TEXT NOT NULL REFERENCES wa_groups(jid),
summary_date DATE NOT NULL,
message_count INTEGER DEFAULT 0,
summary TEXT NOT NULL,
model TEXT DEFAULT 'haiku',
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(group_jid, summary_date)
);
-- ── Email Configuration (singleton row) ──
CREATE TABLE wa_email_config (
id INTEGER PRIMARY KEY DEFAULT 1,
smtp_host TEXT DEFAULT '', -- 'resend' for Resend API mode
smtp_port INTEGER DEFAULT 587,
smtp_user TEXT DEFAULT '',
smtp_pass TEXT DEFAULT '', -- API key for Resend mode
smtp_tls BOOLEAN DEFAULT true,
from_email TEXT DEFAULT '',
from_name TEXT DEFAULT 'WhatsApp Monitor',
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- ── Scheduled Reports ──
CREATE TABLE wa_scheduled_reports (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
report_type TEXT NOT NULL DEFAULT 'daily_summary',
schedule TEXT NOT NULL DEFAULT 'daily', -- daily, weekly, monthly
schedule_time TEXT DEFAULT '08:00', -- HH:MM (UTC)
schedule_day INTEGER DEFAULT 1, -- Day of week (1=Mon) or month
recipients TEXT NOT NULL DEFAULT '', -- Comma-separated emails
group_filter TEXT DEFAULT '', -- Comma-separated JIDs, empty = all
include_stats BOOLEAN DEFAULT true,
is_active BOOLEAN DEFAULT true,
last_sent_at TIMESTAMPTZ,
last_status TEXT DEFAULT '',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- ── Webhook Debug Log ──
CREATE TABLE wa_webhook_log (
id BIGSERIAL PRIMARY KEY,
event_type TEXT NOT NULL,
instance TEXT,
payload JSONB NOT NULL,
processed BOOLEAN DEFAULT false,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_wa_webhook_unprocessed
ON wa_webhook_log (processed) WHERE NOT processed;
Why This Partitioning Strategy Works
Range partitioning by month is the right choice here because:
- Queries are almost always time-bounded. "Show me yesterday's messages" or "summarize last week" only touch 1-2 partitions.
- Data retention is simple. To drop messages older than 12 months:
ALTER TABLE wa_messages DETACH PARTITION wa_messages_2025_01; DROP TABLE wa_messages_2025_01; - The generated TSVECTOR column (
content_search) is inherited by each partition, giving you full-text search with GIN indexes that are partition-local (smaller, faster). - The composite primary key
(id, timestamp)is required by PostgreSQL -- the partition key must be part of any unique constraint. This is fine because WhatsApp message IDs are globally unique within a reasonable time window.
Auto-Creating Partitions
The service creates partitions on startup and via cron. Here is the logic:
async def _ensure_partitions(conn):
"""Create partitions for current month + next 3 months."""
now = datetime.utcnow()
for i in range(4):
m = now.month + i
y = now.year + (m - 1) // 12
m = ((m - 1) % 12) + 1
nm = m + 1
ny = y + (nm - 1) // 12
nm = ((nm - 1) % 12) + 1
part_name = f"wa_messages_{y}_{m:02d}"
start = f"{y}-{m:02d}-01"
end = f"{ny}-{nm:02d}-01"
await conn.execute(f"""
CREATE TABLE IF NOT EXISTS {part_name}
PARTITION OF wa_messages
FOR VALUES FROM ('{start}') TO ('{end}')
""")
This runs on every service startup and on the 25th of each month via cron, ensuring the next month's partition always exists before it is needed.
6. FastAPI Webhook Receiver
The webhook receiver is the core of the system. It receives POST requests from Evolution API for every event (messages, group updates, participant changes, connection state) and processes them asynchronously.
Service Structure
#!/usr/bin/env python3
"""
WhatsApp Group Monitor -- FastAPI Service
Port 8086. Receives Evolution API webhooks, stores to PostgreSQL.
"""
import os
import re
import json
import time
import logging
import asyncio
from datetime import datetime, date, timedelta
from pathlib import Path
from typing import Optional
import asyncpg
import httpx
from fastapi import FastAPI, HTTPException, Query, BackgroundTasks, Request
from fastapi.responses import JSONResponse, FileResponse
# ── Config ──
PG_DSN = "postgresql://postgres:YOUR_PG_PASSWORD@POSTGRES_HOST:5432/whatsapp_monitor"
EVOLUTION_URL = "http://localhost:8080"
EVOLUTION_API_KEY = os.environ.get("EVOLUTION_API_KEY", "YOUR_EVO_KEY")
ANTHROPIC_KEY_FILE = "/opt/wa-monitor/.api_key"
MEDIA_DIR = Path("/opt/wa-monitor/media")
EVOLUTION_INSTANCE = "wa-monitor"
MEDIA_DIR.mkdir(parents=True, exist_ok=True)
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("wa-monitor")
app = FastAPI(title="WhatsApp Monitor", version="1.0")
pool: Optional[asyncpg.Pool] = None
_startup_time = time.time()
Connection Pool and Startup
Using asyncpg for async PostgreSQL access with a connection pool:
@app.on_event("startup")
async def startup():
global pool
pool = await asyncpg.create_pool(PG_DSN, min_size=2, max_size=10)
log.info("PostgreSQL pool created")
await _ensure_schema()
# Sync groups from Evolution API on startup
asyncio.create_task(_sync_groups_from_evolution())
# Start background watchdog
asyncio.create_task(_watchdog_loop())
@app.on_event("shutdown")
async def shutdown():
if pool:
await pool.close()
The Webhook Endpoint
The critical design decision: accept fast, process later. The webhook endpoint logs the raw payload and processes the message in a background task. This ensures Evolution API never times out waiting for your response:
@app.post("/webhook/evolution")
async def webhook_evolution(request: Request,
background_tasks: BackgroundTasks):
"""Receive Evolution API webhooks. Store raw, process in background."""
try:
body = await request.json()
except Exception:
raise HTTPException(400, "Invalid JSON")
event = body.get("event", "unknown")
instance = body.get("instance", "")
# Log raw webhook for debugging
async with pool.acquire() as conn:
await conn.execute(
"""INSERT INTO wa_webhook_log (event_type, instance, payload)
VALUES ($1, $2, $3)""",
event, instance, json.dumps(body)
)
# Process in background -- return 200 immediately
background_tasks.add_task(_process_webhook, event, instance, body)
return {"status": "ok"}
Webhook Routing
Different events get different handlers:
async def _process_webhook(event: str, instance: str, body: dict):
"""Route webhook events to appropriate handlers."""
try:
data = body.get("data", {})
if event == "messages.upsert":
await _process_message(instance, data)
elif event == "groups.upsert":
await _process_group_upsert(data)
elif event == "group-participants.update":
await _process_participant_update(data)
elif event == "connection.update":
state = data.get("state", "")
log.info(f"Connection update: {instance} -> {state}")
elif event == "qrcode.updated":
log.info(f"QR code updated for {instance}")
except Exception as e:
log.error(f"Webhook processing error ({event}): {e}",
exc_info=True)
7. Message Processing Pipeline
Parsing WhatsApp Message Types
WhatsApp messages come in many flavors. The Evolution API webhook payload nests the content differently for each type. Here is how to handle them all:
async def _process_message(instance: str, data: dict):
"""Parse an incoming message and store it."""
key = data.get("key", {})
msg_id = key.get("id", "")
remote_jid = key.get("remoteJid", "")
# Only process group messages (JIDs ending in @g.us)
if not remote_jid.endswith("@g.us"):
return
# Extract sender
participant = (key.get("participant", "")
or data.get("participant", ""))
sender_phone = participant.split("@")[0] if participant else ""
sender_name = data.get("pushName", "")
# Extract message content based on type
message = data.get("message", {})
msg_type = "conversation"
content = ""
media_type = None
media_mime = None
if "conversation" in message:
content = message["conversation"]
msg_type = "conversation"
elif "extendedTextMessage" in message:
content = message["extendedTextMessage"].get("text", "")
msg_type = "extendedText"
elif "imageMessage" in message:
msg_type = "image"
media_type = "image"
content = message["imageMessage"].get("caption", "")
media_mime = message["imageMessage"].get("mimetype", "image/jpeg")
elif "audioMessage" in message:
msg_type = "audio"
media_type = "audio"
media_mime = message["audioMessage"].get("mimetype", "audio/ogg")
elif "videoMessage" in message:
msg_type = "video"
media_type = "video"
content = message["videoMessage"].get("caption", "")
media_mime = message["videoMessage"].get("mimetype", "video/mp4")
elif "documentMessage" in message:
msg_type = "document"
media_type = "document"
fname = message["documentMessage"].get("fileName", "")
content = f"[Document: {fname}]" if fname else "[Document]"
media_mime = message["documentMessage"].get("mimetype", "")
elif "stickerMessage" in message:
msg_type = "sticker"
content = "[Sticker]"
elif "reactionMessage" in message:
msg_type = "reaction"
content = message["reactionMessage"].get("text", "")
elif "contactMessage" in message:
msg_type = "contact"
display = message["contactMessage"].get("displayName", "")
content = f"[Contact: {display}]"
elif "locationMessage" in message:
msg_type = "location"
lat = message["locationMessage"].get("degreesLatitude", "")
lon = message["locationMessage"].get("degreesLongitude", "")
content = f"[Location: {lat}, {lon}]"
else:
msg_type = list(message.keys())[0] if message else "unknown"
# Parse timestamp
ts_epoch = data.get("messageTimestamp", 0)
if isinstance(ts_epoch, str):
ts_epoch = int(ts_epoch)
ts = (datetime.utcfromtimestamp(ts_epoch)
if ts_epoch else datetime.utcnow())
# Extract reply context
reply_to = None
for msg_key in ("extendedTextMessage", "imageMessage", "videoMessage"):
ctx = message.get(msg_key, {}).get("contextInfo", {})
if ctx:
reply_to = ctx.get("stanzaId")
break
# Ensure group exists in our database
await _ensure_group(remote_jid)
# Insert message (idempotent via ON CONFLICT)
async with pool.acquire() as conn:
await conn.execute("""
INSERT INTO wa_messages
(id, group_jid, sender_phone, sender_name, message_type,
content, media_type, media_mime, reply_to_id,
timestamp, raw_payload)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
ON CONFLICT (id, timestamp) DO NOTHING
""", msg_id, remote_jid, sender_phone, sender_name, msg_type,
content, media_type, media_mime, reply_to, ts,
json.dumps(data))
# Update participant tracking
if sender_phone:
await _upsert_participant(remote_jid, sender_phone, sender_name)
# Download media in background
if media_type in ("image", "audio", "video", "document"):
asyncio.create_task(_download_and_process_media(
instance, data, msg_id, remote_jid,
media_type, media_mime, ts
))
log.info(f"Stored: {msg_id} in {remote_jid} "
f"from {sender_phone} ({msg_type})")
Group and Participant Tracking
async def _ensure_group(jid: str, name: str = ""):
"""Insert group if not exists."""
async with pool.acquire() as conn:
await conn.execute("""
INSERT INTO wa_groups (jid, name)
VALUES ($1, $2)
ON CONFLICT (jid) DO NOTHING
""", jid, name)
async def _upsert_participant(group_jid: str, phone: str,
push_name: str = ""):
"""Track participant activity."""
async with pool.acquire() as conn:
await conn.execute("""
INSERT INTO wa_participants
(group_jid, phone, push_name, last_seen)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (group_jid, phone) DO UPDATE SET
push_name = CASE
WHEN EXCLUDED.push_name != ''
THEN EXCLUDED.push_name
ELSE wa_participants.push_name
END,
last_seen = NOW()
""", group_jid, phone, push_name)
Media Download and Processing
Media files are downloaded from Evolution API, stored locally in date-based directories, and optionally processed (thumbnails for images, duration extraction for audio/video):
async def _download_and_process_media(instance, data, msg_id,
group_jid, media_type,
media_mime, ts):
"""Download media via Evolution API, generate thumbnails."""
try:
key = data.get("key", {})
message = data.get("message", {})
# Date-based directory structure
date_dir = MEDIA_DIR / group_jid / ts.strftime("%Y-%m-%d")
date_dir.mkdir(parents=True, exist_ok=True)
# Map MIME types to file extensions
ext_map = {
"image/jpeg": ".jpg", "image/png": ".png",
"image/webp": ".webp", "audio/ogg": ".ogg",
"audio/ogg; codecs=opus": ".ogg", "audio/mp4": ".m4a",
"video/mp4": ".mp4", "application/pdf": ".pdf",
}
ext = ext_map.get(media_mime, ".bin")
media_path = date_dir / f"{msg_id}{ext}"
# Download via Evolution API's base64 endpoint
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.post(
f"{EVOLUTION_URL}/chat/getBase64FromMediaMessage"
f"/{instance}",
json={"message": {"key": key, "message": message}},
headers={"apikey": EVOLUTION_API_KEY}
)
if resp.status_code != 200:
log.warning(f"Media download failed: {msg_id}")
return
import base64
b64_data = resp.json().get("base64", "")
if not b64_data:
return
raw = base64.b64decode(b64_data)
media_path.write_bytes(raw)
# Update database with file path and size
async with pool.acquire() as conn:
await conn.execute("""
UPDATE wa_messages
SET media_path = $1, media_size = $2
WHERE id = $3 AND timestamp = $4
""", str(media_path), len(raw), msg_id, ts)
log.info(f"Media saved: {msg_id} ({len(raw)} bytes)")
except Exception as e:
log.error(f"Media processing failed: {msg_id}: {e}")
8. AI-Powered Daily Summaries
This is where the system becomes genuinely valuable. Instead of reading 300 messages from yesterday's team chat, you get a 4-paragraph summary highlighting decisions, action items, and key discussions.
Claude API Integration
def _get_api_key() -> str:
"""Read Anthropic API key from file."""
key_file = Path(ANTHROPIC_KEY_FILE)
if key_file.exists():
key = key_file.read_text().strip()
if key:
return key
raise RuntimeError("Anthropic API key not configured")
def _call_claude(messages: list, model: str = "claude-haiku-4-5-20251001",
max_tokens: int = 2000, system: str = "") -> str:
"""Call Claude API and return the response text."""
import requests
api_key = _get_api_key()
body = {
"model": model,
"max_tokens": max_tokens,
"messages": messages,
}
if system:
body["system"] = system
resp = requests.post(
"https://api.anthropic.com/v1/messages",
headers={
"x-api-key": api_key,
"content-type": "application/json",
"anthropic-version": "2023-06-01",
},
json=body,
timeout=120,
)
if resp.status_code != 200:
err = resp.json().get("error", {}).get("message",
resp.text[:300])
raise RuntimeError(f"API error ({resp.status_code}): {err}")
return resp.json()["content"][0]["text"].strip()
The Summary Generation Endpoint
@app.post("/summarize/{jid}")
async def summarize_group(jid: str, target_date: str = ""):
"""Generate or retrieve a daily summary for a group."""
if not target_date:
target_date = (date.today() - timedelta(days=1)).isoformat()
summary_date = date.fromisoformat(target_date)
# Check cache first
async with pool.acquire() as conn:
existing = await conn.fetchrow(
"""SELECT summary, message_count, model
FROM wa_daily_summaries
WHERE group_jid = $1 AND summary_date = $2""",
jid, summary_date
)
if existing:
return {
"summary": existing["summary"],
"message_count": existing["message_count"],
"cached": True,
}
# Fetch messages for the target date
messages = await conn.fetch("""
SELECT sender_name, sender_phone, content,
message_type, timestamp
FROM wa_messages
WHERE group_jid = $1 AND timestamp::date = $2
ORDER BY timestamp ASC
""", jid, summary_date)
if not messages:
return {"summary": "No messages on this date.",
"message_count": 0, "cached": False}
group = await conn.fetchrow(
"SELECT name FROM wa_groups WHERE jid = $1", jid)
group_name = group["name"] if group else jid
# Build the prompt
msg_text = []
for m in messages:
name = m["sender_name"] or m["sender_phone"]
ts = m["timestamp"].strftime("%H:%M")
content = m["content"] or f"[{m['message_type']}]"
msg_text.append(f"[{ts}] {name}: {content}")
prompt = f"""Summarize the following WhatsApp group messages \
from "{group_name}" on {target_date}.
Group by topic/thread. Highlight any decisions, action items, \
or important announcements.
Be concise (2-4 paragraphs max). If messages are in a \
non-English language, summarize in English.
Messages:
{chr(10).join(msg_text)}"""
summary = _call_claude(
[{"role": "user", "content": prompt}],
model="claude-haiku-4-5-20251001",
max_tokens=1000
)
# Cache the summary
async with pool.acquire() as conn:
await conn.execute("""
INSERT INTO wa_daily_summaries
(group_jid, summary_date, message_count, summary, model)
VALUES ($1, $2, $3, $4, 'haiku')
ON CONFLICT (group_jid, summary_date) DO UPDATE SET
summary = EXCLUDED.summary,
message_count = EXCLUDED.message_count
""", jid, summary_date, len(messages), summary)
return {"summary": summary, "message_count": len(messages),
"cached": False}
Interactive AI Analysis
Beyond daily summaries, the system supports freeform questions about group conversations. This uses Claude Sonnet for deeper analysis:
WA_SYSTEM_PROMPT = """You are a WhatsApp group analysis assistant. \
You have access to WhatsApp group messages from staff groups.
Your job: Summarize discussions, identify key topics, flag important \
items (decisions, deadlines, complaints, requests), and answer \
questions about group conversations.
When analyzing messages:
- Group by topic/thread when summarizing
- Highlight action items and decisions
- Note who said what when relevant
- Flag urgent or important messages
- Be concise but thorough
- Use sender names when available, phone numbers when not
- Present information in a clear, structured format
- If messages are in a non-English language, provide analysis in English
Format your response in markdown."""
@app.post("/ask")
async def ask_ai(payload: dict):
"""AI analysis endpoint -- answers questions about messages."""
question = payload.get("question", "").strip()
group_jid = payload.get("group_jid", "")
# Gather context: recent messages from relevant group(s)
context_parts = []
async with pool.acquire() as conn:
if group_jid:
messages = await conn.fetch("""
SELECT sender_name, sender_phone, content,
message_type, timestamp
FROM wa_messages
WHERE group_jid = $1
AND timestamp >= CURRENT_DATE - INTERVAL '7 days'
ORDER BY timestamp ASC LIMIT 500
""", group_jid)
for m in messages:
name = m["sender_name"] or m["sender_phone"]
ts = m["timestamp"].strftime("%Y-%m-%d %H:%M")
content = m["content"] or f"[{m['message_type']}]"
context_parts.append(f"[{ts}] {name}: {content}")
context = "\n".join(context_parts)
user_msg = (f"Here is the WhatsApp message data:\n\n{context}"
f"\n\n---\n\nQuestion: {question}")
answer = _call_claude(
[{"role": "user", "content": user_msg}],
model="claude-sonnet-4-6",
max_tokens=3000,
system=WA_SYSTEM_PROMPT
)
return {"answer": answer, "model": "claude-sonnet-4.6"}
AI Prompt Engineering Tips
Getting good summaries requires careful prompting. Here is what works:
- Always include timestamps. Without them, the AI cannot distinguish morning discussions from afternoon follow-ups.
- Include sender names. "Marco said we need to hire 3 more agents" is far more useful than "Someone said we need to hire."
- Request topic grouping. Without this instruction, the AI tends to produce a chronological retelling rather than a thematic summary.
- Specify the output language. If your groups use Italian, Albanian, or Spanish, explicitly ask for English summaries.
- Cap the token budget. Haiku with
max_tokens=1000produces tight summaries. Going higher tends to add filler. - Use Haiku for bulk summaries, Sonnet for interactive analysis. Haiku is 10-20x cheaper and fast enough for nightly cron jobs. Sonnet's deeper reasoning is worth the cost for real-time "what happened?" questions.
9. Email Reports with Resend
Why Resend?
| Feature | Resend | Brevo | Mailjet |
|---|---|---|---|
| Free tier | 3,000/month | 300/day | 200/day |
| Setup time | 2 minutes | 10 minutes | 10 minutes |
| API complexity | 1 endpoint | Medium | Medium |
| KYC/verification | Email only | Email + phone | |
| Custom domain needed? | No (test domain available) | Yes for production | Yes for production |
Resend's free tier is perfect for monitoring reports. The API is a single POST endpoint. No SDK needed -- plain urllib works.
Email Sending Implementation
The service supports both Resend's REST API and traditional SMTP, selected by setting smtp_host to "resend":
def _send_email_sync(config: dict, to_list: list,
subject: str, html_body: str) -> str:
"""Send email. Returns empty string on success, error on failure."""
import json as _json
from urllib.request import Request, urlopen
from urllib.error import HTTPError
# ── Resend API mode ──
if config.get("smtp_host", "").lower() == "resend":
api_key = config.get("smtp_pass", "")
if not api_key:
return "Resend API key not configured"
from_str = (f"{config.get('from_name', 'WhatsApp Monitor')} "
f"<{config.get('from_email', '[email protected]')}>")
# Generate plain-text fallback by stripping HTML
plain = re.sub(r'<[^>]+>', '', html_body)
plain = re.sub(r'\s+', ' ', plain).strip()
payload = _json.dumps({
"from": from_str,
"to": to_list,
"subject": subject,
"html": html_body,
"text": plain,
}).encode()
req = Request("https://api.resend.com/emails",
data=payload, method="POST")
req.add_header("Authorization", f"Bearer {api_key}")
req.add_header("Content-Type", "application/json")
try:
resp = urlopen(req, timeout=30)
data = _json.loads(resp.read())
return "" if data.get("id") else f"Resend error: {data}"
except HTTPError as e:
body = e.read().decode()
return f"Resend HTTP {e.code}: {body}"
except Exception as e:
return f"Resend error: {e}"
# ── Traditional SMTP mode ──
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
if not config.get("smtp_host"):
return "Email not configured"
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = (f"{config.get('from_name')} "
f"<{config.get('from_email')}>")
msg["To"] = ", ".join(to_list)
plain = re.sub(r'<[^>]+>', '', html_body)
msg.attach(MIMEText(plain, "plain", "utf-8"))
msg.attach(MIMEText(html_body, "html", "utf-8"))
try:
if config.get("smtp_tls", True):
server = smtplib.SMTP(config["smtp_host"],
config.get("smtp_port", 587),
timeout=30)
server.starttls()
else:
server = smtplib.SMTP(config["smtp_host"],
config.get("smtp_port", 25),
timeout=30)
if config.get("smtp_user") and config.get("smtp_pass"):
server.login(config["smtp_user"], config["smtp_pass"])
server.sendmail(msg["From"], to_list, msg.as_string())
server.quit()
return ""
except Exception as e:
return str(e)
HTML Email Template
Professional dark-themed email template that renders well across clients:
def _build_email_html(title: str, sections: list) -> str:
"""Build HTML email. sections = [(heading, content_html), ...]"""
sections_html = ""
for heading, content in sections:
sections_html += f'''
<tr><td style="padding:0 24px 20px;">
<h2 style="margin:0 0 8px;font-size:14px;color:#25D366;
font-family:'Courier New',monospace;
border-bottom:1px solid #2a2a2e;
padding-bottom:6px;">{heading}</h2>
<div style="font-size:13px;line-height:1.7;
color:#d4d4d8;">{content}</div>
</td></tr>'''
return f'''<!DOCTYPE html><html><head>
<meta charset="utf-8"></head>
<body style="margin:0;padding:0;background:#0a0a0b;
font-family:-apple-system,sans-serif;">
<table width="100%" cellpadding="0" cellspacing="0"
style="background:#0a0a0b;padding:20px 0;">
<tr><td align="center">
<table width="600" cellpadding="0" cellspacing="0"
style="background:#111113;border:1px solid #27272a;
border-radius:8px;overflow:hidden;">
<tr><td style="background:#18181b;padding:16px 24px;
border-bottom:1px solid #27272a;">
<span style="font-family:'Courier New',monospace;
font-size:15px;font-weight:bold;
color:#e4e4e7;">
<span style="color:#25D366;">●</span> {title}
</span>
</td></tr>
{sections_html}
<tr><td style="padding:16px 24px;
border-top:1px solid #27272a;
text-align:center;">
<span style="font-size:10px;color:#52525b;
font-family:'Courier New',monospace;">
WhatsApp Monitor -- Automated Report
</span>
</td></tr>
</table>
</td></tr></table>
</body></html>'''
Report Types
The system supports three built-in report types:
1. Daily Summary (daily_summary)
- Fetches AI-generated summaries (from
wa_daily_summaries) for each group - Falls back to raw message previews if no summary exists
- Shows message count per group
2. Weekly Digest (weekly_digest)
- Aggregates 7 days of data: total messages, unique participants, top senders
- Includes highlights from daily summaries
- Good for management reporting
3. Activity Alert (activity_alert)
- Lists active vs. inactive groups
- Flags groups with zero messages today
- Useful for community managers who need to spot disengaged groups
Report Generation Engine
async def _send_single_report(report: dict) -> dict:
"""Generate and send a single report."""
config = await _get_email_config()
if not config.get("smtp_host"):
return {"status": "error", "detail": "SMTP not configured"}
recipients = [r.strip()
for r in report["recipients"].split(",")
if r.strip()]
report_type = report["report_type"]
sections = []
async with pool.acquire() as conn:
groups = await conn.fetch(
"SELECT jid, name FROM wa_groups "
"WHERE is_monitored = true")
if report_type == "daily_summary":
yesterday = (date.today() - timedelta(days=1)).isoformat()
for g in groups:
summary = await conn.fetchrow(
"""SELECT summary, message_count
FROM wa_daily_summaries
WHERE group_jid = $1
AND summary_date = $2""",
g["jid"], date.fromisoformat(yesterday))
if summary and summary["message_count"] > 0:
content = summary["summary"].replace("\n", "<br>")
content = (f'<p style="color:#a1a1aa;">'
f'{summary["message_count"]} messages'
f'</p>' + content)
sections.append((g["name"] or g["jid"], content))
subject = f"WhatsApp Daily Summary -- {yesterday}"
title = f"Daily Summary -- {yesterday}"
# ... similar logic for weekly_digest and activity_alert
html = _build_email_html(title, sections)
err = await asyncio.get_event_loop().run_in_executor(
None, _send_email_sync, config, recipients, subject, html)
# Track send status
async with pool.acquire() as conn:
status = "sent" if not err else f"error: {err}"
await conn.execute(
"""UPDATE wa_scheduled_reports
SET last_sent_at = NOW(), last_status = $2
WHERE id = $1""",
report["id"], status)
if err:
return {"status": "error", "detail": err}
return {"status": "ok", "sections": len(sections)}
Report CRUD API
Full REST API for managing reports from the dashboard:
GET /reports -- List all scheduled reports
POST /reports -- Create a new report
PUT /reports/{id} -- Update a report
DELETE /reports/{id} -- Delete a report
POST /reports/{id}/send -- Send a report immediately
POST /cron/send-reports -- Check and send all due reports
10. Dashboard UI (PHP)
The dashboard provides four PHP pages that work together:
whatsapp.php -- Group Monitor
The main interface for viewing groups and messages. Features:
- Group list with live stats (messages today, this week, last message time)
- Message viewer with pagination, sender filtering, date range selection
- Full-text search across all groups (powered by PostgreSQL
tsvector) - AI chat panel -- ask questions about any group's conversations
- Connection status -- shows whether WhatsApp is connected, with QR code display for reconnection
email-reports.php -- Report Management
A standalone CRUD interface for email reports. Features:
- SMTP/Resend configuration panel -- collapsible form for email provider settings
- Report cards -- each report shows schedule, recipients, last send status, and toggle switch
- Modal form -- create/edit reports with type, schedule, time, day, recipients, group filter
- Send Now button -- manually trigger any report
- Test Email -- verify SMTP configuration
whatsapp-data.php -- AJAX Proxy
A thin PHP proxy that forwards browser requests to the FastAPI service. This solves two problems:
- CORS -- the browser cannot directly call
localhost:8086from a page served on port 8082. - Authentication -- the proxy enforces admin-only access using the existing PHP auth system.
// Simplified flow
$endpoint = $_GET['endpoint']; // e.g., "/groups" or "/reports/5"
$url = "http://127.0.0.1:8086{$endpoint}";
$response = curl_exec($url); // Forward request
echo $response; // Return response
The proxy whitelists specific endpoint prefixes to prevent unauthorized access to internal APIs.
whatsapp-ask.php -- AI Analysis Proxy
Dedicated proxy for the /ask endpoint, forwarding user questions to Claude via the FastAPI service. Supports multi-turn conversations by passing the full conversation history.
11. Prometheus Metrics Endpoint
The /metrics endpoint exposes Prometheus-compatible metrics for Grafana monitoring:
@app.get("/metrics")
async def prometheus_metrics():
"""Prometheus-compatible metrics for Grafana."""
lines = []
lines.append("# HELP wa_monitor_up Service status")
lines.append("# TYPE wa_monitor_up gauge")
wa_up = 1 if _watchdog_stats.get("last_state") == "open" else 0
lines.append(f"wa_monitor_up {wa_up}")
lines.append("# HELP wa_monitor_uptime_seconds Uptime")
lines.append("# TYPE wa_monitor_uptime_seconds gauge")
lines.append(f"wa_monitor_uptime_seconds "
f"{int(time.time() - _startup_time)}")
lines.append("# HELP wa_monitor_reconnects_total Reconnects")
lines.append("# TYPE wa_monitor_reconnects_total counter")
lines.append(f'wa_monitor_reconnects_total '
f'{_watchdog_stats["reconnects"]}')
async with pool.acquire() as conn:
today = await conn.fetchval(
"SELECT COUNT(*) FROM wa_messages "
"WHERE timestamp >= CURRENT_DATE")
total = await conn.fetchval(
"SELECT COUNT(*) FROM wa_messages")
groups = await conn.fetchval(
"SELECT COUNT(*) FROM wa_groups "
"WHERE is_monitored = true")
lines.append(f"wa_monitor_messages_today {today}")
lines.append(f"wa_monitor_messages_total {total}")
lines.append(f"wa_monitor_groups {groups}")
from fastapi.responses import PlainTextResponse
return PlainTextResponse(
"\n".join(lines) + "\n",
media_type="text/plain; version=0.0.4")
Prometheus scrape config:
# In prometheus.yml
scrape_configs:
- job_name: 'wa-monitor'
scrape_interval: 30s
static_configs:
- targets: ['172.18.0.1:8086']
Useful Grafana panels:
wa_monitor_up-- alert if WhatsApp disconnectsrate(wa_monitor_messages_today)-- message velocitywa_monitor_reconnects_total-- connection stabilitywa_monitor_groups-- group count over time
12. Cron Jobs & Automation
Four cron jobs keep the system running without human intervention:
# ── Daily AI Summaries (2 AM) ──
# Generates summaries for all groups for yesterday
0 2 * * * curl -s -X POST http://localhost:8086/cron/daily-summaries \
> /dev/null 2>&1
# ── Partition Creation (25th of each month, 3 AM) ──
# Creates PostgreSQL partitions for the next 3+ months
0 3 25 * * curl -s -X POST http://localhost:8086/cron/create-partitions \
> /dev/null 2>&1
# ── Media Cleanup (Sundays at 4 AM) ──
# Deletes downloaded media files older than 90 days
0 4 * * 0 find /opt/wa-monitor/media -type f -mtime +90 -delete \
2>/dev/null; \
find /opt/wa-monitor/media -type d -empty -delete 2>/dev/null
# ── Report Scheduler (every 15 minutes) ──
# Checks for due reports and sends them
*/15 * * * * curl -s -X POST http://localhost:8086/cron/send-reports \
> /dev/null 2>&1
How the Report Scheduler Works
The /cron/send-reports endpoint runs every 15 minutes. For each active report:
- Parse the
schedule_time(e.g., "08:00") and check if the current time is within a 15-minute window after the scheduled time. - Check the
scheduletype:daily-- send every dayweekly-- send only on the matchingschedule_day(1=Monday through 7=Sunday)monthly-- send only when the current day of month matchesschedule_day
- Check
last_sent_at-- skip if already sent today (prevents duplicates from overlapping cron runs). - Generate the report content, send the email, and update
last_sent_atandlast_status.
@app.post("/cron/send-reports")
async def cron_send_reports():
"""Check and send any due reports."""
now = datetime.utcnow()
results = []
async with pool.acquire() as conn:
reports = await conn.fetch(
"SELECT * FROM wa_scheduled_reports "
"WHERE is_active = true")
for r in reports:
report = dict(r)
last_sent = report.get("last_sent_at")
# Already sent today? Skip.
if last_sent and last_sent.date() == now.date():
continue
# Parse schedule time
sched_h, sched_m = map(int,
report.get("schedule_time", "08:00").split(":"))
# Check 15-minute window
now_min = now.hour * 60 + now.minute
sched_min = sched_h * 60 + sched_m
if now_min < sched_min or now_min > sched_min + 15:
continue
# Check schedule type
should_send = False
if report["schedule"] == "daily":
should_send = True
elif report["schedule"] == "weekly":
should_send = (now.isoweekday() == report["schedule_day"])
elif report["schedule"] == "monthly":
should_send = (now.day == report["schedule_day"])
if should_send:
result = await _send_single_report(report)
results.append({"report": report["name"], **result})
return {"sent": results}
13. Connection Watchdog & Auto-Reconnect
WhatsApp connections via Baileys are not permanent. Phone battery dies, Wi-Fi drops, WhatsApp pushes protocol updates. The watchdog runs every 60 seconds and handles all of this automatically:
_watchdog_stats = {
"checks": 0,
"reconnects": 0,
"last_check": None,
"last_state": "unknown"
}
async def _watchdog_loop():
"""Background watchdog: auto-reconnect, sync groups, cleanup."""
await asyncio.sleep(10) # Let startup finish
log.info("Watchdog started")
while True:
try:
_watchdog_stats["checks"] += 1
_watchdog_stats["last_check"] = (
datetime.utcnow().isoformat())
# 1. Check WhatsApp connection
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{EVOLUTION_URL}/instance/connectionState"
f"/{EVOLUTION_INSTANCE}",
headers={"apikey": EVOLUTION_API_KEY})
if resp.status_code == 200:
state = (resp.json().get("instance", {})
.get("state", "unknown"))
_watchdog_stats["last_state"] = state
if state in ("close", "connecting"):
# Disconnected -- attempt reconnect
log.warning(f"WhatsApp disconnected "
f"(state={state}), reconnecting...")
await client.get(
f"{EVOLUTION_URL}/instance/connect"
f"/{EVOLUTION_INSTANCE}",
headers={"apikey": EVOLUTION_API_KEY})
_watchdog_stats["reconnects"] += 1
elif resp.status_code == 404:
# Instance was deleted -- recreate it
log.warning("Instance missing, recreating...")
await client.post(
f"{EVOLUTION_URL}/instance/create",
headers={
"apikey": EVOLUTION_API_KEY,
"Content-Type": "application/json"},
json={
"instanceName": EVOLUTION_INSTANCE,
"integration": "WHATSAPP-BAILEYS",
"qrcode": True,
"webhook": {
"url": ("http://172.18.0.1:8086"
"/webhook/evolution"),
"byEvents": False,
"base64": True,
"events": [
"MESSAGES_UPSERT",
"GROUPS_UPSERT",
"GROUP_PARTICIPANTS_UPDATE",
"CONNECTION_UPDATE",
"QRCODE_UPDATED"
]
}
})
_watchdog_stats["reconnects"] += 1
# 2. Periodic group sync (every 6 hours)
# Refreshes group names and participant counts
# from Evolution API
# 3. Webhook log cleanup (30-day retention)
async with pool.acquire() as conn:
await conn.execute(
"DELETE FROM wa_webhook_log "
"WHERE created_at < NOW() - INTERVAL '30 days'")
except asyncio.CancelledError:
return
except Exception as e:
log.error(f"Watchdog error: {e}")
await asyncio.sleep(60)
14. Production Tips & Lessons Learned
Handling Webhook Floods
When you first connect to a WhatsApp account that has been accumulating messages offline, Evolution API may dump hundreds of historical messages as webhooks in rapid succession. This can overwhelm your database if you are not careful.
Mitigations:
- Background task processing. Never process webhooks in the request handler. Return
200 OKimmediately and process asynchronously. ON CONFLICT DO NOTHING. Every INSERT uses this pattern, making message ingestion fully idempotent. Duplicate webhooks are silently ignored.- Connection pool sizing.
min_size=2, max_size=10handles bursts without exhausting PostgreSQL connections. - Rate-limit media downloads. Media download is the slowest part. Use
asyncio.create_task()so it runs in the background and does not block message processing.
Partition Maintenance
- Always create partitions ahead of time. If a message arrives for a month without a partition, PostgreSQL raises an error and the message is lost. The startup code creates 4 months ahead.
- Run partition creation on the 25th, not the 1st. This gives you 5 days of buffer in case the cron fails.
- To drop old data, detach the partition first, then drop:
-- Remove January 2025 data
ALTER TABLE wa_messages DETACH PARTITION wa_messages_2025_01;
DROP TABLE wa_messages_2025_01;
Media Storage
- Use date-based directories (
/media/{group_jid}/2026-03-12/) to keep the filesystem manageable. - Set a retention policy. The cron job deletes media older than 90 days. Text messages stay forever in PostgreSQL; only the binary files are cleaned up.
- Monitor disk usage. A single busy group sending photos can consume 500MB+ per month. Images average 100-300KB each; voice notes 50-200KB; videos 2-10MB.
Resend Free Tier Limits
- 3,000 emails per month, 100 per day.
- The
[email protected]test sender works without domain verification but may land in spam for some providers. - For production: Add your own domain in Resend (DNS verification takes 5 minutes), then update the
from_emailin the dashboard. - Each report email counts as 1 send per recipient. A daily report to 3 people = 3 sends = 90/month.
Security Considerations
- Store the Anthropic API key in a file (
/opt/wa-monitor/.api_key), not in environment variables. Files are less likely to leak into Docker logs or process listings. - The webhook endpoint has no authentication. This is intentional because Evolution API does not support webhook signing. Mitigate by binding port 8086 to localhost (
--host 127.0.0.1) if Evolution API is on the same machine. - The PHP proxy enforces admin auth. All dashboard requests to the FastAPI service go through
whatsapp-data.php, which checks admin permissions. - SMTP passwords are masked in the API. The
GET /settings/emailendpoint returns"********"instead of the real password. Updates preserve the existing password if the masked value is sent back.
Evolution API Stability
- Pin the version in docker-compose (
v2.3.7, notlatest). Baileys is a reverse-engineered protocol; updates can break things. - Expect disconnections. WhatsApp occasionally forces re-authentication. The watchdog handles this, but you may need to re-scan a QR code every few weeks.
- Do not use the monitoring number for personal messages. Evolution API exposes all messages to webhooks, including private chats. Use a dedicated phone number.
- Redis
maxmemorypolicy ofallkeys-lruprevents memory exhaustion. Set to 256MB for a typical deployment.
Performance at Scale
For reference, this architecture comfortably handles:
| Metric | Capacity |
|---|---|
| Groups monitored | 50+ |
| Messages per day | 10,000+ |
| Concurrent webhook processing | Limited by asyncpg pool (10 connections) |
| Full-text search speed | <100ms for most queries (GIN index) |
| AI summary generation | ~5 seconds per group per day (Haiku) |
| Email report generation | <10 seconds including AI fallback |
15. Complete File Reference
Service Files
| File | Purpose |
|---|---|
/opt/wa-monitor/service.py |
Main FastAPI application (webhook receiver, API endpoints, AI, email, watchdog) |
/opt/wa-monitor/schema.sql |
PostgreSQL schema (run once for initial setup) |
/opt/wa-monitor/.api_key |
Anthropic API key (plain text, chmod 600) |
/opt/wa-monitor/media/ |
Downloaded media files (date-organized) |
/opt/wa-monitor/venv/ |
Python virtual environment |
/opt/wa-monitor/service.log |
Application logs (stdout + stderr from systemd) |
Dashboard Files
| File | Purpose |
|---|---|
whatsapp.php |
Group monitor UI -- browse groups, messages, AI chat |
whatsapp-data.php |
AJAX proxy -- forwards browser requests to FastAPI |
whatsapp-ask.php |
AI analysis proxy -- forwards questions to Claude |
email-reports.php |
Report management UI -- SMTP config, report CRUD |
Configuration Files
| File | Purpose |
|---|---|
docker-compose.yml |
Docker stack (PostgreSQL, Evolution API, Redis) |
.env |
Environment variables (passwords, API keys) |
/etc/systemd/system/wa-monitor.service |
Systemd unit for FastAPI service |
Systemd Unit
[Unit]
Description=WhatsApp Monitor Service (FastAPI)
After=network.target docker.service
[Service]
Type=simple
User=root
WorkingDirectory=/opt/wa-monitor
ExecStart=/opt/wa-monitor/venv/bin/python3 -m uvicorn \
service:app --host 0.0.0.0 --port 8086 --workers 1
Restart=always
RestartSec=5
StandardOutput=append:/opt/wa-monitor/service.log
StandardError=append:/opt/wa-monitor/service.log
Environment=PYTHONUNBUFFERED=1
[Install]
WantedBy=multi-user.target
Python Dependencies
# requirements.txt
fastapi>=0.104.0
uvicorn>=0.24.0
asyncpg>=0.29.0
httpx>=0.25.0
requests>=2.31.0
Pillow>=10.0.0 # Image thumbnails
faster-whisper>=0.9.0 # Audio transcription (optional)
API Endpoints Reference
| Method | Endpoint | Purpose |
|---|---|---|
| POST | /webhook/evolution |
Receive Evolution API webhooks |
| GET | /health |
Health check with DB, WhatsApp, and watchdog status |
| GET | /metrics |
Prometheus metrics |
| GET | /groups |
List all monitored groups with stats |
| GET | /groups/{jid}/messages |
Paginated messages with filters |
| GET | /groups/{jid}/stats |
Top senders, hourly/daily activity |
| POST | /groups/sync |
Force group sync from Evolution API |
| GET | /search?q=term |
Full-text search across all groups |
| GET | /media/{path} |
Serve downloaded media files |
| POST | /ask |
AI-powered question answering |
| POST | /summarize/{jid} |
Generate/retrieve daily summary |
| GET | /settings/email |
Get email configuration |
| POST | /settings/email |
Save email configuration |
| POST | /settings/email/test |
Send test email |
| GET | /reports |
List scheduled reports |
| POST | /reports |
Create a report |
| PUT | /reports/{id} |
Update a report |
| DELETE | /reports/{id} |
Delete a report |
| POST | /reports/{id}/send |
Send a report immediately |
| GET | /connection/status |
WhatsApp connection state |
| GET | /connection/qrcode |
Get QR code for linking |
| POST | /connection/create |
Create/reconnect instance |
| POST | /cron/daily-summaries |
Generate all daily summaries |
| POST | /cron/create-partitions |
Create upcoming partitions |
| POST | /cron/send-reports |
Check and send due reports |
Quick Start Checklist
- Install Docker, Python 3.11+, ffmpeg
- Create
docker-compose.ymlwith PostgreSQL, Evolution API, Redis - Create
.envwithPOSTGRES_PASSWORDandEVOLUTION_API_KEY -
docker compose up -d - Create the
whatsapp_monitordatabase and runschema.sql - Set up the Python venv and install dependencies
- Save your Anthropic API key to
/opt/wa-monitor/.api_key - Deploy
service.pyand install the systemd unit -
systemctl enable --now wa-monitor - Create the Evolution API instance and scan the QR code
- Configure Resend API key in the dashboard (SMTP host =
resend) - Create your first scheduled report
- Add cron jobs for daily summaries, partition creation, media cleanup, report sending
- Add Prometheus scrape target for
/metrics - Wait for messages to flow in and enjoy your morning summary email
Built with Evolution API, FastAPI, PostgreSQL, Claude AI, and Resend. No official WhatsApp Business API required.