← All Tutorials

WhatsApp Group Monitor with AI Summaries & Scheduled Email Reports

Infrastructure & DevOps Advanced 38 min read #07

WhatsApp Group Monitor with AI Summaries & Scheduled Email Reports

Build a complete WhatsApp monitoring platform using Evolution API, FastAPI, PostgreSQL, Claude AI, and Resend -- no official WhatsApp Business API required.


Table of Contents

  1. Why Monitor WhatsApp Groups?
  2. Architecture Overview
  3. Prerequisites
  4. Evolution API: Docker Setup
  5. PostgreSQL Schema with Table Partitioning
  6. FastAPI Webhook Receiver
  7. Message Processing Pipeline
  8. AI-Powered Daily Summaries
  9. Email Reports with Resend
  10. Dashboard UI (PHP)
  11. Prometheus Metrics Endpoint
  12. Cron Jobs & Automation
  13. Connection Watchdog & Auto-Reconnect
  14. Production Tips & Lessons Learned
  15. Complete File Reference

1. Why Monitor WhatsApp Groups?

WhatsApp is the de facto communication channel for teams across Europe, Latin America, Africa, and Asia. For businesses that run distributed operations -- call centers, field teams, multi-office setups -- critical information flows through WhatsApp groups every day:

The problem: WhatsApp has no built-in search across groups, no analytics, no export, no way for management to see what happened yesterday without scrolling through hundreds of messages on a phone.

What this system solves:

Problem Solution
"What did the team discuss yesterday?" AI-generated daily summaries, delivered by email
"Which groups went silent?" Activity alerts flag zero-message groups
"Who are the most active participants?" Per-group stats with top senders
"Find that message from last month" Full-text search across all groups
"I need a weekly report for the board" Scheduled digest emails with stats and highlights
"We need to keep records for compliance" All messages stored in PostgreSQL with timestamps

Cost: Essentially free. Evolution API is open-source. PostgreSQL is free. Resend gives you 3,000 emails/month on the free tier. The only paid component is the Claude API for summaries, which costs roughly $0.02-0.05 per group per day using Haiku.


2. Architecture Overview

                                    ┌─────────────────────────┐
                                    │      Your Phone         │
                                    │    (WhatsApp linked)    │
                                    └────────┬────────────────┘
                                             │ Baileys Protocol
                                             │ (WebSocket)
┌─────────────────────────────────────────────┼──────────────────────────────┐
│  Docker Network (monitoring)                │                              │
│                                             │                              │
│  ┌──────────────────────┐                   │                              │
│  │   Redis 7 Alpine     │◄──cache──┐        │                              │
│  │   (session store)    │          │        │                              │
│  └──────────────────────┘          │        │                              │
│                                    │        ▼                              │
│  ┌──────────────────────┐    ┌─────┴────────────────┐                      │
│  │  PostgreSQL 16       │◄───│   Evolution API      │                      │
│  │  (evolution DB)      │    │   v2.x (Baileys)     │                      │
│  │  (whatsapp_monitor)  │    │   Port 8080          │                      │
│  └──────────┬───────────┘    └─────────┬────────────┘                      │
│             │                          │ Webhook POST                      │
│             │                          │ /webhook/evolution                │
│             │                          ▼                                   │
│             │              ┌───────────────────────┐                       │
│             │              │   FastAPI Service      │                       │
│             │◄─── asyncpg ─│   (wa-monitor)        │──► Claude API         │
│             │              │   Port 8086            │    (Haiku/Sonnet)     │
│             │              │   systemd managed      │                       │
│             │              └───────────┬────────────┘                       │
│             │                          │                                   │
│             │                  ┌───────┼────────┐                          │
│             │                  │       │        │                          │
│             │                  ▼       ▼        ▼                          │
│             │            /metrics  /health  /reports                       │
│             │                  │                 │                          │
│  ┌──────────┴───┐              │                 │                          │
│  │  Prometheus   │◄── scrape ──┘                 │                          │
│  └──────┬───────┘                                │                          │
│         │                                        ▼                          │
│  ┌──────┴───────┐                    ┌──────────────────┐                  │
│  │   Grafana     │                    │  Resend API      │                  │
│  │   Dashboards  │                    │  (email delivery)│                  │
│  └──────────────┘                    └──────────────────┘                  │
│                                                                            │
│  ┌──────────────────────────────────────────────────────┐                  │
│  │   PHP Dashboard (Apache, port 8082)                   │                  │
│  │   whatsapp.php ── Group monitor, messages, AI chat    │                  │
│  │   email-reports.php ── Report CRUD, SMTP config       │                  │
│  │   whatsapp-data.php ── AJAX proxy to FastAPI          │                  │
│  │   whatsapp-ask.php ── AI analysis proxy               │                  │
│  └──────────────────────────────────────────────────────┘                  │
└────────────────────────────────────────────────────────────────────────────┘

Data flow summary:

  1. Your phone stays connected to WhatsApp via Evolution API's Baileys integration (open-source reimplementation of the WhatsApp Web protocol).
  2. Every group message triggers a webhook POST to the FastAPI service.
  3. FastAPI parses the message, stores it in a partitioned PostgreSQL table, and optionally downloads media.
  4. A nightly cron job generates AI summaries for each group using Claude Haiku.
  5. Every 15 minutes, the report scheduler checks for due email reports and sends them via Resend.
  6. The PHP dashboard provides a web UI for browsing groups, searching messages, and managing reports.

3. Prerequisites

Server requirements:

Accounts needed:

Install system dependencies:

# Docker
curl -fsSL https://get.docker.com | sh

# Python 3.12 + venv
apt update && apt install -y python3.12 python3.12-venv python3-pip ffmpeg

# Optional: for image thumbnail generation
pip3 install Pillow

4. Evolution API: Docker Setup

Evolution API is the bridge between WhatsApp and your application. It uses the Baileys library (a reverse-engineered WhatsApp Web client) to maintain a persistent connection -- no official API approval, no Facebook Business verification, no monthly fees.

Docker Compose Configuration

Create your docker-compose.yml. The key services are Evolution API, Redis (for session caching), and PostgreSQL (shared with your other services):

version: "3.8"

services:
  # PostgreSQL — shared database for Evolution API + your monitor
  postgres:
    image: postgres:16-alpine
    container_name: postgres
    restart: unless-stopped
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 10s
      timeout: 5s
      retries: 5
    networks:
      - monitoring

  # Evolution API — WhatsApp connection via Baileys
  evolution-api:
    image: evoapicloud/evolution-api:v2.3.7
    container_name: evolution-api
    restart: unless-stopped
    ports:
      - "8080:8080"
    environment:
      # Server
      - SERVER_URL=http://YOUR_SERVER_IP:8080
      - AUTHENTICATION_API_KEY=${EVOLUTION_API_KEY}
      # Database persistence
      - DATABASE_ENABLED=true
      - DATABASE_PROVIDER=postgresql
      - DATABASE_CONNECTION_URI=postgresql://postgres:${POSTGRES_PASSWORD}@postgres:5432/evolution
      - DATABASE_SAVE_DATA_INSTANCE=true
      - DATABASE_SAVE_DATA_NEW_MESSAGE=true
      - DATABASE_SAVE_DATA_CONTACTS=true
      - DATABASE_SAVE_DATA_CHATS=true
      # Redis caching
      - CACHE_REDIS_ENABLED=true
      - CACHE_REDIS_URI=redis://redis:6379/0
      # Logging
      - LOG_LEVEL=WARN
      # Global webhook — sends all events to your FastAPI service
      - WEBHOOK_GLOBAL_ENABLED=true
      - WEBHOOK_GLOBAL_URL=http://172.18.0.1:8086/webhook/evolution
      - WEBHOOK_GLOBAL_WEBHOOK_BY_EVENTS=false
      - WEBHOOK_EVENTS_MESSAGES_UPSERT=true
      - WEBHOOK_EVENTS_GROUPS_UPSERT=true
      - WEBHOOK_EVENTS_GROUP_PARTICIPANTS_UPDATE=true
      - WEBHOOK_EVENTS_CONNECTION_UPDATE=true
      - WEBHOOK_EVENTS_QRCODE_UPDATED=true
      # Disable unused integrations
      - TYPEBOT_ENABLED=false
      - CHATWOOT_ENABLED=false
      - OPENAI_ENABLED=false
      - DIFY_ENABLED=false
      - S3_ENABLED=false
    volumes:
      - evolution_instances:/evolution/instances
    depends_on:
      postgres:
        condition: service_healthy
      redis:
        condition: service_healthy
    networks:
      - monitoring

  # Redis — session and cache store for Evolution API
  redis:
    image: redis:7-alpine
    container_name: redis
    restart: unless-stopped
    command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru
    volumes:
      - redis_data:/data
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 3s
      retries: 5
    networks:
      - monitoring

volumes:
  postgres_data:
  evolution_instances:
  redis_data:

networks:
  monitoring:
    driver: bridge

Environment File

Create a .env file alongside your compose file:

POSTGRES_PASSWORD=your_secure_pg_password_here
EVOLUTION_API_KEY=your_evo_api_key_here

Starting the Stack

docker compose up -d
docker compose logs -f evolution-api  # Watch for startup

Creating a WhatsApp Instance and Linking Your Phone

Once Evolution API is running, create an instance and scan the QR code:

# Create instance
curl -s -X POST http://localhost:8080/instance/create \
  -H "apikey: YOUR_EVOLUTION_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "instanceName": "wa-monitor",
    "integration": "WHATSAPP-BAILEYS",
    "qrcode": true,
    "webhook": {
      "url": "http://172.18.0.1:8086/webhook/evolution",
      "byEvents": false,
      "base64": true,
      "events": [
        "MESSAGES_UPSERT",
        "GROUPS_UPSERT",
        "GROUP_PARTICIPANTS_UPDATE",
        "CONNECTION_UPDATE",
        "QRCODE_UPDATED"
      ]
    }
  }'

# Get QR code (returns base64-encoded image)
curl -s http://localhost:8080/instance/connect/wa-monitor \
  -H "apikey: YOUR_EVOLUTION_API_KEY" | jq -r '.base64'

Open WhatsApp on your phone, go to Linked Devices, and scan the QR code. The connection persists across restarts because Evolution API stores the session in PostgreSQL.

Important: The webhook URL uses 172.18.0.1 because Evolution API runs inside Docker but needs to reach your FastAPI service running on the host. This is the Docker bridge gateway IP. Adjust if your Docker network uses a different subnet.

Verifying the Connection

curl -s http://localhost:8080/instance/connectionState/wa-monitor \
  -H "apikey: YOUR_EVOLUTION_API_KEY"

# Expected: {"instance":{"instanceName":"wa-monitor","state":"open"}}

5. PostgreSQL Schema with Table Partitioning

The message volume from active WhatsApp groups can grow quickly. A single group of 50 people can produce 500+ messages per day. Multiply by 10-20 groups, add media, and you are looking at millions of rows within months.

Table partitioning solves this by splitting wa_messages into monthly chunks. Each month gets its own physical table, which means:

Schema Design

-- Run inside your PostgreSQL container
CREATE DATABASE whatsapp_monitor;
\c whatsapp_monitor

-- ── Groups ──
CREATE TABLE wa_groups (
    jid TEXT PRIMARY KEY,           -- WhatsApp JID: [email protected]
    name TEXT NOT NULL DEFAULT '',
    description TEXT DEFAULT '',
    participant_count INTEGER DEFAULT 0,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW(),
    is_monitored BOOLEAN DEFAULT true
);

-- ── Participants ──
CREATE TABLE wa_participants (
    id SERIAL PRIMARY KEY,
    group_jid TEXT NOT NULL REFERENCES wa_groups(jid),
    phone TEXT NOT NULL,
    push_name TEXT DEFAULT '',
    is_admin BOOLEAN DEFAULT false,
    first_seen TIMESTAMPTZ DEFAULT NOW(),
    last_seen TIMESTAMPTZ DEFAULT NOW(),
    UNIQUE(group_jid, phone)
);

-- ── Messages (partitioned by month) ──
CREATE TABLE wa_messages (
    id TEXT NOT NULL,                -- WhatsApp message ID
    group_jid TEXT NOT NULL,         -- Group JID
    sender_phone TEXT NOT NULL,      -- Sender phone number
    sender_name TEXT DEFAULT '',     -- Push name (display name)
    message_type TEXT NOT NULL DEFAULT 'conversation',
    content TEXT DEFAULT '',
    -- Full-text search vector, auto-generated from content
    content_search TSVECTOR GENERATED ALWAYS AS (
        to_tsvector('simple', coalesce(content, ''))
    ) STORED,
    media_type TEXT,                 -- image, audio, video, document
    media_path TEXT,                 -- Local file path after download
    media_thumb_path TEXT,           -- Thumbnail path
    media_mime TEXT,                 -- MIME type
    media_size INTEGER,             -- File size in bytes
    media_duration INTEGER,         -- Audio/video duration in seconds
    reply_to_id TEXT,               -- Quoted message ID
    timestamp TIMESTAMPTZ NOT NULL, -- Original WhatsApp timestamp
    raw_payload JSONB,              -- Full webhook payload for debugging
    created_at TIMESTAMPTZ DEFAULT NOW(),
    PRIMARY KEY (id, timestamp)     -- Composite PK required for partitioning
) PARTITION BY RANGE (timestamp);

-- Create partitions for current + next 3 months
-- (Replace 2026 with your current year)
CREATE TABLE wa_messages_2026_03 PARTITION OF wa_messages
    FOR VALUES FROM ('2026-03-01') TO ('2026-04-01');
CREATE TABLE wa_messages_2026_04 PARTITION OF wa_messages
    FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');
CREATE TABLE wa_messages_2026_05 PARTITION OF wa_messages
    FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');
CREATE TABLE wa_messages_2026_06 PARTITION OF wa_messages
    FOR VALUES FROM ('2026-06-01') TO ('2026-07-01');

-- Performance indexes
CREATE INDEX idx_wa_msg_group_ts ON wa_messages (group_jid, timestamp DESC);
CREATE INDEX idx_wa_msg_sender ON wa_messages (sender_phone, timestamp DESC);
CREATE INDEX idx_wa_msg_search ON wa_messages USING GIN (content_search);

-- ── AI Summary Cache ──
CREATE TABLE wa_daily_summaries (
    id SERIAL PRIMARY KEY,
    group_jid TEXT NOT NULL REFERENCES wa_groups(jid),
    summary_date DATE NOT NULL,
    message_count INTEGER DEFAULT 0,
    summary TEXT NOT NULL,
    model TEXT DEFAULT 'haiku',
    created_at TIMESTAMPTZ DEFAULT NOW(),
    UNIQUE(group_jid, summary_date)
);

-- ── Email Configuration (singleton row) ──
CREATE TABLE wa_email_config (
    id INTEGER PRIMARY KEY DEFAULT 1,
    smtp_host TEXT DEFAULT '',      -- 'resend' for Resend API mode
    smtp_port INTEGER DEFAULT 587,
    smtp_user TEXT DEFAULT '',
    smtp_pass TEXT DEFAULT '',      -- API key for Resend mode
    smtp_tls BOOLEAN DEFAULT true,
    from_email TEXT DEFAULT '',
    from_name TEXT DEFAULT 'WhatsApp Monitor',
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- ── Scheduled Reports ──
CREATE TABLE wa_scheduled_reports (
    id SERIAL PRIMARY KEY,
    name TEXT NOT NULL,
    report_type TEXT NOT NULL DEFAULT 'daily_summary',
    schedule TEXT NOT NULL DEFAULT 'daily',      -- daily, weekly, monthly
    schedule_time TEXT DEFAULT '08:00',           -- HH:MM (UTC)
    schedule_day INTEGER DEFAULT 1,              -- Day of week (1=Mon) or month
    recipients TEXT NOT NULL DEFAULT '',          -- Comma-separated emails
    group_filter TEXT DEFAULT '',                 -- Comma-separated JIDs, empty = all
    include_stats BOOLEAN DEFAULT true,
    is_active BOOLEAN DEFAULT true,
    last_sent_at TIMESTAMPTZ,
    last_status TEXT DEFAULT '',
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- ── Webhook Debug Log ──
CREATE TABLE wa_webhook_log (
    id BIGSERIAL PRIMARY KEY,
    event_type TEXT NOT NULL,
    instance TEXT,
    payload JSONB NOT NULL,
    processed BOOLEAN DEFAULT false,
    created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_wa_webhook_unprocessed
    ON wa_webhook_log (processed) WHERE NOT processed;

Why This Partitioning Strategy Works

Range partitioning by month is the right choice here because:

  1. Queries are almost always time-bounded. "Show me yesterday's messages" or "summarize last week" only touch 1-2 partitions.
  2. Data retention is simple. To drop messages older than 12 months: ALTER TABLE wa_messages DETACH PARTITION wa_messages_2025_01; DROP TABLE wa_messages_2025_01;
  3. The generated TSVECTOR column (content_search) is inherited by each partition, giving you full-text search with GIN indexes that are partition-local (smaller, faster).
  4. The composite primary key (id, timestamp) is required by PostgreSQL -- the partition key must be part of any unique constraint. This is fine because WhatsApp message IDs are globally unique within a reasonable time window.

Auto-Creating Partitions

The service creates partitions on startup and via cron. Here is the logic:

async def _ensure_partitions(conn):
    """Create partitions for current month + next 3 months."""
    now = datetime.utcnow()
    for i in range(4):
        m = now.month + i
        y = now.year + (m - 1) // 12
        m = ((m - 1) % 12) + 1
        nm = m + 1
        ny = y + (nm - 1) // 12
        nm = ((nm - 1) % 12) + 1
        part_name = f"wa_messages_{y}_{m:02d}"
        start = f"{y}-{m:02d}-01"
        end = f"{ny}-{nm:02d}-01"
        await conn.execute(f"""
            CREATE TABLE IF NOT EXISTS {part_name}
            PARTITION OF wa_messages
            FOR VALUES FROM ('{start}') TO ('{end}')
        """)

This runs on every service startup and on the 25th of each month via cron, ensuring the next month's partition always exists before it is needed.


6. FastAPI Webhook Receiver

The webhook receiver is the core of the system. It receives POST requests from Evolution API for every event (messages, group updates, participant changes, connection state) and processes them asynchronously.

Service Structure

#!/usr/bin/env python3
"""
WhatsApp Group Monitor -- FastAPI Service
Port 8086. Receives Evolution API webhooks, stores to PostgreSQL.
"""

import os
import re
import json
import time
import logging
import asyncio
from datetime import datetime, date, timedelta
from pathlib import Path
from typing import Optional

import asyncpg
import httpx
from fastapi import FastAPI, HTTPException, Query, BackgroundTasks, Request
from fastapi.responses import JSONResponse, FileResponse

# ── Config ──
PG_DSN = "postgresql://postgres:YOUR_PG_PASSWORD@POSTGRES_HOST:5432/whatsapp_monitor"
EVOLUTION_URL = "http://localhost:8080"
EVOLUTION_API_KEY = os.environ.get("EVOLUTION_API_KEY", "YOUR_EVO_KEY")
ANTHROPIC_KEY_FILE = "/opt/wa-monitor/.api_key"
MEDIA_DIR = Path("/opt/wa-monitor/media")
EVOLUTION_INSTANCE = "wa-monitor"
MEDIA_DIR.mkdir(parents=True, exist_ok=True)

logging.basicConfig(level=logging.INFO,
                    format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("wa-monitor")

app = FastAPI(title="WhatsApp Monitor", version="1.0")
pool: Optional[asyncpg.Pool] = None
_startup_time = time.time()

Connection Pool and Startup

Using asyncpg for async PostgreSQL access with a connection pool:

@app.on_event("startup")
async def startup():
    global pool
    pool = await asyncpg.create_pool(PG_DSN, min_size=2, max_size=10)
    log.info("PostgreSQL pool created")
    await _ensure_schema()
    # Sync groups from Evolution API on startup
    asyncio.create_task(_sync_groups_from_evolution())
    # Start background watchdog
    asyncio.create_task(_watchdog_loop())


@app.on_event("shutdown")
async def shutdown():
    if pool:
        await pool.close()

The Webhook Endpoint

The critical design decision: accept fast, process later. The webhook endpoint logs the raw payload and processes the message in a background task. This ensures Evolution API never times out waiting for your response:

@app.post("/webhook/evolution")
async def webhook_evolution(request: Request,
                            background_tasks: BackgroundTasks):
    """Receive Evolution API webhooks. Store raw, process in background."""
    try:
        body = await request.json()
    except Exception:
        raise HTTPException(400, "Invalid JSON")

    event = body.get("event", "unknown")
    instance = body.get("instance", "")

    # Log raw webhook for debugging
    async with pool.acquire() as conn:
        await conn.execute(
            """INSERT INTO wa_webhook_log (event_type, instance, payload)
               VALUES ($1, $2, $3)""",
            event, instance, json.dumps(body)
        )

    # Process in background -- return 200 immediately
    background_tasks.add_task(_process_webhook, event, instance, body)

    return {"status": "ok"}

Webhook Routing

Different events get different handlers:

async def _process_webhook(event: str, instance: str, body: dict):
    """Route webhook events to appropriate handlers."""
    try:
        data = body.get("data", {})

        if event == "messages.upsert":
            await _process_message(instance, data)
        elif event == "groups.upsert":
            await _process_group_upsert(data)
        elif event == "group-participants.update":
            await _process_participant_update(data)
        elif event == "connection.update":
            state = data.get("state", "")
            log.info(f"Connection update: {instance} -> {state}")
        elif event == "qrcode.updated":
            log.info(f"QR code updated for {instance}")

    except Exception as e:
        log.error(f"Webhook processing error ({event}): {e}",
                  exc_info=True)

7. Message Processing Pipeline

Parsing WhatsApp Message Types

WhatsApp messages come in many flavors. The Evolution API webhook payload nests the content differently for each type. Here is how to handle them all:

async def _process_message(instance: str, data: dict):
    """Parse an incoming message and store it."""
    key = data.get("key", {})
    msg_id = key.get("id", "")
    remote_jid = key.get("remoteJid", "")

    # Only process group messages (JIDs ending in @g.us)
    if not remote_jid.endswith("@g.us"):
        return

    # Extract sender
    participant = (key.get("participant", "")
                   or data.get("participant", ""))
    sender_phone = participant.split("@")[0] if participant else ""
    sender_name = data.get("pushName", "")

    # Extract message content based on type
    message = data.get("message", {})
    msg_type = "conversation"
    content = ""
    media_type = None
    media_mime = None

    if "conversation" in message:
        content = message["conversation"]
        msg_type = "conversation"
    elif "extendedTextMessage" in message:
        content = message["extendedTextMessage"].get("text", "")
        msg_type = "extendedText"
    elif "imageMessage" in message:
        msg_type = "image"
        media_type = "image"
        content = message["imageMessage"].get("caption", "")
        media_mime = message["imageMessage"].get("mimetype", "image/jpeg")
    elif "audioMessage" in message:
        msg_type = "audio"
        media_type = "audio"
        media_mime = message["audioMessage"].get("mimetype", "audio/ogg")
    elif "videoMessage" in message:
        msg_type = "video"
        media_type = "video"
        content = message["videoMessage"].get("caption", "")
        media_mime = message["videoMessage"].get("mimetype", "video/mp4")
    elif "documentMessage" in message:
        msg_type = "document"
        media_type = "document"
        fname = message["documentMessage"].get("fileName", "")
        content = f"[Document: {fname}]" if fname else "[Document]"
        media_mime = message["documentMessage"].get("mimetype", "")
    elif "stickerMessage" in message:
        msg_type = "sticker"
        content = "[Sticker]"
    elif "reactionMessage" in message:
        msg_type = "reaction"
        content = message["reactionMessage"].get("text", "")
    elif "contactMessage" in message:
        msg_type = "contact"
        display = message["contactMessage"].get("displayName", "")
        content = f"[Contact: {display}]"
    elif "locationMessage" in message:
        msg_type = "location"
        lat = message["locationMessage"].get("degreesLatitude", "")
        lon = message["locationMessage"].get("degreesLongitude", "")
        content = f"[Location: {lat}, {lon}]"
    else:
        msg_type = list(message.keys())[0] if message else "unknown"

    # Parse timestamp
    ts_epoch = data.get("messageTimestamp", 0)
    if isinstance(ts_epoch, str):
        ts_epoch = int(ts_epoch)
    ts = (datetime.utcfromtimestamp(ts_epoch)
          if ts_epoch else datetime.utcnow())

    # Extract reply context
    reply_to = None
    for msg_key in ("extendedTextMessage", "imageMessage", "videoMessage"):
        ctx = message.get(msg_key, {}).get("contextInfo", {})
        if ctx:
            reply_to = ctx.get("stanzaId")
            break

    # Ensure group exists in our database
    await _ensure_group(remote_jid)

    # Insert message (idempotent via ON CONFLICT)
    async with pool.acquire() as conn:
        await conn.execute("""
            INSERT INTO wa_messages
                (id, group_jid, sender_phone, sender_name, message_type,
                 content, media_type, media_mime, reply_to_id,
                 timestamp, raw_payload)
            VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
            ON CONFLICT (id, timestamp) DO NOTHING
        """, msg_id, remote_jid, sender_phone, sender_name, msg_type,
             content, media_type, media_mime, reply_to, ts,
             json.dumps(data))

    # Update participant tracking
    if sender_phone:
        await _upsert_participant(remote_jid, sender_phone, sender_name)

    # Download media in background
    if media_type in ("image", "audio", "video", "document"):
        asyncio.create_task(_download_and_process_media(
            instance, data, msg_id, remote_jid,
            media_type, media_mime, ts
        ))

    log.info(f"Stored: {msg_id} in {remote_jid} "
             f"from {sender_phone} ({msg_type})")

Group and Participant Tracking

async def _ensure_group(jid: str, name: str = ""):
    """Insert group if not exists."""
    async with pool.acquire() as conn:
        await conn.execute("""
            INSERT INTO wa_groups (jid, name)
            VALUES ($1, $2)
            ON CONFLICT (jid) DO NOTHING
        """, jid, name)


async def _upsert_participant(group_jid: str, phone: str,
                               push_name: str = ""):
    """Track participant activity."""
    async with pool.acquire() as conn:
        await conn.execute("""
            INSERT INTO wa_participants
                (group_jid, phone, push_name, last_seen)
            VALUES ($1, $2, $3, NOW())
            ON CONFLICT (group_jid, phone) DO UPDATE SET
                push_name = CASE
                    WHEN EXCLUDED.push_name != ''
                    THEN EXCLUDED.push_name
                    ELSE wa_participants.push_name
                END,
                last_seen = NOW()
        """, group_jid, phone, push_name)

Media Download and Processing

Media files are downloaded from Evolution API, stored locally in date-based directories, and optionally processed (thumbnails for images, duration extraction for audio/video):

async def _download_and_process_media(instance, data, msg_id,
                                       group_jid, media_type,
                                       media_mime, ts):
    """Download media via Evolution API, generate thumbnails."""
    try:
        key = data.get("key", {})
        message = data.get("message", {})

        # Date-based directory structure
        date_dir = MEDIA_DIR / group_jid / ts.strftime("%Y-%m-%d")
        date_dir.mkdir(parents=True, exist_ok=True)

        # Map MIME types to file extensions
        ext_map = {
            "image/jpeg": ".jpg", "image/png": ".png",
            "image/webp": ".webp", "audio/ogg": ".ogg",
            "audio/ogg; codecs=opus": ".ogg", "audio/mp4": ".m4a",
            "video/mp4": ".mp4", "application/pdf": ".pdf",
        }
        ext = ext_map.get(media_mime, ".bin")
        media_path = date_dir / f"{msg_id}{ext}"

        # Download via Evolution API's base64 endpoint
        async with httpx.AsyncClient(timeout=60) as client:
            resp = await client.post(
                f"{EVOLUTION_URL}/chat/getBase64FromMediaMessage"
                f"/{instance}",
                json={"message": {"key": key, "message": message}},
                headers={"apikey": EVOLUTION_API_KEY}
            )
            if resp.status_code != 200:
                log.warning(f"Media download failed: {msg_id}")
                return

            import base64
            b64_data = resp.json().get("base64", "")
            if not b64_data:
                return
            raw = base64.b64decode(b64_data)
            media_path.write_bytes(raw)

        # Update database with file path and size
        async with pool.acquire() as conn:
            await conn.execute("""
                UPDATE wa_messages
                SET media_path = $1, media_size = $2
                WHERE id = $3 AND timestamp = $4
            """, str(media_path), len(raw), msg_id, ts)

        log.info(f"Media saved: {msg_id} ({len(raw)} bytes)")

    except Exception as e:
        log.error(f"Media processing failed: {msg_id}: {e}")

8. AI-Powered Daily Summaries

This is where the system becomes genuinely valuable. Instead of reading 300 messages from yesterday's team chat, you get a 4-paragraph summary highlighting decisions, action items, and key discussions.

Claude API Integration

def _get_api_key() -> str:
    """Read Anthropic API key from file."""
    key_file = Path(ANTHROPIC_KEY_FILE)
    if key_file.exists():
        key = key_file.read_text().strip()
        if key:
            return key
    raise RuntimeError("Anthropic API key not configured")


def _call_claude(messages: list, model: str = "claude-haiku-4-5-20251001",
                 max_tokens: int = 2000, system: str = "") -> str:
    """Call Claude API and return the response text."""
    import requests
    api_key = _get_api_key()
    body = {
        "model": model,
        "max_tokens": max_tokens,
        "messages": messages,
    }
    if system:
        body["system"] = system

    resp = requests.post(
        "https://api.anthropic.com/v1/messages",
        headers={
            "x-api-key": api_key,
            "content-type": "application/json",
            "anthropic-version": "2023-06-01",
        },
        json=body,
        timeout=120,
    )
    if resp.status_code != 200:
        err = resp.json().get("error", {}).get("message",
                                                resp.text[:300])
        raise RuntimeError(f"API error ({resp.status_code}): {err}")
    return resp.json()["content"][0]["text"].strip()

The Summary Generation Endpoint

@app.post("/summarize/{jid}")
async def summarize_group(jid: str, target_date: str = ""):
    """Generate or retrieve a daily summary for a group."""
    if not target_date:
        target_date = (date.today() - timedelta(days=1)).isoformat()

    summary_date = date.fromisoformat(target_date)

    # Check cache first
    async with pool.acquire() as conn:
        existing = await conn.fetchrow(
            """SELECT summary, message_count, model
               FROM wa_daily_summaries
               WHERE group_jid = $1 AND summary_date = $2""",
            jid, summary_date
        )
        if existing:
            return {
                "summary": existing["summary"],
                "message_count": existing["message_count"],
                "cached": True,
            }

        # Fetch messages for the target date
        messages = await conn.fetch("""
            SELECT sender_name, sender_phone, content,
                   message_type, timestamp
            FROM wa_messages
            WHERE group_jid = $1 AND timestamp::date = $2
            ORDER BY timestamp ASC
        """, jid, summary_date)

        if not messages:
            return {"summary": "No messages on this date.",
                    "message_count": 0, "cached": False}

        group = await conn.fetchrow(
            "SELECT name FROM wa_groups WHERE jid = $1", jid)
        group_name = group["name"] if group else jid

    # Build the prompt
    msg_text = []
    for m in messages:
        name = m["sender_name"] or m["sender_phone"]
        ts = m["timestamp"].strftime("%H:%M")
        content = m["content"] or f"[{m['message_type']}]"
        msg_text.append(f"[{ts}] {name}: {content}")

    prompt = f"""Summarize the following WhatsApp group messages \
from "{group_name}" on {target_date}.

Group by topic/thread. Highlight any decisions, action items, \
or important announcements.
Be concise (2-4 paragraphs max). If messages are in a \
non-English language, summarize in English.

Messages:
{chr(10).join(msg_text)}"""

    summary = _call_claude(
        [{"role": "user", "content": prompt}],
        model="claude-haiku-4-5-20251001",
        max_tokens=1000
    )

    # Cache the summary
    async with pool.acquire() as conn:
        await conn.execute("""
            INSERT INTO wa_daily_summaries
                (group_jid, summary_date, message_count, summary, model)
            VALUES ($1, $2, $3, $4, 'haiku')
            ON CONFLICT (group_jid, summary_date) DO UPDATE SET
                summary = EXCLUDED.summary,
                message_count = EXCLUDED.message_count
        """, jid, summary_date, len(messages), summary)

    return {"summary": summary, "message_count": len(messages),
            "cached": False}

Interactive AI Analysis

Beyond daily summaries, the system supports freeform questions about group conversations. This uses Claude Sonnet for deeper analysis:

WA_SYSTEM_PROMPT = """You are a WhatsApp group analysis assistant. \
You have access to WhatsApp group messages from staff groups.

Your job: Summarize discussions, identify key topics, flag important \
items (decisions, deadlines, complaints, requests), and answer \
questions about group conversations.

When analyzing messages:
- Group by topic/thread when summarizing
- Highlight action items and decisions
- Note who said what when relevant
- Flag urgent or important messages
- Be concise but thorough
- Use sender names when available, phone numbers when not
- Present information in a clear, structured format
- If messages are in a non-English language, provide analysis in English

Format your response in markdown."""


@app.post("/ask")
async def ask_ai(payload: dict):
    """AI analysis endpoint -- answers questions about messages."""
    question = payload.get("question", "").strip()
    group_jid = payload.get("group_jid", "")

    # Gather context: recent messages from relevant group(s)
    context_parts = []
    async with pool.acquire() as conn:
        if group_jid:
            messages = await conn.fetch("""
                SELECT sender_name, sender_phone, content,
                       message_type, timestamp
                FROM wa_messages
                WHERE group_jid = $1
                  AND timestamp >= CURRENT_DATE - INTERVAL '7 days'
                ORDER BY timestamp ASC LIMIT 500
            """, group_jid)
            for m in messages:
                name = m["sender_name"] or m["sender_phone"]
                ts = m["timestamp"].strftime("%Y-%m-%d %H:%M")
                content = m["content"] or f"[{m['message_type']}]"
                context_parts.append(f"[{ts}] {name}: {content}")

    context = "\n".join(context_parts)
    user_msg = (f"Here is the WhatsApp message data:\n\n{context}"
                f"\n\n---\n\nQuestion: {question}")

    answer = _call_claude(
        [{"role": "user", "content": user_msg}],
        model="claude-sonnet-4-6",
        max_tokens=3000,
        system=WA_SYSTEM_PROMPT
    )
    return {"answer": answer, "model": "claude-sonnet-4.6"}

AI Prompt Engineering Tips

Getting good summaries requires careful prompting. Here is what works:

  1. Always include timestamps. Without them, the AI cannot distinguish morning discussions from afternoon follow-ups.
  2. Include sender names. "Marco said we need to hire 3 more agents" is far more useful than "Someone said we need to hire."
  3. Request topic grouping. Without this instruction, the AI tends to produce a chronological retelling rather than a thematic summary.
  4. Specify the output language. If your groups use Italian, Albanian, or Spanish, explicitly ask for English summaries.
  5. Cap the token budget. Haiku with max_tokens=1000 produces tight summaries. Going higher tends to add filler.
  6. Use Haiku for bulk summaries, Sonnet for interactive analysis. Haiku is 10-20x cheaper and fast enough for nightly cron jobs. Sonnet's deeper reasoning is worth the cost for real-time "what happened?" questions.

9. Email Reports with Resend

Why Resend?

Feature Resend Brevo Mailjet
Free tier 3,000/month 300/day 200/day
Setup time 2 minutes 10 minutes 10 minutes
API complexity 1 endpoint Medium Medium
KYC/verification Email only Email + phone Email
Custom domain needed? No (test domain available) Yes for production Yes for production

Resend's free tier is perfect for monitoring reports. The API is a single POST endpoint. No SDK needed -- plain urllib works.

Email Sending Implementation

The service supports both Resend's REST API and traditional SMTP, selected by setting smtp_host to "resend":

def _send_email_sync(config: dict, to_list: list,
                     subject: str, html_body: str) -> str:
    """Send email. Returns empty string on success, error on failure."""
    import json as _json
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError

    # ── Resend API mode ──
    if config.get("smtp_host", "").lower() == "resend":
        api_key = config.get("smtp_pass", "")
        if not api_key:
            return "Resend API key not configured"

        from_str = (f"{config.get('from_name', 'WhatsApp Monitor')} "
                    f"<{config.get('from_email', '[email protected]')}>")

        # Generate plain-text fallback by stripping HTML
        plain = re.sub(r'<[^>]+>', '', html_body)
        plain = re.sub(r'\s+', ' ', plain).strip()

        payload = _json.dumps({
            "from": from_str,
            "to": to_list,
            "subject": subject,
            "html": html_body,
            "text": plain,
        }).encode()

        req = Request("https://api.resend.com/emails",
                      data=payload, method="POST")
        req.add_header("Authorization", f"Bearer {api_key}")
        req.add_header("Content-Type", "application/json")

        try:
            resp = urlopen(req, timeout=30)
            data = _json.loads(resp.read())
            return "" if data.get("id") else f"Resend error: {data}"
        except HTTPError as e:
            body = e.read().decode()
            return f"Resend HTTP {e.code}: {body}"
        except Exception as e:
            return f"Resend error: {e}"

    # ── Traditional SMTP mode ──
    import smtplib
    from email.mime.multipart import MIMEMultipart
    from email.mime.text import MIMEText

    if not config.get("smtp_host"):
        return "Email not configured"

    msg = MIMEMultipart("alternative")
    msg["Subject"] = subject
    msg["From"] = (f"{config.get('from_name')} "
                   f"<{config.get('from_email')}>")
    msg["To"] = ", ".join(to_list)

    plain = re.sub(r'<[^>]+>', '', html_body)
    msg.attach(MIMEText(plain, "plain", "utf-8"))
    msg.attach(MIMEText(html_body, "html", "utf-8"))

    try:
        if config.get("smtp_tls", True):
            server = smtplib.SMTP(config["smtp_host"],
                                  config.get("smtp_port", 587),
                                  timeout=30)
            server.starttls()
        else:
            server = smtplib.SMTP(config["smtp_host"],
                                  config.get("smtp_port", 25),
                                  timeout=30)

        if config.get("smtp_user") and config.get("smtp_pass"):
            server.login(config["smtp_user"], config["smtp_pass"])

        server.sendmail(msg["From"], to_list, msg.as_string())
        server.quit()
        return ""
    except Exception as e:
        return str(e)

HTML Email Template

Professional dark-themed email template that renders well across clients:

def _build_email_html(title: str, sections: list) -> str:
    """Build HTML email. sections = [(heading, content_html), ...]"""
    sections_html = ""
    for heading, content in sections:
        sections_html += f'''
        <tr><td style="padding:0 24px 20px;">
            <h2 style="margin:0 0 8px;font-size:14px;color:#25D366;
                       font-family:'Courier New',monospace;
                       border-bottom:1px solid #2a2a2e;
                       padding-bottom:6px;">{heading}</h2>
            <div style="font-size:13px;line-height:1.7;
                        color:#d4d4d8;">{content}</div>
        </td></tr>'''

    return f'''<!DOCTYPE html><html><head>
    <meta charset="utf-8"></head>
    <body style="margin:0;padding:0;background:#0a0a0b;
                 font-family:-apple-system,sans-serif;">
    <table width="100%" cellpadding="0" cellspacing="0"
           style="background:#0a0a0b;padding:20px 0;">
    <tr><td align="center">
    <table width="600" cellpadding="0" cellspacing="0"
           style="background:#111113;border:1px solid #27272a;
                  border-radius:8px;overflow:hidden;">
        <tr><td style="background:#18181b;padding:16px 24px;
                       border-bottom:1px solid #27272a;">
            <span style="font-family:'Courier New',monospace;
                         font-size:15px;font-weight:bold;
                         color:#e4e4e7;">
                <span style="color:#25D366;">&#9679;</span> {title}
            </span>
        </td></tr>
        {sections_html}
        <tr><td style="padding:16px 24px;
                       border-top:1px solid #27272a;
                       text-align:center;">
            <span style="font-size:10px;color:#52525b;
                         font-family:'Courier New',monospace;">
                WhatsApp Monitor -- Automated Report
            </span>
        </td></tr>
    </table>
    </td></tr></table>
    </body></html>'''

Report Types

The system supports three built-in report types:

1. Daily Summary (daily_summary)

2. Weekly Digest (weekly_digest)

3. Activity Alert (activity_alert)

Report Generation Engine

async def _send_single_report(report: dict) -> dict:
    """Generate and send a single report."""
    config = await _get_email_config()
    if not config.get("smtp_host"):
        return {"status": "error", "detail": "SMTP not configured"}

    recipients = [r.strip()
                  for r in report["recipients"].split(",")
                  if r.strip()]
    report_type = report["report_type"]

    sections = []
    async with pool.acquire() as conn:
        groups = await conn.fetch(
            "SELECT jid, name FROM wa_groups "
            "WHERE is_monitored = true")

        if report_type == "daily_summary":
            yesterday = (date.today() - timedelta(days=1)).isoformat()
            for g in groups:
                summary = await conn.fetchrow(
                    """SELECT summary, message_count
                       FROM wa_daily_summaries
                       WHERE group_jid = $1
                         AND summary_date = $2""",
                    g["jid"], date.fromisoformat(yesterday))

                if summary and summary["message_count"] > 0:
                    content = summary["summary"].replace("\n", "<br>")
                    content = (f'<p style="color:#a1a1aa;">'
                               f'{summary["message_count"]} messages'
                               f'</p>' + content)
                    sections.append((g["name"] or g["jid"], content))

            subject = f"WhatsApp Daily Summary -- {yesterday}"
            title = f"Daily Summary -- {yesterday}"

        # ... similar logic for weekly_digest and activity_alert

    html = _build_email_html(title, sections)

    err = await asyncio.get_event_loop().run_in_executor(
        None, _send_email_sync, config, recipients, subject, html)

    # Track send status
    async with pool.acquire() as conn:
        status = "sent" if not err else f"error: {err}"
        await conn.execute(
            """UPDATE wa_scheduled_reports
               SET last_sent_at = NOW(), last_status = $2
               WHERE id = $1""",
            report["id"], status)

    if err:
        return {"status": "error", "detail": err}
    return {"status": "ok", "sections": len(sections)}

Report CRUD API

Full REST API for managing reports from the dashboard:

GET    /reports                 -- List all scheduled reports
POST   /reports                 -- Create a new report
PUT    /reports/{id}            -- Update a report
DELETE /reports/{id}            -- Delete a report
POST   /reports/{id}/send       -- Send a report immediately
POST   /cron/send-reports       -- Check and send all due reports

10. Dashboard UI (PHP)

The dashboard provides four PHP pages that work together:

whatsapp.php -- Group Monitor

The main interface for viewing groups and messages. Features:

email-reports.php -- Report Management

A standalone CRUD interface for email reports. Features:

whatsapp-data.php -- AJAX Proxy

A thin PHP proxy that forwards browser requests to the FastAPI service. This solves two problems:

  1. CORS -- the browser cannot directly call localhost:8086 from a page served on port 8082.
  2. Authentication -- the proxy enforces admin-only access using the existing PHP auth system.
// Simplified flow
$endpoint = $_GET['endpoint'];  // e.g., "/groups" or "/reports/5"
$url = "http://127.0.0.1:8086{$endpoint}";
$response = curl_exec($url);    // Forward request
echo $response;                  // Return response

The proxy whitelists specific endpoint prefixes to prevent unauthorized access to internal APIs.

whatsapp-ask.php -- AI Analysis Proxy

Dedicated proxy for the /ask endpoint, forwarding user questions to Claude via the FastAPI service. Supports multi-turn conversations by passing the full conversation history.


11. Prometheus Metrics Endpoint

The /metrics endpoint exposes Prometheus-compatible metrics for Grafana monitoring:

@app.get("/metrics")
async def prometheus_metrics():
    """Prometheus-compatible metrics for Grafana."""
    lines = []

    lines.append("# HELP wa_monitor_up Service status")
    lines.append("# TYPE wa_monitor_up gauge")
    wa_up = 1 if _watchdog_stats.get("last_state") == "open" else 0
    lines.append(f"wa_monitor_up {wa_up}")

    lines.append("# HELP wa_monitor_uptime_seconds Uptime")
    lines.append("# TYPE wa_monitor_uptime_seconds gauge")
    lines.append(f"wa_monitor_uptime_seconds "
                 f"{int(time.time() - _startup_time)}")

    lines.append("# HELP wa_monitor_reconnects_total Reconnects")
    lines.append("# TYPE wa_monitor_reconnects_total counter")
    lines.append(f'wa_monitor_reconnects_total '
                 f'{_watchdog_stats["reconnects"]}')

    async with pool.acquire() as conn:
        today = await conn.fetchval(
            "SELECT COUNT(*) FROM wa_messages "
            "WHERE timestamp >= CURRENT_DATE")
        total = await conn.fetchval(
            "SELECT COUNT(*) FROM wa_messages")
        groups = await conn.fetchval(
            "SELECT COUNT(*) FROM wa_groups "
            "WHERE is_monitored = true")

    lines.append(f"wa_monitor_messages_today {today}")
    lines.append(f"wa_monitor_messages_total {total}")
    lines.append(f"wa_monitor_groups {groups}")

    from fastapi.responses import PlainTextResponse
    return PlainTextResponse(
        "\n".join(lines) + "\n",
        media_type="text/plain; version=0.0.4")

Prometheus scrape config:

# In prometheus.yml
scrape_configs:
  - job_name: 'wa-monitor'
    scrape_interval: 30s
    static_configs:
      - targets: ['172.18.0.1:8086']

Useful Grafana panels:


12. Cron Jobs & Automation

Four cron jobs keep the system running without human intervention:

# ── Daily AI Summaries (2 AM) ──
# Generates summaries for all groups for yesterday
0 2 * * * curl -s -X POST http://localhost:8086/cron/daily-summaries \
    > /dev/null 2>&1

# ── Partition Creation (25th of each month, 3 AM) ──
# Creates PostgreSQL partitions for the next 3+ months
0 3 25 * * curl -s -X POST http://localhost:8086/cron/create-partitions \
    > /dev/null 2>&1

# ── Media Cleanup (Sundays at 4 AM) ──
# Deletes downloaded media files older than 90 days
0 4 * * 0 find /opt/wa-monitor/media -type f -mtime +90 -delete \
    2>/dev/null; \
    find /opt/wa-monitor/media -type d -empty -delete 2>/dev/null

# ── Report Scheduler (every 15 minutes) ──
# Checks for due reports and sends them
*/15 * * * * curl -s -X POST http://localhost:8086/cron/send-reports \
    > /dev/null 2>&1

How the Report Scheduler Works

The /cron/send-reports endpoint runs every 15 minutes. For each active report:

  1. Parse the schedule_time (e.g., "08:00") and check if the current time is within a 15-minute window after the scheduled time.
  2. Check the schedule type:
    • daily -- send every day
    • weekly -- send only on the matching schedule_day (1=Monday through 7=Sunday)
    • monthly -- send only when the current day of month matches schedule_day
  3. Check last_sent_at -- skip if already sent today (prevents duplicates from overlapping cron runs).
  4. Generate the report content, send the email, and update last_sent_at and last_status.
@app.post("/cron/send-reports")
async def cron_send_reports():
    """Check and send any due reports."""
    now = datetime.utcnow()
    results = []

    async with pool.acquire() as conn:
        reports = await conn.fetch(
            "SELECT * FROM wa_scheduled_reports "
            "WHERE is_active = true")

    for r in reports:
        report = dict(r)
        last_sent = report.get("last_sent_at")

        # Already sent today? Skip.
        if last_sent and last_sent.date() == now.date():
            continue

        # Parse schedule time
        sched_h, sched_m = map(int,
            report.get("schedule_time", "08:00").split(":"))

        # Check 15-minute window
        now_min = now.hour * 60 + now.minute
        sched_min = sched_h * 60 + sched_m
        if now_min < sched_min or now_min > sched_min + 15:
            continue

        # Check schedule type
        should_send = False
        if report["schedule"] == "daily":
            should_send = True
        elif report["schedule"] == "weekly":
            should_send = (now.isoweekday() == report["schedule_day"])
        elif report["schedule"] == "monthly":
            should_send = (now.day == report["schedule_day"])

        if should_send:
            result = await _send_single_report(report)
            results.append({"report": report["name"], **result})

    return {"sent": results}

13. Connection Watchdog & Auto-Reconnect

WhatsApp connections via Baileys are not permanent. Phone battery dies, Wi-Fi drops, WhatsApp pushes protocol updates. The watchdog runs every 60 seconds and handles all of this automatically:

_watchdog_stats = {
    "checks": 0,
    "reconnects": 0,
    "last_check": None,
    "last_state": "unknown"
}


async def _watchdog_loop():
    """Background watchdog: auto-reconnect, sync groups, cleanup."""
    await asyncio.sleep(10)  # Let startup finish
    log.info("Watchdog started")

    while True:
        try:
            _watchdog_stats["checks"] += 1
            _watchdog_stats["last_check"] = (
                datetime.utcnow().isoformat())

            # 1. Check WhatsApp connection
            async with httpx.AsyncClient(timeout=10) as client:
                resp = await client.get(
                    f"{EVOLUTION_URL}/instance/connectionState"
                    f"/{EVOLUTION_INSTANCE}",
                    headers={"apikey": EVOLUTION_API_KEY})

                if resp.status_code == 200:
                    state = (resp.json().get("instance", {})
                             .get("state", "unknown"))
                    _watchdog_stats["last_state"] = state

                    if state in ("close", "connecting"):
                        # Disconnected -- attempt reconnect
                        log.warning(f"WhatsApp disconnected "
                                    f"(state={state}), reconnecting...")
                        await client.get(
                            f"{EVOLUTION_URL}/instance/connect"
                            f"/{EVOLUTION_INSTANCE}",
                            headers={"apikey": EVOLUTION_API_KEY})
                        _watchdog_stats["reconnects"] += 1

                elif resp.status_code == 404:
                    # Instance was deleted -- recreate it
                    log.warning("Instance missing, recreating...")
                    await client.post(
                        f"{EVOLUTION_URL}/instance/create",
                        headers={
                            "apikey": EVOLUTION_API_KEY,
                            "Content-Type": "application/json"},
                        json={
                            "instanceName": EVOLUTION_INSTANCE,
                            "integration": "WHATSAPP-BAILEYS",
                            "qrcode": True,
                            "webhook": {
                                "url": ("http://172.18.0.1:8086"
                                        "/webhook/evolution"),
                                "byEvents": False,
                                "base64": True,
                                "events": [
                                    "MESSAGES_UPSERT",
                                    "GROUPS_UPSERT",
                                    "GROUP_PARTICIPANTS_UPDATE",
                                    "CONNECTION_UPDATE",
                                    "QRCODE_UPDATED"
                                ]
                            }
                        })
                    _watchdog_stats["reconnects"] += 1

            # 2. Periodic group sync (every 6 hours)
            # Refreshes group names and participant counts
            # from Evolution API

            # 3. Webhook log cleanup (30-day retention)
            async with pool.acquire() as conn:
                await conn.execute(
                    "DELETE FROM wa_webhook_log "
                    "WHERE created_at < NOW() - INTERVAL '30 days'")

        except asyncio.CancelledError:
            return
        except Exception as e:
            log.error(f"Watchdog error: {e}")

        await asyncio.sleep(60)

14. Production Tips & Lessons Learned

Handling Webhook Floods

When you first connect to a WhatsApp account that has been accumulating messages offline, Evolution API may dump hundreds of historical messages as webhooks in rapid succession. This can overwhelm your database if you are not careful.

Mitigations:

  1. Background task processing. Never process webhooks in the request handler. Return 200 OK immediately and process asynchronously.
  2. ON CONFLICT DO NOTHING. Every INSERT uses this pattern, making message ingestion fully idempotent. Duplicate webhooks are silently ignored.
  3. Connection pool sizing. min_size=2, max_size=10 handles bursts without exhausting PostgreSQL connections.
  4. Rate-limit media downloads. Media download is the slowest part. Use asyncio.create_task() so it runs in the background and does not block message processing.

Partition Maintenance

-- Remove January 2025 data
ALTER TABLE wa_messages DETACH PARTITION wa_messages_2025_01;
DROP TABLE wa_messages_2025_01;

Media Storage

Resend Free Tier Limits

Security Considerations

  1. Store the Anthropic API key in a file (/opt/wa-monitor/.api_key), not in environment variables. Files are less likely to leak into Docker logs or process listings.
  2. The webhook endpoint has no authentication. This is intentional because Evolution API does not support webhook signing. Mitigate by binding port 8086 to localhost (--host 127.0.0.1) if Evolution API is on the same machine.
  3. The PHP proxy enforces admin auth. All dashboard requests to the FastAPI service go through whatsapp-data.php, which checks admin permissions.
  4. SMTP passwords are masked in the API. The GET /settings/email endpoint returns "********" instead of the real password. Updates preserve the existing password if the masked value is sent back.

Evolution API Stability

Performance at Scale

For reference, this architecture comfortably handles:

Metric Capacity
Groups monitored 50+
Messages per day 10,000+
Concurrent webhook processing Limited by asyncpg pool (10 connections)
Full-text search speed <100ms for most queries (GIN index)
AI summary generation ~5 seconds per group per day (Haiku)
Email report generation <10 seconds including AI fallback

15. Complete File Reference

Service Files

File Purpose
/opt/wa-monitor/service.py Main FastAPI application (webhook receiver, API endpoints, AI, email, watchdog)
/opt/wa-monitor/schema.sql PostgreSQL schema (run once for initial setup)
/opt/wa-monitor/.api_key Anthropic API key (plain text, chmod 600)
/opt/wa-monitor/media/ Downloaded media files (date-organized)
/opt/wa-monitor/venv/ Python virtual environment
/opt/wa-monitor/service.log Application logs (stdout + stderr from systemd)

Dashboard Files

File Purpose
whatsapp.php Group monitor UI -- browse groups, messages, AI chat
whatsapp-data.php AJAX proxy -- forwards browser requests to FastAPI
whatsapp-ask.php AI analysis proxy -- forwards questions to Claude
email-reports.php Report management UI -- SMTP config, report CRUD

Configuration Files

File Purpose
docker-compose.yml Docker stack (PostgreSQL, Evolution API, Redis)
.env Environment variables (passwords, API keys)
/etc/systemd/system/wa-monitor.service Systemd unit for FastAPI service

Systemd Unit

[Unit]
Description=WhatsApp Monitor Service (FastAPI)
After=network.target docker.service

[Service]
Type=simple
User=root
WorkingDirectory=/opt/wa-monitor
ExecStart=/opt/wa-monitor/venv/bin/python3 -m uvicorn \
    service:app --host 0.0.0.0 --port 8086 --workers 1
Restart=always
RestartSec=5
StandardOutput=append:/opt/wa-monitor/service.log
StandardError=append:/opt/wa-monitor/service.log
Environment=PYTHONUNBUFFERED=1

[Install]
WantedBy=multi-user.target

Python Dependencies

# requirements.txt
fastapi>=0.104.0
uvicorn>=0.24.0
asyncpg>=0.29.0
httpx>=0.25.0
requests>=2.31.0
Pillow>=10.0.0           # Image thumbnails
faster-whisper>=0.9.0    # Audio transcription (optional)

API Endpoints Reference

Method Endpoint Purpose
POST /webhook/evolution Receive Evolution API webhooks
GET /health Health check with DB, WhatsApp, and watchdog status
GET /metrics Prometheus metrics
GET /groups List all monitored groups with stats
GET /groups/{jid}/messages Paginated messages with filters
GET /groups/{jid}/stats Top senders, hourly/daily activity
POST /groups/sync Force group sync from Evolution API
GET /search?q=term Full-text search across all groups
GET /media/{path} Serve downloaded media files
POST /ask AI-powered question answering
POST /summarize/{jid} Generate/retrieve daily summary
GET /settings/email Get email configuration
POST /settings/email Save email configuration
POST /settings/email/test Send test email
GET /reports List scheduled reports
POST /reports Create a report
PUT /reports/{id} Update a report
DELETE /reports/{id} Delete a report
POST /reports/{id}/send Send a report immediately
GET /connection/status WhatsApp connection state
GET /connection/qrcode Get QR code for linking
POST /connection/create Create/reconnect instance
POST /cron/daily-summaries Generate all daily summaries
POST /cron/create-partitions Create upcoming partitions
POST /cron/send-reports Check and send due reports

Quick Start Checklist

  1. Install Docker, Python 3.11+, ffmpeg
  2. Create docker-compose.yml with PostgreSQL, Evolution API, Redis
  3. Create .env with POSTGRES_PASSWORD and EVOLUTION_API_KEY
  4. docker compose up -d
  5. Create the whatsapp_monitor database and run schema.sql
  6. Set up the Python venv and install dependencies
  7. Save your Anthropic API key to /opt/wa-monitor/.api_key
  8. Deploy service.py and install the systemd unit
  9. systemctl enable --now wa-monitor
  10. Create the Evolution API instance and scan the QR code
  11. Configure Resend API key in the dashboard (SMTP host = resend)
  12. Create your first scheduled report
  13. Add cron jobs for daily summaries, partition creation, media cleanup, report sending
  14. Add Prometheus scrape target for /metrics
  15. Wait for messages to flow in and enjoy your morning summary email

Built with Evolution API, FastAPI, PostgreSQL, Claude AI, and Resend. No official WhatsApp Business API required.

Need expert help with your setup?

VoIP infrastructure consulting, AI voice agent integration, monitoring stacks, scaling — I've done it all in production.

Get a Free Consultation