← All Tutorials

Building an AI-Powered VoIP Call Quality Analysis Service

AI & Voice Agents Advanced 40 min read #02

Building an AI-Powered VoIP Call Quality Analysis Service

FastAPI + NISQA Neural Model + Silero VAD + Claude AI

A production-grade service that replaces manual call quality reviews with automated, neural-network-powered audio analysis and AI-generated reports.


Table of Contents

  1. The Problem
  2. What We Are Building
  3. Architecture Overview
  4. Prerequisites
  5. Project Setup
  6. Core Audio Processing: SoX and FFmpeg
  7. NISQA Neural MOS Scoring
  8. Silero VAD: Voice Activity Detection
  9. Recording Retrieval from Remote Servers
  10. The Analysis Pipeline
  11. API Endpoint Design
  12. AI-Powered Analysis with Claude
  13. The Admin AI Assistant
  14. SQLite Caching Layer
  15. Systemd Service Deployment
  16. Production Tips and Optimization
  17. Troubleshooting
  18. Complete File Reference

1. The Problem

Call centers generate thousands of recordings per day. When a customer complains about audio quality, or a manager suspects one-way audio on a trunk, someone has to manually listen to recordings and guess at the problem. This process is:

What we need is a service that can take any call recording, score its quality objectively using a neural model, detect silence gaps and one-way audio automatically, and produce a human-readable summary of what happened -- all in under 10 seconds, accessible via a simple HTTP API.


2. What We Are Building

A FastAPI service that exposes four endpoints:

Endpoint Method Purpose
/analyze GET Full technical audio analysis (SoX stats, FFmpeg silence detection, Silero VAD speech activity, NISQA MOS score)
/ai-analyze GET Everything from /analyze plus a Claude AI natural-language assessment
/investigate POST Deep investigation of a specific call using SIP traces and call metadata
/ask POST Multi-turn AI admin assistant that can query live databases and metrics

The service uses three ML/AI layers:

  1. NISQA (Non-Intrusive Speech Quality Assessment) -- a neural network trained on thousands of rated audio samples that predicts Mean Opinion Score (MOS) on a 1-5 scale, plus sub-dimensions: noisiness, discontinuity, coloration, and loudness.

  2. Silero VAD (Voice Activity Detection) -- a compact neural model that identifies exactly when speech occurs in an audio file, enabling detection of one-way audio, dead air, and conversation flow patterns.

  3. Claude AI (Anthropic) -- takes the structured analysis data and produces expert-level, human-readable assessments. Haiku for fast per-call analysis, Sonnet/Opus for the admin assistant that queries databases.


3. Architecture Overview

                         +-------------------+
                         |   Your Dashboard  |
                         |  (Grafana, Web)   |
                         +--------+----------+
                                  |
                             HTTP API
                                  |
                    +-------------v--------------+
                    |     FastAPI Service         |
                    |     (port 8084)             |
                    |                             |
                    |  +-------+  +-----------+   |
                    |  | /analyze| | /ai-analyze|  |
                    |  +---+---+  +-----+-----+   |
                    |      |            |          |
                    |  +---v------------v------+   |
                    |  |   Analysis Pipeline    |  |
                    |  |                        |  |
                    |  | 1. Fetch recording     |  |
                    |  |    (HTTP/SCP)          |  |
                    |  | 2. SoX stats           |  |
                    |  |    (RMS, peak, dur)    |  |
                    |  | 3. FFmpeg silence      |  |
                    |  |    detect              |  |
                    |  | 4. Silero VAD          |  |
                    |  |    (speech segments)   |  |
                    |  | 5. NISQA MOS           |  |
                    |  |    (neural scoring)    |  |
                    |  | 6. One-way detection   |  |
                    |  +---+--------------------+  |
                    |      |                       |
                    |  +---v---+   +-----------+   |
                    |  |SQLite |   | Claude AI |   |
                    |  |Cache  |   | (Haiku/   |   |
                    |  |       |   |  Sonnet)  |   |
                    |  +-------+   +-----------+   |
                    +-------|----------|-----------+
                            |          |
              +-------------+     +----+----------+
              |                   |               |
     +--------v-------+  +-------v-----+  +------v------+
     | Recording       |  | Anthropic   |  | ViciDial    |
     | Server (HTTP)   |  | Messages    |  | Databases   |
     | /RECORDINGS/    |  | API         |  | (MySQL)     |
     +----------------+  +-------------+  +-------------+

Data flow for /analyze:

  1. Client sends GET /analyze?server=uk&file=recording-20260301-1422.wav
  2. Service checks SQLite cache -- returns immediately if cached
  3. Downloads recording from the production server (tries separate in/out legs first, falls back to mixed)
  4. Runs SoX for RMS amplitude, peak level, and duration
  5. Runs FFmpeg silencedetect for silence gaps
  6. Runs Silero VAD for speech segment timestamps and speech percentage
  7. Runs NISQA neural model for MOS and quality sub-dimensions
  8. Detects one-way audio by comparing caller vs. agent speech percentages
  9. Caches result in SQLite, returns JSON

Data flow for /ai-analyze: Steps 1-9 from above, then: 10. Formats all metrics into a structured prompt 11. Sends to Claude Haiku for natural-language assessment 12. Caches AI response separately, returns combined result


4. Prerequisites

System packages:

sudo apt update
sudo apt install -y python3.12 python3.12-venv python3.12-dev \
    sox libsox-fmt-all ffmpeg git

Hardware:


5. Project Setup

Directory Structure

mkdir -p /opt/audio-analysis/{tmp,NISQA}
cd /opt/audio-analysis

Your final directory structure will look like this:

/opt/audio-analysis/
    service.py              # Main FastAPI application
    .api_key                # Anthropic API key (chmod 600)
    cache.db                # SQLite cache (auto-created)
    tmp/                    # Temporary recording downloads
    venv/                   # Python virtual environment
    NISQA/                  # NISQA model repo
        weights/
            nisqa.tar       # Pre-trained model weights
        nisqa/
            NISQA_model.py  # Model class
            NISQA_lib.py    # Support library

Python Virtual Environment

python3.12 -m venv /opt/audio-analysis/venv
source /opt/audio-analysis/venv/bin/activate

pip install --upgrade pip
pip install \
    fastapi \
    uvicorn[standard] \
    torch \
    torchaudio \
    numpy \
    pandas \
    soundfile \
    requests \
    PyYAML \
    tqdm \
    anthropic

Note on PyTorch: If you are running CPU-only (no GPU), you can install the lighter CPU build:

pip install torch torchaudio --index-url https://download.pytorch.org/whl/cpu

This saves about 1.5 GB of disk space.

Clone NISQA

cd /opt/audio-analysis
git clone https://github.com/gabrielmittag/NISQA.git

The pre-trained weights are included in the repository under NISQA/weights/. The file we use is nisqa.tar (the full model with all quality dimensions).

Anthropic API Key

echo "YOUR_ANTHROPIC_API_KEY" > /opt/audio-analysis/.api_key
chmod 600 /opt/audio-analysis/.api_key

6. Core Audio Processing: SoX and FFmpeg

Before the neural models even run, we extract fundamental audio metrics using two battle-tested CLI tools.

SoX Statistics

SoX (Sound eXchange) gives us amplitude statistics in a single pass:

import subprocess
import numpy as np

def sox_stats(wav_path: str) -> dict:
    """Run SoX stat on a WAV file, return RMS/peak/duration."""
    try:
        result = subprocess.run(
            ['sox', wav_path, '-n', 'stat'],
            capture_output=True, text=True, timeout=30
        )
        # SoX writes stats to stderr (not stdout)
        text = result.stderr
        stats = {}
        for line in text.splitlines():
            if 'RMS     amplitude' in line:
                val = line.split(':')[-1].strip()
                try:
                    amp = float(val)
                    stats['rms_db'] = round(20 * np.log10(amp + 1e-10), 1)
                except ValueError:
                    stats['rms_db'] = -99.0
            elif 'Maximum amplitude' in line:
                val = line.split(':')[-1].strip()
                try:
                    amp = float(val)
                    stats['peak_db'] = round(20 * np.log10(amp + 1e-10), 1)
                except ValueError:
                    stats['peak_db'] = -99.0
            elif 'Length (seconds)' in line:
                val = line.split(':')[-1].strip()
                try:
                    stats['duration'] = round(float(val), 2)
                except ValueError:
                    pass
        return stats
    except Exception as e:
        return {'error': str(e)}

Why these metrics matter:

Gotcha: SoX writes its output to stderr, not stdout. This catches many people off guard.

FFmpeg Silence Detection

FFmpeg's silencedetect filter finds gaps in audio below a noise threshold:

import re

def ffmpeg_silence(wav_path: str, noise_db: str = '-35dB',
                   min_dur: str = '0.5') -> list:
    """Run FFmpeg silencedetect, return list of {start, end, duration}."""
    try:
        result = subprocess.run(
            ['ffmpeg', '-i', wav_path, '-af',
             f'silencedetect=noise={noise_db}:d={min_dur}',
             '-f', 'null', '-'],
            capture_output=True, text=True, timeout=60
        )
        text = result.stderr
        silences = []
        starts = []
        for line in text.splitlines():
            if 'silence_start:' in line:
                m = re.search(r'silence_start:\s*([\d.]+)', line)
                if m:
                    starts.append(float(m.group(1)))
            elif 'silence_end:' in line:
                m = re.search(
                    r'silence_end:\s*([\d.]+).*duration:\s*([\d.]+)', line)
                if m:
                    end = float(m.group(1))
                    dur = float(m.group(2))
                    start = starts.pop(0) if starts else end - dur
                    silences.append({
                        'start': round(start, 2),
                        'end': round(end, 2),
                        'duration': round(dur, 2)
                    })
        # Handle trailing silence (silence_start with no matching end)
        if starts:
            silences.append({
                'start': round(starts[0], 2),
                'end': -1,       # -1 means "to end of file"
                'duration': -1
            })
        return silences
    except Exception as e:
        return []

Tuning the parameters:

Channel Splitting

VoIP recordings often have two channels: one for the caller (inbound), one for the agent (outbound). Analyzing them separately is critical for detecting one-way audio:

def split_channels(wav_path: str) -> tuple:
    """Split stereo WAV into two mono files.
    Returns (ch1_path, ch2_path) or (original, None) if mono."""
    try:
        result = subprocess.run(
            ['sox', '--i', '-c', wav_path],
            capture_output=True, text=True, timeout=10
        )
        channels = int(result.stdout.strip())
        if channels < 2:
            return wav_path, None

        base = wav_path.rsplit('.', 1)[0]
        ch1 = base + '_ch1.wav'
        ch2 = base + '_ch2.wav'
        subprocess.run(
            ['sox', wav_path, ch1, 'remix', '1'],
            timeout=30, check=True, capture_output=True)
        subprocess.run(
            ['sox', wav_path, ch2, 'remix', '2'],
            timeout=30, check=True, capture_output=True)
        return ch1, ch2
    except Exception as e:
        return wav_path, None

Audio Format Conversion

Recordings may arrive as MP3, WAV, or other formats. We normalize everything to WAV before analysis:

def convert_to_wav(input_path: str) -> str:
    """Convert MP3 or other format to WAV using SoX."""
    if input_path.lower().endswith('.wav'):
        return input_path
    wav_path = input_path.rsplit('.', 1)[0] + '.wav'
    try:
        subprocess.run(
            ['sox', input_path, wav_path],
            timeout=60, check=True, capture_output=True
        )
        return wav_path
    except Exception:
        return input_path  # Return original if conversion fails

7. NISQA Neural MOS Scoring

What is NISQA?

NISQA (Non-Intrusive Speech Quality Assessment) is a deep learning model developed at TU Berlin. Unlike traditional methods like PESQ or POLQA that require a reference signal, NISQA is non-intrusive -- it predicts quality from the degraded signal alone. This is critical for real-world VoIP analysis where you never have the original clean signal.

NISQA predicts five dimensions:

Dimension Scale What It Measures
MOS (Mean Opinion Score) 1-5 Overall perceived quality. 4+ is good, 3-4 is acceptable, below 3 is poor.
Noisiness 1-5 Background noise level. Higher is cleaner. Below 2.5 indicates real noise problems.
Discontinuity 1-5 Smoothness of audio. Higher means smoother. Below 2.5 suggests packet loss or jitter.
Coloration 1-5 Spectral distortion. Important: 8 kHz narrowband telephony naturally scores 2.0-3.0 due to limited bandwidth. This is normal, not a defect.
Loudness 1-5 Perceived volume adequacy.

Setting Up NISQA

The model loads from a .tar checkpoint file:

import sys
import torch

# Add NISQA to Python path
sys.path.insert(0, '/opt/audio-analysis/NISQA')
from nisqa.NISQA_model import nisqaModel

Running Predictions

def nisqa_predict(wav_path: str) -> dict:
    """Run NISQA prediction on a WAV file.
    Returns MOS + quality dimensions."""
    try:
        args = {
            'mode': 'predict_file',
            'pretrained_model': '/opt/audio-analysis/NISQA/weights/nisqa.tar',
            'deg': wav_path,
            'num_workers': 0,
            'bs': 1,
            'ms_channel': None,
            'output_dir': None,
            'tr_bs_val': 1,
            'tr_num_workers': 0,
            'ms_max_segments': 50000,
        }
        model = nisqaModel(args)
        df = model.predict()

        # DataFrame columns: deg, mos_pred, noi_pred, dis_pred,
        #                     col_pred, loud_pred
        row = df.iloc[0]
        return {
            'mos': round(float(row.get('mos_pred', -1)), 2),
            'noisiness': round(float(row.get('noi_pred', -1)), 2),
            'discontinuity': round(float(row.get('dis_pred', -1)), 2),
            'coloration': round(float(row.get('col_pred', -1)), 2),
            'loudness': round(float(row.get('loud_pred', -1)), 2),
        }
    except Exception as e:
        return {'mos': -1, 'error': str(e)}

Key parameters:

Interpreting NISQA Scores for Telephony

This is where domain knowledge matters. NISQA was trained on a mix of narrowband and wideband audio. VoIP telephony using G.711 ulaw at 8 kHz is narrowband by definition, so some scores have different baselines:

MOS Score Interpretation (8 kHz telephony):
  4.0 - 5.0  Excellent (rare for narrowband)
  3.5 - 4.0  Good -- typical for clean narrowband calls
  3.0 - 3.5  Acceptable -- minor issues
  2.5 - 3.0  Poor -- noticeable degradation
  1.0 - 2.5  Bad -- severe quality issues

Coloration (narrowband-specific):
  2.0 - 3.0  NORMAL for 8kHz -- the limited bandwidth itself
              causes coloration. Do NOT flag this as a problem.
  < 1.5      Actual coloration issue (codec artifacts, echo)

Discontinuity:
  > 3.5      Smooth audio, no dropouts
  2.5 - 3.5  Minor discontinuities (occasional packet loss)
  < 2.5      Significant packet loss or jitter

When Both Legs Are Available

When you have separate caller and agent recordings, run NISQA on each independently and report both scores. The averaged MOS gives an overall quality indicator, but the per-leg scores reveal asymmetric problems:

if has_separate_legs:
    nisqa_in = nisqa_predict(inbound_path)
    nisqa_out = nisqa_predict(outbound_path)

    # Average for overall score
    combined = {
        'mos': round((nisqa_in['mos'] + nisqa_out['mos']) / 2, 2),
        'mos_in': nisqa_in['mos'],    # Caller leg quality
        'mos_out': nisqa_out['mos'],  # Agent leg quality
        # ... same for other dimensions
    }

A large gap between mos_in and mos_out (e.g., 3.8 vs 2.1) is a strong signal of a trunk or codec issue affecting only one direction.


8. Silero VAD: Voice Activity Detection

Why VAD?

Silence detection (FFmpeg) tells you when audio is below a threshold. VAD tells you when speech is happening. The difference matters:

Loading Silero VAD

Silero VAD is loaded from PyTorch Hub at startup:

import torch
import torchaudio
import soundfile as sf

# Load model once at startup (not per-request)
vad_model, vad_utils = torch.hub.load(
    repo_or_dir='snakers4/silero-vad',
    model='silero_vad',
    trust_repo=True
)
(get_speech_timestamps, _, read_audio, _, _) = vad_utils

Audio Loading Helper

Silero VAD expects audio at a specific sample rate (8 kHz or 16 kHz). We use soundfile for reliable loading and torchaudio for resampling:

def load_audio_wav(wav_path: str, target_sr: int = 8000) -> torch.Tensor:
    """Load audio file to torch tensor at target sample rate."""
    data, sr = sf.read(wav_path, dtype='float32')
    if len(data.shape) > 1:
        data = data[:, 0]  # Take first channel if stereo
    tensor = torch.from_numpy(data)
    if sr != target_sr:
        tensor = torchaudio.functional.resample(tensor, sr, target_sr)
    return tensor

Running VAD Analysis

def silero_vad_analysis(wav_path: str) -> dict:
    """Run Silero VAD on a mono WAV.
    Returns speech ratio, segment count, and timestamps."""
    try:
        wav = load_audio_wav(wav_path, target_sr=8000)
        total_samples = len(wav)
        total_duration = total_samples / 8000.0

        speech_timestamps = get_speech_timestamps(
            wav, vad_model,
            sampling_rate=8000,
            threshold=0.5,
            min_speech_duration_ms=250,
            min_silence_duration_ms=300
        )

        speech_samples = sum(
            ts['end'] - ts['start'] for ts in speech_timestamps
        )
        speech_ratio = speech_samples / total_samples \
            if total_samples > 0 else 0

        segments = []
        for ts in speech_timestamps:
            segments.append({
                'start': round(ts['start'] / 8000.0, 2),
                'end': round(ts['end'] / 8000.0, 2)
            })

        return {
            'speech_ratio': round(speech_ratio, 3),
            'speech_pct': round(speech_ratio * 100, 1),
            'total_duration': round(total_duration, 2),
            'speech_segments': segments,
            'segment_count': len(segments)
        }
    except Exception as e:
        return {'speech_ratio': -1, 'speech_pct': -1, 'error': str(e)}

Parameter tuning:

One-Way Audio Detection

The key insight: if one channel has < 5% speech but the other has > 20%, you have one-way audio. This is a serious VoIP problem that usually indicates a NAT, codec, or RTP routing issue:

def detect_one_way_audio(inbound_vad: dict, outbound_vad: dict) -> dict:
    """Compare speech activity between caller and agent legs."""
    in_speech = inbound_vad.get('speech_pct', 50)
    out_speech = outbound_vad.get('speech_pct', 50)

    one_way = False
    direction = None

    if in_speech < 5 and out_speech > 20:
        one_way = True
        direction = 'inbound_silent'  # Caller's audio not reaching agent
    elif out_speech < 5 and in_speech > 20:
        one_way = True
        direction = 'outbound_silent'  # Agent's audio not reaching caller

    return {
        'one_way_audio': one_way,
        'one_way_direction': direction,
        'caller_speech_pct': in_speech,
        'agent_speech_pct': out_speech,
    }

9. Recording Retrieval from Remote Servers

VoIP platforms typically store recordings on the telephony server itself. Our analysis service runs on a separate server, so we need to fetch recordings on demand.

ViciDial Recording Layout

ViciDial stores recordings in a specific directory structure:

/var/spool/asterisk/monitorDONE/
    RECORDINGS/
        {filename}-all.wav        # Mixed (both parties)
        MP3/
            {filename}-all.mp3    # Compressed mixed version
        ORIG/
            {filename}-in.wav     # Caller leg (inbound)
            {filename}-out.wav    # Agent leg (outbound)

These directories are typically served by Apache over HTTP, making HTTP-based retrieval the simplest approach.

Multi-Strategy Fetch

The fetcher tries multiple strategies in order of preference:

import os
import re
import requests as http_requests
import logging

log = logging.getLogger('audio-analysis')

TEMP_DIR = '/opt/audio-analysis/tmp'

# Map logical server names to IPs
REC_SERVERS = {
    'uk':      'YOUR_UK_SERVER_IP',
    'romania': 'YOUR_RO_SERVER_IP',
    'france':  'YOUR_FR_SERVER_IP',
    'italy':   'YOUR_IT_SERVER_IP',
}


def _download_file(url: str, local_path: str) -> bool:
    """Download a URL to local path. Returns True on success."""
    try:
        resp = http_requests.head(url, timeout=5)
        if resp.status_code != 200:
            return False
        r = http_requests.get(url, timeout=120, stream=True)
        with open(local_path, 'wb') as f:
            for chunk in r.iter_content(8192):
                f.write(chunk)
        return True
    except Exception:
        return False


def fetch_recording(server: str, filename: str) -> dict:
    """Download recording from production server.

    Strategy:
      1. Try separate in/out WAV legs from ORIG/ (best quality)
      2. Fall back to mixed -all.wav
      3. Fall back to mixed -all.mp3
    """
    host = REC_SERVERS.get(server)
    if not host:
        return {'in_path': None, 'out_path': None,
                'mix_path': None, 'has_legs': False, 'files': []}

    # Strip extension and -all/-in/-out suffix to get base name
    base = re.sub(r'-(all|in|out)\.(wav|mp3)$', '', filename, flags=re.I)
    base = re.sub(r'\.(wav|mp3)$', '', base, flags=re.I)

    result = {'in_path': None, 'out_path': None,
              'mix_path': None, 'has_legs': False, 'files': []}

    # Strategy 1: Separate legs (preferred for per-channel analysis)
    in_local = os.path.join(TEMP_DIR, f"{base}-in.wav")
    out_local = os.path.join(TEMP_DIR, f"{base}-out.wav")

    in_ok = _download_file(
        f"http://{host}/RECORDINGS/ORIG/{base}-in.wav", in_local)
    out_ok = _download_file(
        f"http://{host}/RECORDINGS/ORIG/{base}-out.wav", out_local)

    if in_ok and out_ok:
        # Verify files are not empty stubs
        # (ViciDial creates 44-byte empty WAVs for failed recordings)
        in_size = os.path.getsize(in_local)
        out_size = os.path.getsize(out_local)
        if in_size > 100 and out_size > 100:
            result['in_path'] = in_local
            result['out_path'] = out_local
            result['has_legs'] = True
            result['files'] = [in_local, out_local]
            return result
        # Clean up stubs
        for p in [in_local, out_local]:
            if os.path.exists(p):
                os.remove(p)
    else:
        for p in [in_local, out_local]:
            if os.path.exists(p):
                os.remove(p)

    # Strategy 2: Mixed audio
    mix_candidates = [
        (f"http://{host}/RECORDINGS/{base}-all.wav",
         f"{base}-all.wav"),
        (f"http://{host}/RECORDINGS/MP3/{base}-all.mp3",
         f"{base}-all.mp3"),
        (f"http://{host}/RECORDINGS/{filename}", filename),
    ]
    seen = set()
    for url, local_name in mix_candidates:
        if url in seen:
            continue
        seen.add(url)
        local_path = os.path.join(TEMP_DIR, local_name)
        if _download_file(url, local_path):
            result['mix_path'] = local_path
            result['files'] = [local_path]
            return result

    return result

Design decisions:


10. The Analysis Pipeline

This is where everything comes together. The pipeline orchestrates all the analysis tools and builds the final result:

import time
import json
import hashlib

def _silence_summary(silence_list, total_dur):
    """Summarize silence detection results."""
    if not silence_list or total_dur <= 0:
        return {'count': 0, 'total_sec': 0, 'ratio_pct': 0,
                'longest_ms': 0}
    total_silence = sum(
        s['duration'] for s in silence_list if s['duration'] > 0)
    longest = max(
        (s['duration'] for s in silence_list if s['duration'] > 0),
        default=0)
    return {
        'count': len(silence_list),
        'total_sec': round(total_silence, 2),
        'ratio_pct': round(total_silence / total_dur * 100, 1),
        'longest_ms': round(longest * 1000)
    }


def run_full_analysis(server: str, filename: str) -> dict:
    """Complete analysis pipeline for one recording."""
    t0 = time.time()

    rec = fetch_recording(server, filename)
    if not rec['files']:
        raise FileNotFoundError("Recording not found on server")

    temp_files = list(rec['files'])

    try:
        ch_in = {}
        ch_out = {}
        has_legs = rec['has_legs']
        duration = 0

        if has_legs:
            # --- Separate leg analysis (best case) ---
            in_path = rec['in_path']
            out_path = rec['out_path']

            # Audio stats
            ch_in['stats'] = sox_stats(in_path)
            ch_out['stats'] = sox_stats(out_path)
            duration = max(
                ch_in['stats'].get('duration', 0),
                ch_out['stats'].get('duration', 0))

            # Silence detection
            ch_in['silence'] = ffmpeg_silence(in_path)
            ch_out['silence'] = ffmpeg_silence(out_path)

            # Voice activity detection
            ch_in['vad'] = silero_vad_analysis(in_path)
            ch_out['vad'] = silero_vad_analysis(out_path)

            # Neural MOS scoring (per leg)
            nisqa_in = nisqa_predict(in_path)
            nisqa_out = nisqa_predict(out_path)
            nisqa_result = {
                'mos': round(
                    (nisqa_in['mos'] + nisqa_out['mos']) / 2, 2),
                'mos_in': nisqa_in['mos'],
                'mos_out': nisqa_out['mos'],
                'noisiness': round(
                    (nisqa_in['noisiness'] +
                     nisqa_out['noisiness']) / 2, 2),
                'discontinuity': round(
                    (nisqa_in['discontinuity'] +
                     nisqa_out['discontinuity']) / 2, 2),
                'coloration': round(
                    (nisqa_in['coloration'] +
                     nisqa_out['coloration']) / 2, 2),
                'loudness': round(
                    (nisqa_in['loudness'] +
                     nisqa_out['loudness']) / 2, 2),
            }
        else:
            # --- Mixed audio analysis ---
            mix_path = rec['mix_path']
            wav_path = convert_to_wav(mix_path)
            if wav_path != mix_path:
                temp_files.append(wav_path)

            overall_stats = sox_stats(wav_path)
            duration = overall_stats.get('duration', 0)

            # Try splitting stereo into channels
            ch1_path, ch2_path = split_channels(wav_path)
            if ch2_path:
                temp_files.extend([ch1_path, ch2_path])
                ch_in['stats'] = sox_stats(ch1_path)
                ch_out['stats'] = sox_stats(ch2_path)
                ch_in['silence'] = ffmpeg_silence(ch1_path)
                ch_out['silence'] = ffmpeg_silence(ch2_path)
                ch_in['vad'] = silero_vad_analysis(ch1_path)
                ch_out['vad'] = silero_vad_analysis(ch2_path)
            else:
                ch_in['stats'] = overall_stats
                ch_in['silence'] = ffmpeg_silence(wav_path)
                ch_in['vad'] = silero_vad_analysis(wav_path)

            nisqa_result = nisqa_predict(wav_path)

        # --- One-way audio detection ---
        one_way = False
        one_way_direction = None
        if ch_out.get('vad'):
            in_speech = ch_in.get('vad', {}).get('speech_pct', 50)
            out_speech = ch_out.get('vad', {}).get('speech_pct', 50)
            if in_speech < 5 and out_speech > 20:
                one_way = True
                one_way_direction = 'inbound_silent'
            elif out_speech < 5 and in_speech > 20:
                one_way = True
                one_way_direction = 'outbound_silent'

        # --- Build result ---
        elapsed = round(time.time() - t0, 2)
        has_outbound = bool(ch_out.get('vad'))

        result = {
            'server': server,
            'file': filename,
            'duration': duration,
            'has_legs': has_legs,
            'nisqa': nisqa_result,
            'inbound': {
                'rms_db': ch_in.get('stats', {}).get('rms_db'),
                'peak_db': ch_in.get('stats', {}).get('peak_db'),
                'speech_pct': ch_in.get('vad', {}).get(
                    'speech_pct', -1),
                'silence': _silence_summary(
                    ch_in.get('silence', []), duration),
                'speech_segments': ch_in.get('vad', {}).get(
                    'speech_segments', []),
            },
            'outbound': {
                'rms_db': ch_out.get('stats', {}).get('rms_db'),
                'peak_db': ch_out.get('stats', {}).get('peak_db'),
                'speech_pct': ch_out.get('vad', {}).get(
                    'speech_pct', -1),
                'silence': _silence_summary(
                    ch_out.get('silence', []), duration),
                'speech_segments': ch_out.get('vad', {}).get(
                    'speech_segments', []),
            } if has_outbound else None,
            'one_way_audio': one_way,
            'one_way_direction': one_way_direction,
            'analysis_time_sec': elapsed,
        }

        return result

    finally:
        # Always clean up temp files
        for p in temp_files:
            if p and os.path.exists(p) and p.startswith(TEMP_DIR):
                try:
                    os.remove(p)
                except OSError:
                    pass

11. API Endpoint Design

FastAPI Application Setup

from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import JSONResponse

app = FastAPI(title="Audio Quality Analysis")

@app.get("/health")
async def health():
    """Health check endpoint. Confirms models are loaded."""
    return {"status": "ok", "models": ["silero_vad", "nisqa"]}

GET /analyze -- Full Technical Analysis

@app.get("/analyze")
async def analyze(
    server: str = Query(...,
        description="Server key: uk, romania, france, italy"),
    file: str = Query(...,
        description="Recording filename")
):
    """Full audio quality analysis for a recording."""
    if server not in REC_SERVERS:
        raise HTTPException(400, f"Unknown server: {server}")
    if not file or not re.match(r'^[a-zA-Z0-9_\-\.]+$', file):
        raise HTTPException(400, "Invalid filename")

    # Check cache first
    result = get_cached_or_analyze(server, file)
    return JSONResponse(result)

Example request:

curl "http://YOUR_SERVER_IP:8084/analyze?server=uk&file=20260301-142215_1001_5551234567-all.wav"

Example response:

{
  "server": "uk",
  "file": "20260301-142215_1001_5551234567-all.wav",
  "duration": 187.52,
  "has_legs": true,
  "nisqa": {
    "mos": 3.71,
    "noisiness": 3.85,
    "discontinuity": 4.12,
    "coloration": 2.45,
    "loudness": 3.58,
    "mos_in": 3.64,
    "mos_out": 3.78
  },
  "inbound": {
    "rms_db": -24.3,
    "peak_db": -6.1,
    "speech_pct": 42.3,
    "silence": {
      "count": 8,
      "total_sec": 15.7,
      "ratio_pct": 8.4,
      "longest_ms": 4200
    },
    "speech_segments": [
      {"start": 1.2, "end": 8.5},
      {"start": 12.3, "end": 25.1},
      {"start": 28.8, "end": 45.6}
    ]
  },
  "outbound": {
    "rms_db": -22.8,
    "peak_db": -4.9,
    "speech_pct": 51.7,
    "silence": {
      "count": 6,
      "total_sec": 11.2,
      "ratio_pct": 6.0,
      "longest_ms": 3100
    },
    "speech_segments": [
      {"start": 9.1, "end": 11.8},
      {"start": 25.5, "end": 28.2},
      {"start": 46.1, "end": 62.4}
    ]
  },
  "one_way_audio": false,
  "one_way_direction": null,
  "analysis_time_sec": 6.42
}

DELETE /cache -- Cache Management

@app.delete("/cache")
async def clear_cache(server: str = "", file: str = ""):
    """Clear cache for a specific recording or all."""
    db = get_db()
    if server and file:
        db.execute(
            "DELETE FROM analysis_cache WHERE cache_key=?",
            (f"{server}:{file}",))
    else:
        db.execute("DELETE FROM analysis_cache")
    db.commit()
    cnt = db.total_changes
    db.close()
    return {"deleted": cnt}

12. AI-Powered Analysis with Claude

The AI Analysis Layer

The /ai-analyze endpoint takes the raw metrics from /analyze and sends them to Claude Haiku for expert interpretation. This is the difference between "MOS 3.2, noisiness 2.8" and "This call had moderate background noise that may have made it hard for the agent to hear the caller clearly."

Building the Data Prompt

The key to good AI analysis is structuring the data clearly for the model. We format all metrics into a readable text block:

def build_call_data_prompt(analysis: dict) -> str:
    """Format analysis results as structured text for Claude."""
    n = analysis.get('nisqa', {})
    inb = analysis.get('inbound', {})
    outb = analysis.get('outbound', {})
    dur = analysis.get('duration', 0)
    m, s = int(dur // 60), int(dur % 60)
    has_legs = analysis.get('has_legs', False)

    lines = []
    lines.append(f"Duration: {m}m {s}s")
    lines.append(f"Analysis type: "
                 f"{'Separate caller/agent legs' if has_legs "
                 f"else 'Mixed single audio'}")
    lines.append(f"Codec: 8kHz ulaw (narrowband telephony)")
    lines.append("")
    lines.append("NISQA scores (1-5, higher=better):")
    lines.append(f"  Overall MOS: {n.get('mos', '?')}")
    lines.append(f"  Noisiness: {n.get('noisiness', '?')}")
    lines.append(f"  Discontinuity: {n.get('discontinuity', '?')}")
    lines.append(f"  Coloration: {n.get('coloration', '?')}")
    lines.append(f"  Loudness: {n.get('loudness', '?')}")
    if n.get('mos_in') is not None:
        lines.append(f"  Caller leg MOS: {n['mos_in']}")
        lines.append(f"  Agent leg MOS: {n['mos_out']}")

    lines.append("")
    lines.append("Caller (inbound):")
    lines.append(f"  Speech: {inb.get('speech_pct', '?')}% of call")
    lines.append(f"  Volume: RMS {inb.get('rms_db', '?')}dB, "
                 f"Peak {inb.get('peak_db', '?')}dB")
    sil = inb.get('silence', {})
    lines.append(f"  Silence gaps: {sil.get('count', 0)}, "
                 f"longest {sil.get('longest_ms', 0)}ms")

    # Include speech segment timestamps
    in_segs = inb.get('speech_segments', [])
    if in_segs:
        lines.append(f"  Speech segments ({len(in_segs)}):")
        for seg in in_segs:
            lines.append(f"    {seg['start']:.1f}s - {seg['end']:.1f}s")

    if outb:
        lines.append("")
        lines.append("Agent (outbound):")
        lines.append(f"  Speech: {outb.get('speech_pct', '?')}% of call")
        lines.append(f"  Volume: RMS {outb.get('rms_db', '?')}dB, "
                     f"Peak {outb.get('peak_db', '?')}dB")

        out_segs = outb.get('speech_segments', [])
        if out_segs:
            lines.append(f"  Speech segments ({len(out_segs)}):")
            for seg in out_segs:
                lines.append(
                    f"    {seg['start']:.1f}s - {seg['end']:.1f}s")

    lines.append("")
    lines.append(f"One-way audio detected: "
                 f"{'Yes' if analysis.get('one_way_audio') else 'No'}")

    return "\n".join(lines)

The System Prompt: Encoding Domain Knowledge

This is the most important part of the AI layer. The system prompt encodes VoIP telephony domain expertise that prevents the model from making common mistakes:

ANALYSIS_PROMPT = """You are a senior VoIP call quality analyst at a call \
center. You analyze phone calls based on detailed metrics and speech \
activity data.

DOMAIN KNOWLEDGE you must apply:
- These are 8kHz narrowband ulaw telephony calls. NISQA coloration scores \
  of 2.0-3.0 are NORMAL for narrowband -- this is NOT a defect. Only flag \
  coloration below 1.5.
- When one party is silent but the other is actively speaking (check the \
  speech segment timestamps), the silent party is LISTENING -- not on \
  hold, not disconnected. This is normal conversation.
- Silence gaps in the 0.5-3s range are normal conversational pauses. Only \
  flag gaps >5s as notable.
- NISQA noisiness 3.0-3.5 is borderline -- call it "minor" not \
  "significant". Only flag below 2.5 as a real problem.
- NISQA discontinuity >3.5 means SMOOTH audio with NO dropouts. Only \
  flag below 2.5 as packet loss.
- RMS around -25dB is normal volume. Below -35dB is too quiet. \
  Above -10dB is too loud.

YOUR JOB: Give an honest, simple, human-readable assessment. Write like \
you are explaining to a supervisor who is not technical. No jargon. No \
speculation about causes unless the data clearly supports it.

IMPORTANT: Cross-reference the speech segment timestamps. If the agent has \
a 20s silence from 170s-190s but the caller has multiple speech segments \
in that same window, the agent was simply listening to the caller -- say \
that, do not call it a problem.

Write your assessment in this format:
- One sentence overall verdict
- 2-3 short bullet points with specific findings (reference actual numbers)
- Keep it under 80 words total
- Be honest -- if the call quality is actually fine for narrowband \
  telephony, say so"""

Why the domain knowledge matters: Without these instructions, Claude will flag every single 8 kHz call as having "coloration issues" (because narrowband audio does sound colored compared to wideband). It will also flag normal listening pauses as "dead air" or "possible disconnection." The domain constraints prevent these false positives.

Calling the Anthropic API

ANTHROPIC_KEY_FILE = '/opt/audio-analysis/.api_key'

def call_claude_api(prompt: str, max_tokens: int = 300) -> str:
    """Call Claude Haiku via the Anthropic Messages API."""
    api_key = ''
    if os.path.exists(ANTHROPIC_KEY_FILE):
        api_key = open(ANTHROPIC_KEY_FILE).read().strip()
    if not api_key:
        raise RuntimeError("Anthropic API key not configured")

    resp = http_requests.post(
        "https://api.anthropic.com/v1/messages",
        headers={
            "x-api-key": api_key,
            "content-type": "application/json",
            "anthropic-version": "2023-06-01",
        },
        json={
            "model": "claude-haiku-4-5-20251001",
            "max_tokens": max_tokens,
            "messages": [{"role": "user", "content": prompt}],
        },
        timeout=60,
    )
    if resp.status_code != 200:
        err = resp.json().get('error', {}).get('message',
                                                resp.text[:200])
        raise RuntimeError(f"API error ({resp.status_code}): {err}")
    return resp.json()['content'][0]['text'].strip()

The /ai-analyze Endpoint

@app.get("/ai-analyze")
async def ai_analyze(
    server: str = Query(...),
    file: str = Query(...),
):
    """AI-powered call quality assessment."""
    if server not in REC_SERVERS:
        raise HTTPException(400, f"Unknown server: {server}")

    # Check AI-specific cache
    cache_key = f"ai:{server}:{file}"
    cached = check_cache(cache_key)
    if cached:
        return JSONResponse(cached)

    # Run standard analysis first (also cached)
    analysis_data = get_cached_or_analyze(server, file)

    t0 = time.time()
    call_data = build_call_data_prompt(analysis_data)

    prompt = ANALYSIS_PROMPT + "\n\n" + call_data
    ai_text = call_claude_api(prompt)

    elapsed = round(time.time() - t0, 2)
    result = {
        "ai_analysis": ai_text,
        "model": "claude-haiku",
        "has_legs": analysis_data.get('has_legs', False),
        "analysis_time_sec": elapsed,
    }

    save_to_cache(cache_key, result)
    return JSONResponse(result)

Example response:

{
  "ai_analysis": "This call had good overall audio quality for a standard telephone call. The MOS score of 3.71 is solid for narrowband telephony.\n\n- Both caller (42.3%) and agent (51.7%) had healthy speech activity, with normal turn-taking conversation throughout.\n- Audio volume was good on both sides (RMS -24.3dB caller, -22.8dB agent) with no clipping.\n- No one-way audio, no significant silence gaps. The longest pause was 4.2 seconds, which is within normal range.",
  "model": "claude-haiku",
  "has_legs": true,
  "analysis_time_sec": 1.83
}

POST /investigate -- Deep Call Investigation

The /investigate endpoint handles a different use case: given SIP traces and call metadata, explain what happened to a specific call. This is used when a call was dropped, had poor quality, or ended unexpectedly:

@app.post("/investigate")
async def investigate(payload: dict):
    """Analyze SIP trace + call context with Claude to explain
    what happened."""
    sip_trace = payload.get('sip_trace', '')
    call_info = payload.get('call_info', '')
    status = payload.get('status', '')

    if not sip_trace and not call_info:
        raise HTTPException(400, "No call data provided")

    # Cache by hash of input
    cache_key = (
        f"inv:{hashlib.md5((sip_trace + call_info).encode()).hexdigest()}"
    )
    cached = check_cache(cache_key)
    if cached:
        return JSONResponse(cached)

    t0 = time.time()

    prompt = f"""You are a senior VoIP/SIP engineer at a call center. \
A call ended with status "{status}" and the supervisor wants to know \
what happened in simple, non-technical terms.

REFERENCE -- SIP CODES:
100=Trying, 180=Ringing, 183=Progress, 200=OK, 408=Timeout, \
480=Unavailable, 486=Busy, 487=Cancelled, 503=Overloaded, \
BYE=Hangup

REFERENCE -- CARRIER HANGUP CAUSES:
1=Unallocated number, 16=Normal clearing, 17=User busy, \
18=No response, 19=No answer, 21=Call rejected, 27=Dest out of order, \
28=Invalid number, 31=Normal unspecified, 34=No circuit, \
38=Network out of order

=== CALL DATA ===
{call_info}

=== SIP TRACE ===
{sip_trace if sip_trace else '(No SIP trace available)'}

Explain what happened in 2-4 short sentences for a non-technical \
supervisor. Be specific -- reference actual codes and durations."""

    ai_text = call_claude_api(prompt, max_tokens=600)
    elapsed = round(time.time() - t0, 2)

    result = {"explanation": ai_text, "analysis_time_sec": elapsed}
    save_to_cache(cache_key, result)
    return JSONResponse(result)

Example request:

curl -X POST http://YOUR_SERVER_IP:8084/investigate \
  -H "Content-Type: application/json" \
  -d '{
    "status": "NANQUE",
    "call_info": "CARRIER LOG:\n  dialstatus: ANSWER\n  hangup_cause: 16\n  sip_hangup_cause: 200\n\nQUEUE DATA:\n  queue_seconds: 45\n  queue_position: 3\n\nDID ROUTING:\n  did_pattern: 442012345678\n  did_description: London Main",
    "sip_trace": ""
  }'

13. The Admin AI Assistant

The /ask endpoint is the most sophisticated part of the service. It creates a multi-turn AI assistant that can query live databases and Prometheus metrics to answer questions about call center operations.

How It Works

  1. The user sends a natural-language question (e.g., "How many calls did the UK team handle today?")
  2. Claude receives the question along with a comprehensive system prompt describing the database schema, table structures, and available data sources
  3. Claude responds with one or more <data_request> blocks containing SQL queries, Prometheus queries, or ViciDial report requests
  4. The service executes those queries and sends the results back to Claude
  5. Claude analyzes the data and produces a formatted answer
  6. This loop can repeat up to 3 times for complex questions requiring follow-up queries

Data Source Executors

import pymysql

MYSQL_SERVERS = {
    'uk':      {'host': 'YOUR_UK_SERVER_IP',   'label': 'Alpha'},
    'romania': {'host': 'YOUR_RO_SERVER_IP',   'label': 'Charlie'},
    'france':  {'host': 'YOUR_FR_SERVER_IP',   'label': 'Delta'},
    'italy':   {'host': 'YOUR_IT_SERVER_IP',   'label': 'Echo'},
}
MYSQL_USER = 'YOUR_DB_USER'
MYSQL_PASS = 'YOUR_DB_PASSWORD'
MYSQL_DB   = 'asterisk'


def query_mysql(server_key: str, sql: str, limit: int = 50) -> list:
    """Run a read-only query against a ViciDial server."""
    srv = MYSQL_SERVERS.get(server_key)
    if not srv:
        return [{"error": f"Unknown server: {server_key}"}]

    try:
        conn = pymysql.connect(
            host=srv['host'], port=3306,
            user=MYSQL_USER, password=MYSQL_PASS,
            database=MYSQL_DB, connect_timeout=5,
            read_timeout=10, charset='utf8')

        with conn.cursor(pymysql.cursors.DictCursor) as cur:
            # Safety: only allow SELECT/SHOW
            stripped = sql.strip().upper()
            if not stripped.startswith('SELECT') and \
               not stripped.startswith('SHOW'):
                return [{"error": "Only SELECT/SHOW queries allowed"}]

            cur.execute(sql)
            rows = cur.fetchmany(limit)

            # Convert non-serializable types for JSON
            result = []
            for row in rows:
                clean = {}
                for k, v in row.items():
                    if hasattr(v, 'isoformat'):
                        clean[k] = v.isoformat()
                    elif isinstance(v, (bytes, bytearray)):
                        clean[k] = v.decode('utf-8', errors='replace')
                    elif isinstance(v, __import__('decimal').Decimal):
                        clean[k] = float(v)
                    else:
                        clean[k] = v
                result.append(clean)
            conn.close()
            return result

    except Exception as e:
        return [{"error": f"MySQL error: {str(e)[:200]}"}]


def query_prometheus(query: str) -> dict:
    """Run a PromQL instant query."""
    try:
        resp = http_requests.get(
            "http://localhost:9090/api/v1/query",
            params={"query": query},
            timeout=10)
        data = resp.json()
        if data.get('status') != 'success':
            return {"error": data.get('error', 'query failed')}
        results = []
        for r in data.get('data', {}).get('result', [])[:30]:
            metric = {k: v for k, v in r.get('metric', {}).items()
                      if k != '__name__'}
            val = r.get('value', [None, None])
            results.append({
                "labels": metric,
                "value": val[1] if len(val) > 1 else None})
        return {"results": results}
    except Exception as e:
        return {"error": str(e)[:200]}

The /ask Endpoint with Multi-Turn Loop

@app.post("/ask")
async def admin_ask(payload: dict):
    """Multi-turn AI assistant that can query live databases."""
    question = payload.get('question', '').strip()
    conversation = payload.get('conversation', [])

    if not question and not conversation:
        raise HTTPException(400, "No question provided")

    t0 = time.time()

    # Build messages
    if conversation:
        messages = conversation
    else:
        messages = [{"role": "user", "content": question}]

    # Call Claude (Sonnet or Opus for complex reasoning)
    response_text = call_claude_admin(messages, max_tokens=2000)

    # Data request loop: up to 3 rounds
    total_queries = 0
    for round_num in range(3):
        if '<data_request>' not in response_text:
            break

        match = re.search(
            r'<data_request>\s*(\[.*?\])\s*</data_request>',
            response_text, re.DOTALL)
        if not match:
            break

        queries = json.loads(match.group(1))

        # Execute all queries (max 10 per round)
        query_results = []
        for q in queries[:10]:
            qtype = q.get('type', '')
            if qtype == 'mysql':
                result = query_mysql(
                    q.get('server', ''), q.get('sql', ''))
                query_results.append({
                    "type": "mysql",
                    "server": q.get('server', ''),
                    "rows": result,
                    "count": len(result),
                })
            elif qtype == 'prometheus':
                result = query_prometheus(q.get('query', ''))
                query_results.append({
                    "type": "prometheus",
                    "data": result,
                })

        total_queries += len(query_results)

        # Send results back to Claude for analysis
        messages.append({
            "role": "assistant", "content": response_text})
        messages.append({
            "role": "user",
            "content": (
                "Here are the query results:\n\n```json\n"
                + json.dumps(query_results, indent=2, default=str)
                + "\n```\n\nAnalyze this data and give me a clear "
                "answer. Use markdown formatting.")
        })

        response_text = call_claude_admin(messages, max_tokens=3000)

    # Return final answer
    elapsed = round(time.time() - t0, 2)
    messages.append({"role": "assistant", "content": response_text})

    return JSONResponse({
        "answer": response_text,
        "queries_executed": total_queries,
        "elapsed_sec": elapsed,
        "conversation": messages,  # For multi-turn continuations
    })

Example request:

curl -X POST http://YOUR_SERVER_IP:8084/ask \
  -H "Content-Type: application/json" \
  -d '{"question": "How many inbound calls did UK handle today, and what was the average wait time?"}'

Example response:

{
  "answer": "## UK Inbound Performance Today\n\n| Metric | Value |\n|---|---|\n| Total inbound calls | 247 |\n| Answered | 231 (93.5%) |\n| Dropped (NANQUE) | 8 (3.2%) |\n| Average wait time | 18.4 seconds |\n| Max wait time | 142 seconds |\n\nPerformance is healthy today...",
  "queries_executed": 2,
  "elapsed_sec": 4.71,
  "conversation": [...]
}

Multi-Turn Conversations

The /ask endpoint returns the full conversation array in its response. To ask follow-up questions, send the conversation back:

curl -X POST http://YOUR_SERVER_IP:8084/ask \
  -H "Content-Type: application/json" \
  -d '{
    "conversation": [
      {"role": "user", "content": "How many inbound calls did UK handle today?"},
      {"role": "assistant", "content": "...previous answer..."},
      {"role": "user", "content": "Which agent had the most calls?"}
    ]
  }'

Security Considerations

The admin assistant has read-only database access, but there are important safeguards:

  1. SQL injection prevention -- only SELECT and SHOW queries are allowed. Any query starting with INSERT, UPDATE, DELETE, DROP, etc. is rejected.
  2. Row limits -- query results are capped at 50 rows per query.
  3. Query count limits -- maximum 10 queries per round, 3 rounds per request.
  4. Timeouts -- MySQL connections time out at 10 seconds, preventing runaway queries.
  5. Read-only database user -- the MySQL user should have only SELECT privileges.

14. SQLite Caching Layer

Every analysis is expensive (3-8 seconds of CPU time, plus network for downloading recordings). We cache results in SQLite:

import sqlite3

CACHE_DB = '/opt/audio-analysis/cache.db'

def get_db():
    """Get a SQLite connection with row factory."""
    db = sqlite3.connect(CACHE_DB)
    db.row_factory = sqlite3.Row
    db.execute("""CREATE TABLE IF NOT EXISTS analysis_cache (
        cache_key TEXT PRIMARY KEY,
        result_json TEXT NOT NULL,
        created_at REAL NOT NULL
    )""")
    db.commit()
    return db


def get_cached_or_analyze(server: str, filename: str) -> dict:
    """Return cached result or run fresh analysis."""
    cache_key = f"{server}:{filename}"

    # Check cache
    try:
        db = get_db()
        row = db.execute(
            "SELECT result_json FROM analysis_cache WHERE cache_key=?",
            (cache_key,)).fetchone()
        if row:
            db.close()
            return json.loads(row['result_json'])
        db.close()
    except Exception:
        pass

    # Run analysis
    result = run_full_analysis(server, filename)

    # Cache result
    try:
        db = get_db()
        db.execute(
            "INSERT OR REPLACE INTO analysis_cache "
            "(cache_key, result_json, created_at) VALUES (?,?,?)",
            (cache_key, json.dumps(result), time.time()))
        db.commit()
        db.close()
    except Exception:
        pass

    return result

Cache key strategy:

Why SQLite and not Redis?

Cache Invalidation

The /cache DELETE endpoint allows clearing specific entries or the entire cache:

# Clear a specific recording's cache
curl -X DELETE "http://YOUR_SERVER_IP:8084/cache?server=uk&file=recording.wav"

# Clear everything
curl -X DELETE "http://YOUR_SERVER_IP:8084/cache"

15. Systemd Service Deployment

Service File

Create /etc/systemd/system/audio-analysis.service:

[Unit]
Description=Audio Quality Analysis Service (FastAPI)
After=network.target

[Service]
Type=simple
User=root
WorkingDirectory=/opt/audio-analysis
ExecStart=/opt/audio-analysis/venv/bin/python3 -m uvicorn \
    service:app --host 0.0.0.0 --port 8084 --workers 1
Restart=always
RestartSec=5
StandardOutput=append:/opt/audio-analysis/service.log
StandardError=append:/opt/audio-analysis/service.log

[Install]
WantedBy=multi-user.target

Why --workers 1? The ML models (NISQA, Silero VAD) load into memory once at startup. With multiple workers, each worker loads its own copy, consuming 500 MB per worker. For a service that processes requests sequentially (audio analysis is CPU-bound), one worker is optimal.

Enable and Start

sudo systemctl daemon-reload
sudo systemctl enable audio-analysis
sudo systemctl start audio-analysis

# Check status
sudo systemctl status audio-analysis

# View logs
tail -f /opt/audio-analysis/service.log

Log Rotation

Add a logrotate config at /etc/logrotate.d/audio-analysis:

/opt/audio-analysis/service.log {
    daily
    rotate 14
    compress
    delaycompress
    missingok
    notifempty
    postrotate
        systemctl restart audio-analysis
    endscript
}

Firewall

Only expose port 8084 to trusted networks:

# Allow from your monitoring server only
sudo ufw allow from YOUR_MONITORING_IP to any port 8084

# Or restrict via iptables
iptables -A INPUT -p tcp --dport 8084 -s YOUR_MONITORING_IP -j ACCEPT
iptables -A INPUT -p tcp --dport 8084 -j DROP

16. Production Tips and Optimization

Model Loading at Startup

Both Silero VAD and NISQA take several seconds to load. Load them once at module level, not per request:

# At module level (runs once when uvicorn imports the module)
log.info("Loading Silero VAD model...")
vad_model, vad_utils = torch.hub.load(
    repo_or_dir='snakers4/silero-vad',
    model='silero_vad',
    trust_repo=True
)
(get_speech_timestamps, _, read_audio, _, _) = vad_utils
log.info("Silero VAD loaded.")

log.info("Loading NISQA model...")
from nisqa.NISQA_model import nisqaModel
# Pre-load the model class -- actual prediction still creates
# instances per file, but the weights are cached by PyTorch
log.info("NISQA loaded.")

Temporary File Cleanup

Always use a try/finally block to clean up downloaded recordings, even if analysis fails:

try:
    # ... run analysis ...
    return result
finally:
    for p in temp_files:
        if p and os.path.exists(p) and p.startswith(TEMP_DIR):
            try:
                os.remove(p)
            except OSError:
                pass

Also consider a periodic cleanup cron for any orphaned files:

# Clean temp files older than 1 hour
echo "0 * * * * find /opt/audio-analysis/tmp -type f -mmin +60 -delete" \
    | crontab -

Batch Processing

For analyzing many recordings at once (e.g., nightly quality audits), you can build a batch endpoint or a CLI script:

#!/usr/bin/env python3
"""Batch analyze recordings from a list file."""
import requests
import sys
import json
import time

API_BASE = "http://localhost:8084"

def batch_analyze(server: str, filenames: list):
    results = []
    for i, fname in enumerate(filenames):
        print(f"[{i+1}/{len(filenames)}] {fname}...", end=" ")
        t0 = time.time()
        resp = requests.get(
            f"{API_BASE}/analyze",
            params={"server": server, "file": fname},
            timeout=120)
        elapsed = time.time() - t0

        if resp.status_code == 200:
            data = resp.json()
            mos = data.get('nisqa', {}).get('mos', -1)
            print(f"MOS={mos} ({elapsed:.1f}s)")
            results.append(data)
        else:
            print(f"FAILED: {resp.status_code}")
            results.append({"file": fname, "error": resp.text})

    return results

if __name__ == '__main__':
    server = sys.argv[1]
    with open(sys.argv[2]) as f:
        files = [line.strip() for line in f if line.strip()]
    results = batch_analyze(server, files)

    # Write summary
    with open('batch_results.json', 'w') as f:
        json.dump(results, f, indent=2)

    # Print quality distribution
    scores = [r['nisqa']['mos'] for r in results
              if isinstance(r.get('nisqa'), dict)
              and r['nisqa'].get('mos', -1) > 0]
    if scores:
        import statistics
        print(f"\nResults: {len(scores)} analyzed")
        print(f"  Mean MOS: {statistics.mean(scores):.2f}")
        print(f"  Median MOS: {statistics.median(scores):.2f}")
        print(f"  Min MOS: {min(scores):.2f}")
        print(f"  Below 3.0: {sum(1 for s in scores if s < 3.0)}")

Usage:

# Create a list of recordings to analyze
echo "20260301-142215_1001_5551234567-all.wav" > recordings.txt
echo "20260301-143022_1002_5559876543-all.wav" >> recordings.txt

# Run batch analysis
python3 batch_analyze.py uk recordings.txt

Performance Benchmarks

Typical analysis times on a 4-core CPU server (no GPU):

Step Time
Recording download (LAN) 0.3 - 1.0 s
SoX stats 0.1 - 0.3 s
FFmpeg silence detection 0.5 - 1.5 s
Silero VAD 0.5 - 2.0 s
NISQA prediction 1.5 - 4.0 s
Claude Haiku API call 1.0 - 3.0 s
Total (/analyze) 3 - 8 s
Total (/ai-analyze) 4 - 11 s
Cached response < 10 ms

Cost Estimation

Claude API costs for the AI analysis layer:

Endpoint Model Input Tokens Output Tokens Cost per Call
/ai-analyze Claude Haiku ~800 ~100 ~$0.0002
/investigate Claude Haiku ~1500 ~300 ~$0.0005
/ask Claude Sonnet/Opus ~3000 ~1000 ~$0.01-0.05

At 100 AI analyses per day, the Haiku cost is roughly $0.60/month. The admin assistant (/ask) is more expensive per query but used far less frequently.


17. Troubleshooting

Model Loading Failures

Problem: ModuleNotFoundError: No module named 'nisqa'

# Ensure NISQA is cloned and the path is correct
ls /opt/audio-analysis/NISQA/nisqa/NISQA_model.py
# Ensure the path insert is before the import

Problem: RuntimeError: Error(s) in loading state_dict

This usually means the wrong model weights file. Ensure you are using nisqa.tar (the full model), not nisqa_mos_only.tar or nisqa_tts.tar.

Problem: Silero VAD fails with urllib.error.URLError

On first run, Silero downloads its model from GitHub. If your server has no internet access:

# On a machine with internet
python3 -c "import torch; torch.hub.load('snakers4/silero-vad', 'silero_vad', trust_repo=True)"

# Copy the cached model
scp -r ~/.cache/torch/hub/snakers4_silero-vad_master \
    YOUR_SERVER_IP:/root/.cache/torch/hub/

SoX / FFmpeg Issues

Problem: FileNotFoundError: sox: No such file or directory

sudo apt install sox libsox-fmt-all
# Verify
sox --version
ffmpeg -version

Problem: SoX cannot read MP3 files

sudo apt install libsox-fmt-mp3

API Key Issues

Problem: RuntimeError: Anthropic API key not configured

# Check the key file exists and is readable
cat /opt/audio-analysis/.api_key
# Should print your key (starts with "sk-ant-")

# Check permissions
ls -la /opt/audio-analysis/.api_key
# Should be: -rw------- root root

Recording Fetch Failures

Problem: HTTPException: 404 Recording not found on server

  1. Check that the recording server is accessible from the analysis server:
    curl -I http://YOUR_RECORDING_SERVER/RECORDINGS/
    
  2. Check the filename format. ViciDial filenames follow a specific pattern.
  3. Check that Apache on the recording server allows directory listing or direct file access.

Memory Issues

Problem: Service uses too much RAM or gets OOM-killed.

PyTorch models consume memory. On a constrained server:

# Check current usage
ps aux | grep uvicorn

# Set memory limit in systemd
# Add to [Service] section of the unit file:
MemoryMax=2G

Service Will Not Start

# Check the log
journalctl -u audio-analysis -n 50

# Try running manually to see errors
cd /opt/audio-analysis
source venv/bin/activate
python3 -m uvicorn service:app --host 0.0.0.0 --port 8084

Common issues:

NISQA Returns MOS of -1

This means the prediction failed, usually because:

Check the service log for the specific error message.


18. Complete File Reference

Project Structure

/opt/audio-analysis/
    service.py                    # Main FastAPI application (~1400 lines)
    .api_key                      # Anthropic API key
    cache.db                      # SQLite cache (auto-created)
    service.log                   # Application log
    tmp/                          # Temporary recording downloads
    venv/                         # Python virtual environment
    NISQA/                        # NISQA repository (git clone)
        weights/
            nisqa.tar             # Pre-trained model weights (~80MB)
        nisqa/
            NISQA_model.py        # Model loading and prediction
            NISQA_lib.py          # Neural network architecture

/etc/systemd/system/
    audio-analysis.service        # Systemd unit file

Python Dependencies

fastapi
uvicorn[standard]
torch
torchaudio
numpy
pandas
soundfile
requests
PyYAML
tqdm
anthropic
pymysql          # Only needed for /ask endpoint

System Dependencies

python3.12
python3.12-venv
python3.12-dev
sox
libsox-fmt-all
libsox-fmt-mp3
ffmpeg
git

Summary

This service combines three layers of audio intelligence:

  1. Signal-level analysis (SoX + FFmpeg) -- amplitude, silence gaps, duration. Fast, deterministic, zero false positives.

  2. Neural perception modeling (NISQA + Silero VAD) -- MOS scores, speech activity, one-way audio detection. Objective, consistent, replaces subjective human judgment.

  3. AI reasoning (Claude) -- translates metrics into actionable insights, investigates call failures using SIP traces, answers ad-hoc questions by querying live databases.

The SQLite cache ensures that repeated queries for the same recording are instant. The systemd service file ensures the service restarts automatically on failure. The FastAPI framework provides automatic OpenAPI documentation at /docs.

The result is a service that turns "can you listen to this call and tell me if the audio was okay?" -- a 5-minute manual task -- into a 6-second API call that returns objective scores and a plain-English assessment.


Built with FastAPI, PyTorch, NISQA, Silero VAD, SoX, FFmpeg, and Claude AI.

For questions, feedback, or consulting inquiries: [your contact information]

Need expert help with your setup?

VoIP infrastructure consulting, AI voice agent integration, monitoring stacks, scaling — I've done it all in production.

Get a Free Consultation