Building an AI-Powered VoIP Call Quality Analysis Service
FastAPI + NISQA Neural Model + Silero VAD + Claude AI
A production-grade service that replaces manual call quality reviews with automated, neural-network-powered audio analysis and AI-generated reports.
Table of Contents
- The Problem
- What We Are Building
- Architecture Overview
- Prerequisites
- Project Setup
- Core Audio Processing: SoX and FFmpeg
- NISQA Neural MOS Scoring
- Silero VAD: Voice Activity Detection
- Recording Retrieval from Remote Servers
- The Analysis Pipeline
- API Endpoint Design
- AI-Powered Analysis with Claude
- The Admin AI Assistant
- SQLite Caching Layer
- Systemd Service Deployment
- Production Tips and Optimization
- Troubleshooting
- Complete File Reference
1. The Problem
Call centers generate thousands of recordings per day. When a customer complains about audio quality, or a manager suspects one-way audio on a trunk, someone has to manually listen to recordings and guess at the problem. This process is:
- Slow. A 3-minute call takes 3 minutes to listen to, plus time to write notes.
- Subjective. One engineer says "sounds fine," another says "too much noise." There is no consistent scoring.
- Reactive. Problems are found only after complaints. Systematic quality degradation goes unnoticed for days.
- Incomplete. Nobody checks whether the agent side had audio at all. One-way audio bugs can persist for entire shifts before anyone notices.
What we need is a service that can take any call recording, score its quality objectively using a neural model, detect silence gaps and one-way audio automatically, and produce a human-readable summary of what happened -- all in under 10 seconds, accessible via a simple HTTP API.
2. What We Are Building
A FastAPI service that exposes four endpoints:
| Endpoint | Method | Purpose |
|---|---|---|
/analyze |
GET | Full technical audio analysis (SoX stats, FFmpeg silence detection, Silero VAD speech activity, NISQA MOS score) |
/ai-analyze |
GET | Everything from /analyze plus a Claude AI natural-language assessment |
/investigate |
POST | Deep investigation of a specific call using SIP traces and call metadata |
/ask |
POST | Multi-turn AI admin assistant that can query live databases and metrics |
The service uses three ML/AI layers:
NISQA (Non-Intrusive Speech Quality Assessment) -- a neural network trained on thousands of rated audio samples that predicts Mean Opinion Score (MOS) on a 1-5 scale, plus sub-dimensions: noisiness, discontinuity, coloration, and loudness.
Silero VAD (Voice Activity Detection) -- a compact neural model that identifies exactly when speech occurs in an audio file, enabling detection of one-way audio, dead air, and conversation flow patterns.
Claude AI (Anthropic) -- takes the structured analysis data and produces expert-level, human-readable assessments. Haiku for fast per-call analysis, Sonnet/Opus for the admin assistant that queries databases.
3. Architecture Overview
+-------------------+
| Your Dashboard |
| (Grafana, Web) |
+--------+----------+
|
HTTP API
|
+-------------v--------------+
| FastAPI Service |
| (port 8084) |
| |
| +-------+ +-----------+ |
| | /analyze| | /ai-analyze| |
| +---+---+ +-----+-----+ |
| | | |
| +---v------------v------+ |
| | Analysis Pipeline | |
| | | |
| | 1. Fetch recording | |
| | (HTTP/SCP) | |
| | 2. SoX stats | |
| | (RMS, peak, dur) | |
| | 3. FFmpeg silence | |
| | detect | |
| | 4. Silero VAD | |
| | (speech segments) | |
| | 5. NISQA MOS | |
| | (neural scoring) | |
| | 6. One-way detection | |
| +---+--------------------+ |
| | |
| +---v---+ +-----------+ |
| |SQLite | | Claude AI | |
| |Cache | | (Haiku/ | |
| | | | Sonnet) | |
| +-------+ +-----------+ |
+-------|----------|-----------+
| |
+-------------+ +----+----------+
| | |
+--------v-------+ +-------v-----+ +------v------+
| Recording | | Anthropic | | ViciDial |
| Server (HTTP) | | Messages | | Databases |
| /RECORDINGS/ | | API | | (MySQL) |
+----------------+ +-------------+ +-------------+
Data flow for /analyze:
- Client sends
GET /analyze?server=uk&file=recording-20260301-1422.wav - Service checks SQLite cache -- returns immediately if cached
- Downloads recording from the production server (tries separate in/out legs first, falls back to mixed)
- Runs SoX for RMS amplitude, peak level, and duration
- Runs FFmpeg
silencedetectfor silence gaps - Runs Silero VAD for speech segment timestamps and speech percentage
- Runs NISQA neural model for MOS and quality sub-dimensions
- Detects one-way audio by comparing caller vs. agent speech percentages
- Caches result in SQLite, returns JSON
Data flow for /ai-analyze:
Steps 1-9 from above, then:
10. Formats all metrics into a structured prompt
11. Sends to Claude Haiku for natural-language assessment
12. Caches AI response separately, returns combined result
4. Prerequisites
System packages:
sudo apt update
sudo apt install -y python3.12 python3.12-venv python3.12-dev \
sox libsox-fmt-all ffmpeg git
- Python 3.10+ (3.12 recommended)
- SoX -- the Swiss Army knife of audio processing. Provides RMS/peak/duration stats.
- FFmpeg -- used for silence detection via the
silencedetectaudio filter. - Git -- to clone the NISQA repository.
Hardware:
- CPU-only is fine. NISQA and Silero VAD both run well on CPU. A single analysis takes 3-8 seconds on a 4-core server.
- RAM: 2 GB minimum (models load ~500 MB total at startup).
- Disk: 500 MB for code + models + venv, plus temporary space for downloaded recordings.
5. Project Setup
Directory Structure
mkdir -p /opt/audio-analysis/{tmp,NISQA}
cd /opt/audio-analysis
Your final directory structure will look like this:
/opt/audio-analysis/
service.py # Main FastAPI application
.api_key # Anthropic API key (chmod 600)
cache.db # SQLite cache (auto-created)
tmp/ # Temporary recording downloads
venv/ # Python virtual environment
NISQA/ # NISQA model repo
weights/
nisqa.tar # Pre-trained model weights
nisqa/
NISQA_model.py # Model class
NISQA_lib.py # Support library
Python Virtual Environment
python3.12 -m venv /opt/audio-analysis/venv
source /opt/audio-analysis/venv/bin/activate
pip install --upgrade pip
pip install \
fastapi \
uvicorn[standard] \
torch \
torchaudio \
numpy \
pandas \
soundfile \
requests \
PyYAML \
tqdm \
anthropic
Note on PyTorch: If you are running CPU-only (no GPU), you can install the lighter CPU build:
pip install torch torchaudio --index-url https://download.pytorch.org/whl/cpuThis saves about 1.5 GB of disk space.
Clone NISQA
cd /opt/audio-analysis
git clone https://github.com/gabrielmittag/NISQA.git
The pre-trained weights are included in the repository under NISQA/weights/. The file we use is nisqa.tar (the full model with all quality dimensions).
Anthropic API Key
echo "YOUR_ANTHROPIC_API_KEY" > /opt/audio-analysis/.api_key
chmod 600 /opt/audio-analysis/.api_key
6. Core Audio Processing: SoX and FFmpeg
Before the neural models even run, we extract fundamental audio metrics using two battle-tested CLI tools.
SoX Statistics
SoX (Sound eXchange) gives us amplitude statistics in a single pass:
import subprocess
import numpy as np
def sox_stats(wav_path: str) -> dict:
"""Run SoX stat on a WAV file, return RMS/peak/duration."""
try:
result = subprocess.run(
['sox', wav_path, '-n', 'stat'],
capture_output=True, text=True, timeout=30
)
# SoX writes stats to stderr (not stdout)
text = result.stderr
stats = {}
for line in text.splitlines():
if 'RMS amplitude' in line:
val = line.split(':')[-1].strip()
try:
amp = float(val)
stats['rms_db'] = round(20 * np.log10(amp + 1e-10), 1)
except ValueError:
stats['rms_db'] = -99.0
elif 'Maximum amplitude' in line:
val = line.split(':')[-1].strip()
try:
amp = float(val)
stats['peak_db'] = round(20 * np.log10(amp + 1e-10), 1)
except ValueError:
stats['peak_db'] = -99.0
elif 'Length (seconds)' in line:
val = line.split(':')[-1].strip()
try:
stats['duration'] = round(float(val), 2)
except ValueError:
pass
return stats
except Exception as e:
return {'error': str(e)}
Why these metrics matter:
- RMS dB -- the average loudness. Normal telephony is around -25 dB. Below -35 dB means the caller is barely audible. Above -10 dB means possible clipping.
- Peak dB -- the loudest single sample. If peak is near 0 dB, the audio may clip.
- Duration -- sanity check. A "call" that is 0.5 seconds long is not a real call.
Gotcha: SoX writes its output to stderr, not stdout. This catches many people off guard.
FFmpeg Silence Detection
FFmpeg's silencedetect filter finds gaps in audio below a noise threshold:
import re
def ffmpeg_silence(wav_path: str, noise_db: str = '-35dB',
min_dur: str = '0.5') -> list:
"""Run FFmpeg silencedetect, return list of {start, end, duration}."""
try:
result = subprocess.run(
['ffmpeg', '-i', wav_path, '-af',
f'silencedetect=noise={noise_db}:d={min_dur}',
'-f', 'null', '-'],
capture_output=True, text=True, timeout=60
)
text = result.stderr
silences = []
starts = []
for line in text.splitlines():
if 'silence_start:' in line:
m = re.search(r'silence_start:\s*([\d.]+)', line)
if m:
starts.append(float(m.group(1)))
elif 'silence_end:' in line:
m = re.search(
r'silence_end:\s*([\d.]+).*duration:\s*([\d.]+)', line)
if m:
end = float(m.group(1))
dur = float(m.group(2))
start = starts.pop(0) if starts else end - dur
silences.append({
'start': round(start, 2),
'end': round(end, 2),
'duration': round(dur, 2)
})
# Handle trailing silence (silence_start with no matching end)
if starts:
silences.append({
'start': round(starts[0], 2),
'end': -1, # -1 means "to end of file"
'duration': -1
})
return silences
except Exception as e:
return []
Tuning the parameters:
noise=-35dBworks well for telephony. Lower values (e.g., -45dB) catch more subtle silence but may flag codec noise as silence.d=0.5means only gaps longer than 500ms count. Normal conversational pauses are 0.3-0.8s, so 0.5s is a reasonable floor. For detecting hold music gaps, you might raise this to 2.0s.
Channel Splitting
VoIP recordings often have two channels: one for the caller (inbound), one for the agent (outbound). Analyzing them separately is critical for detecting one-way audio:
def split_channels(wav_path: str) -> tuple:
"""Split stereo WAV into two mono files.
Returns (ch1_path, ch2_path) or (original, None) if mono."""
try:
result = subprocess.run(
['sox', '--i', '-c', wav_path],
capture_output=True, text=True, timeout=10
)
channels = int(result.stdout.strip())
if channels < 2:
return wav_path, None
base = wav_path.rsplit('.', 1)[0]
ch1 = base + '_ch1.wav'
ch2 = base + '_ch2.wav'
subprocess.run(
['sox', wav_path, ch1, 'remix', '1'],
timeout=30, check=True, capture_output=True)
subprocess.run(
['sox', wav_path, ch2, 'remix', '2'],
timeout=30, check=True, capture_output=True)
return ch1, ch2
except Exception as e:
return wav_path, None
Audio Format Conversion
Recordings may arrive as MP3, WAV, or other formats. We normalize everything to WAV before analysis:
def convert_to_wav(input_path: str) -> str:
"""Convert MP3 or other format to WAV using SoX."""
if input_path.lower().endswith('.wav'):
return input_path
wav_path = input_path.rsplit('.', 1)[0] + '.wav'
try:
subprocess.run(
['sox', input_path, wav_path],
timeout=60, check=True, capture_output=True
)
return wav_path
except Exception:
return input_path # Return original if conversion fails
7. NISQA Neural MOS Scoring
What is NISQA?
NISQA (Non-Intrusive Speech Quality Assessment) is a deep learning model developed at TU Berlin. Unlike traditional methods like PESQ or POLQA that require a reference signal, NISQA is non-intrusive -- it predicts quality from the degraded signal alone. This is critical for real-world VoIP analysis where you never have the original clean signal.
NISQA predicts five dimensions:
| Dimension | Scale | What It Measures |
|---|---|---|
| MOS (Mean Opinion Score) | 1-5 | Overall perceived quality. 4+ is good, 3-4 is acceptable, below 3 is poor. |
| Noisiness | 1-5 | Background noise level. Higher is cleaner. Below 2.5 indicates real noise problems. |
| Discontinuity | 1-5 | Smoothness of audio. Higher means smoother. Below 2.5 suggests packet loss or jitter. |
| Coloration | 1-5 | Spectral distortion. Important: 8 kHz narrowband telephony naturally scores 2.0-3.0 due to limited bandwidth. This is normal, not a defect. |
| Loudness | 1-5 | Perceived volume adequacy. |
Setting Up NISQA
The model loads from a .tar checkpoint file:
import sys
import torch
# Add NISQA to Python path
sys.path.insert(0, '/opt/audio-analysis/NISQA')
from nisqa.NISQA_model import nisqaModel
Running Predictions
def nisqa_predict(wav_path: str) -> dict:
"""Run NISQA prediction on a WAV file.
Returns MOS + quality dimensions."""
try:
args = {
'mode': 'predict_file',
'pretrained_model': '/opt/audio-analysis/NISQA/weights/nisqa.tar',
'deg': wav_path,
'num_workers': 0,
'bs': 1,
'ms_channel': None,
'output_dir': None,
'tr_bs_val': 1,
'tr_num_workers': 0,
'ms_max_segments': 50000,
}
model = nisqaModel(args)
df = model.predict()
# DataFrame columns: deg, mos_pred, noi_pred, dis_pred,
# col_pred, loud_pred
row = df.iloc[0]
return {
'mos': round(float(row.get('mos_pred', -1)), 2),
'noisiness': round(float(row.get('noi_pred', -1)), 2),
'discontinuity': round(float(row.get('dis_pred', -1)), 2),
'coloration': round(float(row.get('col_pred', -1)), 2),
'loudness': round(float(row.get('loud_pred', -1)), 2),
}
except Exception as e:
return {'mos': -1, 'error': str(e)}
Key parameters:
mode='predict_file'-- single-file prediction mode.deg-- path to the "degraded" audio file (the recording to score).num_workers=0-- disable multiprocessing for data loading. This avoids fork issues inside a web server.ms_max_segments=50000-- maximum number of 320ms segments to process. For a 30-minute call at 8 kHz, you need about 5,625 segments. 50,000 covers calls up to ~4.4 hours.
Interpreting NISQA Scores for Telephony
This is where domain knowledge matters. NISQA was trained on a mix of narrowband and wideband audio. VoIP telephony using G.711 ulaw at 8 kHz is narrowband by definition, so some scores have different baselines:
MOS Score Interpretation (8 kHz telephony):
4.0 - 5.0 Excellent (rare for narrowband)
3.5 - 4.0 Good -- typical for clean narrowband calls
3.0 - 3.5 Acceptable -- minor issues
2.5 - 3.0 Poor -- noticeable degradation
1.0 - 2.5 Bad -- severe quality issues
Coloration (narrowband-specific):
2.0 - 3.0 NORMAL for 8kHz -- the limited bandwidth itself
causes coloration. Do NOT flag this as a problem.
< 1.5 Actual coloration issue (codec artifacts, echo)
Discontinuity:
> 3.5 Smooth audio, no dropouts
2.5 - 3.5 Minor discontinuities (occasional packet loss)
< 2.5 Significant packet loss or jitter
When Both Legs Are Available
When you have separate caller and agent recordings, run NISQA on each independently and report both scores. The averaged MOS gives an overall quality indicator, but the per-leg scores reveal asymmetric problems:
if has_separate_legs:
nisqa_in = nisqa_predict(inbound_path)
nisqa_out = nisqa_predict(outbound_path)
# Average for overall score
combined = {
'mos': round((nisqa_in['mos'] + nisqa_out['mos']) / 2, 2),
'mos_in': nisqa_in['mos'], # Caller leg quality
'mos_out': nisqa_out['mos'], # Agent leg quality
# ... same for other dimensions
}
A large gap between mos_in and mos_out (e.g., 3.8 vs 2.1) is a strong signal of a trunk or codec issue affecting only one direction.
8. Silero VAD: Voice Activity Detection
Why VAD?
Silence detection (FFmpeg) tells you when audio is below a threshold. VAD tells you when speech is happening. The difference matters:
- Background noise at -40 dB will fool silence detection (it is "not silent") but VAD correctly identifies it as non-speech.
- A caller breathing into the phone is not silence, but it is also not speech.
- VAD gives you a speech ratio -- what percentage of the call actually contained speech. A normal call is 40-70% speech. A call at 5% speech on one leg is one-way audio.
Loading Silero VAD
Silero VAD is loaded from PyTorch Hub at startup:
import torch
import torchaudio
import soundfile as sf
# Load model once at startup (not per-request)
vad_model, vad_utils = torch.hub.load(
repo_or_dir='snakers4/silero-vad',
model='silero_vad',
trust_repo=True
)
(get_speech_timestamps, _, read_audio, _, _) = vad_utils
Audio Loading Helper
Silero VAD expects audio at a specific sample rate (8 kHz or 16 kHz). We use soundfile for reliable loading and torchaudio for resampling:
def load_audio_wav(wav_path: str, target_sr: int = 8000) -> torch.Tensor:
"""Load audio file to torch tensor at target sample rate."""
data, sr = sf.read(wav_path, dtype='float32')
if len(data.shape) > 1:
data = data[:, 0] # Take first channel if stereo
tensor = torch.from_numpy(data)
if sr != target_sr:
tensor = torchaudio.functional.resample(tensor, sr, target_sr)
return tensor
Running VAD Analysis
def silero_vad_analysis(wav_path: str) -> dict:
"""Run Silero VAD on a mono WAV.
Returns speech ratio, segment count, and timestamps."""
try:
wav = load_audio_wav(wav_path, target_sr=8000)
total_samples = len(wav)
total_duration = total_samples / 8000.0
speech_timestamps = get_speech_timestamps(
wav, vad_model,
sampling_rate=8000,
threshold=0.5,
min_speech_duration_ms=250,
min_silence_duration_ms=300
)
speech_samples = sum(
ts['end'] - ts['start'] for ts in speech_timestamps
)
speech_ratio = speech_samples / total_samples \
if total_samples > 0 else 0
segments = []
for ts in speech_timestamps:
segments.append({
'start': round(ts['start'] / 8000.0, 2),
'end': round(ts['end'] / 8000.0, 2)
})
return {
'speech_ratio': round(speech_ratio, 3),
'speech_pct': round(speech_ratio * 100, 1),
'total_duration': round(total_duration, 2),
'speech_segments': segments,
'segment_count': len(segments)
}
except Exception as e:
return {'speech_ratio': -1, 'speech_pct': -1, 'error': str(e)}
Parameter tuning:
threshold=0.5-- the confidence threshold for classifying a frame as speech. Higher values (0.7) reduce false positives but may miss quiet speech. 0.5 is a good default for telephony.min_speech_duration_ms=250-- ignore "speech" shorter than 250ms (likely a click or noise burst).min_silence_duration_ms=300-- do not split speech segments on pauses shorter than 300ms (natural micro-pauses within words).
One-Way Audio Detection
The key insight: if one channel has < 5% speech but the other has > 20%, you have one-way audio. This is a serious VoIP problem that usually indicates a NAT, codec, or RTP routing issue:
def detect_one_way_audio(inbound_vad: dict, outbound_vad: dict) -> dict:
"""Compare speech activity between caller and agent legs."""
in_speech = inbound_vad.get('speech_pct', 50)
out_speech = outbound_vad.get('speech_pct', 50)
one_way = False
direction = None
if in_speech < 5 and out_speech > 20:
one_way = True
direction = 'inbound_silent' # Caller's audio not reaching agent
elif out_speech < 5 and in_speech > 20:
one_way = True
direction = 'outbound_silent' # Agent's audio not reaching caller
return {
'one_way_audio': one_way,
'one_way_direction': direction,
'caller_speech_pct': in_speech,
'agent_speech_pct': out_speech,
}
9. Recording Retrieval from Remote Servers
VoIP platforms typically store recordings on the telephony server itself. Our analysis service runs on a separate server, so we need to fetch recordings on demand.
ViciDial Recording Layout
ViciDial stores recordings in a specific directory structure:
/var/spool/asterisk/monitorDONE/
RECORDINGS/
{filename}-all.wav # Mixed (both parties)
MP3/
{filename}-all.mp3 # Compressed mixed version
ORIG/
{filename}-in.wav # Caller leg (inbound)
{filename}-out.wav # Agent leg (outbound)
These directories are typically served by Apache over HTTP, making HTTP-based retrieval the simplest approach.
Multi-Strategy Fetch
The fetcher tries multiple strategies in order of preference:
import os
import re
import requests as http_requests
import logging
log = logging.getLogger('audio-analysis')
TEMP_DIR = '/opt/audio-analysis/tmp'
# Map logical server names to IPs
REC_SERVERS = {
'uk': 'YOUR_UK_SERVER_IP',
'romania': 'YOUR_RO_SERVER_IP',
'france': 'YOUR_FR_SERVER_IP',
'italy': 'YOUR_IT_SERVER_IP',
}
def _download_file(url: str, local_path: str) -> bool:
"""Download a URL to local path. Returns True on success."""
try:
resp = http_requests.head(url, timeout=5)
if resp.status_code != 200:
return False
r = http_requests.get(url, timeout=120, stream=True)
with open(local_path, 'wb') as f:
for chunk in r.iter_content(8192):
f.write(chunk)
return True
except Exception:
return False
def fetch_recording(server: str, filename: str) -> dict:
"""Download recording from production server.
Strategy:
1. Try separate in/out WAV legs from ORIG/ (best quality)
2. Fall back to mixed -all.wav
3. Fall back to mixed -all.mp3
"""
host = REC_SERVERS.get(server)
if not host:
return {'in_path': None, 'out_path': None,
'mix_path': None, 'has_legs': False, 'files': []}
# Strip extension and -all/-in/-out suffix to get base name
base = re.sub(r'-(all|in|out)\.(wav|mp3)$', '', filename, flags=re.I)
base = re.sub(r'\.(wav|mp3)$', '', base, flags=re.I)
result = {'in_path': None, 'out_path': None,
'mix_path': None, 'has_legs': False, 'files': []}
# Strategy 1: Separate legs (preferred for per-channel analysis)
in_local = os.path.join(TEMP_DIR, f"{base}-in.wav")
out_local = os.path.join(TEMP_DIR, f"{base}-out.wav")
in_ok = _download_file(
f"http://{host}/RECORDINGS/ORIG/{base}-in.wav", in_local)
out_ok = _download_file(
f"http://{host}/RECORDINGS/ORIG/{base}-out.wav", out_local)
if in_ok and out_ok:
# Verify files are not empty stubs
# (ViciDial creates 44-byte empty WAVs for failed recordings)
in_size = os.path.getsize(in_local)
out_size = os.path.getsize(out_local)
if in_size > 100 and out_size > 100:
result['in_path'] = in_local
result['out_path'] = out_local
result['has_legs'] = True
result['files'] = [in_local, out_local]
return result
# Clean up stubs
for p in [in_local, out_local]:
if os.path.exists(p):
os.remove(p)
else:
for p in [in_local, out_local]:
if os.path.exists(p):
os.remove(p)
# Strategy 2: Mixed audio
mix_candidates = [
(f"http://{host}/RECORDINGS/{base}-all.wav",
f"{base}-all.wav"),
(f"http://{host}/RECORDINGS/MP3/{base}-all.mp3",
f"{base}-all.mp3"),
(f"http://{host}/RECORDINGS/{filename}", filename),
]
seen = set()
for url, local_name in mix_candidates:
if url in seen:
continue
seen.add(url)
local_path = os.path.join(TEMP_DIR, local_name)
if _download_file(url, local_path):
result['mix_path'] = local_path
result['files'] = [local_path]
return result
return result
Design decisions:
- HEAD request before GET -- avoids downloading 404 error pages as if they were audio files.
- 44-byte stub detection -- ViciDial creates empty WAV files (just the WAV header) for recordings that failed to start. We check file size > 100 bytes.
- Separate legs preferred -- when available, separate in/out WAVs give far better analysis since we can compare caller and agent quality independently.
10. The Analysis Pipeline
This is where everything comes together. The pipeline orchestrates all the analysis tools and builds the final result:
import time
import json
import hashlib
def _silence_summary(silence_list, total_dur):
"""Summarize silence detection results."""
if not silence_list or total_dur <= 0:
return {'count': 0, 'total_sec': 0, 'ratio_pct': 0,
'longest_ms': 0}
total_silence = sum(
s['duration'] for s in silence_list if s['duration'] > 0)
longest = max(
(s['duration'] for s in silence_list if s['duration'] > 0),
default=0)
return {
'count': len(silence_list),
'total_sec': round(total_silence, 2),
'ratio_pct': round(total_silence / total_dur * 100, 1),
'longest_ms': round(longest * 1000)
}
def run_full_analysis(server: str, filename: str) -> dict:
"""Complete analysis pipeline for one recording."""
t0 = time.time()
rec = fetch_recording(server, filename)
if not rec['files']:
raise FileNotFoundError("Recording not found on server")
temp_files = list(rec['files'])
try:
ch_in = {}
ch_out = {}
has_legs = rec['has_legs']
duration = 0
if has_legs:
# --- Separate leg analysis (best case) ---
in_path = rec['in_path']
out_path = rec['out_path']
# Audio stats
ch_in['stats'] = sox_stats(in_path)
ch_out['stats'] = sox_stats(out_path)
duration = max(
ch_in['stats'].get('duration', 0),
ch_out['stats'].get('duration', 0))
# Silence detection
ch_in['silence'] = ffmpeg_silence(in_path)
ch_out['silence'] = ffmpeg_silence(out_path)
# Voice activity detection
ch_in['vad'] = silero_vad_analysis(in_path)
ch_out['vad'] = silero_vad_analysis(out_path)
# Neural MOS scoring (per leg)
nisqa_in = nisqa_predict(in_path)
nisqa_out = nisqa_predict(out_path)
nisqa_result = {
'mos': round(
(nisqa_in['mos'] + nisqa_out['mos']) / 2, 2),
'mos_in': nisqa_in['mos'],
'mos_out': nisqa_out['mos'],
'noisiness': round(
(nisqa_in['noisiness'] +
nisqa_out['noisiness']) / 2, 2),
'discontinuity': round(
(nisqa_in['discontinuity'] +
nisqa_out['discontinuity']) / 2, 2),
'coloration': round(
(nisqa_in['coloration'] +
nisqa_out['coloration']) / 2, 2),
'loudness': round(
(nisqa_in['loudness'] +
nisqa_out['loudness']) / 2, 2),
}
else:
# --- Mixed audio analysis ---
mix_path = rec['mix_path']
wav_path = convert_to_wav(mix_path)
if wav_path != mix_path:
temp_files.append(wav_path)
overall_stats = sox_stats(wav_path)
duration = overall_stats.get('duration', 0)
# Try splitting stereo into channels
ch1_path, ch2_path = split_channels(wav_path)
if ch2_path:
temp_files.extend([ch1_path, ch2_path])
ch_in['stats'] = sox_stats(ch1_path)
ch_out['stats'] = sox_stats(ch2_path)
ch_in['silence'] = ffmpeg_silence(ch1_path)
ch_out['silence'] = ffmpeg_silence(ch2_path)
ch_in['vad'] = silero_vad_analysis(ch1_path)
ch_out['vad'] = silero_vad_analysis(ch2_path)
else:
ch_in['stats'] = overall_stats
ch_in['silence'] = ffmpeg_silence(wav_path)
ch_in['vad'] = silero_vad_analysis(wav_path)
nisqa_result = nisqa_predict(wav_path)
# --- One-way audio detection ---
one_way = False
one_way_direction = None
if ch_out.get('vad'):
in_speech = ch_in.get('vad', {}).get('speech_pct', 50)
out_speech = ch_out.get('vad', {}).get('speech_pct', 50)
if in_speech < 5 and out_speech > 20:
one_way = True
one_way_direction = 'inbound_silent'
elif out_speech < 5 and in_speech > 20:
one_way = True
one_way_direction = 'outbound_silent'
# --- Build result ---
elapsed = round(time.time() - t0, 2)
has_outbound = bool(ch_out.get('vad'))
result = {
'server': server,
'file': filename,
'duration': duration,
'has_legs': has_legs,
'nisqa': nisqa_result,
'inbound': {
'rms_db': ch_in.get('stats', {}).get('rms_db'),
'peak_db': ch_in.get('stats', {}).get('peak_db'),
'speech_pct': ch_in.get('vad', {}).get(
'speech_pct', -1),
'silence': _silence_summary(
ch_in.get('silence', []), duration),
'speech_segments': ch_in.get('vad', {}).get(
'speech_segments', []),
},
'outbound': {
'rms_db': ch_out.get('stats', {}).get('rms_db'),
'peak_db': ch_out.get('stats', {}).get('peak_db'),
'speech_pct': ch_out.get('vad', {}).get(
'speech_pct', -1),
'silence': _silence_summary(
ch_out.get('silence', []), duration),
'speech_segments': ch_out.get('vad', {}).get(
'speech_segments', []),
} if has_outbound else None,
'one_way_audio': one_way,
'one_way_direction': one_way_direction,
'analysis_time_sec': elapsed,
}
return result
finally:
# Always clean up temp files
for p in temp_files:
if p and os.path.exists(p) and p.startswith(TEMP_DIR):
try:
os.remove(p)
except OSError:
pass
11. API Endpoint Design
FastAPI Application Setup
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import JSONResponse
app = FastAPI(title="Audio Quality Analysis")
@app.get("/health")
async def health():
"""Health check endpoint. Confirms models are loaded."""
return {"status": "ok", "models": ["silero_vad", "nisqa"]}
GET /analyze -- Full Technical Analysis
@app.get("/analyze")
async def analyze(
server: str = Query(...,
description="Server key: uk, romania, france, italy"),
file: str = Query(...,
description="Recording filename")
):
"""Full audio quality analysis for a recording."""
if server not in REC_SERVERS:
raise HTTPException(400, f"Unknown server: {server}")
if not file or not re.match(r'^[a-zA-Z0-9_\-\.]+$', file):
raise HTTPException(400, "Invalid filename")
# Check cache first
result = get_cached_or_analyze(server, file)
return JSONResponse(result)
Example request:
curl "http://YOUR_SERVER_IP:8084/analyze?server=uk&file=20260301-142215_1001_5551234567-all.wav"
Example response:
{
"server": "uk",
"file": "20260301-142215_1001_5551234567-all.wav",
"duration": 187.52,
"has_legs": true,
"nisqa": {
"mos": 3.71,
"noisiness": 3.85,
"discontinuity": 4.12,
"coloration": 2.45,
"loudness": 3.58,
"mos_in": 3.64,
"mos_out": 3.78
},
"inbound": {
"rms_db": -24.3,
"peak_db": -6.1,
"speech_pct": 42.3,
"silence": {
"count": 8,
"total_sec": 15.7,
"ratio_pct": 8.4,
"longest_ms": 4200
},
"speech_segments": [
{"start": 1.2, "end": 8.5},
{"start": 12.3, "end": 25.1},
{"start": 28.8, "end": 45.6}
]
},
"outbound": {
"rms_db": -22.8,
"peak_db": -4.9,
"speech_pct": 51.7,
"silence": {
"count": 6,
"total_sec": 11.2,
"ratio_pct": 6.0,
"longest_ms": 3100
},
"speech_segments": [
{"start": 9.1, "end": 11.8},
{"start": 25.5, "end": 28.2},
{"start": 46.1, "end": 62.4}
]
},
"one_way_audio": false,
"one_way_direction": null,
"analysis_time_sec": 6.42
}
DELETE /cache -- Cache Management
@app.delete("/cache")
async def clear_cache(server: str = "", file: str = ""):
"""Clear cache for a specific recording or all."""
db = get_db()
if server and file:
db.execute(
"DELETE FROM analysis_cache WHERE cache_key=?",
(f"{server}:{file}",))
else:
db.execute("DELETE FROM analysis_cache")
db.commit()
cnt = db.total_changes
db.close()
return {"deleted": cnt}
12. AI-Powered Analysis with Claude
The AI Analysis Layer
The /ai-analyze endpoint takes the raw metrics from /analyze and sends them to Claude Haiku for expert interpretation. This is the difference between "MOS 3.2, noisiness 2.8" and "This call had moderate background noise that may have made it hard for the agent to hear the caller clearly."
Building the Data Prompt
The key to good AI analysis is structuring the data clearly for the model. We format all metrics into a readable text block:
def build_call_data_prompt(analysis: dict) -> str:
"""Format analysis results as structured text for Claude."""
n = analysis.get('nisqa', {})
inb = analysis.get('inbound', {})
outb = analysis.get('outbound', {})
dur = analysis.get('duration', 0)
m, s = int(dur // 60), int(dur % 60)
has_legs = analysis.get('has_legs', False)
lines = []
lines.append(f"Duration: {m}m {s}s")
lines.append(f"Analysis type: "
f"{'Separate caller/agent legs' if has_legs "
f"else 'Mixed single audio'}")
lines.append(f"Codec: 8kHz ulaw (narrowband telephony)")
lines.append("")
lines.append("NISQA scores (1-5, higher=better):")
lines.append(f" Overall MOS: {n.get('mos', '?')}")
lines.append(f" Noisiness: {n.get('noisiness', '?')}")
lines.append(f" Discontinuity: {n.get('discontinuity', '?')}")
lines.append(f" Coloration: {n.get('coloration', '?')}")
lines.append(f" Loudness: {n.get('loudness', '?')}")
if n.get('mos_in') is not None:
lines.append(f" Caller leg MOS: {n['mos_in']}")
lines.append(f" Agent leg MOS: {n['mos_out']}")
lines.append("")
lines.append("Caller (inbound):")
lines.append(f" Speech: {inb.get('speech_pct', '?')}% of call")
lines.append(f" Volume: RMS {inb.get('rms_db', '?')}dB, "
f"Peak {inb.get('peak_db', '?')}dB")
sil = inb.get('silence', {})
lines.append(f" Silence gaps: {sil.get('count', 0)}, "
f"longest {sil.get('longest_ms', 0)}ms")
# Include speech segment timestamps
in_segs = inb.get('speech_segments', [])
if in_segs:
lines.append(f" Speech segments ({len(in_segs)}):")
for seg in in_segs:
lines.append(f" {seg['start']:.1f}s - {seg['end']:.1f}s")
if outb:
lines.append("")
lines.append("Agent (outbound):")
lines.append(f" Speech: {outb.get('speech_pct', '?')}% of call")
lines.append(f" Volume: RMS {outb.get('rms_db', '?')}dB, "
f"Peak {outb.get('peak_db', '?')}dB")
out_segs = outb.get('speech_segments', [])
if out_segs:
lines.append(f" Speech segments ({len(out_segs)}):")
for seg in out_segs:
lines.append(
f" {seg['start']:.1f}s - {seg['end']:.1f}s")
lines.append("")
lines.append(f"One-way audio detected: "
f"{'Yes' if analysis.get('one_way_audio') else 'No'}")
return "\n".join(lines)
The System Prompt: Encoding Domain Knowledge
This is the most important part of the AI layer. The system prompt encodes VoIP telephony domain expertise that prevents the model from making common mistakes:
ANALYSIS_PROMPT = """You are a senior VoIP call quality analyst at a call \
center. You analyze phone calls based on detailed metrics and speech \
activity data.
DOMAIN KNOWLEDGE you must apply:
- These are 8kHz narrowband ulaw telephony calls. NISQA coloration scores \
of 2.0-3.0 are NORMAL for narrowband -- this is NOT a defect. Only flag \
coloration below 1.5.
- When one party is silent but the other is actively speaking (check the \
speech segment timestamps), the silent party is LISTENING -- not on \
hold, not disconnected. This is normal conversation.
- Silence gaps in the 0.5-3s range are normal conversational pauses. Only \
flag gaps >5s as notable.
- NISQA noisiness 3.0-3.5 is borderline -- call it "minor" not \
"significant". Only flag below 2.5 as a real problem.
- NISQA discontinuity >3.5 means SMOOTH audio with NO dropouts. Only \
flag below 2.5 as packet loss.
- RMS around -25dB is normal volume. Below -35dB is too quiet. \
Above -10dB is too loud.
YOUR JOB: Give an honest, simple, human-readable assessment. Write like \
you are explaining to a supervisor who is not technical. No jargon. No \
speculation about causes unless the data clearly supports it.
IMPORTANT: Cross-reference the speech segment timestamps. If the agent has \
a 20s silence from 170s-190s but the caller has multiple speech segments \
in that same window, the agent was simply listening to the caller -- say \
that, do not call it a problem.
Write your assessment in this format:
- One sentence overall verdict
- 2-3 short bullet points with specific findings (reference actual numbers)
- Keep it under 80 words total
- Be honest -- if the call quality is actually fine for narrowband \
telephony, say so"""
Why the domain knowledge matters: Without these instructions, Claude will flag every single 8 kHz call as having "coloration issues" (because narrowband audio does sound colored compared to wideband). It will also flag normal listening pauses as "dead air" or "possible disconnection." The domain constraints prevent these false positives.
Calling the Anthropic API
ANTHROPIC_KEY_FILE = '/opt/audio-analysis/.api_key'
def call_claude_api(prompt: str, max_tokens: int = 300) -> str:
"""Call Claude Haiku via the Anthropic Messages API."""
api_key = ''
if os.path.exists(ANTHROPIC_KEY_FILE):
api_key = open(ANTHROPIC_KEY_FILE).read().strip()
if not api_key:
raise RuntimeError("Anthropic API key not configured")
resp = http_requests.post(
"https://api.anthropic.com/v1/messages",
headers={
"x-api-key": api_key,
"content-type": "application/json",
"anthropic-version": "2023-06-01",
},
json={
"model": "claude-haiku-4-5-20251001",
"max_tokens": max_tokens,
"messages": [{"role": "user", "content": prompt}],
},
timeout=60,
)
if resp.status_code != 200:
err = resp.json().get('error', {}).get('message',
resp.text[:200])
raise RuntimeError(f"API error ({resp.status_code}): {err}")
return resp.json()['content'][0]['text'].strip()
The /ai-analyze Endpoint
@app.get("/ai-analyze")
async def ai_analyze(
server: str = Query(...),
file: str = Query(...),
):
"""AI-powered call quality assessment."""
if server not in REC_SERVERS:
raise HTTPException(400, f"Unknown server: {server}")
# Check AI-specific cache
cache_key = f"ai:{server}:{file}"
cached = check_cache(cache_key)
if cached:
return JSONResponse(cached)
# Run standard analysis first (also cached)
analysis_data = get_cached_or_analyze(server, file)
t0 = time.time()
call_data = build_call_data_prompt(analysis_data)
prompt = ANALYSIS_PROMPT + "\n\n" + call_data
ai_text = call_claude_api(prompt)
elapsed = round(time.time() - t0, 2)
result = {
"ai_analysis": ai_text,
"model": "claude-haiku",
"has_legs": analysis_data.get('has_legs', False),
"analysis_time_sec": elapsed,
}
save_to_cache(cache_key, result)
return JSONResponse(result)
Example response:
{
"ai_analysis": "This call had good overall audio quality for a standard telephone call. The MOS score of 3.71 is solid for narrowband telephony.\n\n- Both caller (42.3%) and agent (51.7%) had healthy speech activity, with normal turn-taking conversation throughout.\n- Audio volume was good on both sides (RMS -24.3dB caller, -22.8dB agent) with no clipping.\n- No one-way audio, no significant silence gaps. The longest pause was 4.2 seconds, which is within normal range.",
"model": "claude-haiku",
"has_legs": true,
"analysis_time_sec": 1.83
}
POST /investigate -- Deep Call Investigation
The /investigate endpoint handles a different use case: given SIP traces and call metadata, explain what happened to a specific call. This is used when a call was dropped, had poor quality, or ended unexpectedly:
@app.post("/investigate")
async def investigate(payload: dict):
"""Analyze SIP trace + call context with Claude to explain
what happened."""
sip_trace = payload.get('sip_trace', '')
call_info = payload.get('call_info', '')
status = payload.get('status', '')
if not sip_trace and not call_info:
raise HTTPException(400, "No call data provided")
# Cache by hash of input
cache_key = (
f"inv:{hashlib.md5((sip_trace + call_info).encode()).hexdigest()}"
)
cached = check_cache(cache_key)
if cached:
return JSONResponse(cached)
t0 = time.time()
prompt = f"""You are a senior VoIP/SIP engineer at a call center. \
A call ended with status "{status}" and the supervisor wants to know \
what happened in simple, non-technical terms.
REFERENCE -- SIP CODES:
100=Trying, 180=Ringing, 183=Progress, 200=OK, 408=Timeout, \
480=Unavailable, 486=Busy, 487=Cancelled, 503=Overloaded, \
BYE=Hangup
REFERENCE -- CARRIER HANGUP CAUSES:
1=Unallocated number, 16=Normal clearing, 17=User busy, \
18=No response, 19=No answer, 21=Call rejected, 27=Dest out of order, \
28=Invalid number, 31=Normal unspecified, 34=No circuit, \
38=Network out of order
=== CALL DATA ===
{call_info}
=== SIP TRACE ===
{sip_trace if sip_trace else '(No SIP trace available)'}
Explain what happened in 2-4 short sentences for a non-technical \
supervisor. Be specific -- reference actual codes and durations."""
ai_text = call_claude_api(prompt, max_tokens=600)
elapsed = round(time.time() - t0, 2)
result = {"explanation": ai_text, "analysis_time_sec": elapsed}
save_to_cache(cache_key, result)
return JSONResponse(result)
Example request:
curl -X POST http://YOUR_SERVER_IP:8084/investigate \
-H "Content-Type: application/json" \
-d '{
"status": "NANQUE",
"call_info": "CARRIER LOG:\n dialstatus: ANSWER\n hangup_cause: 16\n sip_hangup_cause: 200\n\nQUEUE DATA:\n queue_seconds: 45\n queue_position: 3\n\nDID ROUTING:\n did_pattern: 442012345678\n did_description: London Main",
"sip_trace": ""
}'
13. The Admin AI Assistant
The /ask endpoint is the most sophisticated part of the service. It creates a multi-turn AI assistant that can query live databases and Prometheus metrics to answer questions about call center operations.
How It Works
- The user sends a natural-language question (e.g., "How many calls did the UK team handle today?")
- Claude receives the question along with a comprehensive system prompt describing the database schema, table structures, and available data sources
- Claude responds with one or more
<data_request>blocks containing SQL queries, Prometheus queries, or ViciDial report requests - The service executes those queries and sends the results back to Claude
- Claude analyzes the data and produces a formatted answer
- This loop can repeat up to 3 times for complex questions requiring follow-up queries
Data Source Executors
import pymysql
MYSQL_SERVERS = {
'uk': {'host': 'YOUR_UK_SERVER_IP', 'label': 'Alpha'},
'romania': {'host': 'YOUR_RO_SERVER_IP', 'label': 'Charlie'},
'france': {'host': 'YOUR_FR_SERVER_IP', 'label': 'Delta'},
'italy': {'host': 'YOUR_IT_SERVER_IP', 'label': 'Echo'},
}
MYSQL_USER = 'YOUR_DB_USER'
MYSQL_PASS = 'YOUR_DB_PASSWORD'
MYSQL_DB = 'asterisk'
def query_mysql(server_key: str, sql: str, limit: int = 50) -> list:
"""Run a read-only query against a ViciDial server."""
srv = MYSQL_SERVERS.get(server_key)
if not srv:
return [{"error": f"Unknown server: {server_key}"}]
try:
conn = pymysql.connect(
host=srv['host'], port=3306,
user=MYSQL_USER, password=MYSQL_PASS,
database=MYSQL_DB, connect_timeout=5,
read_timeout=10, charset='utf8')
with conn.cursor(pymysql.cursors.DictCursor) as cur:
# Safety: only allow SELECT/SHOW
stripped = sql.strip().upper()
if not stripped.startswith('SELECT') and \
not stripped.startswith('SHOW'):
return [{"error": "Only SELECT/SHOW queries allowed"}]
cur.execute(sql)
rows = cur.fetchmany(limit)
# Convert non-serializable types for JSON
result = []
for row in rows:
clean = {}
for k, v in row.items():
if hasattr(v, 'isoformat'):
clean[k] = v.isoformat()
elif isinstance(v, (bytes, bytearray)):
clean[k] = v.decode('utf-8', errors='replace')
elif isinstance(v, __import__('decimal').Decimal):
clean[k] = float(v)
else:
clean[k] = v
result.append(clean)
conn.close()
return result
except Exception as e:
return [{"error": f"MySQL error: {str(e)[:200]}"}]
def query_prometheus(query: str) -> dict:
"""Run a PromQL instant query."""
try:
resp = http_requests.get(
"http://localhost:9090/api/v1/query",
params={"query": query},
timeout=10)
data = resp.json()
if data.get('status') != 'success':
return {"error": data.get('error', 'query failed')}
results = []
for r in data.get('data', {}).get('result', [])[:30]:
metric = {k: v for k, v in r.get('metric', {}).items()
if k != '__name__'}
val = r.get('value', [None, None])
results.append({
"labels": metric,
"value": val[1] if len(val) > 1 else None})
return {"results": results}
except Exception as e:
return {"error": str(e)[:200]}
The /ask Endpoint with Multi-Turn Loop
@app.post("/ask")
async def admin_ask(payload: dict):
"""Multi-turn AI assistant that can query live databases."""
question = payload.get('question', '').strip()
conversation = payload.get('conversation', [])
if not question and not conversation:
raise HTTPException(400, "No question provided")
t0 = time.time()
# Build messages
if conversation:
messages = conversation
else:
messages = [{"role": "user", "content": question}]
# Call Claude (Sonnet or Opus for complex reasoning)
response_text = call_claude_admin(messages, max_tokens=2000)
# Data request loop: up to 3 rounds
total_queries = 0
for round_num in range(3):
if '<data_request>' not in response_text:
break
match = re.search(
r'<data_request>\s*(\[.*?\])\s*</data_request>',
response_text, re.DOTALL)
if not match:
break
queries = json.loads(match.group(1))
# Execute all queries (max 10 per round)
query_results = []
for q in queries[:10]:
qtype = q.get('type', '')
if qtype == 'mysql':
result = query_mysql(
q.get('server', ''), q.get('sql', ''))
query_results.append({
"type": "mysql",
"server": q.get('server', ''),
"rows": result,
"count": len(result),
})
elif qtype == 'prometheus':
result = query_prometheus(q.get('query', ''))
query_results.append({
"type": "prometheus",
"data": result,
})
total_queries += len(query_results)
# Send results back to Claude for analysis
messages.append({
"role": "assistant", "content": response_text})
messages.append({
"role": "user",
"content": (
"Here are the query results:\n\n```json\n"
+ json.dumps(query_results, indent=2, default=str)
+ "\n```\n\nAnalyze this data and give me a clear "
"answer. Use markdown formatting.")
})
response_text = call_claude_admin(messages, max_tokens=3000)
# Return final answer
elapsed = round(time.time() - t0, 2)
messages.append({"role": "assistant", "content": response_text})
return JSONResponse({
"answer": response_text,
"queries_executed": total_queries,
"elapsed_sec": elapsed,
"conversation": messages, # For multi-turn continuations
})
Example request:
curl -X POST http://YOUR_SERVER_IP:8084/ask \
-H "Content-Type: application/json" \
-d '{"question": "How many inbound calls did UK handle today, and what was the average wait time?"}'
Example response:
{
"answer": "## UK Inbound Performance Today\n\n| Metric | Value |\n|---|---|\n| Total inbound calls | 247 |\n| Answered | 231 (93.5%) |\n| Dropped (NANQUE) | 8 (3.2%) |\n| Average wait time | 18.4 seconds |\n| Max wait time | 142 seconds |\n\nPerformance is healthy today...",
"queries_executed": 2,
"elapsed_sec": 4.71,
"conversation": [...]
}
Multi-Turn Conversations
The /ask endpoint returns the full conversation array in its response. To ask follow-up questions, send the conversation back:
curl -X POST http://YOUR_SERVER_IP:8084/ask \
-H "Content-Type: application/json" \
-d '{
"conversation": [
{"role": "user", "content": "How many inbound calls did UK handle today?"},
{"role": "assistant", "content": "...previous answer..."},
{"role": "user", "content": "Which agent had the most calls?"}
]
}'
Security Considerations
The admin assistant has read-only database access, but there are important safeguards:
- SQL injection prevention -- only SELECT and SHOW queries are allowed. Any query starting with INSERT, UPDATE, DELETE, DROP, etc. is rejected.
- Row limits -- query results are capped at 50 rows per query.
- Query count limits -- maximum 10 queries per round, 3 rounds per request.
- Timeouts -- MySQL connections time out at 10 seconds, preventing runaway queries.
- Read-only database user -- the MySQL user should have only SELECT privileges.
14. SQLite Caching Layer
Every analysis is expensive (3-8 seconds of CPU time, plus network for downloading recordings). We cache results in SQLite:
import sqlite3
CACHE_DB = '/opt/audio-analysis/cache.db'
def get_db():
"""Get a SQLite connection with row factory."""
db = sqlite3.connect(CACHE_DB)
db.row_factory = sqlite3.Row
db.execute("""CREATE TABLE IF NOT EXISTS analysis_cache (
cache_key TEXT PRIMARY KEY,
result_json TEXT NOT NULL,
created_at REAL NOT NULL
)""")
db.commit()
return db
def get_cached_or_analyze(server: str, filename: str) -> dict:
"""Return cached result or run fresh analysis."""
cache_key = f"{server}:{filename}"
# Check cache
try:
db = get_db()
row = db.execute(
"SELECT result_json FROM analysis_cache WHERE cache_key=?",
(cache_key,)).fetchone()
if row:
db.close()
return json.loads(row['result_json'])
db.close()
except Exception:
pass
# Run analysis
result = run_full_analysis(server, filename)
# Cache result
try:
db = get_db()
db.execute(
"INSERT OR REPLACE INTO analysis_cache "
"(cache_key, result_json, created_at) VALUES (?,?,?)",
(cache_key, json.dumps(result), time.time()))
db.commit()
db.close()
except Exception:
pass
return result
Cache key strategy:
/analyzeresults:"uk:recording-filename.wav"/ai-analyzeresults:"ai:uk:recording-filename.wav"(separate from raw analysis)/investigateresults:"inv:{md5hash}"(hash of the input data)
Why SQLite and not Redis?
- Zero configuration. No extra service to manage.
- Persistent across restarts. Analysis results are still valid after a reboot.
- Perfectly adequate for this workload (hundreds of cached results, not millions).
- Atomic writes. No corruption risk from concurrent requests at this scale.
Cache Invalidation
The /cache DELETE endpoint allows clearing specific entries or the entire cache:
# Clear a specific recording's cache
curl -X DELETE "http://YOUR_SERVER_IP:8084/cache?server=uk&file=recording.wav"
# Clear everything
curl -X DELETE "http://YOUR_SERVER_IP:8084/cache"
15. Systemd Service Deployment
Service File
Create /etc/systemd/system/audio-analysis.service:
[Unit]
Description=Audio Quality Analysis Service (FastAPI)
After=network.target
[Service]
Type=simple
User=root
WorkingDirectory=/opt/audio-analysis
ExecStart=/opt/audio-analysis/venv/bin/python3 -m uvicorn \
service:app --host 0.0.0.0 --port 8084 --workers 1
Restart=always
RestartSec=5
StandardOutput=append:/opt/audio-analysis/service.log
StandardError=append:/opt/audio-analysis/service.log
[Install]
WantedBy=multi-user.target
Why --workers 1? The ML models (NISQA, Silero VAD) load into memory once at startup. With multiple workers, each worker loads its own copy, consuming 500 MB per worker. For a service that processes requests sequentially (audio analysis is CPU-bound), one worker is optimal.
Enable and Start
sudo systemctl daemon-reload
sudo systemctl enable audio-analysis
sudo systemctl start audio-analysis
# Check status
sudo systemctl status audio-analysis
# View logs
tail -f /opt/audio-analysis/service.log
Log Rotation
Add a logrotate config at /etc/logrotate.d/audio-analysis:
/opt/audio-analysis/service.log {
daily
rotate 14
compress
delaycompress
missingok
notifempty
postrotate
systemctl restart audio-analysis
endscript
}
Firewall
Only expose port 8084 to trusted networks:
# Allow from your monitoring server only
sudo ufw allow from YOUR_MONITORING_IP to any port 8084
# Or restrict via iptables
iptables -A INPUT -p tcp --dport 8084 -s YOUR_MONITORING_IP -j ACCEPT
iptables -A INPUT -p tcp --dport 8084 -j DROP
16. Production Tips and Optimization
Model Loading at Startup
Both Silero VAD and NISQA take several seconds to load. Load them once at module level, not per request:
# At module level (runs once when uvicorn imports the module)
log.info("Loading Silero VAD model...")
vad_model, vad_utils = torch.hub.load(
repo_or_dir='snakers4/silero-vad',
model='silero_vad',
trust_repo=True
)
(get_speech_timestamps, _, read_audio, _, _) = vad_utils
log.info("Silero VAD loaded.")
log.info("Loading NISQA model...")
from nisqa.NISQA_model import nisqaModel
# Pre-load the model class -- actual prediction still creates
# instances per file, but the weights are cached by PyTorch
log.info("NISQA loaded.")
Temporary File Cleanup
Always use a try/finally block to clean up downloaded recordings, even if analysis fails:
try:
# ... run analysis ...
return result
finally:
for p in temp_files:
if p and os.path.exists(p) and p.startswith(TEMP_DIR):
try:
os.remove(p)
except OSError:
pass
Also consider a periodic cleanup cron for any orphaned files:
# Clean temp files older than 1 hour
echo "0 * * * * find /opt/audio-analysis/tmp -type f -mmin +60 -delete" \
| crontab -
Batch Processing
For analyzing many recordings at once (e.g., nightly quality audits), you can build a batch endpoint or a CLI script:
#!/usr/bin/env python3
"""Batch analyze recordings from a list file."""
import requests
import sys
import json
import time
API_BASE = "http://localhost:8084"
def batch_analyze(server: str, filenames: list):
results = []
for i, fname in enumerate(filenames):
print(f"[{i+1}/{len(filenames)}] {fname}...", end=" ")
t0 = time.time()
resp = requests.get(
f"{API_BASE}/analyze",
params={"server": server, "file": fname},
timeout=120)
elapsed = time.time() - t0
if resp.status_code == 200:
data = resp.json()
mos = data.get('nisqa', {}).get('mos', -1)
print(f"MOS={mos} ({elapsed:.1f}s)")
results.append(data)
else:
print(f"FAILED: {resp.status_code}")
results.append({"file": fname, "error": resp.text})
return results
if __name__ == '__main__':
server = sys.argv[1]
with open(sys.argv[2]) as f:
files = [line.strip() for line in f if line.strip()]
results = batch_analyze(server, files)
# Write summary
with open('batch_results.json', 'w') as f:
json.dump(results, f, indent=2)
# Print quality distribution
scores = [r['nisqa']['mos'] for r in results
if isinstance(r.get('nisqa'), dict)
and r['nisqa'].get('mos', -1) > 0]
if scores:
import statistics
print(f"\nResults: {len(scores)} analyzed")
print(f" Mean MOS: {statistics.mean(scores):.2f}")
print(f" Median MOS: {statistics.median(scores):.2f}")
print(f" Min MOS: {min(scores):.2f}")
print(f" Below 3.0: {sum(1 for s in scores if s < 3.0)}")
Usage:
# Create a list of recordings to analyze
echo "20260301-142215_1001_5551234567-all.wav" > recordings.txt
echo "20260301-143022_1002_5559876543-all.wav" >> recordings.txt
# Run batch analysis
python3 batch_analyze.py uk recordings.txt
Performance Benchmarks
Typical analysis times on a 4-core CPU server (no GPU):
| Step | Time |
|---|---|
| Recording download (LAN) | 0.3 - 1.0 s |
| SoX stats | 0.1 - 0.3 s |
| FFmpeg silence detection | 0.5 - 1.5 s |
| Silero VAD | 0.5 - 2.0 s |
| NISQA prediction | 1.5 - 4.0 s |
| Claude Haiku API call | 1.0 - 3.0 s |
| Total (/analyze) | 3 - 8 s |
| Total (/ai-analyze) | 4 - 11 s |
| Cached response | < 10 ms |
Cost Estimation
Claude API costs for the AI analysis layer:
| Endpoint | Model | Input Tokens | Output Tokens | Cost per Call |
|---|---|---|---|---|
/ai-analyze |
Claude Haiku | ~800 | ~100 | ~$0.0002 |
/investigate |
Claude Haiku | ~1500 | ~300 | ~$0.0005 |
/ask |
Claude Sonnet/Opus | ~3000 | ~1000 | ~$0.01-0.05 |
At 100 AI analyses per day, the Haiku cost is roughly $0.60/month. The admin assistant (/ask) is more expensive per query but used far less frequently.
17. Troubleshooting
Model Loading Failures
Problem: ModuleNotFoundError: No module named 'nisqa'
# Ensure NISQA is cloned and the path is correct
ls /opt/audio-analysis/NISQA/nisqa/NISQA_model.py
# Ensure the path insert is before the import
Problem: RuntimeError: Error(s) in loading state_dict
This usually means the wrong model weights file. Ensure you are using nisqa.tar (the full model), not nisqa_mos_only.tar or nisqa_tts.tar.
Problem: Silero VAD fails with urllib.error.URLError
On first run, Silero downloads its model from GitHub. If your server has no internet access:
# On a machine with internet
python3 -c "import torch; torch.hub.load('snakers4/silero-vad', 'silero_vad', trust_repo=True)"
# Copy the cached model
scp -r ~/.cache/torch/hub/snakers4_silero-vad_master \
YOUR_SERVER_IP:/root/.cache/torch/hub/
SoX / FFmpeg Issues
Problem: FileNotFoundError: sox: No such file or directory
sudo apt install sox libsox-fmt-all
# Verify
sox --version
ffmpeg -version
Problem: SoX cannot read MP3 files
sudo apt install libsox-fmt-mp3
API Key Issues
Problem: RuntimeError: Anthropic API key not configured
# Check the key file exists and is readable
cat /opt/audio-analysis/.api_key
# Should print your key (starts with "sk-ant-")
# Check permissions
ls -la /opt/audio-analysis/.api_key
# Should be: -rw------- root root
Recording Fetch Failures
Problem: HTTPException: 404 Recording not found on server
- Check that the recording server is accessible from the analysis server:
curl -I http://YOUR_RECORDING_SERVER/RECORDINGS/ - Check the filename format. ViciDial filenames follow a specific pattern.
- Check that Apache on the recording server allows directory listing or direct file access.
Memory Issues
Problem: Service uses too much RAM or gets OOM-killed.
PyTorch models consume memory. On a constrained server:
# Check current usage
ps aux | grep uvicorn
# Set memory limit in systemd
# Add to [Service] section of the unit file:
MemoryMax=2G
Service Will Not Start
# Check the log
journalctl -u audio-analysis -n 50
# Try running manually to see errors
cd /opt/audio-analysis
source venv/bin/activate
python3 -m uvicorn service:app --host 0.0.0.0 --port 8084
Common issues:
- Port 8084 already in use:
lsof -i :8084 - Virtual environment broken: recreate with
python3.12 -m venv venv --clear - Missing Python package:
pip install <package>in the venv
NISQA Returns MOS of -1
This means the prediction failed, usually because:
- The audio file is too short (< 1 second)
- The audio file is empty (44-byte WAV header only)
- The sample rate conversion failed
- The file is corrupt
Check the service log for the specific error message.
18. Complete File Reference
Project Structure
/opt/audio-analysis/
service.py # Main FastAPI application (~1400 lines)
.api_key # Anthropic API key
cache.db # SQLite cache (auto-created)
service.log # Application log
tmp/ # Temporary recording downloads
venv/ # Python virtual environment
NISQA/ # NISQA repository (git clone)
weights/
nisqa.tar # Pre-trained model weights (~80MB)
nisqa/
NISQA_model.py # Model loading and prediction
NISQA_lib.py # Neural network architecture
/etc/systemd/system/
audio-analysis.service # Systemd unit file
Python Dependencies
fastapi
uvicorn[standard]
torch
torchaudio
numpy
pandas
soundfile
requests
PyYAML
tqdm
anthropic
pymysql # Only needed for /ask endpoint
System Dependencies
python3.12
python3.12-venv
python3.12-dev
sox
libsox-fmt-all
libsox-fmt-mp3
ffmpeg
git
Summary
This service combines three layers of audio intelligence:
Signal-level analysis (SoX + FFmpeg) -- amplitude, silence gaps, duration. Fast, deterministic, zero false positives.
Neural perception modeling (NISQA + Silero VAD) -- MOS scores, speech activity, one-way audio detection. Objective, consistent, replaces subjective human judgment.
AI reasoning (Claude) -- translates metrics into actionable insights, investigates call failures using SIP traces, answers ad-hoc questions by querying live databases.
The SQLite cache ensures that repeated queries for the same recording are instant. The systemd service file ensures the service restarts automatically on failure. The FastAPI framework provides automatic OpenAPI documentation at /docs.
The result is a service that turns "can you listen to this call and tell me if the audio was okay?" -- a 5-minute manual task -- into a 6-second API call that returns objective scores and a plain-English assessment.
Built with FastAPI, PyTorch, NISQA, Silero VAD, SoX, FFmpeg, and Claude AI.
For questions, feedback, or consulting inquiries: [your contact information]