Tutorial 35: QA Pipeline — Call Transcription + AI Quality Scoring
Build an automated quality assurance pipeline that transcribes every inbound call using Faster-Whisper and scores agent performance with AI — running entirely on your existing ViciDial server with zero impact on live calls.
Table of Contents
- Introduction — Why Automated QA Matters
- Architecture Overview
- Prerequisites
- Storage Setup — Dedicated SSD
- Whisper Model Setup
- LLM Provider Selection
- Database Schema
- Queue Manager Script
- Transcription Engine
- QA Scoring Engine
- API Service
- Production Deployment
- Production Safety
- Dashboard Integration
- Troubleshooting
Technologies: Python, FastAPI, Faster-Whisper, Gemini API, Groq, SQLite, ViciDial, systemd Difficulty: Advanced | Reading time: ~65 minutes
1. Introduction — Why Automated QA Matters
The Scale Problem
A typical ViciDial call center with 30-50 agents handles 500-1,000 inbound calls per day. Each call averages 60-120 seconds. A human QA reviewer can evaluate perhaps 30-40 calls per day with reasonable attention to detail. That means a manual review process covers roughly 4-8% of total call volume.
The calls that slip through unreviewed are where the problems hide: agents giving incorrect information, skipping required disclosures, being rude to customers, or failing to close properly. By the time a pattern surfaces in customer complaints, the damage is done.
What Automated QA Solves
An automated pipeline reverses this equation entirely:
- 100% coverage — every single call is transcribed and scored, not a random sample
- Consistency — the same scoring criteria are applied uniformly to every call, eliminating reviewer bias
- Speed — results are available the next morning, not days or weeks later
- Flagging — calls scoring below threshold are automatically flagged for human review, so supervisors focus attention where it matters
- Trend analysis — track agent improvement over time with hard data, not subjective impressions
- Training material — low-scoring calls become training examples; high-scoring calls become templates
The Cost Reality
This is the remarkable part: with modern open-source speech recognition and free-tier LLM APIs, you can build a production QA pipeline that costs between $0 and $18 per month to operate. The processing runs overnight on your existing server hardware, using CPU cores that would otherwise sit idle. The only real cost is a few hours of setup time.
Compare that to commercial QA platforms (CallMiner, NICE, Observe.AI) that charge $50-150 per agent per month. For a 40-agent center, that is $2,000-6,000/month versus $0-18/month.
What This Tutorial Builds
By the end of this tutorial, you will have:
- A dedicated storage volume isolated from production I/O
- A queue manager that tracks every new recording as it arrives
- A batch transcription engine using Faster-Whisper's
large-v3-turbomodel - An AI scoring engine that evaluates each call against an 8-category weighted scorecard
- A FastAPI service exposing QA data for dashboards and reports
- systemd services and cron jobs for fully automated overnight processing
- Resource controls that guarantee zero impact on live call handling
The entire pipeline is designed around one principle: production safety first. ViciDial handles live calls. The QA pipeline must never, under any circumstances, degrade call quality or agent responsiveness. Every design decision in this tutorial reflects that priority.
2. Architecture Overview
System Layout
ViciDial Server (e.g., Alpha — Ryzen 5 3600, 12 cores, 62GB RAM)
│
├── Production SSD (nvme0n1) ← Asterisk, MySQL, ViciDial
│ └── /var/spool/asterisk/monitorDONE/ ← completed recordings (read-only source)
│
├── QA SSD (nvme1n1) ← dedicated, isolated I/O
│ └── /mnt/qa/
│ ├── db/
│ │ └── qa.db ← SQLite database (queue, transcripts, scores)
│ ├── transcripts/
│ │ └── YYYY-MM-DD/ ← plain text transcripts organized by date
│ ├── models/
│ │ └── large-v3-turbo/ ← Whisper model cache (~3GB)
│ └── logs/
│ └── qa-pipeline.log ← processing logs with rotation
│
├── /root/qa-pipeline/ ← all scripts
│ ├── qa_queue.py ← finds new recordings, adds to queue
│ ├── qa_transcribe.py ← batch transcription with resource limits
│ ├── qa_score.py ← AI scoring via Gemini/Groq
│ ├── qa_service.py ← FastAPI REST API (port 8085)
│ ├── qa_config.py ← shared configuration
│ └── requirements.txt ← Python dependencies
│
└── systemd services
├── qa-pipeline.service ← nightly batch processor
└── qa-api.service ← FastAPI web service
Data Flow
┌──────────────────────────────────────────────────────┐
│ DAYTIME (08:00-22:00) │
│ │
│ Call ends → recording saved to monitorDONE/ │
│ → qa_queue.py detects new file │
│ → inserts row in qa_queue table │
│ → enriches with call metadata from │
│ vicidial_closer_log │
│ │
│ (NO processing — just queuing) │
└──────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────┐
│ NIGHTTIME (22:00-07:00) │
│ │
│ Cron triggers batch at 22:00 → │
│ 1. qa_transcribe.py processes queue │
│ - reads .wav from monitorDONE/ │
│ - transcribes with Faster-Whisper │
│ - saves .txt to /mnt/qa/transcripts/ │
│ - updates qa_transcripts table │
│ - resource limited: nice, ionice, taskset │
│ │
│ 2. qa_score.py processes transcripts │
│ - reads transcript text │
│ - sends to Gemini API (or Groq fallback) │
│ - parses 8-category JSON scorecard │
│ - stores in qa_scores table │
│ - flags calls scoring below threshold │
│ │
│ Results ready by ~04:00-06:00 │
└──────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────┐
│ ALWAYS RUNNING │
│ │
│ qa_service.py (FastAPI :8085) │
│ → /api/qa/daily-summary │
│ → /api/qa/agent/{user_id} │
│ → /api/qa/flagged │
│ → /api/qa/call/{uniqueid} │
│ → /api/qa/trends │
│ │
│ Consumed by: Grafana, admin PHP pages, email reports│
└──────────────────────────────────────────────────────┘
Why This Architecture
Separate SSD: The single most important design decision. Whisper transcription is I/O intensive — it reads large audio files and writes model inference data. If this happens on the same disk as MySQL and Asterisk, you risk I/O contention that degrades call quality. A dedicated SSD eliminates this entirely.
Queue-then-process: Queuing during the day is a lightweight database INSERT. Processing at night uses CPU cores that are otherwise 98% idle. This hybrid approach means zero production impact during business hours.
SQLite over PostgreSQL: For this workload (sequential batch inserts, simple queries, single-writer), SQLite is simpler, faster, and requires zero additional infrastructure. The QA database lives on its own SSD, so there is no contention with the production MySQL database.
Gemini with Groq fallback: The free Gemini tier covers typical call volume. If it goes down or quality is insufficient, the system automatically falls back to Groq's paid API. No manual intervention needed.
3. Prerequisites
Hardware Requirements
| Component | Minimum | Recommended | Notes |
|---|---|---|---|
| CPU | 4 cores (x86_64) | 6+ cores | Whisper uses 4 cores for transcription; leave remaining for ViciDial |
| RAM | 8 GB free | 16+ GB free | large-v3-turbo loads ~3GB into RAM during processing |
| Storage | 50 GB free | Dedicated SSD | Transcripts are small (~2KB/call) but model cache needs 3GB |
| GPU | Not required | Not required | CPU transcription at 3-4x real-time is sufficient for overnight batch |
Software Requirements
| Software | Version | Purpose |
|---|---|---|
| Python | 3.9+ (3.11 recommended) | Runtime for all pipeline scripts |
| faster-whisper | 1.1.0+ | Speech-to-text engine (CTranslate2 backend) |
| FastAPI | 0.100+ | REST API framework |
| uvicorn | 0.20+ | ASGI server for FastAPI |
| ViciDial | Any version | Source of recordings and call metadata |
| MariaDB/MySQL | Any version | ViciDial database (read-only access) |
| systemd | Any | Service management |
| sox or ffmpeg | Any | Audio format conversion (if needed) |
API Keys (at least one required)
| Provider | How to Get | Free Tier |
|---|---|---|
| Google Gemini | aistudio.google.com | 1,000 requests/day |
| Groq | console.groq.com | Limited free, then ~$18/mo at volume |
ViciDial Recording Format
ViciDial stores recordings in /var/spool/asterisk/monitorDONE/ as WAV files (or MP3 if configured). The filename format is:
YYYYMMDD-HHMMSS_XXXXXXXXXX-all.wav
Where XXXXXXXXXX is the lead phone number. The recording_log table maps these files to vicidial_id (the ViciDial call ID) and lead_id.
Verify your recording path and format:
# Check where recordings are stored
ls /var/spool/asterisk/monitorDONE/ | head -5
# Check recording format
file /var/spool/asterisk/monitorDONE/$(ls /var/spool/asterisk/monitorDONE/ | head -1)
# Check recording_log table structure
mysql -u root -e "DESCRIBE recording_log" asterisk
Install Python Dependencies
If your server does not have Python 3.11, install it first:
# For openSUSE (ViciBox)
zypper install python311 python311-pip python311-devel
# For CentOS 7
yum install -y epel-release
yum install -y python3 python3-pip python3-devel
# For Debian/Ubuntu
apt install -y python3.11 python3.11-venv python3-pip
Create a virtual environment for the QA pipeline:
mkdir -p /root/qa-pipeline
cd /root/qa-pipeline
python3.11 -m venv /root/qa-pipeline/venv
source /root/qa-pipeline/venv/bin/activate
# Create requirements.txt
cat > requirements.txt << 'EOF'
faster-whisper>=1.1.0
fastapi>=0.100.0
uvicorn[standard]>=0.20.0
httpx>=0.24.0
mysql-connector-python>=8.0.0
pydantic>=2.0.0
python-dateutil>=2.8.0
EOF
pip install -r requirements.txt
Note: faster-whisper will automatically install ctranslate2 and download model files on first use. The large-v3-turbo model is approximately 3GB.
4. Storage Setup — Dedicated SSD
Why a Separate Disk Matters
Whisper transcription reads multi-megabyte audio files and performs intensive computation that generates significant disk I/O for model loading and memory-mapped operations. If this runs on the same SSD as your MySQL database and Asterisk recordings, you create I/O contention that can cause:
- MySQL query latency spikes (agents see "loading..." in their browsers)
- Asterisk recording write delays (brief audio gaps in recordings)
- General system sluggishness that affects call quality
A dedicated SSD completely eliminates this risk. The QA pipeline's I/O is physically isolated from production.
Identify Your Available Disk
# List all block devices
lsblk
# Look for an unused disk (no partitions, no mount point)
# Example output:
# nvme0n1 259:0 0 477.0G 0 disk ← production (in use)
# ├─nvme0n1p1 259:1 0 512M 0 part /boot
# ├─nvme0n1p2 259:2 0 15.3G 0 part [SWAP]
# └─nvme0n1p3 259:3 0 461.3G 0 part /
# nvme1n1 259:4 0 476.9G 0 disk ← empty, available!
Partition, Format, and Mount
# IMPORTANT: Replace nvme1n1 with YOUR actual device name
# Double-check with lsblk — formatting the wrong disk destroys data
DEVICE=/dev/nvme1n1
# Create a single partition using the full disk
parted ${DEVICE} --script mklabel gpt
parted ${DEVICE} --script mkpart primary ext4 0% 100%
# Format with ext4 (best for this workload)
mkfs.ext4 -L qa-storage ${DEVICE}p1
# Create mount point
mkdir -p /mnt/qa
# Mount the partition
mount ${DEVICE}p1 /mnt/qa
# Verify it mounted correctly
df -h /mnt/qa
# Should show ~470GB available
Add to fstab for Persistence
# Get the UUID (more reliable than device names)
blkid ${DEVICE}p1
# Example output: /dev/nvme1n1p1: LABEL="qa-storage" UUID="a1b2c3d4-e5f6-..." TYPE="ext4"
# Add to fstab (replace UUID with your actual UUID)
echo 'UUID=a1b2c3d4-e5f6-7890-abcd-ef1234567890 /mnt/qa ext4 defaults,noatime 0 2' >> /etc/fstab
# Test fstab entry (unmount and remount from fstab)
umount /mnt/qa
mount -a
# Verify
df -h /mnt/qa
The noatime mount option disables access time updates on every file read, reducing unnecessary write I/O.
Create Directory Structure
# Create all required directories
mkdir -p /mnt/qa/{db,transcripts,models,logs}
# Set ownership (run pipeline as root for simplicity, or create a dedicated user)
chmod 755 /mnt/qa
chmod 700 /mnt/qa/db # database files — restricted access
# Verify structure
tree /mnt/qa/ 2>/dev/null || find /mnt/qa -type d
# /mnt/qa/
# ├── db/ ← SQLite database
# ├── transcripts/ ← text files organized by date
# ├── models/ ← Whisper model cache
# └── logs/ ← processing logs
If You Do Not Have a Spare SSD
If a dedicated SSD is not available, you can still run the pipeline on your existing disk. Create the directory structure directly:
mkdir -p /mnt/qa/{db,transcripts,models,logs}
The pipeline will still work, but you should be more conservative with resource limits (see Section 13) and consider running at a lower concurrency to minimize I/O contention with production.
5. Whisper Model Setup
Understanding Faster-Whisper
Faster-Whisper is a reimplementation of OpenAI's Whisper using CTranslate2 for inference. It is 4x faster than the original Whisper with equivalent accuracy, and uses significantly less memory. It runs entirely on CPU — no GPU required.
For call center QA, the key consideration is the trade-off between transcription speed and accuracy. You need the transcription to be good enough that the AI scoring engine can reliably evaluate agent performance. Minor word errors are acceptable; missing entire sentences is not.
Model Comparison
| Model | Size | WER (English) | Speed (CPU, 4 cores) | RAM Usage | Best For |
|---|---|---|---|---|---|
tiny |
75 MB | ~10% | 15-20x real-time | ~1 GB | Testing only |
base |
145 MB | ~7% | 10-15x real-time | ~1 GB | Quick prototyping |
small |
488 MB | ~5.5% | 6-8x real-time | ~2 GB | Budget deployments |
medium |
1.5 GB | ~4.5% | 2-3x real-time | ~3 GB | Good balance |
large-v3 |
3.1 GB | ~3.5% | 1-1.5x real-time | ~4 GB | Best accuracy, too slow for batch |
large-v3-turbo |
3.1 GB | ~4% | 3-4x real-time | ~3 GB | Best for production batch |
The large-v3-turbo model is the clear winner for this use case. It achieves near-large-v3 accuracy at 3-4x the speed. For a 90-second call, transcription takes approximately 25-30 seconds on a modern 6-core CPU. Over 700 calls (17.5 hours of audio), that is roughly 5-6 hours of processing — well within the overnight window.
WER (Word Error Rate) measures the percentage of words incorrectly transcribed. At ~4% WER, a 100-word segment will have approximately 4 incorrect words. This is more than sufficient for QA scoring, where the AI evaluator considers the overall conversation flow rather than individual word precision.
Download and Test the Model
# Activate the virtual environment
source /root/qa-pipeline/venv/bin/activate
# Test model download and basic transcription
python3 << 'PYEOF'
from faster_whisper import WhisperModel
import os
import time
# Set model cache directory to dedicated SSD
os.environ["HF_HOME"] = "/mnt/qa/models"
os.environ["HUGGINGFACE_HUB_CACHE"] = "/mnt/qa/models"
print("Downloading large-v3-turbo model (~3GB, first run only)...")
print("This may take 5-15 minutes depending on internet speed.")
model = WhisperModel(
"large-v3-turbo",
device="cpu",
compute_type="int8", # Quantized for CPU — faster, less RAM
cpu_threads=4, # Use 4 cores for transcription
download_root="/mnt/qa/models"
)
print("Model loaded successfully!")
print(f"Model cache location: /mnt/qa/models/")
print(f"Cache size: {sum(f.stat().st_size for f in __import__('pathlib').Path('/mnt/qa/models').rglob('*') if f.is_file()) / 1024**3:.1f} GB")
# Test with a sample recording if available
test_dir = "/var/spool/asterisk/monitorDONE/"
test_files = [f for f in os.listdir(test_dir) if f.endswith('.wav')][:1]
if test_files:
test_file = os.path.join(test_dir, test_files[0])
print(f"\nTest transcription: {test_file}")
start = time.time()
segments, info = model.transcribe(
test_file,
beam_size=5,
language="en", # Set to your primary language
vad_filter=True, # Skip silence — speeds up processing
vad_parameters=dict(
min_silence_duration_ms=500,
speech_pad_ms=200
)
)
full_text = ""
for segment in segments:
full_text += segment.text + " "
elapsed = time.time() - start
print(f"Language: {info.language} (probability: {info.language_probability:.2f})")
print(f"Duration: {info.duration:.1f}s audio, transcribed in {elapsed:.1f}s")
print(f"Speed: {info.duration/elapsed:.1f}x real-time")
print(f"\nTranscript:\n{full_text.strip()}")
else:
print("\nNo test recordings found in monitorDONE/")
print("The model is ready — it will be used when the pipeline runs.")
PYEOF
Compute Type Selection
The compute_type parameter controls numerical precision during inference:
| Compute Type | Speed | RAM | Accuracy | Best For |
|---|---|---|---|---|
float32 |
Slowest | Highest | Reference | Not recommended for CPU |
float16 |
N/A | N/A | N/A | GPU only |
int8 |
Fastest | Lowest | Minimal loss | CPU production |
int8_float16 |
Fast | Low | Minimal loss | GPU production |
Always use int8 for CPU-based transcription. The accuracy difference versus float32 is negligible (< 0.1% WER increase) while speed improves by 2-3x and RAM usage drops by 40%.
Language Configuration
For a UK-based call center handling English calls:
# Force English — skip language detection (saves ~1 second per call)
segments, info = model.transcribe(audio_path, language="en")
For multilingual environments (e.g., Italian inbound groups):
# Auto-detect language
segments, info = model.transcribe(audio_path)
# info.language returns detected language code ("en", "it", "es", etc.)
If you handle calls in multiple known languages, consider setting the language explicitly per inbound group to avoid detection errors.
VAD (Voice Activity Detection)
The vad_filter=True parameter enables Silero VAD, which detects speech segments and skips silence. This is critical for call center recordings because:
- Calls often have 10-30 seconds of hold/silence/ringing at the start
- There may be gaps between transfers
- The transcription engine wastes time processing silence
With VAD enabled, a 90-second recording with 20 seconds of silence effectively becomes a 70-second transcription job — a 22% speed improvement.
6. LLM Provider Selection
The Scoring Challenge
Transcription alone is not enough. A text transcript tells you what was said, but not whether it was said well. You need an LLM to evaluate the transcript against your QA criteria and produce a structured score.
The LLM must:
- Accept a conversation transcript (typically 200-800 words for a 60-120 second call)
- Evaluate it against 8 specific categories
- Return a structured JSON response with scores and justifications
- Process 700+ calls per night reliably
- Not store or train on your customer data
Provider Comparison (700 calls/day, ~21,000 calls/month)
| Provider | Model | Input Cost | Output Cost | Monthly Cost | Quality | JSON Reliability |
|---|---|---|---|---|---|---|
| Google Gemini | 2.5 Flash-Lite | Free | Free | $0 | Good | Good |
| Groq | Llama 3.3 70B | $0.59/M tok | $0.79/M tok | ~$18 | Good | Good |
| OpenAI | GPT-4o-mini | $0.15/M tok | $0.60/M tok | ~$15 | Excellent | Excellent |
| Anthropic | Claude Haiku 3.5 | $0.80/M tok | $4.00/M tok | ~$58 | Excellent | Excellent |
| Groq | Llama 3.1 8B | $0.05/M tok | $0.08/M tok | ~$2 | Fair | Fair |
| DeepSeek | DeepSeek-V3 | $0.27/M tok | $1.10/M tok | ~$12 | Good | Good |
Cost calculation basis: Average call transcript = ~400 tokens input. Scoring prompt template = ~600 tokens. Total input per call = ~1,000 tokens. Output (JSON scorecard) = ~300 tokens.
Why Gemini 2.5 Flash-Lite (Recommended)
Google's Gemini Flash-Lite offers a generous free tier:
- 1,000 requests per day — covers 700 calls with 300 requests of headroom
- No credit card required — genuinely free, not a trial
- Good instruction following for structured JSON output
- Fast response times (typically < 2 seconds per request)
- Data is not used for training on the paid API tier
The only risk is Google changing or removing the free tier. This is mitigated by the automatic Groq fallback.
Why Groq as Fallback
Groq runs open-source models (Llama 3.3 70B) on custom LPU hardware with extremely fast inference. At $18/month for the expected volume, it is the cheapest paid option with acceptable quality. The 70B parameter model is large enough for nuanced QA evaluation — the 8B model is not (it struggles with multi-category scoring and tends to give everything 4/5).
Why NOT DeepSeek
DeepSeek offers competitive pricing and good model quality. However, DeepSeek is a Chinese company, and all API data is processed and stored on servers in China. For a call center handling customer PII (names, phone numbers, account details that appear in transcripts), this creates compliance and data sovereignty concerns. Unless your legal team has reviewed and approved it, avoid sending customer call transcripts to Chinese servers.
Why NOT Local LLMs
Running an LLM locally (e.g., Llama 3.1 8B via llama.cpp) seems ideal — zero cost, full privacy. The reality on a CPU-only server:
| Model | Inference Time per Call | Calls per Night (9 hours) | Verdict |
|---|---|---|---|
| Llama 3.1 8B (Q4) | ~45 seconds | ~720 | Barely enough, poor quality |
| Llama 3.3 70B (Q4) | ~8 minutes | ~67 | Completely insufficient |
| Phi-3 mini (3.8B) | ~20 seconds | ~1,620 | Fast but scoring quality too low |
The 8B model could theoretically handle the volume, but its scoring quality is unreliable — it frequently gives uniform scores (all 3s or all 4s) and misses nuances in agent performance. The 70B model produces good scores but is far too slow on CPU. You would need a GPU (at minimum an RTX 3060 12GB, ~$300 used) to make local LLMs viable.
Bottom line: Use Gemini free tier. If it breaks, fall back to Groq at $18/month. If both fail, the queue accumulates and retries the next night — no data is lost.
API Key Setup
# Create config directory
mkdir -p /root/qa-pipeline
# Store API keys in environment file (NOT in code)
cat > /root/qa-pipeline/.env << 'EOF'
# Gemini API (get from https://aistudio.google.com/app/apikey)
GEMINI_API_KEY=YOUR_GEMINI_API_KEY_HERE
# Groq API (get from https://console.groq.com/keys)
GROQ_API_KEY=YOUR_GROQ_API_KEY_HERE
# ViciDial database connection
VICIDIAL_DB_HOST=127.0.0.1
VICIDIAL_DB_USER=cron
VICIDIAL_DB_PASS=YOUR_DB_PASSWORD
VICIDIAL_DB_NAME=asterisk
EOF
# Restrict permissions — API keys must not be world-readable
chmod 600 /root/qa-pipeline/.env
7. Database Schema
Why SQLite
The QA pipeline has a simple access pattern: one writer (the batch processor) runs at night, and one reader (the API service) runs during the day. There is no concurrent write contention. SQLite handles this perfectly and offers several advantages over PostgreSQL or MySQL for this use case:
- Zero infrastructure — no database server to install, configure, or maintain
- Single file — the entire database is one file on the dedicated SSD, easy to backup
- Fast sequential writes — batch inserts during nightly processing are extremely fast
- No network overhead — the API service reads the database file directly
- WAL mode — allows concurrent reads while writing (readers do not block the writer)
At 700 calls/day, the database will grow approximately 50-80 MB per month. After a year, it will be under 1 GB — trivial for any SSD.
Schema Definition
Create the complete database schema:
cat > /root/qa-pipeline/schema.sql << 'SQLEOF'
-- QA Pipeline Database Schema
-- SQLite with WAL mode for concurrent read/write
-- Enable WAL mode (set once, persists)
PRAGMA journal_mode=WAL;
PRAGMA synchronous=NORMAL;
PRAGMA cache_size=-64000; -- 64MB cache
PRAGMA temp_store=MEMORY;
-- ============================================================
-- Table: qa_queue
-- Purpose: Tracks recordings waiting to be processed
-- Populated by: qa_queue.py (runs every 5 minutes via cron)
-- Consumed by: qa_transcribe.py (nightly batch)
-- ============================================================
CREATE TABLE IF NOT EXISTS qa_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
-- Recording identification
recording_id TEXT NOT NULL, -- from recording_log.recording_id
filename TEXT NOT NULL, -- full path to .wav file
uniqueid TEXT, -- Asterisk uniqueid (links to closer_log)
vicidial_id TEXT, -- ViciDial call ID
lead_id INTEGER, -- ViciDial lead ID
-- Call metadata (from vicidial_closer_log)
call_date TEXT, -- YYYY-MM-DD HH:MM:SS
phone_number TEXT, -- caller's number (for reference, not displayed)
agent_user TEXT, -- ViciDial user ID
agent_name TEXT, -- agent full name
campaign_id TEXT, -- inbound group / campaign
call_length INTEGER, -- duration in seconds
call_status TEXT, -- ViciDial disposition (SALE, NI, DROP, etc.)
-- Processing state
status TEXT NOT NULL DEFAULT 'pending',
-- pending: waiting for transcription
-- transcribing: currently being transcribed
-- transcribed: transcript ready, waiting for scoring
-- scoring: currently being scored
-- complete: fully processed
-- error: processing failed
-- skipped: too short or invalid
error_message TEXT, -- error details if status='error'
retry_count INTEGER DEFAULT 0, -- number of retry attempts
-- Timestamps
queued_at TEXT NOT NULL DEFAULT (datetime('now')),
started_at TEXT, -- when processing began
completed_at TEXT, -- when fully complete
-- Constraints
UNIQUE(recording_id) -- prevent duplicate queue entries
);
-- Index for efficient batch processing queries
CREATE INDEX IF NOT EXISTS idx_queue_status ON qa_queue(status);
CREATE INDEX IF NOT EXISTS idx_queue_agent ON qa_queue(agent_user);
CREATE INDEX IF NOT EXISTS idx_queue_date ON qa_queue(call_date);
CREATE INDEX IF NOT EXISTS idx_queue_campaign ON qa_queue(campaign_id);
-- ============================================================
-- Table: qa_transcripts
-- Purpose: Stores transcription results
-- Populated by: qa_transcribe.py
-- ============================================================
CREATE TABLE IF NOT EXISTS qa_transcripts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
queue_id INTEGER NOT NULL, -- FK to qa_queue
recording_id TEXT NOT NULL, -- from qa_queue
-- Transcription results
transcript TEXT NOT NULL, -- full transcript text
language TEXT, -- detected language code (en, it, es)
language_prob REAL, -- detection confidence 0.0-1.0
duration_audio REAL, -- audio duration in seconds
duration_proc REAL, -- processing time in seconds
word_count INTEGER, -- total words in transcript
-- Whisper metadata
model_name TEXT DEFAULT 'large-v3-turbo',
compute_type TEXT DEFAULT 'int8',
-- File reference
transcript_file TEXT, -- path to .txt file on disk
-- Timestamps
created_at TEXT NOT NULL DEFAULT (datetime('now')),
FOREIGN KEY (queue_id) REFERENCES qa_queue(id)
);
CREATE INDEX IF NOT EXISTS idx_transcript_recording ON qa_transcripts(recording_id);
CREATE INDEX IF NOT EXISTS idx_transcript_queue ON qa_transcripts(queue_id);
-- ============================================================
-- Table: qa_scores
-- Purpose: Stores AI quality scores for each call
-- Populated by: qa_score.py
-- ============================================================
CREATE TABLE IF NOT EXISTS qa_scores (
id INTEGER PRIMARY KEY AUTOINCREMENT,
queue_id INTEGER NOT NULL, -- FK to qa_queue
transcript_id INTEGER NOT NULL, -- FK to qa_transcripts
recording_id TEXT NOT NULL,
-- Agent info (denormalized for fast queries)
agent_user TEXT NOT NULL,
agent_name TEXT,
campaign_id TEXT,
call_date TEXT,
-- Individual category scores (1-5 scale)
score_greeting INTEGER CHECK(score_greeting BETWEEN 1 AND 5),
score_customer_id INTEGER CHECK(score_customer_id BETWEEN 1 AND 5),
score_needs INTEGER CHECK(score_needs BETWEEN 1 AND 5),
score_product INTEGER CHECK(score_product BETWEEN 1 AND 5),
score_communication INTEGER CHECK(score_communication BETWEEN 1 AND 5),
score_resolution INTEGER CHECK(score_resolution BETWEEN 1 AND 5),
score_closing INTEGER CHECK(score_closing BETWEEN 1 AND 5),
score_compliance INTEGER CHECK(score_compliance BETWEEN 1 AND 5),
-- Weighted total (calculated: 0-100)
total_score REAL NOT NULL,
-- AI feedback
summary TEXT, -- 2-3 sentence overall assessment
strengths TEXT, -- JSON array of strength points
improvements TEXT, -- JSON array of improvement areas
flagged INTEGER DEFAULT 0, -- 1 if score below threshold
flag_reason TEXT, -- why it was flagged
-- Scoring metadata
llm_provider TEXT, -- 'gemini' or 'groq'
llm_model TEXT, -- specific model used
scoring_time REAL, -- API call duration in seconds
raw_response TEXT, -- full LLM JSON response (for debugging)
-- Timestamps
created_at TEXT NOT NULL DEFAULT (datetime('now')),
FOREIGN KEY (queue_id) REFERENCES qa_queue(id),
FOREIGN KEY (transcript_id) REFERENCES qa_transcripts(id)
);
CREATE INDEX IF NOT EXISTS idx_scores_agent ON qa_scores(agent_user);
CREATE INDEX IF NOT EXISTS idx_scores_date ON qa_scores(call_date);
CREATE INDEX IF NOT EXISTS idx_scores_campaign ON qa_scores(campaign_id);
CREATE INDEX IF NOT EXISTS idx_scores_flagged ON qa_scores(flagged) WHERE flagged = 1;
CREATE INDEX IF NOT EXISTS idx_scores_total ON qa_scores(total_score);
-- ============================================================
-- Table: qa_daily_stats
-- Purpose: Pre-aggregated daily statistics per agent
-- Populated by: qa_score.py (at end of nightly batch)
-- ============================================================
CREATE TABLE IF NOT EXISTS qa_daily_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
-- Dimensions
stat_date TEXT NOT NULL, -- YYYY-MM-DD
agent_user TEXT NOT NULL,
agent_name TEXT,
campaign_id TEXT,
-- Aggregate scores
calls_scored INTEGER DEFAULT 0,
avg_total_score REAL,
min_total_score REAL,
max_total_score REAL,
-- Category averages
avg_greeting REAL,
avg_customer_id REAL,
avg_needs REAL,
avg_product REAL,
avg_communication REAL,
avg_resolution REAL,
avg_closing REAL,
avg_compliance REAL,
-- Flags
flagged_calls INTEGER DEFAULT 0, -- count of calls below threshold
-- Processing stats
calls_queued INTEGER DEFAULT 0,
calls_skipped INTEGER DEFAULT 0, -- too short, errors
calls_failed INTEGER DEFAULT 0, -- processing errors
-- Timestamps
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT,
UNIQUE(stat_date, agent_user, campaign_id)
);
CREATE INDEX IF NOT EXISTS idx_daily_date ON qa_daily_stats(stat_date);
CREATE INDEX IF NOT EXISTS idx_daily_agent ON qa_daily_stats(agent_user);
-- ============================================================
-- Table: qa_config
-- Purpose: Runtime configuration (thresholds, API keys reference)
-- ============================================================
CREATE TABLE IF NOT EXISTS qa_config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
description TEXT,
updated_at TEXT DEFAULT (datetime('now'))
);
-- Default configuration values
INSERT OR IGNORE INTO qa_config (key, value, description) VALUES
('flag_threshold', '60', 'Calls scoring below this are flagged for review'),
('min_call_duration', '30', 'Minimum call duration in seconds to process'),
('max_retries', '3', 'Maximum retry attempts for failed processing'),
('batch_size', '100', 'Number of calls to process per batch cycle'),
('llm_primary', 'gemini', 'Primary LLM provider (gemini or groq)'),
('llm_fallback', 'groq', 'Fallback LLM provider'),
('whisper_model', 'large-v3-turbo', 'Faster-Whisper model name'),
('whisper_threads', '4', 'CPU threads for Whisper transcription'),
('whisper_compute_type', 'int8', 'Whisper compute type'),
('load_avg_max', '4.0', 'Pause processing if load average exceeds this'),
('recording_path', '/var/spool/asterisk/monitorDONE', 'Path to ViciDial recordings'),
('transcript_path', '/mnt/qa/transcripts', 'Path to store transcript files');
SQLEOF
Initialize the Database
# Create the database with the schema
sqlite3 /mnt/qa/db/qa.db < /root/qa-pipeline/schema.sql
# Verify tables were created
sqlite3 /mnt/qa/db/qa.db ".tables"
# Expected: qa_config qa_daily_stats qa_queue qa_scores qa_transcripts
# Verify WAL mode is active
sqlite3 /mnt/qa/db/qa.db "PRAGMA journal_mode;"
# Expected: wal
# Check default config values
sqlite3 /mnt/qa/db/qa.db "SELECT key, value FROM qa_config;"
Schema Diagram
┌─────────────┐ ┌──────────────────┐ ┌────────────────┐
│ qa_queue │────→│ qa_transcripts │────→│ qa_scores │
│ │ │ │ │ │
│ recording_id│ │ queue_id (FK) │ │ queue_id (FK) │
│ filename │ │ transcript │ │ transcript_id │
│ agent_user │ │ language │ │ score_greeting │
│ campaign_id │ │ word_count │ │ score_needs │
│ call_length │ │ duration_proc │ │ total_score │
│ status │ │ │ │ flagged │
│ call_date │ │ │ │ summary │
└─────────────┘ └──────────────────┘ └────────────────┘
│
▼
┌────────────────┐
│ qa_daily_stats │
│ │
│ stat_date │
│ agent_user │
│ avg_total_score│
│ flagged_calls │
└────────────────┘
8. Queue Manager Script
Purpose
The queue manager runs every 5 minutes during business hours. It scans ViciDial's recording_log table for new recordings that have not yet been queued, enriches them with call metadata from vicidial_closer_log, and inserts them into the qa_queue table. It does not perform any CPU-intensive work — just lightweight database queries and inserts.
Configuration Module
Create the shared configuration used by all pipeline scripts:
cat > /root/qa-pipeline/qa_config.py << 'PYEOF'
"""
QA Pipeline — Shared Configuration
All pipeline scripts import settings from here.
"""
import os
import sqlite3
from pathlib import Path
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv(Path(__file__).parent / ".env")
# ── Paths ────────────────────────────────────────────────────
QA_BASE_DIR = Path("/mnt/qa")
QA_DB_PATH = QA_BASE_DIR / "db" / "qa.db"
QA_TRANSCRIPT_DIR = QA_BASE_DIR / "transcripts"
QA_MODEL_DIR = QA_BASE_DIR / "models"
QA_LOG_DIR = QA_BASE_DIR / "logs"
RECORDING_PATH = Path("/var/spool/asterisk/monitorDONE")
SCRIPT_DIR = Path("/root/qa-pipeline")
# ── ViciDial Database ────────────────────────────────────────
VICIDIAL_DB = {
"host": os.getenv("VICIDIAL_DB_HOST", "127.0.0.1"),
"user": os.getenv("VICIDIAL_DB_USER", "cron"),
"password": os.getenv("VICIDIAL_DB_PASS", ""),
"database": os.getenv("VICIDIAL_DB_NAME", "asterisk"),
"charset": "utf8mb4",
"connection_timeout": 10,
}
# ── Whisper Settings ─────────────────────────────────────────
WHISPER_MODEL = "large-v3-turbo"
WHISPER_COMPUTE_TYPE = "int8"
WHISPER_THREADS = 4
WHISPER_BEAM_SIZE = 5
WHISPER_LANGUAGE = "en" # Set to None for auto-detect
# ── LLM API Settings ────────────────────────────────────────
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "")
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "")
LLM_PRIMARY = "gemini"
LLM_FALLBACK = "groq"
# ── Processing Limits ────────────────────────────────────────
MIN_CALL_DURATION = 30 # seconds — skip calls shorter than this
FLAG_THRESHOLD = 60 # score below this = flagged for review
MAX_RETRIES = 3 # retry failed items up to this many times
BATCH_SIZE = 100 # process this many per cycle
LOAD_AVG_MAX = 4.0 # pause if system load exceeds this
# ── Logging ──────────────────────────────────────────────────
LOG_FILE = QA_LOG_DIR / "qa-pipeline.log"
LOG_FORMAT = "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
def get_qa_db() -> sqlite3.Connection:
"""Get a connection to the QA SQLite database."""
conn = sqlite3.connect(str(QA_DB_PATH), timeout=30)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
conn.execute("PRAGMA cache_size=-64000")
return conn
def get_config_value(key: str, default: str = None) -> str:
"""Read a config value from the qa_config table."""
try:
conn = get_qa_db()
row = conn.execute(
"SELECT value FROM qa_config WHERE key = ?", (key,)
).fetchone()
conn.close()
return row["value"] if row else default
except Exception:
return default
PYEOF
Queue Manager Script
cat > /root/qa-pipeline/qa_queue.py << 'PYEOF'
#!/usr/bin/env python3
"""
QA Pipeline — Queue Manager
Scans for new ViciDial recordings and adds them to the processing queue.
Run: every 5 minutes via cron during business hours
*/5 8-22 * * * /root/qa-pipeline/venv/bin/python /root/qa-pipeline/qa_queue.py
Or run manually:
/root/qa-pipeline/venv/bin/python /root/qa-pipeline/qa_queue.py
"""
import sys
import logging
import mysql.connector
from datetime import datetime, timedelta
from pathlib import Path
# Add script directory to path
sys.path.insert(0, str(Path(__file__).parent))
from qa_config import (
get_qa_db, VICIDIAL_DB, RECORDING_PATH,
MIN_CALL_DURATION, LOG_FILE, LOG_FORMAT, LOG_DATE_FORMAT
)
# ── Logging Setup ────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format=LOG_FORMAT,
datefmt=LOG_DATE_FORMAT,
handlers=[
logging.FileHandler(str(LOG_FILE)),
logging.StreamHandler()
]
)
logger = logging.getLogger("qa_queue")
def get_vicidial_db():
"""Connect to the ViciDial MySQL/MariaDB database (read-only queries)."""
return mysql.connector.connect(**VICIDIAL_DB)
def get_last_queued_time(qa_conn) -> str:
"""Get the timestamp of the most recently queued recording."""
row = qa_conn.execute(
"SELECT MAX(call_date) as last_date FROM qa_queue"
).fetchone()
if row and row["last_date"]:
return row["last_date"]
# Default: look back 24 hours on first run
return (datetime.now() - timedelta(hours=24)).strftime("%Y-%m-%d %H:%M:%S")
def get_queued_recording_ids(qa_conn) -> set:
"""Get all recording_ids already in the queue (for dedup)."""
rows = qa_conn.execute("SELECT recording_id FROM qa_queue").fetchall()
return {str(row["recording_id"]) for row in rows}
def fetch_new_recordings(vici_conn, since: str, existing_ids: set) -> list:
"""
Query ViciDial for recordings created since the given timestamp.
Joins recording_log with vicidial_closer_log to get call metadata.
Only includes inbound calls (closer_log) with duration >= MIN_CALL_DURATION.
"""
cursor = vici_conn.cursor(dictionary=True)
query = """
SELECT
r.recording_id,
r.filename,
r.location AS filepath,
r.start_time,
r.length_in_sec,
r.vicidial_id,
r.lead_id,
cl.uniqueid,
cl.phone_number,
cl.user AS agent_user,
cl.campaign_id,
cl.status AS call_status,
cl.length_in_sec AS call_length,
cl.call_date,
vu.full_name AS agent_name
FROM recording_log r
LEFT JOIN vicidial_closer_log cl
ON r.vicidial_id = cl.closecallid
LEFT JOIN vicidial_users vu
ON cl.user = vu.user
WHERE r.start_time >= %s
AND r.length_in_sec >= %s
ORDER BY r.start_time ASC
LIMIT 2000
"""
cursor.execute(query, (since, MIN_CALL_DURATION))
recordings = cursor.fetchall()
cursor.close()
# Filter out already-queued recordings
new_recordings = []
for rec in recordings:
rec_id = str(rec["recording_id"])
if rec_id not in existing_ids:
new_recordings.append(rec)
return new_recordings
def verify_recording_file(rec: dict) -> str:
"""
Verify that the recording file exists on disk.
Returns the full file path if found, None otherwise.
ViciDial stores recordings in various formats. Check common patterns.
"""
# Try the filepath from recording_log first
if rec.get("filepath"):
filepath = Path(rec["filepath"])
if filepath.exists():
return str(filepath)
# Try constructing path from filename
if rec.get("filename"):
for ext in [".wav", ".mp3", ".gsm"]:
candidate = RECORDING_PATH / f"{rec['filename']}{ext}"
if candidate.exists():
return str(candidate)
# Try without adding extension (filename may already include it)
candidate = RECORDING_PATH / rec["filename"]
if candidate.exists():
return str(candidate)
return None
def queue_recordings(qa_conn, recordings: list) -> tuple:
"""Insert new recordings into the qa_queue table. Returns (queued, skipped)."""
queued = 0
skipped = 0
for rec in recordings:
# Verify file exists on disk
filepath = verify_recording_file(rec)
if not filepath:
logger.debug(
f"Recording {rec['recording_id']} file not found, skipping"
)
skipped += 1
continue
# Determine initial status
call_length = rec.get("call_length") or rec.get("length_in_sec") or 0
if call_length < MIN_CALL_DURATION:
status = "skipped"
else:
status = "pending"
try:
qa_conn.execute(
"""
INSERT OR IGNORE INTO qa_queue (
recording_id, filename, uniqueid, vicidial_id, lead_id,
call_date, phone_number, agent_user, agent_name,
campaign_id, call_length, call_status, status
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
str(rec["recording_id"]),
filepath,
rec.get("uniqueid"),
rec.get("vicidial_id"),
rec.get("lead_id"),
str(rec.get("call_date", "")),
rec.get("phone_number"),
rec.get("agent_user"),
rec.get("agent_name"),
rec.get("campaign_id"),
call_length,
rec.get("call_status"),
status,
),
)
if status == "pending":
queued += 1
else:
skipped += 1
except Exception as e:
logger.error(f"Failed to queue recording {rec['recording_id']}: {e}")
skipped += 1
qa_conn.commit()
return queued, skipped
def run():
"""Main entry point — scan for new recordings and queue them."""
logger.info("Queue manager starting...")
# Connect to databases
qa_conn = get_qa_db()
try:
vici_conn = get_vicidial_db()
except Exception as e:
logger.error(f"Cannot connect to ViciDial database: {e}")
qa_conn.close()
return
try:
# Get the starting point for the scan
since = get_last_queued_time(qa_conn)
existing_ids = get_queued_recording_ids(qa_conn)
logger.info(
f"Scanning for recordings since {since} "
f"({len(existing_ids)} already queued)"
)
# Fetch new recordings from ViciDial
new_recordings = fetch_new_recordings(vici_conn, since, existing_ids)
if not new_recordings:
logger.info("No new recordings found")
return
logger.info(f"Found {len(new_recordings)} new recordings")
# Queue them
queued, skipped = queue_recordings(qa_conn, new_recordings)
logger.info(f"Queued: {queued}, Skipped: {skipped}")
# Log queue statistics
stats = qa_conn.execute(
"""
SELECT status, COUNT(*) as cnt
FROM qa_queue
GROUP BY status
"""
).fetchall()
for row in stats:
logger.info(f" Queue status '{row['status']}': {row['cnt']}")
except Exception as e:
logger.error(f"Queue manager error: {e}", exc_info=True)
finally:
vici_conn.close()
qa_conn.close()
logger.info("Queue manager finished")
if __name__ == "__main__":
run()
PYEOF
chmod +x /root/qa-pipeline/qa_queue.py
Cron Entry for Queue Manager
# Add to crontab — runs every 5 minutes during business hours
# The queue manager is lightweight (just DB queries), safe to run frequently
crontab -l 2>/dev/null | grep -v qa_queue > /tmp/cron_tmp
echo '*/5 8-22 * * * /root/qa-pipeline/venv/bin/python /root/qa-pipeline/qa_queue.py >> /mnt/qa/logs/qa-queue-cron.log 2>&1' >> /tmp/cron_tmp
crontab /tmp/cron_tmp
rm /tmp/cron_tmp
Testing the Queue Manager
# Run manually to verify it works
source /root/qa-pipeline/venv/bin/activate
python /root/qa-pipeline/qa_queue.py
# Check what was queued
sqlite3 /mnt/qa/db/qa.db "SELECT COUNT(*) as total, status FROM qa_queue GROUP BY status;"
# View the most recent entries
sqlite3 /mnt/qa/db/qa.db \
"SELECT recording_id, agent_user, campaign_id, call_length, status
FROM qa_queue ORDER BY queued_at DESC LIMIT 10;"
9. Transcription Engine
Purpose
The transcription engine is the CPU-intensive core of the pipeline. It reads audio files from ViciDial's recording directory, transcribes them using Faster-Whisper, and stores the text output. It is designed to run only during off-hours (22:00-07:00) with strict resource limits to guarantee zero impact on any residual live call handling.
Resource Control Strategy
The transcription engine uses four layers of resource isolation:
| Control | Command | Effect |
|---|---|---|
| CPU priority | nice -n 19 |
Lowest scheduling priority — yields to any other process |
| I/O priority | ionice -c3 |
Idle I/O class — only uses disk when nothing else needs it |
| Core pinning | taskset -c 6-11 |
Pins to last 6 cores, leaving cores 0-5 for Asterisk/MySQL |
| Load monitor | Custom check | Pauses if system load average exceeds threshold |
These are applied both at the process level (when launching the script) and internally (the script checks load average before each batch).
Transcription Script
cat > /root/qa-pipeline/qa_transcribe.py << 'PYEOF'
#!/usr/bin/env python3
"""
QA Pipeline — Transcription Engine
Batch-transcribes queued recordings using Faster-Whisper.
Run: nightly via cron at 22:00 (or manually for testing)
nice -n 19 ionice -c3 taskset -c 6-11 \
/root/qa-pipeline/venv/bin/python /root/qa-pipeline/qa_transcribe.py
Resource controls:
- nice -n 19: lowest CPU priority
- ionice -c3: idle I/O class
- taskset -c 6-11: pin to cores 6-11 (leave 0-5 for production)
"""
import os
import sys
import time
import signal
import logging
from datetime import datetime
from pathlib import Path
# Set model cache before importing faster_whisper
os.environ["HF_HOME"] = "/mnt/qa/models"
os.environ["HUGGINGFACE_HUB_CACHE"] = "/mnt/qa/models"
sys.path.insert(0, str(Path(__file__).parent))
from qa_config import (
get_qa_db, QA_TRANSCRIPT_DIR,
WHISPER_MODEL, WHISPER_COMPUTE_TYPE, WHISPER_THREADS,
WHISPER_BEAM_SIZE, WHISPER_LANGUAGE,
LOAD_AVG_MAX, BATCH_SIZE, MAX_RETRIES,
LOG_FILE, LOG_FORMAT, LOG_DATE_FORMAT
)
# ── Logging ──────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format=LOG_FORMAT,
datefmt=LOG_DATE_FORMAT,
handlers=[
logging.FileHandler(str(LOG_FILE)),
logging.StreamHandler()
]
)
logger = logging.getLogger("qa_transcribe")
# ── Graceful Shutdown ────────────────────────────────────────
shutdown_requested = False
def handle_signal(signum, frame):
global shutdown_requested
logger.info(f"Received signal {signum}, finishing current file then stopping...")
shutdown_requested = True
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
def check_system_load() -> bool:
"""
Check if system load average is below threshold.
Returns True if safe to continue processing, False if overloaded.
"""
load_1min = os.getloadavg()[0]
if load_1min > LOAD_AVG_MAX:
logger.warning(
f"System load {load_1min:.1f} exceeds threshold {LOAD_AVG_MAX}. "
f"Pausing for 60 seconds..."
)
return False
return True
def load_whisper_model():
"""Load the Whisper model. This takes 10-30 seconds on first call."""
from faster_whisper import WhisperModel
logger.info(
f"Loading Whisper model: {WHISPER_MODEL} "
f"(compute_type={WHISPER_COMPUTE_TYPE}, threads={WHISPER_THREADS})"
)
start = time.time()
model = WhisperModel(
WHISPER_MODEL,
device="cpu",
compute_type=WHISPER_COMPUTE_TYPE,
cpu_threads=WHISPER_THREADS,
download_root="/mnt/qa/models"
)
elapsed = time.time() - start
logger.info(f"Model loaded in {elapsed:.1f}s")
return model
def transcribe_file(model, audio_path: str) -> dict:
"""
Transcribe a single audio file.
Returns dict with:
transcript: str — full text
language: str — detected language code
language_prob: float — detection confidence
duration_audio: float — audio length in seconds
duration_proc: float — processing time in seconds
word_count: int — total words
"""
start = time.time()
transcribe_kwargs = {
"beam_size": WHISPER_BEAM_SIZE,
"vad_filter": True,
"vad_parameters": {
"min_silence_duration_ms": 500,
"speech_pad_ms": 200,
},
"word_timestamps": False, # Not needed for QA, saves time
"condition_on_previous_text": True,
}
# Set language if configured (skip auto-detection)
if WHISPER_LANGUAGE:
transcribe_kwargs["language"] = WHISPER_LANGUAGE
segments, info = model.transcribe(audio_path, **transcribe_kwargs)
# Collect all segments into full transcript
full_text_parts = []
for segment in segments:
text = segment.text.strip()
if text:
full_text_parts.append(text)
full_text = " ".join(full_text_parts)
elapsed = time.time() - start
return {
"transcript": full_text,
"language": info.language,
"language_prob": round(info.language_probability, 3),
"duration_audio": round(info.duration, 2),
"duration_proc": round(elapsed, 2),
"word_count": len(full_text.split()) if full_text else 0,
}
def save_transcript_file(recording_id: str, call_date: str, text: str) -> str:
"""
Save transcript text to a dated directory.
Returns the file path.
"""
# Extract date for directory organization
try:
date_str = call_date[:10] # YYYY-MM-DD
except (TypeError, IndexError):
date_str = datetime.now().strftime("%Y-%m-%d")
date_dir = QA_TRANSCRIPT_DIR / date_str
date_dir.mkdir(parents=True, exist_ok=True)
filepath = date_dir / f"{recording_id}.txt"
filepath.write_text(text, encoding="utf-8")
return str(filepath)
def process_queue(model):
"""Process all pending items in the queue."""
qa_conn = get_qa_db()
# Get pending items (oldest first)
pending = qa_conn.execute(
"""
SELECT id, recording_id, filename, call_date, agent_user,
campaign_id, retry_count
FROM qa_queue
WHERE status IN ('pending', 'error')
AND retry_count < ?
ORDER BY call_date ASC
LIMIT ?
""",
(MAX_RETRIES, BATCH_SIZE)
).fetchall()
if not pending:
logger.info("No pending items in queue")
qa_conn.close()
return 0
logger.info(f"Processing {len(pending)} queued recordings")
processed = 0
errors = 0
for item in pending:
# Check for shutdown signal
if shutdown_requested:
logger.info("Shutdown requested, stopping after current batch")
break
# Check system load before each file
while not check_system_load():
if shutdown_requested:
break
time.sleep(60)
if shutdown_requested:
break
queue_id = item["id"]
recording_id = item["recording_id"]
audio_path = item["filename"]
logger.info(
f"Transcribing [{processed+1}/{len(pending)}]: "
f"{recording_id} ({item['agent_user']}/{item['campaign_id']})"
)
# Mark as transcribing
qa_conn.execute(
"UPDATE qa_queue SET status = 'transcribing', started_at = datetime('now') WHERE id = ?",
(queue_id,)
)
qa_conn.commit()
try:
# Verify file still exists
if not Path(audio_path).exists():
raise FileNotFoundError(f"Recording file not found: {audio_path}")
# Transcribe
result = transcribe_file(model, audio_path)
if not result["transcript"] or result["word_count"] < 3:
# Empty or near-empty transcript — likely silence or noise
qa_conn.execute(
"""UPDATE qa_queue
SET status = 'skipped',
error_message = 'Empty or near-empty transcript',
completed_at = datetime('now')
WHERE id = ?""",
(queue_id,)
)
qa_conn.commit()
logger.info(f" Skipped {recording_id}: empty transcript")
continue
# Save transcript file to disk
transcript_file = save_transcript_file(
recording_id, item["call_date"], result["transcript"]
)
# Store in database
qa_conn.execute(
"""
INSERT INTO qa_transcripts (
queue_id, recording_id, transcript, language, language_prob,
duration_audio, duration_proc, word_count,
model_name, compute_type, transcript_file
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
queue_id,
recording_id,
result["transcript"],
result["language"],
result["language_prob"],
result["duration_audio"],
result["duration_proc"],
result["word_count"],
WHISPER_MODEL,
WHISPER_COMPUTE_TYPE,
transcript_file,
)
)
# Update queue status
qa_conn.execute(
"""UPDATE qa_queue
SET status = 'transcribed', completed_at = datetime('now')
WHERE id = ?""",
(queue_id,)
)
qa_conn.commit()
speed = result["duration_audio"] / result["duration_proc"] if result["duration_proc"] > 0 else 0
logger.info(
f" Done: {result['word_count']} words, "
f"{result['duration_audio']:.0f}s audio in {result['duration_proc']:.1f}s "
f"({speed:.1f}x real-time), lang={result['language']}"
)
processed += 1
except Exception as e:
logger.error(f" Error transcribing {recording_id}: {e}", exc_info=True)
qa_conn.execute(
"""UPDATE qa_queue
SET status = 'error',
error_message = ?,
retry_count = retry_count + 1
WHERE id = ?""",
(str(e)[:500], queue_id)
)
qa_conn.commit()
errors += 1
qa_conn.close()
logger.info(f"Transcription complete: {processed} processed, {errors} errors")
return processed
def run():
"""Main entry point — load model and process the queue."""
logger.info("=" * 60)
logger.info("Transcription engine starting")
logger.info("=" * 60)
# Initial load check
if not check_system_load():
logger.warning("System overloaded at startup. Waiting 5 minutes...")
time.sleep(300)
if not check_system_load():
logger.error("System still overloaded. Aborting.")
return
# Load model (this is the slow part — ~10-30 seconds)
model = load_whisper_model()
# Process all pending items
total_processed = 0
batch_num = 0
while not shutdown_requested:
batch_num += 1
logger.info(f"--- Batch {batch_num} ---")
processed = process_queue(model)
total_processed += processed
if processed == 0:
# No more items to process
break
# Brief pause between batches
time.sleep(2)
logger.info(f"Total transcribed: {total_processed} recordings")
logger.info("Transcription engine finished")
if __name__ == "__main__":
run()
PYEOF
chmod +x /root/qa-pipeline/qa_transcribe.py
Running the Transcription Engine
# Manual test run (with resource limits)
nice -n 19 ionice -c3 taskset -c 6-11 \
/root/qa-pipeline/venv/bin/python /root/qa-pipeline/qa_transcribe.py
# Check results
sqlite3 /mnt/qa/db/qa.db \
"SELECT COUNT(*) as total,
ROUND(AVG(duration_proc), 1) as avg_proc_time,
ROUND(AVG(word_count), 0) as avg_words,
ROUND(AVG(duration_audio/duration_proc), 1) as avg_speed
FROM qa_transcripts;"
# View a sample transcript
sqlite3 /mnt/qa/db/qa.db \
"SELECT recording_id, word_count, language,
SUBSTR(transcript, 1, 200) as preview
FROM qa_transcripts
ORDER BY created_at DESC LIMIT 3;"
# Check transcript files on disk
ls -la /mnt/qa/transcripts/$(date +%Y-%m-%d)/ | head -10
10. QA Scoring Engine
Purpose
The scoring engine reads completed transcripts, sends them to an LLM (Gemini or Groq) with a detailed scoring prompt, and parses the structured JSON response into the 8-category scorecard. It runs immediately after the transcription engine completes.
The QA Scorecard
Each call is evaluated across 8 categories with the following weights:
| # | Category | Weight | Score Range | What the AI Evaluates |
|---|---|---|---|---|
| 1 | Greeting / Opening | 10% | 1-5 | Did the agent identify themselves and the company? Professional, warm tone? |
| 2 | Customer Identification | 10% | 1-5 | Did the agent verify the caller's identity or account? Asked for reference number? |
| 3 | Needs Assessment | 15% | 1-5 | Did the agent ask appropriate questions? Understood the issue before offering solutions? |
| 4 | Product Knowledge | 15% | 1-5 | Did the agent provide accurate information? Confident answers? No misinformation? |
| 5 | Communication | 15% | 1-5 | Clear speech, appropriate pace, no excessive jargon, active listening signals? |
| 6 | Resolution / Outcome | 20% | 1-5 | Was the issue resolved? If not, were proper next steps given? Did the agent take ownership? |
| 7 | Closing | 10% | 1-5 | Summarized actions taken, confirmed customer satisfaction, professional goodbye? |
| 8 | Compliance | 5% | 1-5 | No prohibited language, followed required scripts/disclosures if applicable? |
Weighted total formula:
Total = (greeting * 0.10 + customer_id * 0.10 + needs * 0.15 + product * 0.15
+ communication * 0.15 + resolution * 0.20 + closing * 0.10
+ compliance * 0.05) * 20
This maps the 1-5 scale to 0-100. A perfect score (all 5s) = 100. A score of all 3s = 60.
Scoring Script
cat > /root/qa-pipeline/qa_score.py << 'PYEOF'
#!/usr/bin/env python3
"""
QA Pipeline — AI Scoring Engine
Sends transcripts to Gemini (or Groq fallback) for quality scoring.
Run: after qa_transcribe.py completes (chained in nightly batch)
/root/qa-pipeline/venv/bin/python /root/qa-pipeline/qa_score.py
"""
import sys
import json
import time
import logging
import httpx
from datetime import datetime
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from qa_config import (
get_qa_db, GEMINI_API_KEY, GROQ_API_KEY,
LLM_PRIMARY, LLM_FALLBACK,
FLAG_THRESHOLD, BATCH_SIZE, MAX_RETRIES,
LOG_FILE, LOG_FORMAT, LOG_DATE_FORMAT
)
# ── Logging ──────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format=LOG_FORMAT,
datefmt=LOG_DATE_FORMAT,
handlers=[
logging.FileHandler(str(LOG_FILE)),
logging.StreamHandler()
]
)
logger = logging.getLogger("qa_score")
# ── Category Weights ─────────────────────────────────────────
CATEGORY_WEIGHTS = {
"greeting": 0.10,
"customer_id": 0.10,
"needs": 0.15,
"product": 0.15,
"communication": 0.15,
"resolution": 0.20,
"closing": 0.10,
"compliance": 0.05,
}
# ── Scoring Prompt ───────────────────────────────────────────
SCORING_PROMPT = """You are an expert call center quality assurance evaluator. Analyze the following call transcript and score the agent's performance.
## Scoring Categories
Score each category from 1 to 5:
- 1 = Very Poor (completely failed this aspect)
- 2 = Poor (significant issues)
- 3 = Acceptable (meets minimum standards)
- 4 = Good (above average, minor issues only)
- 5 = Excellent (exemplary performance)
### Categories:
1. **greeting** (10%): Did the agent introduce themselves and the company clearly? Was the tone professional and welcoming?
2. **customer_id** (10%): Did the agent verify the caller's identity, account number, or reference? If not applicable (e.g., new inquiry), did they collect necessary details?
3. **needs** (15%): Did the agent ask appropriate discovery questions? Did they listen and understand the customer's issue before jumping to solutions?
4. **product** (15%): Did the agent demonstrate accurate knowledge? Were answers confident and correct? Any misinformation given?
5. **communication** (15%): Was the agent clear and easy to understand? Appropriate pace? Used plain language (not excessive jargon)? Active listening signals (acknowledgments, paraphrasing)?
6. **resolution** (20%): Was the customer's issue resolved? If not fully resolved, were proper next steps clearly communicated? Did the agent take ownership rather than deflecting?
7. **closing** (10%): Did the agent summarize what was discussed/agreed? Confirm the customer was satisfied? Professional sign-off?
8. **compliance** (5%): No inappropriate language, no unauthorized promises, no policy violations evident in the transcript?
## Call Context
- Agent: {agent_name} ({agent_user})
- Campaign/Group: {campaign_id}
- Call Duration: {call_length} seconds
- Call Disposition: {call_status}
## Transcript
{transcript}
## Response Format
Respond with ONLY a valid JSON object (no markdown, no code fences, no explanation outside the JSON). Use this exact structure:
{{
"scores": {{
"greeting": <1-5>,
"customer_id": <1-5>,
"needs": <1-5>,
"product": <1-5>,
"communication": <1-5>,
"resolution": <1-5>,
"closing": <1-5>,
"compliance": <1-5>
}},
"summary": "<2-3 sentence overall assessment of the call>",
"strengths": ["<strength 1>", "<strength 2>"],
"improvements": ["<area for improvement 1>", "<area for improvement 2>"]
}}
IMPORTANT: Return ONLY the JSON object. No other text."""
def call_gemini(prompt: str) -> dict:
"""
Call Google Gemini API.
Returns parsed JSON response or raises exception.
"""
url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash-lite:generateContent?key={GEMINI_API_KEY}"
payload = {
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {
"temperature": 0.3,
"maxOutputTokens": 1024,
"responseMimeType": "application/json",
},
}
with httpx.Client(timeout=30) as client:
response = client.post(url, json=payload)
response.raise_for_status()
data = response.json()
# Extract text from Gemini response
try:
text = data["candidates"][0]["content"]["parts"][0]["text"]
except (KeyError, IndexError) as e:
raise ValueError(f"Unexpected Gemini response structure: {e}")
# Parse JSON (Gemini with responseMimeType should return clean JSON)
return json.loads(text)
def call_groq(prompt: str) -> dict:
"""
Call Groq API with Llama 3.3 70B.
Returns parsed JSON response or raises exception.
"""
url = "https://api.groq.com/openai/v1/chat/completions"
payload = {
"model": "llama-3.3-70b-versatile",
"messages": [
{
"role": "system",
"content": "You are a call center QA evaluator. Always respond with valid JSON only.",
},
{"role": "user", "content": prompt},
],
"temperature": 0.3,
"max_tokens": 1024,
"response_format": {"type": "json_object"},
}
headers = {
"Authorization": f"Bearer {GROQ_API_KEY}",
"Content-Type": "application/json",
}
with httpx.Client(timeout=30) as client:
response = client.post(url, json=payload, headers=headers)
response.raise_for_status()
data = response.json()
text = data["choices"][0]["message"]["content"]
return json.loads(text)
def score_transcript(transcript: str, call_info: dict) -> dict:
"""
Score a transcript using the primary LLM provider, with fallback.
Returns dict with scores, summary, strengths, improvements, and metadata.
"""
# Build the prompt with call context
prompt = SCORING_PROMPT.format(
agent_name=call_info.get("agent_name", "Unknown"),
agent_user=call_info.get("agent_user", "Unknown"),
campaign_id=call_info.get("campaign_id", "Unknown"),
call_length=call_info.get("call_length", 0),
call_status=call_info.get("call_status", "Unknown"),
transcript=transcript,
)
# Try primary provider
providers = [
(LLM_PRIMARY, call_gemini if LLM_PRIMARY == "gemini" else call_groq),
(LLM_FALLBACK, call_groq if LLM_FALLBACK == "groq" else call_gemini),
]
last_error = None
for provider_name, provider_func in providers:
try:
start = time.time()
result = provider_func(prompt)
elapsed = time.time() - start
# Validate the response has required fields
scores = result.get("scores", {})
required_keys = set(CATEGORY_WEIGHTS.keys())
if not required_keys.issubset(set(scores.keys())):
missing = required_keys - set(scores.keys())
raise ValueError(f"Missing score categories: {missing}")
# Validate score values are 1-5
for key, val in scores.items():
if key in CATEGORY_WEIGHTS:
if not isinstance(val, (int, float)) or val < 1 or val > 5:
scores[key] = max(1, min(5, int(val)))
# Calculate weighted total
total = sum(
scores[cat] * weight
for cat, weight in CATEGORY_WEIGHTS.items()
) * 20 # Scale from 1-5 to 0-100
return {
"scores": scores,
"total_score": round(total, 1),
"summary": result.get("summary", ""),
"strengths": result.get("strengths", []),
"improvements": result.get("improvements", []),
"provider": provider_name,
"model": "gemini-2.0-flash-lite" if provider_name == "gemini" else "llama-3.3-70b-versatile",
"scoring_time": round(elapsed, 2),
"raw_response": json.dumps(result),
}
except Exception as e:
logger.warning(f" {provider_name} failed: {e}")
last_error = e
continue
raise RuntimeError(f"All LLM providers failed. Last error: {last_error}")
def process_transcribed_queue():
"""Score all transcribed (but unscored) items."""
qa_conn = get_qa_db()
# Get transcribed items that need scoring
items = qa_conn.execute(
"""
SELECT q.id as queue_id, q.recording_id, q.agent_user, q.agent_name,
q.campaign_id, q.call_length, q.call_status, q.call_date,
t.id as transcript_id, t.transcript, t.word_count
FROM qa_queue q
JOIN qa_transcripts t ON t.queue_id = q.id
WHERE q.status = 'transcribed'
AND q.retry_count < ?
ORDER BY q.call_date ASC
LIMIT ?
""",
(MAX_RETRIES, BATCH_SIZE)
).fetchall()
if not items:
logger.info("No transcripts waiting for scoring")
qa_conn.close()
return 0
logger.info(f"Scoring {len(items)} transcripts")
scored = 0
errors = 0
for item in items:
queue_id = item["queue_id"]
recording_id = item["recording_id"]
logger.info(
f"Scoring [{scored+1}/{len(items)}]: {recording_id} "
f"({item['agent_user']}, {item['word_count']} words)"
)
# Mark as scoring
qa_conn.execute(
"UPDATE qa_queue SET status = 'scoring' WHERE id = ?",
(queue_id,)
)
qa_conn.commit()
try:
call_info = {
"agent_user": item["agent_user"],
"agent_name": item["agent_name"],
"campaign_id": item["campaign_id"],
"call_length": item["call_length"],
"call_status": item["call_status"],
}
result = score_transcript(item["transcript"], call_info)
# Determine if flagged
flagged = 1 if result["total_score"] < FLAG_THRESHOLD else 0
flag_reason = None
if flagged:
low_cats = [
cat for cat, score in result["scores"].items()
if score <= 2
]
flag_reason = f"Score {result['total_score']:.0f} below {FLAG_THRESHOLD}"
if low_cats:
flag_reason += f"; critical: {', '.join(low_cats)}"
# Insert score record
qa_conn.execute(
"""
INSERT INTO qa_scores (
queue_id, transcript_id, recording_id,
agent_user, agent_name, campaign_id, call_date,
score_greeting, score_customer_id, score_needs,
score_product, score_communication, score_resolution,
score_closing, score_compliance,
total_score, summary, strengths, improvements,
flagged, flag_reason,
llm_provider, llm_model, scoring_time, raw_response
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
queue_id, item["transcript_id"], recording_id,
item["agent_user"], item["agent_name"],
item["campaign_id"], item["call_date"],
result["scores"]["greeting"],
result["scores"]["customer_id"],
result["scores"]["needs"],
result["scores"]["product"],
result["scores"]["communication"],
result["scores"]["resolution"],
result["scores"]["closing"],
result["scores"]["compliance"],
result["total_score"],
result["summary"],
json.dumps(result["strengths"]),
json.dumps(result["improvements"]),
flagged, flag_reason,
result["provider"], result["model"],
result["scoring_time"],
result["raw_response"],
)
)
# Mark queue item as complete
qa_conn.execute(
"""UPDATE qa_queue
SET status = 'complete', completed_at = datetime('now')
WHERE id = ?""",
(queue_id,)
)
qa_conn.commit()
flag_marker = " [FLAGGED]" if flagged else ""
logger.info(
f" Score: {result['total_score']:.0f}/100 via {result['provider']} "
f"({result['scoring_time']:.1f}s){flag_marker}"
)
scored += 1
# Rate limiting — respect API quotas
# Gemini: 1000 req/day = ~1 req/1.4s (comfortable margin at 1 req/2s)
# Groq: varies by plan
time.sleep(1.5)
except Exception as e:
logger.error(f" Error scoring {recording_id}: {e}", exc_info=True)
qa_conn.execute(
"""UPDATE qa_queue
SET status = 'error',
error_message = ?,
retry_count = retry_count + 1
WHERE id = ?""",
(f"Scoring error: {str(e)[:400]}", queue_id)
)
qa_conn.commit()
errors += 1
# Back off on errors (might be rate limited)
time.sleep(5)
qa_conn.close()
logger.info(f"Scoring complete: {scored} scored, {errors} errors")
return scored
def update_daily_stats():
"""
Aggregate daily statistics after scoring is complete.
Recalculates stats for any date that had new scores today.
"""
qa_conn = get_qa_db()
today = datetime.now().strftime("%Y-%m-%d")
# Find dates that had scores created today
dates = qa_conn.execute(
"""
SELECT DISTINCT SUBSTR(call_date, 1, 10) as stat_date
FROM qa_scores
WHERE DATE(created_at) = ?
""",
(today,)
).fetchall()
if not dates:
logger.info("No new scores to aggregate")
qa_conn.close()
return
for date_row in dates:
stat_date = date_row["stat_date"]
if not stat_date:
continue
# Get per-agent aggregates for this date
agents = qa_conn.execute(
"""
SELECT
agent_user,
agent_name,
campaign_id,
COUNT(*) as calls_scored,
ROUND(AVG(total_score), 1) as avg_total_score,
ROUND(MIN(total_score), 1) as min_total_score,
ROUND(MAX(total_score), 1) as max_total_score,
ROUND(AVG(score_greeting), 2) as avg_greeting,
ROUND(AVG(score_customer_id), 2) as avg_customer_id,
ROUND(AVG(score_needs), 2) as avg_needs,
ROUND(AVG(score_product), 2) as avg_product,
ROUND(AVG(score_communication), 2) as avg_communication,
ROUND(AVG(score_resolution), 2) as avg_resolution,
ROUND(AVG(score_closing), 2) as avg_closing,
ROUND(AVG(score_compliance), 2) as avg_compliance,
SUM(flagged) as flagged_calls
FROM qa_scores
WHERE SUBSTR(call_date, 1, 10) = ?
GROUP BY agent_user, campaign_id
""",
(stat_date,)
).fetchall()
for agent in agents:
qa_conn.execute(
"""
INSERT INTO qa_daily_stats (
stat_date, agent_user, agent_name, campaign_id,
calls_scored, avg_total_score, min_total_score, max_total_score,
avg_greeting, avg_customer_id, avg_needs, avg_product,
avg_communication, avg_resolution, avg_closing, avg_compliance,
flagged_calls, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'))
ON CONFLICT(stat_date, agent_user, campaign_id)
DO UPDATE SET
calls_scored = excluded.calls_scored,
avg_total_score = excluded.avg_total_score,
min_total_score = excluded.min_total_score,
max_total_score = excluded.max_total_score,
avg_greeting = excluded.avg_greeting,
avg_customer_id = excluded.avg_customer_id,
avg_needs = excluded.avg_needs,
avg_product = excluded.avg_product,
avg_communication = excluded.avg_communication,
avg_resolution = excluded.avg_resolution,
avg_closing = excluded.avg_closing,
avg_compliance = excluded.avg_compliance,
flagged_calls = excluded.flagged_calls,
updated_at = datetime('now')
""",
(
stat_date, agent["agent_user"], agent["agent_name"],
agent["campaign_id"], agent["calls_scored"],
agent["avg_total_score"], agent["min_total_score"],
agent["max_total_score"],
agent["avg_greeting"], agent["avg_customer_id"],
agent["avg_needs"], agent["avg_product"],
agent["avg_communication"], agent["avg_resolution"],
agent["avg_closing"], agent["avg_compliance"],
agent["flagged_calls"],
)
)
qa_conn.commit()
logger.info(f"Updated daily stats for {stat_date}: {len(agents)} agent records")
# Also update queue-level stats
for date_row in dates:
stat_date = date_row["stat_date"]
if not stat_date:
continue
queue_stats = qa_conn.execute(
"""
SELECT
agent_user,
campaign_id,
SUM(CASE WHEN status = 'complete' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 'skipped' THEN 1 ELSE 0 END) as skipped,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as failed
FROM qa_queue
WHERE SUBSTR(call_date, 1, 10) = ?
GROUP BY agent_user, campaign_id
""",
(stat_date,)
).fetchall()
for qs in queue_stats:
qa_conn.execute(
"""
UPDATE qa_daily_stats
SET calls_queued = ?,
calls_skipped = ?,
calls_failed = ?
WHERE stat_date = ? AND agent_user = ? AND campaign_id = ?
""",
(
qs["completed"] + qs["skipped"] + qs["failed"],
qs["skipped"], qs["failed"],
stat_date, qs["agent_user"], qs["campaign_id"],
)
)
qa_conn.commit()
qa_conn.close()
logger.info("Daily stats aggregation complete")
def run():
"""Main entry point — score all pending transcripts."""
logger.info("=" * 60)
logger.info("QA Scoring engine starting")
logger.info("=" * 60)
total_scored = 0
batch_num = 0
while True:
batch_num += 1
logger.info(f"--- Scoring batch {batch_num} ---")
scored = process_transcribed_queue()
total_scored += scored
if scored == 0:
break
time.sleep(1)
# Update daily aggregates
logger.info("Updating daily statistics...")
update_daily_stats()
logger.info(f"Total scored: {total_scored}")
logger.info("QA Scoring engine finished")
if __name__ == "__main__":
run()
PYEOF
chmod +x /root/qa-pipeline/qa_score.py
Testing the Scoring Engine
# Run manually after transcription has completed
source /root/qa-pipeline/venv/bin/activate
python /root/qa-pipeline/qa_score.py
# View scores
sqlite3 /mnt/qa/db/qa.db \
"SELECT recording_id, agent_user, total_score, flagged,
score_greeting, score_needs, score_resolution, llm_provider
FROM qa_scores ORDER BY created_at DESC LIMIT 10;"
# View flagged calls
sqlite3 /mnt/qa/db/qa.db \
"SELECT recording_id, agent_user, total_score, flag_reason, summary
FROM qa_scores WHERE flagged = 1 ORDER BY total_score ASC LIMIT 10;"
# View daily stats
sqlite3 /mnt/qa/db/qa.db \
"SELECT stat_date, agent_user, calls_scored, avg_total_score, flagged_calls
FROM qa_daily_stats ORDER BY stat_date DESC LIMIT 20;"
11. API Service
Purpose
The API service is a lightweight FastAPI application that exposes QA data through REST endpoints. It runs continuously as a systemd service, providing data to Grafana dashboards, PHP admin pages, and email report generators. It reads from the SQLite database (read-only) and never interferes with the nightly batch processing thanks to SQLite's WAL mode.
API Service Script
cat > /root/qa-pipeline/qa_service.py << 'PYEOF'
#!/usr/bin/env python3
"""
QA Pipeline — REST API Service
Serves QA data via FastAPI on port 8085.
Run: as systemd service (qa-api.service)
/root/qa-pipeline/venv/bin/uvicorn qa_service:app --host 0.0.0.0 --port 8085
"""
import sys
import json
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
sys.path.insert(0, str(Path(__file__).parent))
from qa_config import get_qa_db, FLAG_THRESHOLD
# ── FastAPI App ──────────────────────────────────────────────
app = FastAPI(
title="QA Pipeline API",
description="Call quality scoring data for ViciDial",
version="1.0.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET"],
allow_headers=["*"],
)
# ── Response Models ──────────────────────────────────────────
class HealthResponse(BaseModel):
status: str
db_size_mb: float
total_scored: int
total_flagged: int
last_processed: Optional[str]
class DailySummary(BaseModel):
date: str
total_calls: int
scored_calls: int
avg_score: Optional[float]
flagged_calls: int
top_agent: Optional[str]
top_score: Optional[float]
bottom_agent: Optional[str]
bottom_score: Optional[float]
category_averages: dict
class AgentSummary(BaseModel):
agent_user: str
agent_name: Optional[str]
period_start: str
period_end: str
total_calls: int
avg_score: Optional[float]
min_score: Optional[float]
max_score: Optional[float]
flagged_calls: int
category_averages: dict
trend: list # daily scores for charting
class FlaggedCall(BaseModel):
recording_id: str
agent_user: str
agent_name: Optional[str]
campaign_id: Optional[str]
call_date: Optional[str]
call_length: Optional[int]
total_score: float
flag_reason: Optional[str]
summary: Optional[str]
scores: dict
class CallDetail(BaseModel):
recording_id: str
agent_user: str
agent_name: Optional[str]
campaign_id: Optional[str]
call_date: Optional[str]
call_length: Optional[int]
call_status: Optional[str]
transcript: Optional[str]
total_score: Optional[float]
scores: Optional[dict]
summary: Optional[str]
strengths: Optional[list]
improvements: Optional[list]
flagged: bool
llm_provider: Optional[str]
# ── Endpoints ────────────────────────────────────────────────
@app.get("/api/qa/health", response_model=HealthResponse)
def health_check():
"""Service health check with basic statistics."""
conn = get_qa_db()
try:
db_path = Path("/mnt/qa/db/qa.db")
db_size = db_path.stat().st_size / (1024 * 1024) if db_path.exists() else 0
total = conn.execute("SELECT COUNT(*) as c FROM qa_scores").fetchone()["c"]
flagged = conn.execute(
"SELECT COUNT(*) as c FROM qa_scores WHERE flagged = 1"
).fetchone()["c"]
last = conn.execute(
"SELECT MAX(created_at) as t FROM qa_scores"
).fetchone()["t"]
return HealthResponse(
status="ok",
db_size_mb=round(db_size, 2),
total_scored=total,
total_flagged=flagged,
last_processed=last,
)
finally:
conn.close()
@app.get("/api/qa/daily-summary")
def daily_summary(
date: Optional[str] = Query(None, description="Date in YYYY-MM-DD format"),
days: int = Query(7, description="Number of days to return", ge=1, le=90),
):
"""
Get daily QA summary statistics.
If date is provided, returns that specific day. Otherwise returns the last N days.
"""
conn = get_qa_db()
try:
if date:
dates = [date]
else:
dates = []
for i in range(days):
d = (datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d")
dates.append(d)
results = []
for d in dates:
# Aggregate from qa_scores for this date
stats = conn.execute(
"""
SELECT
COUNT(*) as scored_calls,
ROUND(AVG(total_score), 1) as avg_score,
SUM(flagged) as flagged_calls,
ROUND(AVG(score_greeting), 2) as avg_greeting,
ROUND(AVG(score_customer_id), 2) as avg_customer_id,
ROUND(AVG(score_needs), 2) as avg_needs,
ROUND(AVG(score_product), 2) as avg_product,
ROUND(AVG(score_communication), 2) as avg_communication,
ROUND(AVG(score_resolution), 2) as avg_resolution,
ROUND(AVG(score_closing), 2) as avg_closing,
ROUND(AVG(score_compliance), 2) as avg_compliance
FROM qa_scores
WHERE SUBSTR(call_date, 1, 10) = ?
""",
(d,),
).fetchone()
total_queued = conn.execute(
"SELECT COUNT(*) as c FROM qa_queue WHERE SUBSTR(call_date, 1, 10) = ?",
(d,),
).fetchone()["c"]
# Best and worst agent
best = conn.execute(
"""
SELECT agent_user, ROUND(AVG(total_score), 1) as avg_s
FROM qa_scores WHERE SUBSTR(call_date, 1, 10) = ?
GROUP BY agent_user HAVING COUNT(*) >= 3
ORDER BY avg_s DESC LIMIT 1
""",
(d,),
).fetchone()
worst = conn.execute(
"""
SELECT agent_user, ROUND(AVG(total_score), 1) as avg_s
FROM qa_scores WHERE SUBSTR(call_date, 1, 10) = ?
GROUP BY agent_user HAVING COUNT(*) >= 3
ORDER BY avg_s ASC LIMIT 1
""",
(d,),
).fetchone()
results.append({
"date": d,
"total_calls": total_queued,
"scored_calls": stats["scored_calls"] or 0,
"avg_score": stats["avg_score"],
"flagged_calls": stats["flagged_calls"] or 0,
"top_agent": best["agent_user"] if best else None,
"top_score": best["avg_s"] if best else None,
"bottom_agent": worst["agent_user"] if worst else None,
"bottom_score": worst["avg_s"] if worst else None,
"category_averages": {
"greeting": stats["avg_greeting"],
"customer_id": stats["avg_customer_id"],
"needs": stats["avg_needs"],
"product": stats["avg_product"],
"communication": stats["avg_communication"],
"resolution": stats["avg_resolution"],
"closing": stats["avg_closing"],
"compliance": stats["avg_compliance"],
},
})
return results
finally:
conn.close()
@app.get("/api/qa/agent/{agent_user}")
def agent_detail(
agent_user: str,
days: int = Query(30, description="Number of days to include", ge=1, le=365),
):
"""Get detailed QA data for a specific agent."""
conn = get_qa_db()
try:
since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
# Overall stats
stats = conn.execute(
"""
SELECT
agent_name,
COUNT(*) as total_calls,
ROUND(AVG(total_score), 1) as avg_score,
ROUND(MIN(total_score), 1) as min_score,
ROUND(MAX(total_score), 1) as max_score,
SUM(flagged) as flagged_calls,
ROUND(AVG(score_greeting), 2) as avg_greeting,
ROUND(AVG(score_customer_id), 2) as avg_customer_id,
ROUND(AVG(score_needs), 2) as avg_needs,
ROUND(AVG(score_product), 2) as avg_product,
ROUND(AVG(score_communication), 2) as avg_communication,
ROUND(AVG(score_resolution), 2) as avg_resolution,
ROUND(AVG(score_closing), 2) as avg_closing,
ROUND(AVG(score_compliance), 2) as avg_compliance
FROM qa_scores
WHERE agent_user = ? AND SUBSTR(call_date, 1, 10) >= ?
""",
(agent_user, since),
).fetchone()
if not stats or stats["total_calls"] == 0:
raise HTTPException(
status_code=404,
detail=f"No QA data found for agent '{agent_user}' in last {days} days",
)
# Daily trend for charting
trend = conn.execute(
"""
SELECT
SUBSTR(call_date, 1, 10) as date,
COUNT(*) as calls,
ROUND(AVG(total_score), 1) as avg_score
FROM qa_scores
WHERE agent_user = ? AND SUBSTR(call_date, 1, 10) >= ?
GROUP BY SUBSTR(call_date, 1, 10)
ORDER BY date ASC
""",
(agent_user, since),
).fetchall()
return {
"agent_user": agent_user,
"agent_name": stats["agent_name"],
"period_start": since,
"period_end": datetime.now().strftime("%Y-%m-%d"),
"total_calls": stats["total_calls"],
"avg_score": stats["avg_score"],
"min_score": stats["min_score"],
"max_score": stats["max_score"],
"flagged_calls": stats["flagged_calls"],
"category_averages": {
"greeting": stats["avg_greeting"],
"customer_id": stats["avg_customer_id"],
"needs": stats["avg_needs"],
"product": stats["avg_product"],
"communication": stats["avg_communication"],
"resolution": stats["avg_resolution"],
"closing": stats["avg_closing"],
"compliance": stats["avg_compliance"],
},
"trend": [
{"date": r["date"], "calls": r["calls"], "avg_score": r["avg_score"]}
for r in trend
],
}
finally:
conn.close()
@app.get("/api/qa/flagged")
def flagged_calls(
days: int = Query(7, ge=1, le=90),
limit: int = Query(50, ge=1, le=500),
agent: Optional[str] = Query(None),
campaign: Optional[str] = Query(None),
):
"""Get calls flagged for human review (scored below threshold)."""
conn = get_qa_db()
try:
since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
query = """
SELECT
s.recording_id, s.agent_user, s.agent_name, s.campaign_id,
s.call_date, q.call_length, s.total_score, s.flag_reason,
s.summary,
s.score_greeting, s.score_customer_id, s.score_needs,
s.score_product, s.score_communication, s.score_resolution,
s.score_closing, s.score_compliance
FROM qa_scores s
JOIN qa_queue q ON q.id = s.queue_id
WHERE s.flagged = 1 AND SUBSTR(s.call_date, 1, 10) >= ?
"""
params = [since]
if agent:
query += " AND s.agent_user = ?"
params.append(agent)
if campaign:
query += " AND s.campaign_id = ?"
params.append(campaign)
query += " ORDER BY s.total_score ASC LIMIT ?"
params.append(limit)
rows = conn.execute(query, params).fetchall()
return [
{
"recording_id": r["recording_id"],
"agent_user": r["agent_user"],
"agent_name": r["agent_name"],
"campaign_id": r["campaign_id"],
"call_date": r["call_date"],
"call_length": r["call_length"],
"total_score": r["total_score"],
"flag_reason": r["flag_reason"],
"summary": r["summary"],
"scores": {
"greeting": r["score_greeting"],
"customer_id": r["score_customer_id"],
"needs": r["score_needs"],
"product": r["score_product"],
"communication": r["score_communication"],
"resolution": r["score_resolution"],
"closing": r["score_closing"],
"compliance": r["score_compliance"],
},
}
for r in rows
]
finally:
conn.close()
@app.get("/api/qa/call/{uniqueid}")
def call_detail(uniqueid: str):
"""Get full QA detail for a specific call by recording_id or uniqueid."""
conn = get_qa_db()
try:
# Try recording_id first, then uniqueid
queue_item = conn.execute(
"""
SELECT * FROM qa_queue
WHERE recording_id = ? OR uniqueid = ?
LIMIT 1
""",
(uniqueid, uniqueid),
).fetchone()
if not queue_item:
raise HTTPException(status_code=404, detail="Call not found")
queue_id = queue_item["id"]
# Get transcript
transcript_row = conn.execute(
"SELECT * FROM qa_transcripts WHERE queue_id = ?",
(queue_id,),
).fetchone()
# Get score
score_row = conn.execute(
"SELECT * FROM qa_scores WHERE queue_id = ?",
(queue_id,),
).fetchone()
result = {
"recording_id": queue_item["recording_id"],
"uniqueid": queue_item["uniqueid"],
"agent_user": queue_item["agent_user"],
"agent_name": queue_item["agent_name"],
"campaign_id": queue_item["campaign_id"],
"call_date": queue_item["call_date"],
"call_length": queue_item["call_length"],
"call_status": queue_item["call_status"],
"processing_status": queue_item["status"],
"transcript": None,
"total_score": None,
"scores": None,
"summary": None,
"strengths": None,
"improvements": None,
"flagged": False,
"flag_reason": None,
"llm_provider": None,
}
if transcript_row:
result["transcript"] = transcript_row["transcript"]
result["language"] = transcript_row["language"]
result["word_count"] = transcript_row["word_count"]
result["transcription_time"] = transcript_row["duration_proc"]
if score_row:
result["total_score"] = score_row["total_score"]
result["scores"] = {
"greeting": score_row["score_greeting"],
"customer_id": score_row["score_customer_id"],
"needs": score_row["score_needs"],
"product": score_row["score_product"],
"communication": score_row["score_communication"],
"resolution": score_row["score_resolution"],
"closing": score_row["score_closing"],
"compliance": score_row["score_compliance"],
}
result["summary"] = score_row["summary"]
result["strengths"] = json.loads(score_row["strengths"]) if score_row["strengths"] else []
result["improvements"] = json.loads(score_row["improvements"]) if score_row["improvements"] else []
result["flagged"] = bool(score_row["flagged"])
result["flag_reason"] = score_row["flag_reason"]
result["llm_provider"] = score_row["llm_provider"]
return result
finally:
conn.close()
@app.get("/api/qa/trends")
def score_trends(
days: int = Query(30, ge=7, le=365),
campaign: Optional[str] = Query(None),
):
"""Get score trends over time (for line charts)."""
conn = get_qa_db()
try:
since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
query = """
SELECT
SUBSTR(call_date, 1, 10) as date,
COUNT(*) as calls,
ROUND(AVG(total_score), 1) as avg_score,
SUM(flagged) as flagged,
ROUND(AVG(score_greeting), 2) as greeting,
ROUND(AVG(score_customer_id), 2) as customer_id,
ROUND(AVG(score_needs), 2) as needs,
ROUND(AVG(score_product), 2) as product,
ROUND(AVG(score_communication), 2) as communication,
ROUND(AVG(score_resolution), 2) as resolution,
ROUND(AVG(score_closing), 2) as closing,
ROUND(AVG(score_compliance), 2) as compliance
FROM qa_scores
WHERE SUBSTR(call_date, 1, 10) >= ?
"""
params = [since]
if campaign:
query += " AND campaign_id = ?"
params.append(campaign)
query += " GROUP BY SUBSTR(call_date, 1, 10) ORDER BY date ASC"
rows = conn.execute(query, params).fetchall()
return [
{
"date": r["date"],
"calls": r["calls"],
"avg_score": r["avg_score"],
"flagged": r["flagged"],
"categories": {
"greeting": r["greeting"],
"customer_id": r["customer_id"],
"needs": r["needs"],
"product": r["product"],
"communication": r["communication"],
"resolution": r["resolution"],
"closing": r["closing"],
"compliance": r["compliance"],
},
}
for r in rows
]
finally:
conn.close()
@app.get("/api/qa/leaderboard")
def agent_leaderboard(
days: int = Query(7, ge=1, le=90),
min_calls: int = Query(5, ge=1),
):
"""Agent ranking by average QA score."""
conn = get_qa_db()
try:
since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
rows = conn.execute(
"""
SELECT
agent_user,
agent_name,
COUNT(*) as calls,
ROUND(AVG(total_score), 1) as avg_score,
ROUND(MIN(total_score), 1) as min_score,
ROUND(MAX(total_score), 1) as max_score,
SUM(flagged) as flagged
FROM qa_scores
WHERE SUBSTR(call_date, 1, 10) >= ?
GROUP BY agent_user
HAVING COUNT(*) >= ?
ORDER BY avg_score DESC
""",
(since, min_calls),
).fetchall()
return [
{
"rank": i + 1,
"agent_user": r["agent_user"],
"agent_name": r["agent_name"],
"calls": r["calls"],
"avg_score": r["avg_score"],
"min_score": r["min_score"],
"max_score": r["max_score"],
"flagged": r["flagged"],
}
for i, r in enumerate(rows)
]
finally:
conn.close()
@app.get("/api/qa/queue-status")
def queue_status():
"""Current processing queue statistics."""
conn = get_qa_db()
try:
rows = conn.execute(
"""
SELECT status, COUNT(*) as count
FROM qa_queue
GROUP BY status
ORDER BY count DESC
"""
).fetchall()
today = datetime.now().strftime("%Y-%m-%d")
today_queued = conn.execute(
"SELECT COUNT(*) as c FROM qa_queue WHERE DATE(queued_at) = ?",
(today,),
).fetchone()["c"]
return {
"status_counts": {r["status"]: r["count"] for r in rows},
"today_queued": today_queued,
"total": sum(r["count"] for r in rows),
}
finally:
conn.close()
PYEOF
chmod +x /root/qa-pipeline/qa_service.py
Testing the API
# Start the API server manually for testing
source /root/qa-pipeline/venv/bin/activate
cd /root/qa-pipeline
uvicorn qa_service:app --host 0.0.0.0 --port 8085 &
# Test endpoints
curl -s http://localhost:8085/api/qa/health | python3 -m json.tool
curl -s http://localhost:8085/api/qa/daily-summary?days=3 | python3 -m json.tool
curl -s http://localhost:8085/api/qa/flagged?days=7 | python3 -m json.tool
curl -s http://localhost:8085/api/qa/leaderboard?days=7 | python3 -m json.tool
curl -s http://localhost:8085/api/qa/queue-status | python3 -m json.tool
# Interactive API docs
# Visit: http://YOUR_SERVER_IP:8085/docs
# Stop test server
kill %1
12. Production Deployment
systemd Service — Nightly Batch Processor
This service runs the full pipeline: queue scan, transcription, and scoring. It is triggered by a systemd timer at 22:00 daily.
cat > /etc/systemd/system/qa-pipeline.service << 'EOF'
[Unit]
Description=QA Pipeline — Nightly Batch Processor
After=network.target mysql.service mariadb.service
[Service]
Type=oneshot
User=root
WorkingDirectory=/root/qa-pipeline
# Resource controls — protect production workloads
Nice=19
IOSchedulingClass=idle
CPUAffinity=6 7 8 9 10 11
# Run the full pipeline: queue → transcribe → score
ExecStart=/bin/bash -c '\
source /root/qa-pipeline/venv/bin/activate && \
python /root/qa-pipeline/qa_queue.py && \
python /root/qa-pipeline/qa_transcribe.py && \
python /root/qa-pipeline/qa_score.py'
# Allow up to 10 hours for the batch to complete
TimeoutStartSec=36000
# Restart on failure after 5 minutes
Restart=on-failure
RestartSec=300
# Environment
Environment=PYTHONUNBUFFERED=1
Environment=HF_HOME=/mnt/qa/models
Environment=HUGGINGFACE_HUB_CACHE=/mnt/qa/models
# Logging
StandardOutput=append:/mnt/qa/logs/qa-batch.log
StandardError=append:/mnt/qa/logs/qa-batch.log
[Install]
WantedBy=multi-user.target
EOF
systemd Timer — 22:00 Daily Trigger
cat > /etc/systemd/system/qa-pipeline.timer << 'EOF'
[Unit]
Description=QA Pipeline — Nightly Schedule
[Timer]
# Run at 22:00 every day
OnCalendar=*-*-* 22:00:00
# If the system was off at 22:00, run on next boot
Persistent=true
# Random delay up to 5 minutes to avoid exact-second load spikes
RandomizedDelaySec=300
[Install]
WantedBy=timers.target
EOF
systemd Service — API Server
cat > /etc/systemd/system/qa-api.service << 'EOF'
[Unit]
Description=QA Pipeline — REST API Service
After=network.target
[Service]
Type=simple
User=root
WorkingDirectory=/root/qa-pipeline
ExecStart=/root/qa-pipeline/venv/bin/uvicorn \
qa_service:app \
--host 0.0.0.0 \
--port 8085 \
--workers 2 \
--log-level warning
Restart=always
RestartSec=10
Environment=PYTHONUNBUFFERED=1
StandardOutput=append:/mnt/qa/logs/qa-api.log
StandardError=append:/mnt/qa/logs/qa-api.log
[Install]
WantedBy=multi-user.target
EOF
Enable and Start Services
# Reload systemd to pick up new service files
systemctl daemon-reload
# Enable the timer (nightly batch)
systemctl enable qa-pipeline.timer
systemctl start qa-pipeline.timer
# Enable and start the API service
systemctl enable qa-api.service
systemctl start qa-api.service
# Verify everything is running
systemctl status qa-pipeline.timer
systemctl status qa-api.service
# Check timer schedule
systemctl list-timers qa-pipeline.timer
# View logs
journalctl -u qa-pipeline.service --since today
journalctl -u qa-api.service --since today -f
Queue Manager Cron
The queue manager runs separately via cron (lightweight, every 5 minutes during business hours):
# Add to crontab
(crontab -l 2>/dev/null; echo '*/5 8-22 * * * /root/qa-pipeline/venv/bin/python /root/qa-pipeline/qa_queue.py >> /mnt/qa/logs/qa-queue-cron.log 2>&1') | sort -u | crontab -
Log Rotation
cat > /etc/logrotate.d/qa-pipeline << 'EOF'
/mnt/qa/logs/*.log {
daily
rotate 14
compress
delaycompress
missingok
notifempty
create 0644 root root
postrotate
systemctl restart qa-api.service 2>/dev/null || true
endscript
}
EOF
Manual Pipeline Run
To run the pipeline manually (e.g., for testing or to process a backlog):
# Run with resource limits (same as systemd applies)
nice -n 19 ionice -c3 taskset -c 6-11 \
/bin/bash -c '
source /root/qa-pipeline/venv/bin/activate
python /root/qa-pipeline/qa_queue.py
python /root/qa-pipeline/qa_transcribe.py
python /root/qa-pipeline/qa_score.py
'
# Or trigger the systemd service directly
systemctl start qa-pipeline.service
# Monitor progress
tail -f /mnt/qa/logs/qa-pipeline.log
13. Production Safety
The Golden Rule
The QA pipeline must never degrade live call handling. This means:
- No measurable increase in Asterisk response latency
- No measurable increase in MySQL query times
- No audible impact on call recording quality
- No disk I/O contention affecting production reads/writes
Everything in this section exists to enforce that rule.
CPU Priority and Core Pinning
CPU Core Map (12-core example):
Cores 0-5: Reserved for production
- Core 0-1: Asterisk (call processing, SIP, RTP)
- Core 2-3: MySQL/MariaDB (queries, replication)
- Core 4-5: Apache/PHP (agent web interface)
Cores 6-11: Available for QA pipeline
- Used ONLY when processing runs (22:00-~04:00)
- nice -n 19: yields to ANY other process immediately
- ionice -c3: only uses disk when nothing else needs it
The taskset -c 6-11 pins the process to the last 6 cores. Even if the kernel scheduler would otherwise spread load, the QA pipeline physically cannot run on cores 0-5. Combined with nice -n 19, even on cores 6-11, any production process that migrates to those cores takes immediate priority.
I/O Isolation
The dedicated SSD provides physical I/O isolation. The QA pipeline reads from the production SSD (recording files from monitorDONE/) but writes exclusively to the QA SSD. The read pattern is sequential (one file at a time) and intermittent (read a file, process for 20-30 seconds, read next file), so it does not create sustained read load on the production disk.
If you do not have a dedicated SSD, add explicit I/O throttling:
# Create a cgroup for the QA pipeline (systemd v2)
# Add to the [Service] section of qa-pipeline.service:
IOReadBandwidthMax=/dev/nvme0n1 50M
IOWriteBandwidthMax=/dev/nvme0n1 20M
Load Average Kill Switch
The transcription engine checks system load average before processing each file. If the 1-minute load average exceeds the threshold (default: 4.0 on a 12-core system), it pauses for 60 seconds and rechecks. This protects against edge cases where something unexpected drives production load up during the processing window.
# Already built into qa_transcribe.py — the check_system_load() function
# Adjust the threshold in qa_config.py:
LOAD_AVG_MAX = 4.0 # Increase for more cores, decrease for fewer
Recommended thresholds:
| CPU Cores | LOAD_AVG_MAX | Rationale |
|---|---|---|
| 4 cores | 2.0 | Very conservative — only half the CPU budget |
| 6 cores | 3.0 | 50% headroom for production |
| 8 cores | 4.0 | Comfortable margin |
| 12 cores | 4.0-6.0 | 4.0 is very conservative for 12 cores |
| 16+ cores | 6.0-8.0 | Scale proportionally |
Memory Usage
Whisper large-v3-turbo with int8 quantization uses approximately 3GB of RAM. On a 62GB server, this is negligible. On smaller servers, verify available memory before starting:
# Check available memory
free -g
# If available RAM < 6GB, consider using 'small' model (~2GB)
# Or 'medium' model (~3GB) — still good accuracy
The model is loaded once when the transcription engine starts and remains in memory for the entire batch. It is released when the process exits.
Monitoring the Pipeline
Add these checks to your monitoring routine:
# Quick status check — add to a monitoring script or cron
cat > /root/qa-pipeline/qa_status.sh << 'BASH_EOF'
#!/bin/bash
# QA Pipeline Status Check
echo "=== QA Pipeline Status ==="
# Service status
echo ""
echo "Services:"
systemctl is-active qa-api.service 2>/dev/null && echo " API: RUNNING" || echo " API: STOPPED"
systemctl is-active qa-pipeline.service 2>/dev/null && echo " Batch: RUNNING" || echo " Batch: idle"
# Next scheduled run
echo ""
echo "Next batch run:"
systemctl list-timers qa-pipeline.timer --no-pager 2>/dev/null | grep qa-pipeline || echo " Timer not active"
# Queue status
echo ""
echo "Queue:"
sqlite3 /mnt/qa/db/qa.db "SELECT status, COUNT(*) FROM qa_queue GROUP BY status;" 2>/dev/null
# Today's scores
echo ""
echo "Today's scores:"
sqlite3 /mnt/qa/db/qa.db \
"SELECT COUNT(*) as scored, ROUND(AVG(total_score),1) as avg_score, SUM(flagged) as flagged
FROM qa_scores WHERE DATE(call_date) = DATE('now');" 2>/dev/null
# Disk usage
echo ""
echo "Disk usage:"
du -sh /mnt/qa/db/ /mnt/qa/transcripts/ /mnt/qa/models/ 2>/dev/null
# Last processing log entry
echo ""
echo "Last log entry:"
tail -1 /mnt/qa/logs/qa-pipeline.log 2>/dev/null || echo " No logs yet"
BASH_EOF
chmod +x /root/qa-pipeline/qa_status.sh
Backup
The QA database is a single SQLite file. Back it up daily:
# Add to existing backup cron or create new one
# SQLite online backup — safe even while API is reading
(crontab -l 2>/dev/null; echo '0 6 * * * sqlite3 /mnt/qa/db/qa.db ".backup /mnt/qa/db/qa_backup_$(date +\%Y\%m\%d).db" && find /mnt/qa/db/ -name "qa_backup_*.db" -mtime +7 -delete') | sort -u | crontab -
14. Dashboard Integration
Grafana Panels
If you run Grafana (as covered in earlier tutorials), add the QA API as a JSON data source.
Setup JSON API Data Source
- In Grafana, go to Configuration > Data Sources > Add data source
- Search for JSON API (install the plugin if not present:
grafana-cli plugins install marcusolsson-json-datasource) - Configure:
- Name:
QA Pipeline - URL:
http://YOUR_SERVER_IP:8085
- Name:
Dashboard Panel Examples
Daily Average Score (Stat Panel):
Path: /api/qa/daily-summary?days=1
Field: $[0].avg_score
Title: "Today's QA Score"
Thresholds: Red < 60, Yellow < 75, Green >= 75
Score Trend (Time Series):
Path: /api/qa/trends?days=30
Fields: $.date, $.avg_score
Title: "QA Score Trend (30 days)"
Flagged Calls Count (Stat Panel):
Path: /api/qa/daily-summary?days=1
Field: $[0].flagged_calls
Title: "Flagged Calls Today"
Thresholds: Green = 0, Yellow < 5, Red >= 5
Agent Leaderboard (Table Panel):
Path: /api/qa/leaderboard?days=7
Fields: rank, agent_user, agent_name, calls, avg_score, flagged
Title: "Agent QA Leaderboard (7 days)"
Category Radar Chart (for individual agent view):
Path: /api/qa/agent/{agent_user}?days=30
Fields: category_averages.*
Title: "Agent Strengths by Category"
PHP Admin Page
If you have a PHP admin dashboard, create a simple page that consumes the QA API:
<?php
// qa-dashboard.php — QA Pipeline Overview
// Place in your admin web directory
$api_base = "http://127.0.0.1:8085/api/qa";
function qa_api($endpoint) {
global $api_base;
$ch = curl_init("$api_base/$endpoint");
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_TIMEOUT, 10);
$response = curl_exec($ch);
curl_close($ch);
return json_decode($response, true);
}
$health = qa_api("health");
$summary = qa_api("daily-summary?days=7");
$flagged = qa_api("flagged?days=7&limit=20");
$leaderboard = qa_api("leaderboard?days=7");
$queue = qa_api("queue-status");
?>
<!DOCTYPE html>
<html>
<head>
<title>QA Dashboard</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; background: #f5f5f5; }
.card { background: white; border-radius: 8px; padding: 20px; margin: 10px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); display: inline-block; vertical-align: top; }
.stat { font-size: 36px; font-weight: bold; }
.label { color: #666; font-size: 14px; }
table { border-collapse: collapse; width: 100%; }
th, td { text-align: left; padding: 8px 12px; border-bottom: 1px solid #eee; }
th { background: #f8f8f8; font-weight: 600; }
.score-high { color: #2e7d32; }
.score-mid { color: #f57f17; }
.score-low { color: #c62828; }
.flagged { background: #fff3e0; }
</style>
</head>
<body>
<h1>QA Pipeline Dashboard</h1>
<!-- Status Cards -->
<div class="card">
<div class="label">Total Scored</div>
<div class="stat"><?= number_format($health['total_scored'] ?? 0) ?></div>
</div>
<div class="card">
<div class="label">Flagged Calls</div>
<div class="stat score-low"><?= $health['total_flagged'] ?? 0 ?></div>
</div>
<div class="card">
<div class="label">Queue</div>
<div class="stat"><?= $queue['status_counts']['pending'] ?? 0 ?> pending</div>
</div>
<div class="card">
<div class="label">DB Size</div>
<div class="stat"><?= round($health['db_size_mb'] ?? 0, 1) ?> MB</div>
</div>
<!-- 7-Day Summary -->
<h2>Last 7 Days</h2>
<table>
<tr>
<th>Date</th><th>Calls</th><th>Scored</th>
<th>Avg Score</th><th>Flagged</th><th>Top Agent</th>
</tr>
<?php foreach ($summary ?? [] as $day): ?>
<tr>
<td><?= $day['date'] ?></td>
<td><?= $day['total_calls'] ?></td>
<td><?= $day['scored_calls'] ?></td>
<td class="<?= ($day['avg_score'] ?? 0) >= 75 ? 'score-high' : (($day['avg_score'] ?? 0) >= 60 ? 'score-mid' : 'score-low') ?>">
<?= $day['avg_score'] ?? '-' ?>
</td>
<td><?= $day['flagged_calls'] ?></td>
<td><?= $day['top_agent'] ?? '-' ?> (<?= $day['top_score'] ?? '-' ?>)</td>
</tr>
<?php endforeach; ?>
</table>
<!-- Agent Leaderboard -->
<h2>Agent Leaderboard (7 days)</h2>
<table>
<tr>
<th>#</th><th>Agent</th><th>Name</th>
<th>Calls</th><th>Avg Score</th><th>Flagged</th>
</tr>
<?php foreach ($leaderboard ?? [] as $agent): ?>
<tr>
<td><?= $agent['rank'] ?></td>
<td><?= htmlspecialchars($agent['agent_user']) ?></td>
<td><?= htmlspecialchars($agent['agent_name'] ?? '') ?></td>
<td><?= $agent['calls'] ?></td>
<td class="<?= $agent['avg_score'] >= 75 ? 'score-high' : ($agent['avg_score'] >= 60 ? 'score-mid' : 'score-low') ?>">
<?= $agent['avg_score'] ?>
</td>
<td><?= $agent['flagged'] ?></td>
</tr>
<?php endforeach; ?>
</table>
<!-- Flagged Calls -->
<h2>Flagged Calls (Last 7 Days)</h2>
<table>
<tr>
<th>Date</th><th>Agent</th><th>Campaign</th>
<th>Duration</th><th>Score</th><th>Reason</th>
</tr>
<?php foreach ($flagged ?? [] as $call): ?>
<tr class="flagged">
<td><?= substr($call['call_date'] ?? '', 0, 16) ?></td>
<td><?= htmlspecialchars($call['agent_user']) ?></td>
<td><?= htmlspecialchars($call['campaign_id'] ?? '') ?></td>
<td><?= $call['call_length'] ?>s</td>
<td class="score-low"><?= round($call['total_score']) ?></td>
<td><?= htmlspecialchars($call['flag_reason'] ?? '') ?></td>
</tr>
<?php endforeach; ?>
</table>
<p style="color:#999; margin-top:20px;">
Last processed: <?= $health['last_processed'] ?? 'never' ?>
</p>
</body>
</html>
Email Reports via Resend
If you have the Resend email integration (Tutorial 32+), add QA summaries to your daily report:
# Add to your existing email report generator
# Fetches QA summary and formats it for email
import httpx
def get_qa_email_section() -> str:
"""Generate QA section for daily email report."""
try:
resp = httpx.get("http://127.0.0.1:8085/api/qa/daily-summary?days=1", timeout=10)
summary = resp.json()[0] if resp.json() else None
if not summary or summary["scored_calls"] == 0:
return "<p>No QA data available for today.</p>"
flagged_resp = httpx.get(
"http://127.0.0.1:8085/api/qa/flagged?days=1&limit=5", timeout=10
)
flagged = flagged_resp.json()
html = f"""
<h2>QA Scores</h2>
<table style="border-collapse:collapse;width:100%;">
<tr>
<td style="padding:8px;"><strong>Calls Scored:</strong> {summary['scored_calls']}</td>
<td style="padding:8px;"><strong>Average Score:</strong> {summary['avg_score']}/100</td>
<td style="padding:8px;"><strong>Flagged:</strong> {summary['flagged_calls']}</td>
</tr>
</table>
"""
if flagged:
html += "<h3>Flagged Calls</h3><ul>"
for call in flagged:
html += (
f"<li><strong>{call['agent_user']}</strong> — "
f"Score: {call['total_score']:.0f} — "
f"{call['summary']}</li>"
)
html += "</ul>"
return html
except Exception as e:
return f"<p>QA data unavailable: {e}</p>"
15. Troubleshooting
Common Issues
"No module named 'faster_whisper'"
The virtual environment is not activated, or faster-whisper was not installed.
# Verify the venv exists and has the package
/root/qa-pipeline/venv/bin/pip list | grep faster-whisper
# If missing, reinstall
/root/qa-pipeline/venv/bin/pip install faster-whisper
Model download hangs or fails
The server may not have internet access, or the Hugging Face CDN is blocked.
# Test connectivity
curl -I https://huggingface.co
# Download model manually on a machine with internet, then copy
# On machine with internet:
pip install faster-whisper
python -c "from faster_whisper import WhisperModel; WhisperModel('large-v3-turbo', download_root='/tmp/models')"
# Copy /tmp/models/ to the server's /mnt/qa/models/
scp -r /tmp/models/ root@YOUR_SERVER:/mnt/qa/models/
Transcription produces garbled text
Usually caused by wrong audio format or codec. ViciDial typically records in WAV (PCM) or MP3.
# Check format of a recording
file /var/spool/asterisk/monitorDONE/sample.wav
# Should show: RIFF (little-endian) data, WAVE audio, Microsoft PCM, 16 bit, mono 8000 Hz
# If format is unusual, convert with sox first
sox input.wav -r 16000 -c 1 output.wav
# Or with ffmpeg
ffmpeg -i input.wav -ar 16000 -ac 1 output.wav
Gemini API returns 429 (rate limited)
You have exceeded the free tier quota (1,000 requests/day).
# Check how many calls were scored today via Gemini
sqlite3 /mnt/qa/db/qa.db \
"SELECT COUNT(*) FROM qa_scores
WHERE llm_provider = 'gemini' AND DATE(created_at) = DATE('now');"
# The scoring engine will automatically fall back to Groq
# If Groq also fails, items stay in 'error' status and retry next night
Queue shows recordings but nothing gets transcribed
Check that the nightly batch ran and what errors occurred.
# Check if the timer fired
systemctl status qa-pipeline.timer
journalctl -u qa-pipeline.service --since "22:00"
# Check for errors in the log
grep -i error /mnt/qa/logs/qa-pipeline.log | tail -20
# Check queue status
sqlite3 /mnt/qa/db/qa.db \
"SELECT status, COUNT(*), GROUP_CONCAT(DISTINCT error_message)
FROM qa_queue GROUP BY status;"
# Manually trigger processing
systemctl start qa-pipeline.service
journalctl -u qa-pipeline.service -f
Load average kill switch keeps triggering
Something else is using significant CPU during the processing window.
# Check what is using CPU at night
# Add to crontab for one night to diagnose:
echo '*/10 22-6 * * * ps aux --sort=-%cpu | head -10 >> /mnt/qa/logs/cpu-debug.log' | crontab -
# Common causes:
# - MySQL backup or OPTIMIZE TABLE running at the same time
# - Another cron job (ADMIN_archive_log_tables.pl)
# - Recording conversion jobs
# Solution: adjust cron schedules to avoid overlap, or raise LOAD_AVG_MAX
SQLite "database is locked" errors
This happens if the API service and batch processor try to write simultaneously (rare with WAL mode).
# Verify WAL mode is enabled
sqlite3 /mnt/qa/db/qa.db "PRAGMA journal_mode;"
# Should output: wal
# If not WAL, re-enable it:
sqlite3 /mnt/qa/db/qa.db "PRAGMA journal_mode=WAL;"
# Increase the timeout (already set to 30s in qa_config.py)
# If still occurring, the batch processor may be holding a transaction too long
Disk space running low on QA SSD
Transcript files accumulate over time (~2KB per call, ~1.4MB/day at 700 calls).
# Check disk usage
df -h /mnt/qa
du -sh /mnt/qa/transcripts/
du -sh /mnt/qa/db/
# Clean up old transcript files (keep last 90 days)
find /mnt/qa/transcripts/ -type f -name "*.txt" -mtime +90 -delete
find /mnt/qa/transcripts/ -type d -empty -delete
# Add cleanup to cron (monthly)
echo '0 3 1 * * find /mnt/qa/transcripts/ -type f -mtime +90 -delete && find /mnt/qa/transcripts/ -type d -empty -delete' | crontab -
Debugging Tips
Test a Single Call End-to-End
source /root/qa-pipeline/venv/bin/activate
cd /root/qa-pipeline
python3 << 'PYEOF'
import json
from qa_config import get_qa_db
# Pick a specific recording
conn = get_qa_db()
item = conn.execute(
"SELECT * FROM qa_queue WHERE status = 'pending' LIMIT 1"
).fetchone()
if not item:
print("No pending items. Try: status = 'complete' to review an existing one")
else:
print(f"Recording: {item['recording_id']}")
print(f"Agent: {item['agent_user']} ({item['agent_name']})")
print(f"File: {item['filename']}")
print(f"Duration: {item['call_length']}s")
conn.close()
PYEOF
Inspect a Scored Call
sqlite3 /mnt/qa/db/qa.db << 'SQL'
.mode column
.headers on
SELECT
s.recording_id,
s.agent_user,
s.total_score,
s.score_greeting as greet,
s.score_customer_id as cust_id,
s.score_needs as needs,
s.score_product as product,
s.score_communication as comm,
s.score_resolution as resol,
s.score_closing as close,
s.score_compliance as compl,
s.llm_provider,
SUBSTR(s.summary, 1, 100) as summary
FROM qa_scores s
ORDER BY s.created_at DESC
LIMIT 5;
SQL
View Full AI Response for Debugging
# Get the raw LLM response for a specific call
sqlite3 /mnt/qa/db/qa.db \
"SELECT raw_response FROM qa_scores WHERE recording_id = 'YOUR_RECORDING_ID';" \
| python3 -m json.tool
Performance Benchmarking
# Time a batch of 10 transcriptions
source /root/qa-pipeline/venv/bin/activate
python3 << 'PYEOF'
import time, os
os.environ["HF_HOME"] = "/mnt/qa/models"
from faster_whisper import WhisperModel
model = WhisperModel("large-v3-turbo", device="cpu", compute_type="int8", cpu_threads=4)
recordings = []
rec_dir = "/var/spool/asterisk/monitorDONE/"
for f in sorted(os.listdir(rec_dir))[-10:]:
if f.endswith(('.wav', '.mp3')):
recordings.append(os.path.join(rec_dir, f))
total_audio = 0
total_proc = 0
for path in recordings:
start = time.time()
segments, info = model.transcribe(path, beam_size=5, language="en", vad_filter=True)
text = " ".join(s.text for s in segments)
elapsed = time.time() - start
total_audio += info.duration
total_proc += elapsed
print(f" {os.path.basename(path)}: {info.duration:.0f}s audio -> {elapsed:.1f}s ({info.duration/elapsed:.1f}x)")
print(f"\nTotal: {total_audio:.0f}s audio in {total_proc:.1f}s ({total_audio/total_proc:.1f}x real-time)")
print(f"Estimated time for 700 calls ({700*90/3600:.0f}h audio): {700*90/total_proc*total_audio*total_proc/(total_audio*3600):.1f}h")
PYEOF
Summary — Complete File Inventory
| File | Purpose |
|---|---|
/root/qa-pipeline/qa_config.py |
Shared configuration and database connection |
/root/qa-pipeline/qa_queue.py |
Queue manager — scans for new recordings |
/root/qa-pipeline/qa_transcribe.py |
Transcription engine — Faster-Whisper batch |
/root/qa-pipeline/qa_score.py |
Scoring engine — Gemini/Groq API |
/root/qa-pipeline/qa_service.py |
REST API — FastAPI on port 8085 |
/root/qa-pipeline/qa_status.sh |
Quick status check script |
/root/qa-pipeline/schema.sql |
Database schema definition |
/root/qa-pipeline/requirements.txt |
Python dependencies |
/root/qa-pipeline/.env |
API keys and database credentials |
/mnt/qa/db/qa.db |
SQLite database |
/mnt/qa/transcripts/ |
Transcript text files by date |
/mnt/qa/models/ |
Whisper model cache |
/mnt/qa/logs/ |
All pipeline logs |
/etc/systemd/system/qa-pipeline.service |
Nightly batch processor service |
/etc/systemd/system/qa-pipeline.timer |
22:00 daily trigger |
/etc/systemd/system/qa-api.service |
REST API service |
References
Speech Recognition: Faster-Whisper GitHub | CTranslate2 | OpenAI Whisper | Whisper Model Comparison | Silero VAD
LLM APIs: Google Gemini API | Groq API | Gemini Free Tier Limits
Python / FastAPI: FastAPI Documentation | SQLite WAL Mode | Python httpx | uvicorn
Linux Resource Control: nice / ionice | taskset | systemd Resource Control | cgroups v2
ViciDial: recording_log Table | vicidial_closer_log
Tutorial verified against Python 3.11, faster-whisper 1.1+, FastAPI 0.100+, SQLite 3.40+, and ViciDial SVN trunk. Tested on production servers running 14 to 50+ concurrent agents with Ryzen 5 3600 (12 cores, 62GB RAM). All resource control settings validated under sustained load with zero measurable impact on live call handling.