Tutorial 39: Real-Time Call Transcription & Sentiment Dashboard
Build a Live Monitoring System That Transcribes Active Calls and Displays Sentiment Analysis in Real-Time
Build a live transcription and sentiment analysis system that processes active calls in real-time, displaying agent conversations, customer sentiment, and keyword alerts on a web dashboard — enabling supervisors to monitor call quality as it happens.
Table of Contents
- Introduction
- Architecture Overview
- Prerequisites
- Audio Capture Methods
- Streaming Transcription Engine
- Sentiment Analysis
- Message Broker — Redis Pub/Sub
- WebSocket Server
- Dashboard Frontend
- Alert System
- Production Deployment
- Integration with ViciDial
- Troubleshooting
Difficulty: Advanced Reading Time: ~75 minutes Technologies: Python, FastAPI, Faster-Whisper, Deepgram, VADER, Redis, WebSocket, Chart.js, Asterisk, AudioSocket
1. Introduction
Why Real-Time Transcription Matters
If you followed Tutorial 35 in this series, you already have a batch transcription and QA scoring pipeline — one that processes completed recordings overnight and delivers next-morning scorecards. That system is invaluable for trend analysis, coaching, and compliance review. But it has a fundamental limitation: by the time a supervisor reads a QA scorecard, the call is long over. The angry customer has already hung up. The compliance violation has already occurred. The coaching moment has passed.
Real-time transcription changes the game entirely. Instead of reviewing what happened yesterday, supervisors see what is happening right now. They watch the words appear on screen as the agent speaks them. They see sentiment indicators shift from green to yellow to red as a customer grows frustrated. They receive instant alerts when trigger words are detected — "cancel my account," "speak to your supervisor," "I will report you," or worse. And they can intervene before a recoverable situation becomes an irreversible one.
Supervisor Use Cases
Live Monitoring and Intervention
The most immediate use case is supervisor monitoring. In a traditional call center, supervisors listen to live calls by picking up a phone and silently joining the audio bridge. This is time-consuming, requires dedicated hardware, and only allows monitoring one call at a time. With a transcription dashboard, a single supervisor can visually monitor 10, 20, or even 50 simultaneous calls — scanning for red sentiment indicators and keyword alerts without listening to a single second of audio. When something requires attention, they can click into that specific call to read the full transcript and decide whether to intervene.
Real-Time Coaching
New agents benefit enormously from real-time feedback. The dashboard can be configured to display coaching prompts when specific patterns are detected — an agent who says "I don't know" too frequently, one who forgets to mention a required disclosure, or one who interrupts the caller repeatedly. These prompts appear on the supervisor's screen, allowing them to send a chat message or whisper to the agent before the behavior becomes a pattern.
Compliance Monitoring
Regulated industries — finance, healthcare, insurance, debt collection — have strict requirements about what must and must not be said on calls. Real-time transcription enables instant detection of missing disclosures ("This call may be recorded for quality purposes"), prohibited language, or compliance-critical phrases. Instead of discovering a TCPA violation during a quarterly audit, the system flags it while the call is still active.
Escalation Detection
When a customer says "I want to speak to a manager" or "I'm going to file a complaint," the traditional workflow depends on the agent pressing a button or physically walking to a supervisor's desk. With keyword detection, the supervisor is alerted automatically — often before the agent even has a chance to react.
Batch vs. Real-Time — When to Use Which
| Aspect | Batch (Tutorial 35) | Real-Time (This Tutorial) |
|---|---|---|
| Latency | Hours (overnight processing) | 1-5 seconds |
| CPU/GPU Cost | Low (off-peak processing) | High (continuous inference) |
| Accuracy | Higher (full-recording context) | Lower (streaming chunks) |
| Use Case | QA scoring, trend analysis | Live monitoring, alerts |
| Scaling | Easy (queue-based) | Harder (per-call resources) |
| Infrastructure | Simple (cron + whisper) | Complex (streaming pipeline) |
| Intervention | Impossible (call is over) | Possible (call is live) |
The ideal setup uses both. Real-time transcription handles the urgent, supervisory needs during business hours. Batch processing runs overnight for the thorough QA scoring that requires full-call context, better models, and careful analysis. This tutorial focuses exclusively on the real-time side.
2. Architecture Overview
System Diagram
┌─────────────────────────────────────────────────────────────────────┐
│ ASTERISK PBX │
│ │
│ Active Call ──► AudioSocket/EAGI ──► Raw PCM Audio Stream │
│ │ │
│ Active Call ──► MixMonitor ──► WAV file (near-real-time alt.) │
│ │ │
└────────────────────────────────────────┼────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ TRANSCRIPTION SERVICE │
│ │
│ Audio Buffer ──► Chunker (5s segments) ──► Faster-Whisper │
│ or Deepgram Live API │
│ │ │
│ ▼ │
│ Raw Transcript │
│ {speaker, text, ts} │
└────────────────────────────────────────┼────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ SENTIMENT SERVICE │
│ │
│ Transcript Segment ──► VADER / TextBlob / Transformer │
│ │ │
│ ├──► Sentiment Score (-1.0 to +1.0) │
│ ├──► Keyword Detection (cancel, complaint) │
│ └──► Alert Trigger (threshold check) │
│ │ │
└────────────────────────────────────────┼────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ REDIS PUB/SUB │
│ │
│ Channel: calls:{call_id} │
│ Channel: calls:alerts │
│ Channel: calls:active_list │
│ │
│ Message: {call_id, timestamp, speaker, text, sentiment, keywords} │
└────────────────────────────────────────┼────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ WEBSOCKET SERVER (FastAPI) │
│ │
│ /ws/calls ──► All calls feed │
│ /ws/calls/{call_id} ──► Single call feed │
│ /api/calls ──► Active call list │
│ /api/calls/{id}/history ──► Transcript history │
│ │ │
└────────────────────────────────────────┼────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ BROWSER DASHBOARD │
│ │
│ ┌──────────────┐ ┌───────────────────────────────────────────┐ │
│ │ Active Calls │ │ Live Transcript │ │
│ │ │ │ │ │
│ │ ● Call #1 │ │ [Agent] Hello, how can I help? ✅ │ │
│ │ 😐 0.12 │ │ [Caller] I want to cancel... ⚠️ │ │
│ │ ● Call #2 │ │ [Agent] I understand, let me... ✅ │ │
│ │ ✅ 0.65 │ │ [Caller] This is unacceptable! 🔴 │ │
│ │ ● Call #3 │ │ │ │
│ │ 🔴 -0.45 │ │ ┌─────────────────────────────────────┐ │ │
│ │ │ │ │ Sentiment Timeline 📈 │ │ │
│ └──────────────┘ │ │ (Chart.js line graph) │ │ │
│ │ └─────────────────────────────────────┘ │ │
│ ┌──────────────────────────────────────────────────────────┐ │ │
│ │ ⚠️ ALERT: "cancel" detected on Call #1 — Agent: Smith │ │ │
│ └──────────────────────────────────────────────────────────┘ │ │
└─────────────────────────────────────────────────────────────────────┘
Data Flow Summary
- Audio Capture: Asterisk streams raw PCM audio from active calls via AudioSocket, EAGI, or writes recording files that are watched with inotifywait.
- Transcription: Audio chunks are fed to Faster-Whisper (local, free) or Deepgram (cloud, fast) for speech-to-text conversion.
- Analysis: Each transcript segment passes through sentiment analysis (VADER/TextBlob/transformer) and keyword detection.
- Distribution: Analyzed segments are published to Redis channels for real-time distribution.
- Delivery: A FastAPI WebSocket server subscribes to Redis and pushes updates to connected browser clients.
- Display: The browser dashboard renders live transcripts, sentiment charts, and alert banners.
Why This Architecture
Each component is decoupled. The transcription service does not know or care about the dashboard. The sentiment service does not know how audio was captured. Redis acts as the central nervous system, allowing any number of consumers to subscribe to call data. This means you can add a Slack alert bot, an email notifier, a database logger, or a third-party integration without modifying any existing component.
3. Prerequisites
Hardware Requirements
Real-time transcription is CPU-intensive. Each concurrent call being transcribed consumes approximately:
| Model | CPU Cores | RAM | Latency per 5s Chunk |
|---|---|---|---|
| Faster-Whisper tiny | 0.5 cores | 400 MB | ~0.5s |
| Faster-Whisper base | 1 core | 600 MB | ~1.0s |
| Faster-Whisper small | 2 cores | 1.2 GB | ~2.0s |
| Faster-Whisper medium | 4 cores | 3.0 GB | ~4.5s |
| Deepgram (cloud) | 0 cores | 50 MB | ~0.3s |
For a 20-agent call center processing all calls simultaneously with Faster-Whisper small:
- CPU: 40 cores (2 per call)
- RAM: 24 GB (1.2 GB shared model + per-call buffers)
- This is impractical on a single server
Recommendation: Use Faster-Whisper tiny or base for real-time (accuracy is acceptable for sentiment and keyword detection), or use Deepgram for true streaming with minimal server resources.
Software Requirements
# Operating System
# Ubuntu 22.04+ or Debian 12+ recommended
# Asterisk (already installed if following this series)
asterisk -V
# Asterisk 18+ recommended, 16.8+ minimum for AudioSocket
# Python 3.11+
python3 --version
# Node.js 18+ (optional, for dashboard dev server)
node --version
# Redis
redis-server --version
Install Dependencies
# System packages
apt update
apt install -y python3-pip python3-venv redis-server ffmpeg inotify-tools
# Start and enable Redis
systemctl enable --now redis-server
# Verify Redis
redis-cli ping
# Should return: PONG
# Create project directory
mkdir -p /opt/call-monitor
cd /opt/call-monitor
# Python virtual environment
python3 -m venv venv
source venv/bin/activate
# Core Python dependencies
pip install \
faster-whisper==1.1.1 \
fastapi==0.115.6 \
uvicorn[standard]==0.34.0 \
websockets==14.1 \
redis==5.2.1 \
httpx==0.28.1 \
vaderSentiment==3.3.2 \
textblob==0.18.0 \
pydantic==2.10.3 \
numpy==2.2.1 \
python-multipart==0.0.20
# Optional: Deepgram SDK (if using cloud transcription)
pip install deepgram-sdk==3.9.0
# Optional: Transformers for advanced sentiment (requires more RAM)
# pip install transformers==4.47.1 torch==2.5.1
# Download TextBlob corpora
python3 -m textblob.download_corpora
API Keys (If Using Cloud Services)
# Deepgram (optional — only if not using local Faster-Whisper)
# Sign up at https://console.deepgram.com/
# Free tier: 45,000 minutes/year ($200 credit)
export DEEPGRAM_API_KEY="YOUR_DEEPGRAM_API_KEY"
# No API key needed for Faster-Whisper (runs locally)
Directory Structure
/opt/call-monitor/
├── venv/ # Python virtual environment
├── config.py # Configuration
├── audio_capture.py # Asterisk audio capture
├── transcription_service.py # Faster-Whisper / Deepgram
├── sentiment_service.py # VADER / TextBlob / Transformer
├── alert_manager.py # Alert rules and notifications
├── websocket_server.py # FastAPI WebSocket server
├── redis_broker.py # Redis pub/sub wrapper
├── vicidial_integration.py # ViciDial data enrichment
├── static/
│ └── dashboard.html # Browser dashboard (single file)
├── requirements.txt # Python dependencies
└── logs/ # Application logs
4. Audio Capture Methods
Getting live audio out of Asterisk is the most critical — and most complex — part of this system. There are four viable approaches, each with different trade-offs.
Method 1: AudioSocket (Recommended)
AudioSocket is a TCP-based protocol introduced in Asterisk 16.8 that streams raw audio over a standard network socket. It is the cleanest approach for real-time audio capture.
How it works: The AudioSocket dialplan application connects the call's audio channel to a TCP server. Your Python service listens on that TCP port and receives raw 16-bit signed linear PCM audio at 8000 Hz (slin16 at 16000 Hz if configured).
Asterisk Dialplan Configuration:
; /etc/asterisk/extensions_custom.conf
; Route calls through AudioSocket before continuing normal call flow
[transcription-intercept]
; This context wraps the normal call flow with AudioSocket capture
; The AudioSocket app sends audio to your transcription server
; while the call continues normally via ChanSpy
exten => s,1,NoOp(Starting real-time transcription for ${CHANNEL(uniqueid)})
same => n,Set(CALL_ID=${CHANNEL(uniqueid)})
same => n,Set(MONITOR_UUID=${SHELL(uuidgen | tr -d '\n')})
; Start MixMonitor as backup recording
same => n,MixMonitor(${MIXMON_DIR}${CALL_ID}.wav,b)
; Fork audio to AudioSocket for real-time processing
; The UUID identifies this session to the transcription server
same => n,AudioSocket(${MONITOR_UUID},YOUR_SERVER_IP:9093)
same => n,Hangup()
; Alternative: Use ChanSpy to tap existing calls without modifying dialplan
[transcription-spy]
exten => _*88.,1,NoOp(Transcription spy on channel ${EXTEN:3})
same => n,Set(CALL_ID=${EXTEN:3})
same => n,ChanSpy(${CALL_ID},qES(9093))
same => n,Hangup()
AudioSocket Python Receiver:
#!/usr/bin/env python3
"""
audio_capture.py — AudioSocket receiver for Asterisk real-time audio.
Listens on a TCP port for AudioSocket connections from Asterisk.
Each connection represents one active call. Audio is buffered and
forwarded to the transcription service.
AudioSocket Protocol:
- 3-byte header: 1 byte type + 2 bytes length (network byte order)
- Type 0x01: UUID (16 bytes, sent once at connection start)
- Type 0x10: Audio data (signed linear 16-bit, 8000 Hz mono)
- Type 0x02: Hangup notification
- Type 0xFF: Error
"""
import asyncio
import struct
import uuid
import logging
import time
from typing import Dict, Callable, Optional
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
# AudioSocket message types
MSG_UUID = 0x01
MSG_AUDIO = 0x10
MSG_HANGUP = 0x02
MSG_ERROR = 0xFF
@dataclass
class ActiveCall:
"""Represents an active call being captured."""
call_id: str
start_time: float
audio_buffer: bytearray = field(default_factory=bytearray)
total_bytes: int = 0
chunk_count: int = 0
class AudioSocketServer:
"""
TCP server that receives AudioSocket connections from Asterisk.
Each Asterisk call that hits the AudioSocket() dialplan app
opens a TCP connection to this server and streams raw PCM audio.
"""
def __init__(
self,
host: str = "0.0.0.0",
port: int = 9093,
sample_rate: int = 8000,
chunk_duration: float = 5.0,
on_audio_chunk: Optional[Callable] = None,
on_call_start: Optional[Callable] = None,
on_call_end: Optional[Callable] = None,
):
self.host = host
self.port = port
self.sample_rate = sample_rate
self.bytes_per_sample = 2 # 16-bit signed linear
self.chunk_size = int(sample_rate * chunk_duration * self.bytes_per_sample)
self.chunk_duration = chunk_duration
# Callbacks
self.on_audio_chunk = on_audio_chunk # Called with (call_id, audio_bytes)
self.on_call_start = on_call_start # Called with (call_id)
self.on_call_end = on_call_end # Called with (call_id)
# Active calls tracking
self.active_calls: Dict[str, ActiveCall] = {}
self.server = None
async def start(self):
"""Start the AudioSocket TCP server."""
self.server = await asyncio.start_server(
self._handle_connection, self.host, self.port
)
addrs = ", ".join(str(s.getsockname()) for s in self.server.sockets)
logger.info(f"AudioSocket server listening on {addrs}")
async with self.server:
await self.server.serve_forever()
async def _handle_connection(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
"""Handle a single AudioSocket connection (one call)."""
peer = writer.get_extra_info("peername")
call_id = None
try:
logger.info(f"New AudioSocket connection from {peer}")
while True:
# Read 3-byte header: type (1 byte) + length (2 bytes)
header = await reader.readexactly(3)
msg_type = header[0]
msg_len = struct.unpack("!H", header[1:3])[0]
if msg_len > 0:
payload = await reader.readexactly(msg_len)
else:
payload = b""
if msg_type == MSG_UUID:
# First message: UUID identifying the session
call_uuid = uuid.UUID(bytes=payload[:16])
call_id = str(call_uuid)
self.active_calls[call_id] = ActiveCall(
call_id=call_id,
start_time=time.time(),
)
logger.info(f"Call started: {call_id} from {peer}")
if self.on_call_start:
await self._safe_callback(self.on_call_start, call_id)
elif msg_type == MSG_AUDIO and call_id:
# Audio data — append to buffer
call = self.active_calls.get(call_id)
if call:
call.audio_buffer.extend(payload)
call.total_bytes += len(payload)
# When buffer reaches chunk size, emit it
if len(call.audio_buffer) >= self.chunk_size:
chunk = bytes(call.audio_buffer[: self.chunk_size])
call.audio_buffer = call.audio_buffer[self.chunk_size :]
call.chunk_count += 1
if self.on_audio_chunk:
await self._safe_callback(
self.on_audio_chunk, call_id, chunk
)
elif msg_type == MSG_HANGUP:
logger.info(f"Call hangup received: {call_id}")
break
elif msg_type == MSG_ERROR:
logger.error(
f"AudioSocket error for {call_id}: {payload.decode('utf-8', errors='replace')}"
)
break
except asyncio.IncompleteReadError:
logger.info(f"Connection closed (call ended): {call_id}")
except Exception as e:
logger.error(f"Error handling AudioSocket connection: {e}", exc_info=True)
finally:
# Process any remaining audio in buffer
if call_id and call_id in self.active_calls:
call = self.active_calls[call_id]
if call.audio_buffer and self.on_audio_chunk:
await self._safe_callback(
self.on_audio_chunk, call_id, bytes(call.audio_buffer)
)
duration = time.time() - call.start_time
logger.info(
f"Call ended: {call_id}, duration={duration:.1f}s, "
f"chunks={call.chunk_count}, bytes={call.total_bytes}"
)
if self.on_call_end:
await self._safe_callback(self.on_call_end, call_id)
del self.active_calls[call_id]
writer.close()
await writer.wait_closed()
async def _safe_callback(self, callback, *args):
"""Execute callback safely, catching exceptions."""
try:
result = callback(*args)
if asyncio.iscoroutine(result):
await result
except Exception as e:
logger.error(f"Callback error: {e}", exc_info=True)
def get_active_calls(self) -> Dict[str, dict]:
"""Return information about all active calls."""
return {
call_id: {
"call_id": call_id,
"start_time": call.start_time,
"duration": time.time() - call.start_time,
"chunks_processed": call.chunk_count,
"bytes_received": call.total_bytes,
}
for call_id, call in self.active_calls.items()
}
Method 2: EAGI (External AGI)
EAGI extends the standard AGI protocol by providing audio on file descriptor 3. When Asterisk runs an EAGI script, the script receives the call's audio stream on fd 3 while being able to send AGI commands on stdin/stdout.
Dialplan:
; /etc/asterisk/extensions_custom.conf
[transcription-eagi]
exten => s,1,NoOp(Starting EAGI transcription)
same => n,EAGI(/opt/call-monitor/eagi_capture.py)
same => n,Hangup()
EAGI Python Script:
#!/usr/bin/env python3
"""
eagi_capture.py — EAGI script that reads audio from fd 3.
When Asterisk runs this as EAGI, it pipes the call's audio
on file descriptor 3 as signed linear 16-bit 8000 Hz mono.
This script reads audio chunks and sends them to the transcription
service via Redis or direct function call.
"""
import os
import sys
import struct
import time
import json
import redis
# AGI environment variables come on stdin
agi_env = {}
while True:
line = sys.stdin.readline().strip()
if line == "":
break
if ":" in line:
key, value = line.split(":", 1)
agi_env[key.strip()] = value.strip()
call_id = agi_env.get("agi_uniqueid", f"eagi-{int(time.time())}")
channel = agi_env.get("agi_channel", "unknown")
# Open audio file descriptor (fd 3 is EAGI audio)
try:
audio_fd = os.fdopen(3, "rb", buffering=0)
except OSError as e:
sys.stderr.write(f"Cannot open fd 3: {e}\n")
sys.exit(1)
# Connect to Redis
r = redis.Redis(host="127.0.0.1", port=6379, db=0)
# Publish call start event
r.publish(
"audio:new_call",
json.dumps({"call_id": call_id, "channel": channel, "start_time": time.time()}),
)
# Configuration
SAMPLE_RATE = 8000
BYTES_PER_SAMPLE = 2
CHUNK_DURATION = 5 # seconds
CHUNK_SIZE = SAMPLE_RATE * BYTES_PER_SAMPLE * CHUNK_DURATION
# Send AGI command to answer the channel
sys.stdout.write("ANSWER\n")
sys.stdout.flush()
sys.stdin.readline() # Read AGI response
# Read audio in chunks
buffer = bytearray()
try:
while True:
data = audio_fd.read(1600) # 100ms of audio at 8kHz 16-bit
if not data:
break
buffer.extend(data)
if len(buffer) >= CHUNK_SIZE:
chunk = bytes(buffer[:CHUNK_SIZE])
buffer = buffer[CHUNK_SIZE:]
# Publish raw audio chunk to Redis for transcription service
r.publish(
f"audio:chunk:{call_id}",
chunk,
)
except (BrokenPipeError, IOError):
pass
finally:
# Publish remaining buffer
if buffer:
r.publish(f"audio:chunk:{call_id}", bytes(buffer))
# Publish call end event
r.publish(
"audio:call_end",
json.dumps({"call_id": call_id, "end_time": time.time()}),
)
audio_fd.close()
Method 3: MixMonitor + inotifywait (Simplest)
This approach does not require any dialplan changes. It watches the recording directory for new or updated WAV files and processes them in near-real-time. The trade-off is latency — you get transcription 5-10 seconds behind the live call, depending on how frequently MixMonitor flushes to disk.
#!/usr/bin/env python3
"""
file_watcher.py — Watch recording directory for new/updated files.
Uses inotifywait (inotify-tools) to detect when MixMonitor writes
new audio data to recording files. Processes the new audio portion
of each file and sends it for transcription.
This is the simplest approach — no dialplan changes needed.
Trade-off: 5-10 second delay vs true real-time.
"""
import asyncio
import subprocess
import os
import time
import json
import logging
import wave
import struct
from typing import Dict
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
@dataclass
class WatchedFile:
"""Track the state of a recording file being monitored."""
filepath: str
call_id: str
last_position: int = 0 # Last byte position we read from
last_modified: float = 0
chunk_count: int = 0
class RecordingWatcher:
"""
Watch Asterisk recording directory for new and modified files.
Uses inotifywait to get instant filesystem notifications when
MixMonitor writes to recording files.
"""
def __init__(
self,
watch_dir: str = "/var/spool/asterisk/monitor",
on_audio_chunk=None,
on_call_start=None,
on_call_end=None,
file_pattern: str = "*.wav",
poll_interval: float = 3.0,
):
self.watch_dir = watch_dir
self.on_audio_chunk = on_audio_chunk
self.on_call_start = on_call_start
self.on_call_end = on_call_end
self.file_pattern = file_pattern
self.poll_interval = poll_interval
self.watched_files: Dict[str, WatchedFile] = {}
self._stop = False
async def start(self):
"""Start watching the recording directory."""
logger.info(f"Watching directory: {self.watch_dir}")
# Use inotifywait for efficient filesystem monitoring
cmd = [
"inotifywait",
"-m", # Monitor continuously
"-r", # Recursive
"--format", "%w%f %e",
"-e", "modify,create,close_write",
self.watch_dir,
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
while not self._stop:
line = await asyncio.wait_for(
process.stdout.readline(), timeout=self.poll_interval
)
if not line:
break
parts = line.decode().strip().split(" ", 1)
if len(parts) < 2:
continue
filepath, events = parts[0], parts[1]
# Only process WAV files
if not filepath.endswith(".wav"):
continue
await self._process_file_event(filepath, events)
except asyncio.TimeoutError:
pass
except Exception as e:
logger.error(f"Watcher error: {e}", exc_info=True)
finally:
process.terminate()
async def _process_file_event(self, filepath: str, events: str):
"""Process a filesystem event on a recording file."""
basename = os.path.basename(filepath)
call_id = basename.replace(".wav", "").replace("-in", "").replace("-out", "")
if filepath not in self.watched_files:
# New file — new call
self.watched_files[filepath] = WatchedFile(
filepath=filepath,
call_id=call_id,
last_modified=time.time(),
)
logger.info(f"New recording detected: {basename} (call: {call_id})")
if self.on_call_start:
await self.on_call_start(call_id)
watched = self.watched_files[filepath]
if "CLOSE_WRITE" in events:
# File closed — call probably ended
await self._read_new_audio(watched)
if self.on_call_end:
await self.on_call_end(call_id)
del self.watched_files[filepath]
logger.info(f"Recording closed: {basename}")
elif "MODIFY" in events:
# File modified — new audio data written
await self._read_new_audio(watched)
async def _read_new_audio(self, watched: WatchedFile):
"""Read new audio data from a recording file since last read."""
try:
file_size = os.path.getsize(watched.filepath)
if file_size <= watched.last_position:
return
with open(watched.filepath, "rb") as f:
# Skip WAV header on first read
if watched.last_position == 0:
# Read WAV header to get format info
try:
with wave.open(watched.filepath, "rb") as wf:
header_size = 44 # Standard WAV header
watched.last_position = header_size
except wave.Error:
watched.last_position = 44 # Assume standard header
f.seek(watched.last_position)
new_audio = f.read()
if new_audio and self.on_audio_chunk:
watched.chunk_count += 1
await self.on_audio_chunk(watched.call_id, new_audio)
watched.last_position = file_size
watched.last_modified = time.time()
except (OSError, IOError) as e:
logger.error(f"Error reading {watched.filepath}: {e}")
def stop(self):
"""Stop the watcher."""
self._stop = True
Method 4: ChanSpy + Local Channel
ChanSpy allows you to tap into an existing call without modifying the dialplan that handles that call. This is useful when you cannot or do not want to change how calls are routed — you simply "spy" on them from a separate channel.
; /etc/asterisk/extensions_custom.conf
[transcription-chanspy]
; Originate a local channel that spies on a target channel
; and pipes the audio to AudioSocket
;
; Usage from AMI:
; Action: Originate
; Channel: Local/spy-SIP-agent101@transcription-chanspy
; Application: Wait
; Data: 3600
exten => _spy-.,1,NoOp(Spy-based transcription for ${EXTEN:4})
same => n,Set(TARGET=${EXTEN:4})
same => n,Set(MONITOR_UUID=${SHELL(uuidgen | tr -d '\n')})
same => n,ChanSpy(${TARGET},qwS(YOUR_SERVER_IP:9093))
same => n,Hangup()
AMI Script to Start Spying on a Call:
#!/usr/bin/env python3
"""
Start ChanSpy-based transcription for a specific channel via AMI.
"""
import socket
import time
def ami_spy_channel(
ami_host: str,
ami_port: int,
ami_user: str,
ami_secret: str,
target_channel: str,
):
"""Originate a ChanSpy channel to tap into a live call."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ami_host, ami_port))
sock.recv(1024) # Welcome message
# Login
sock.send(
f"Action: Login\r\n"
f"Username: {ami_user}\r\n"
f"Secret: {ami_secret}\r\n"
f"\r\n".encode()
)
time.sleep(0.5)
sock.recv(4096)
# Originate ChanSpy
spy_exten = f"spy-{target_channel}"
sock.send(
f"Action: Originate\r\n"
f"Channel: Local/{spy_exten}@transcription-chanspy\r\n"
f"Application: Wait\r\n"
f"Data: 7200\r\n"
f"Async: true\r\n"
f"ActionID: spy-{int(time.time())}\r\n"
f"\r\n".encode()
)
time.sleep(0.5)
response = sock.recv(4096).decode()
sock.send(b"Action: Logoff\r\n\r\n")
sock.close()
return response
Method Comparison
| Feature | AudioSocket | EAGI | MixMonitor+inotify | ChanSpy |
|---|---|---|---|---|
| Latency | ~50ms | ~50ms | 5-10 seconds | ~100ms |
| Complexity | Medium | Medium | Low | Medium |
| Dialplan Changes | Required | Required | None | Minimal |
| Asterisk Version | 16.8+ | Any | Any | Any |
| Audio Quality | Raw PCM stream | Raw PCM fd3 | Completed WAV | Mixed audio |
| Stereo/Split Legs | No (mixed) | No (mixed) | Yes (with flags) | No (mixed) |
| CPU on Asterisk | Low | Low | Low | Medium |
| Scalability | Best | Good | Best | Limited |
| Production Use | Recommended | Good | Good (batch+RT) | Testing |
Recommendation: Use AudioSocket for true real-time deployments. Use MixMonitor + inotifywait if you want simplicity with acceptable delay. Use ChanSpy for testing or when you cannot modify the production dialplan.
5. Streaming Transcription Engine
The transcription engine converts raw audio chunks into text. We provide two complete implementations: Faster-Whisper for local processing (free, private, higher CPU) and Deepgram for cloud processing (paid, lower latency, minimal CPU).
Configuration Module
#!/usr/bin/env python3
"""
config.py — Central configuration for the call monitoring system.
"""
import os
from dataclasses import dataclass, field
from typing import List
@dataclass
class TranscriptionConfig:
"""Transcription engine configuration."""
# Engine: "whisper" or "deepgram"
engine: str = os.getenv("TRANSCRIPTION_ENGINE", "whisper")
# Faster-Whisper settings
whisper_model: str = os.getenv("WHISPER_MODEL", "base")
whisper_device: str = os.getenv("WHISPER_DEVICE", "cpu")
whisper_compute_type: str = os.getenv("WHISPER_COMPUTE_TYPE", "int8")
whisper_language: str = os.getenv("WHISPER_LANGUAGE", "en")
# Deepgram settings
deepgram_api_key: str = os.getenv("DEEPGRAM_API_KEY", "")
deepgram_model: str = os.getenv("DEEPGRAM_MODEL", "nova-2")
deepgram_language: str = os.getenv("DEEPGRAM_LANGUAGE", "en-GB")
# Audio settings
sample_rate: int = int(os.getenv("SAMPLE_RATE", "8000"))
chunk_duration: float = float(os.getenv("CHUNK_DURATION", "5.0"))
@dataclass
class SentimentConfig:
"""Sentiment analysis configuration."""
# Engine: "vader", "textblob", or "transformer"
engine: str = os.getenv("SENTIMENT_ENGINE", "vader")
# Transformer model (if engine == "transformer")
transformer_model: str = "distilbert-base-uncased-finetuned-sst-2-english"
# Rolling average window (number of utterances)
rolling_window: int = int(os.getenv("SENTIMENT_WINDOW", "5"))
# Alert keywords (comma-separated)
alert_keywords: List[str] = field(default_factory=lambda: [
"cancel", "supervisor", "manager", "complaint", "legal",
"lawyer", "sue", "report", "escalate", "unacceptable",
"disgusting", "terrible", "worst", "refund", "fraud",
"scam", "lie", "liar", "incompetent", "idiot", "stupid",
])
# Profanity words to flag
profanity_words: List[str] = field(default_factory=lambda: [
"fuck", "shit", "damn", "hell", "ass", "bitch", "bastard",
"crap", "piss", "dick", "bloody",
])
# Sentiment thresholds
positive_threshold: float = 0.3
negative_threshold: float = -0.3
alert_threshold: float = -0.5
critical_threshold: float = -0.7
@dataclass
class AlertConfig:
"""Alert system configuration."""
# Number of consecutive negative utterances to trigger alert
consecutive_negative: int = 3
# Enable alert channels
enable_dashboard_alerts: bool = True
enable_email_alerts: bool = False
enable_slack_alerts: bool = False
# Email settings (Resend API)
resend_api_key: str = os.getenv("RESEND_API_KEY", "")
alert_email_from: str = os.getenv("ALERT_EMAIL_FROM", "[email protected]")
alert_email_to: str = os.getenv("ALERT_EMAIL_TO", "[email protected]")
# Slack webhook
slack_webhook_url: str = os.getenv("SLACK_WEBHOOK_URL", "")
@dataclass
class RedisConfig:
"""Redis configuration."""
host: str = os.getenv("REDIS_HOST", "127.0.0.1")
port: int = int(os.getenv("REDIS_PORT", "6379"))
db: int = int(os.getenv("REDIS_DB", "0"))
password: str = os.getenv("REDIS_PASSWORD", "")
@dataclass
class ServerConfig:
"""WebSocket/API server configuration."""
host: str = os.getenv("SERVER_HOST", "0.0.0.0")
port: int = int(os.getenv("SERVER_PORT", "8095"))
auth_token: str = os.getenv("AUTH_TOKEN", "your-secret-token-change-me")
cors_origins: List[str] = field(default_factory=lambda: ["*"])
@dataclass
class AppConfig:
"""Top-level application configuration."""
transcription: TranscriptionConfig = field(default_factory=TranscriptionConfig)
sentiment: SentimentConfig = field(default_factory=SentimentConfig)
alerts: AlertConfig = field(default_factory=AlertConfig)
redis: RedisConfig = field(default_factory=RedisConfig)
server: ServerConfig = field(default_factory=ServerConfig)
# Global config instance
config = AppConfig()
Option A: Faster-Whisper Transcription (Local, Free)
#!/usr/bin/env python3
"""
transcription_service.py — Real-time transcription using Faster-Whisper
or Deepgram, with chunked audio processing.
This service receives audio chunks from the audio capture layer,
transcribes them, and publishes results to Redis.
"""
import asyncio
import io
import wave
import time
import json
import logging
import struct
from typing import Optional, Dict, List
from dataclasses import dataclass, field
import numpy as np
import redis.asyncio as aioredis
logger = logging.getLogger(__name__)
@dataclass
class TranscriptionSegment:
"""A single transcription segment with metadata."""
call_id: str
text: str
timestamp: float
start_time: float # Relative to call start
end_time: float
confidence: float
speaker: str = "unknown" # "agent" or "caller" if diarized
language: str = "en"
@dataclass
class CallTranscriptionState:
"""Track transcription state for an active call."""
call_id: str
call_start: float
segments: List[TranscriptionSegment] = field(default_factory=list)
total_audio_seconds: float = 0.0
total_chunks: int = 0
class WhisperTranscriber:
"""
Real-time transcription using Faster-Whisper.
Processes audio chunks (typically 5 seconds each) and returns
transcription segments. Uses the CTranslate2 backend for
efficient CPU inference.
"""
def __init__(
self,
model_size: str = "base",
device: str = "cpu",
compute_type: str = "int8",
language: str = "en",
sample_rate: int = 8000,
):
self.model_size = model_size
self.device = device
self.compute_type = compute_type
self.language = language
self.sample_rate = sample_rate
self.model = None
def load_model(self):
"""Load the Faster-Whisper model. Call once at startup."""
from faster_whisper import WhisperModel
logger.info(
f"Loading Faster-Whisper model: {self.model_size} "
f"(device={self.device}, compute={self.compute_type})"
)
start = time.time()
self.model = WhisperModel(
self.model_size,
device=self.device,
compute_type=self.compute_type,
)
logger.info(f"Model loaded in {time.time() - start:.1f}s")
def transcribe_chunk(
self,
audio_bytes: bytes,
call_id: str,
chunk_offset: float = 0.0,
) -> List[TranscriptionSegment]:
"""
Transcribe a chunk of raw PCM audio.
Args:
audio_bytes: Raw signed 16-bit linear PCM audio
call_id: Unique call identifier
chunk_offset: Time offset of this chunk from call start (seconds)
Returns:
List of TranscriptionSegment objects
"""
if self.model is None:
raise RuntimeError("Model not loaded. Call load_model() first.")
if len(audio_bytes) < 1600: # Less than 100ms of audio
return []
# Convert raw PCM bytes to numpy float32 array
# Asterisk sends signed 16-bit linear (slin) at 8000 Hz
samples = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32)
samples /= 32768.0 # Normalize to [-1.0, 1.0]
# If sample rate is not 16000 Hz, resample for Whisper
if self.sample_rate != 16000:
# Simple linear interpolation resampling
ratio = 16000 / self.sample_rate
new_length = int(len(samples) * ratio)
indices = np.linspace(0, len(samples) - 1, new_length)
samples = np.interp(indices, np.arange(len(samples)), samples).astype(
np.float32
)
# Transcribe
start = time.time()
segments_gen, info = self.model.transcribe(
samples,
language=self.language,
beam_size=3,
best_of=3,
vad_filter=True,
vad_parameters=dict(
min_silence_duration_ms=300,
speech_pad_ms=200,
),
word_timestamps=False,
condition_on_previous_text=False,
)
# Collect segments
result = []
for seg in segments_gen:
text = seg.text.strip()
if not text or len(text) < 2:
continue
result.append(
TranscriptionSegment(
call_id=call_id,
text=text,
timestamp=time.time(),
start_time=chunk_offset + seg.start,
end_time=chunk_offset + seg.end,
confidence=seg.avg_logprob,
language=info.language,
)
)
elapsed = time.time() - start
audio_duration = len(audio_bytes) / (self.sample_rate * 2)
logger.debug(
f"Transcribed {audio_duration:.1f}s audio in {elapsed:.2f}s "
f"(RTF={elapsed/audio_duration:.2f}), segments={len(result)}"
)
return result
class DeepgramTranscriber:
"""
Real-time transcription using Deepgram's live WebSocket API.
Provides true streaming transcription with ~300ms latency.
Requires a Deepgram API key (free tier: 45,000 min/year).
"""
def __init__(
self,
api_key: str,
model: str = "nova-2",
language: str = "en-GB",
sample_rate: int = 8000,
):
self.api_key = api_key
self.model = model
self.language = language
self.sample_rate = sample_rate
self._connections: Dict[str, any] = {}
async def start_stream(self, call_id: str, on_transcript=None):
"""
Start a streaming transcription session for a call.
Args:
call_id: Unique call identifier
on_transcript: Async callback(TranscriptionSegment) for results
"""
from deepgram import (
DeepgramClient,
LiveTranscriptionEvents,
LiveOptions,
)
deepgram = DeepgramClient(self.api_key)
connection = deepgram.listen.asynclive.v("1")
call_start = time.time()
async def on_message(self_conn, result, **kwargs):
"""Handle transcription results from Deepgram."""
transcript = result.channel.alternatives[0].transcript
if not transcript.strip():
return
confidence = result.channel.alternatives[0].confidence
start_offset = result.start
duration = result.duration
segment = TranscriptionSegment(
call_id=call_id,
text=transcript.strip(),
timestamp=time.time(),
start_time=start_offset,
end_time=start_offset + duration,
confidence=confidence,
language=self.language,
)
if on_transcript:
await on_transcript(segment)
async def on_error(self_conn, error, **kwargs):
logger.error(f"Deepgram error for {call_id}: {error}")
connection.on(LiveTranscriptionEvents.Transcript, on_message)
connection.on(LiveTranscriptionEvents.Error, on_error)
options = LiveOptions(
model=self.model,
language=self.language,
encoding="linear16",
sample_rate=self.sample_rate,
channels=1,
smart_format=True,
interim_results=False,
utterance_end_ms=1000,
vad_events=True,
endpointing=300,
)
if await connection.start(options):
self._connections[call_id] = connection
logger.info(f"Deepgram stream started for {call_id}")
else:
logger.error(f"Failed to start Deepgram stream for {call_id}")
async def send_audio(self, call_id: str, audio_bytes: bytes):
"""Send audio data to an active Deepgram stream."""
conn = self._connections.get(call_id)
if conn:
await conn.send(audio_bytes)
async def stop_stream(self, call_id: str):
"""Stop a streaming transcription session."""
conn = self._connections.pop(call_id, None)
if conn:
await conn.finish()
logger.info(f"Deepgram stream stopped for {call_id}")
class TranscriptionManager:
"""
Manages transcription for all active calls.
Routes audio to the appropriate transcription engine,
tracks per-call state, and publishes results to Redis.
"""
def __init__(
self,
engine: str = "whisper",
redis_url: str = "redis://127.0.0.1:6379/0",
**engine_kwargs,
):
self.engine_type = engine
self.redis_url = redis_url
self.engine_kwargs = engine_kwargs
self.call_states: Dict[str, CallTranscriptionState] = {}
self.redis: Optional[aioredis.Redis] = None
# Initialize transcription engine
if engine == "whisper":
self.transcriber = WhisperTranscriber(**engine_kwargs)
elif engine == "deepgram":
self.transcriber = DeepgramTranscriber(**engine_kwargs)
else:
raise ValueError(f"Unknown engine: {engine}")
async def initialize(self):
"""Initialize connections and models."""
self.redis = aioredis.from_url(self.redis_url)
if isinstance(self.transcriber, WhisperTranscriber):
# Load model (blocking, do in thread)
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.transcriber.load_model)
logger.info("TranscriptionManager initialized")
async def on_call_start(self, call_id: str):
"""Handle a new call starting."""
self.call_states[call_id] = CallTranscriptionState(
call_id=call_id,
call_start=time.time(),
)
# Publish call start event
await self.redis.publish(
"calls:events",
json.dumps({
"event": "call_start",
"call_id": call_id,
"timestamp": time.time(),
}),
)
# If using Deepgram, start a streaming session
if isinstance(self.transcriber, DeepgramTranscriber):
await self.transcriber.start_stream(
call_id,
on_transcript=lambda seg: self._publish_segment(seg),
)
logger.info(f"Call tracking started: {call_id}")
async def on_audio_chunk(self, call_id: str, audio_bytes: bytes):
"""
Process an audio chunk from an active call.
For Whisper: accumulates and processes chunks synchronously.
For Deepgram: forwards raw audio to the streaming connection.
"""
state = self.call_states.get(call_id)
if not state:
logger.warning(f"Audio for unknown call: {call_id}")
return
state.total_chunks += 1
chunk_duration = len(audio_bytes) / (8000 * 2) # 8kHz 16-bit
state.total_audio_seconds += chunk_duration
if isinstance(self.transcriber, WhisperTranscriber):
# Process chunk with Faster-Whisper (in thread pool)
loop = asyncio.get_event_loop()
segments = await loop.run_in_executor(
None,
self.transcriber.transcribe_chunk,
audio_bytes,
call_id,
state.total_audio_seconds - chunk_duration,
)
for segment in segments:
state.segments.append(segment)
await self._publish_segment(segment)
elif isinstance(self.transcriber, DeepgramTranscriber):
# Forward audio to Deepgram stream
await self.transcriber.send_audio(call_id, audio_bytes)
async def on_call_end(self, call_id: str):
"""Handle a call ending."""
state = self.call_states.pop(call_id, None)
if not state:
return
# Stop Deepgram stream if active
if isinstance(self.transcriber, DeepgramTranscriber):
await self.transcriber.stop_stream(call_id)
duration = time.time() - state.call_start
# Publish call end event
await self.redis.publish(
"calls:events",
json.dumps({
"event": "call_end",
"call_id": call_id,
"timestamp": time.time(),
"duration": duration,
"segments": len(state.segments),
"audio_seconds": state.total_audio_seconds,
}),
)
logger.info(
f"Call ended: {call_id}, duration={duration:.1f}s, "
f"segments={len(state.segments)}"
)
async def _publish_segment(self, segment: TranscriptionSegment):
"""Publish a transcription segment to Redis."""
message = {
"type": "transcript",
"call_id": segment.call_id,
"text": segment.text,
"timestamp": segment.timestamp,
"start_time": segment.start_time,
"end_time": segment.end_time,
"confidence": segment.confidence,
"speaker": segment.speaker,
"language": segment.language,
}
# Publish to call-specific channel and global channel
await self.redis.publish(
f"calls:transcript:{segment.call_id}",
json.dumps(message),
)
await self.redis.publish(
"calls:transcript:all",
json.dumps(message),
)
def get_active_calls(self) -> List[dict]:
"""Return list of calls currently being transcribed."""
now = time.time()
return [
{
"call_id": state.call_id,
"duration": now - state.call_start,
"segments": len(state.segments),
"audio_seconds": state.total_audio_seconds,
"last_text": state.segments[-1].text if state.segments else "",
}
for state in self.call_states.values()
]
Audio Format Handling
Asterisk sends audio in different formats depending on the capture method. Here is a utility module for converting between formats:
#!/usr/bin/env python3
"""
audio_utils.py — Audio format conversion utilities.
Handles conversion between Asterisk's native formats and
what transcription engines expect.
"""
import io
import wave
import struct
import numpy as np
from typing import Tuple
def pcm_to_wav_bytes(
pcm_data: bytes,
sample_rate: int = 8000,
sample_width: int = 2,
channels: int = 1,
) -> bytes:
"""
Wrap raw PCM data in a WAV container.
Args:
pcm_data: Raw signed 16-bit linear PCM bytes
sample_rate: Sample rate in Hz
sample_width: Bytes per sample (2 for 16-bit)
channels: Number of audio channels
Returns:
WAV file contents as bytes
"""
buffer = io.BytesIO()
with wave.open(buffer, "wb") as wf:
wf.setnchannels(channels)
wf.setsampwidth(sample_width)
wf.setframerate(sample_rate)
wf.writeframes(pcm_data)
return buffer.getvalue()
def wav_to_pcm(wav_data: bytes) -> Tuple[bytes, int, int]:
"""
Extract raw PCM from a WAV file.
Returns:
Tuple of (pcm_bytes, sample_rate, channels)
"""
buffer = io.BytesIO(wav_data)
with wave.open(buffer, "rb") as wf:
pcm = wf.readframes(wf.getnframes())
return pcm, wf.getframerate(), wf.getnchannels()
def resample_pcm(
pcm_data: bytes,
from_rate: int,
to_rate: int,
) -> bytes:
"""
Resample PCM audio from one sample rate to another.
Uses linear interpolation (good enough for speech).
"""
samples = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32)
ratio = to_rate / from_rate
new_length = int(len(samples) * ratio)
indices = np.linspace(0, len(samples) - 1, new_length)
resampled = np.interp(indices, np.arange(len(samples)), samples)
return resampled.astype(np.int16).tobytes()
def split_stereo(pcm_data: bytes) -> Tuple[bytes, bytes]:
"""
Split stereo PCM into two mono channels.
Useful when MixMonitor records with separate legs
(agent on left, caller on right).
Returns:
Tuple of (left_channel_bytes, right_channel_bytes)
"""
samples = np.frombuffer(pcm_data, dtype=np.int16)
left = samples[0::2] # Even samples = left channel
right = samples[1::2] # Odd samples = right channel
return left.tobytes(), right.tobytes()
def detect_silence(
pcm_data: bytes,
threshold: float = 0.01,
sample_rate: int = 8000,
) -> bool:
"""
Detect if an audio chunk is mostly silence.
Returns True if RMS energy is below threshold.
"""
samples = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0
rms = np.sqrt(np.mean(samples**2))
return rms < threshold
def normalize_audio(pcm_data: bytes, target_rms: float = 0.1) -> bytes:
"""Normalize audio volume to target RMS level."""
samples = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) / 32768.0
current_rms = np.sqrt(np.mean(samples**2))
if current_rms < 0.001: # Near-silence, don't amplify noise
return pcm_data
gain = target_rms / current_rms
gain = min(gain, 10.0) # Cap maximum gain at 20dB
samples = samples * gain
samples = np.clip(samples, -1.0, 1.0) # Prevent clipping
return (samples * 32768).astype(np.int16).tobytes()
Speaker Diarization (Agent vs. Caller)
When using stereo recordings (MixMonitor with separate legs), you can identify who is speaking:
def identify_speaker_from_stereo(
left_pcm: bytes,
right_pcm: bytes,
threshold: float = 0.02,
) -> str:
"""
Identify active speaker from stereo recording legs.
Convention: Left channel = agent, Right channel = caller.
Compares RMS energy to determine who is speaking.
Args:
left_pcm: Agent audio (left channel)
right_pcm: Caller audio (right channel)
threshold: Minimum energy to count as speech
Returns:
"agent", "caller", or "both"
"""
left_samples = np.frombuffer(left_pcm, dtype=np.int16).astype(np.float32) / 32768.0
right_samples = np.frombuffer(right_pcm, dtype=np.int16).astype(np.float32) / 32768.0
left_rms = np.sqrt(np.mean(left_samples**2))
right_rms = np.sqrt(np.mean(right_samples**2))
left_speaking = left_rms > threshold
right_speaking = right_rms > threshold
if left_speaking and right_speaking:
return "both"
elif left_speaking:
return "agent"
elif right_speaking:
return "caller"
else:
return "silence"
6. Sentiment Analysis
Each transcription segment is analyzed for sentiment (positive/negative/neutral), keyword matches, and overall call health. We provide three analysis engines with a unified interface.
Sentiment Engine Comparison
| Feature | VADER | TextBlob | Transformer (DistilBERT) |
|---|---|---|---|
| Speed | ~0.1ms/sentence | ~0.5ms/sentence | ~50ms/sentence (CPU) |
| Accuracy | Good for English | Decent | Best |
| Domain Tuning | Rule-based, customizable | Limited | Fine-tune possible |
| Dependencies | vaderSentiment (tiny) | textblob (small) | transformers+torch (~2GB) |
| Handles Sarcasm | Poorly | Poorly | Moderately |
| RAM Usage | ~10 MB | ~20 MB | ~500 MB |
| Best For | Real-time, high volume | Simple scoring | Accuracy-critical |
Recommendation: Use VADER for real-time processing. It is fast enough to process every utterance without any measurable impact on latency, and its accuracy is sufficient for sentiment trending and alert triggering. Use the transformer model only if you need high accuracy and have spare CPU/GPU resources.
Complete Sentiment Module
#!/usr/bin/env python3
"""
sentiment_service.py — Sentiment analysis for call transcription segments.
Analyzes each utterance for:
- Sentiment polarity (-1.0 negative to +1.0 positive)
- Sentiment label (positive, negative, neutral)
- Keyword detection (alert words, profanity)
- Rolling sentiment score per call
Supports three engines: VADER, TextBlob, and Transformer (DistilBERT).
"""
import re
import time
import json
import logging
import asyncio
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from collections import deque
import redis.asyncio as aioredis
logger = logging.getLogger(__name__)
@dataclass
class SentimentResult:
"""Result of sentiment analysis on a single utterance."""
call_id: str
text: str
timestamp: float
start_time: float
end_time: float
speaker: str
# Sentiment scores
sentiment_score: float = 0.0 # -1.0 to 1.0
sentiment_label: str = "neutral" # positive, negative, neutral
confidence: float = 0.0
# Keyword detection
keywords_detected: List[str] = field(default_factory=list)
profanity_detected: List[str] = field(default_factory=list)
# Rolling average for this call
rolling_sentiment: float = 0.0
def to_dict(self) -> dict:
return {
"type": "sentiment",
"call_id": self.call_id,
"text": self.text,
"timestamp": self.timestamp,
"start_time": self.start_time,
"end_time": self.end_time,
"speaker": self.speaker,
"sentiment_score": round(self.sentiment_score, 4),
"sentiment_label": self.sentiment_label,
"confidence": round(self.confidence, 4),
"keywords_detected": self.keywords_detected,
"profanity_detected": self.profanity_detected,
"rolling_sentiment": round(self.rolling_sentiment, 4),
}
class VADERAnalyzer:
"""
Sentiment analysis using VADER (Valence Aware Dictionary and
sEntiment Reasoner).
VADER is specifically designed for social media and short texts.
It works well for call center utterances because:
- Handles capitalization (VERY ANGRY vs very angry)
- Handles punctuation (great!!! vs great)
- Handles negation (not happy)
- Handles degree modifiers (extremely, slightly)
- Fast enough for real-time use (~0.1ms per sentence)
"""
def __init__(self):
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
self.analyzer = SentimentIntensityAnalyzer()
# Add call-center-specific terms to VADER's lexicon
custom_lexicon = {
"cancel": -2.0,
"cancellation": -1.5,
"refund": -1.0,
"escalate": -1.5,
"supervisor": -0.5,
"manager": -0.5,
"complaint": -2.0,
"unacceptable": -2.5,
"resolved": 2.0,
"thank you": 2.0,
"thanks": 1.5,
"appreciate": 2.0,
"helpful": 2.5,
"excellent": 3.0,
"perfect": 3.0,
"brilliant": 2.5,
"sorted": 1.5,
"wonderful": 3.0,
"disgusting": -3.0,
"incompetent": -3.0,
"useless": -2.5,
"ridiculous": -2.0,
"waste of time": -2.5,
}
self.analyzer.lexicon.update(custom_lexicon)
def analyze(self, text: str) -> Tuple[float, str, float]:
"""
Analyze sentiment of text.
Returns:
Tuple of (score, label, confidence)
- score: -1.0 to 1.0 (compound score)
- label: "positive", "negative", or "neutral"
- confidence: 0.0 to 1.0 (abs of compound)
"""
scores = self.analyzer.polarity_scores(text)
compound = scores["compound"]
if compound >= 0.05:
label = "positive"
elif compound <= -0.05:
label = "negative"
else:
label = "neutral"
confidence = abs(compound)
return compound, label, confidence
class TextBlobAnalyzer:
"""
Sentiment analysis using TextBlob.
Simpler than VADER, provides polarity and subjectivity scores.
Polarity: -1.0 (negative) to 1.0 (positive)
Subjectivity: 0.0 (objective) to 1.0 (subjective)
"""
def __init__(self):
from textblob import TextBlob
self.TextBlob = TextBlob
def analyze(self, text: str) -> Tuple[float, str, float]:
"""Analyze sentiment using TextBlob."""
blob = self.TextBlob(text)
polarity = blob.sentiment.polarity
subjectivity = blob.sentiment.subjectivity
if polarity > 0.1:
label = "positive"
elif polarity < -0.1:
label = "negative"
else:
label = "neutral"
# Use subjectivity as a proxy for confidence
# (subjective statements have clearer sentiment)
confidence = min(abs(polarity) + subjectivity * 0.3, 1.0)
return polarity, label, confidence
class TransformerAnalyzer:
"""
Sentiment analysis using a fine-tuned DistilBERT transformer.
Model: distilbert-base-uncased-finetuned-sst-2-english
Much more accurate than rule-based methods but ~500x slower.
Best for batch processing or when you have GPU resources.
"""
def __init__(self, model_name: str = "distilbert-base-uncased-finetuned-sst-2-english"):
from transformers import pipeline
logger.info(f"Loading transformer model: {model_name}")
self.classifier = pipeline(
"sentiment-analysis",
model=model_name,
device=-1, # CPU (use 0 for GPU)
)
logger.info("Transformer model loaded")
def analyze(self, text: str) -> Tuple[float, str, float]:
"""Analyze sentiment using transformer model."""
result = self.classifier(text, truncation=True, max_length=512)[0]
label = result["label"].lower() # "POSITIVE" or "NEGATIVE"
confidence = result["score"]
# Convert to -1 to 1 scale
if label == "positive":
score = confidence
label = "positive"
else:
score = -confidence
label = "negative"
# Mark low-confidence results as neutral
if confidence < 0.6:
label = "neutral"
return score, label, confidence
class KeywordDetector:
"""
Detect alert keywords and profanity in transcription text.
Uses simple string matching with word boundary awareness.
"""
def __init__(
self,
alert_keywords: Optional[List[str]] = None,
profanity_words: Optional[List[str]] = None,
):
self.alert_keywords = alert_keywords or [
"cancel", "supervisor", "manager", "complaint", "legal",
"lawyer", "sue", "report", "escalate", "unacceptable",
"disgusting", "terrible", "worst", "refund", "fraud",
"scam", "lie", "liar", "incompetent",
]
self.profanity_words = profanity_words or [
"fuck", "shit", "damn", "hell", "ass", "bitch",
"bastard", "crap", "piss", "bloody",
]
# Compile regex patterns for word boundary matching
self._alert_pattern = re.compile(
r"\b(" + "|".join(re.escape(w) for w in self.alert_keywords) + r")\b",
re.IGNORECASE,
)
self._profanity_pattern = re.compile(
r"\b(" + "|".join(re.escape(w) for w in self.profanity_words) + r")\b",
re.IGNORECASE,
)
def detect(self, text: str) -> Tuple[List[str], List[str]]:
"""
Detect keywords and profanity in text.
Returns:
Tuple of (alert_keywords_found, profanity_found)
"""
alerts = list(set(m.group().lower() for m in self._alert_pattern.finditer(text)))
profanity = list(set(m.group().lower() for m in self._profanity_pattern.finditer(text)))
return alerts, profanity
@dataclass
class CallSentimentState:
"""Track rolling sentiment for a single call."""
call_id: str
scores: deque = field(default_factory=lambda: deque(maxlen=20))
total_segments: int = 0
total_positive: int = 0
total_negative: int = 0
total_neutral: int = 0
consecutive_negative: int = 0
all_keywords: List[str] = field(default_factory=list)
all_profanity: List[str] = field(default_factory=list)
def add_score(self, score: float, label: str):
"""Add a sentiment score and update counters."""
self.scores.append(score)
self.total_segments += 1
if label == "positive":
self.total_positive += 1
self.consecutive_negative = 0
elif label == "negative":
self.total_negative += 1
self.consecutive_negative += 1
else:
self.total_neutral += 1
self.consecutive_negative = 0
@property
def rolling_average(self) -> float:
"""Calculate rolling average sentiment score."""
if not self.scores:
return 0.0
return sum(self.scores) / len(self.scores)
@property
def overall_label(self) -> str:
"""Get overall sentiment label based on rolling average."""
avg = self.rolling_average
if avg > 0.1:
return "positive"
elif avg < -0.1:
return "negative"
return "neutral"
class SentimentService:
"""
Main sentiment analysis service.
Subscribes to transcription segments from Redis, analyzes them,
and publishes enriched results back to Redis.
"""
def __init__(
self,
engine: str = "vader",
redis_url: str = "redis://127.0.0.1:6379/0",
rolling_window: int = 10,
alert_keywords: Optional[List[str]] = None,
profanity_words: Optional[List[str]] = None,
):
self.redis_url = redis_url
self.rolling_window = rolling_window
self.redis: Optional[aioredis.Redis] = None
self.call_states: Dict[str, CallSentimentState] = {}
# Initialize sentiment analyzer
if engine == "vader":
self.analyzer = VADERAnalyzer()
elif engine == "textblob":
self.analyzer = TextBlobAnalyzer()
elif engine == "transformer":
self.analyzer = TransformerAnalyzer()
else:
raise ValueError(f"Unknown sentiment engine: {engine}")
# Initialize keyword detector
self.keyword_detector = KeywordDetector(alert_keywords, profanity_words)
logger.info(f"SentimentService initialized with engine={engine}")
async def initialize(self):
"""Connect to Redis."""
self.redis = aioredis.from_url(self.redis_url)
async def start(self):
"""Start listening for transcription segments."""
await self.initialize()
pubsub = self.redis.pubsub()
await pubsub.subscribe("calls:transcript:all", "calls:events")
logger.info("SentimentService listening for transcription segments")
async for message in pubsub.listen():
if message["type"] != "message":
continue
channel = message["channel"]
if isinstance(channel, bytes):
channel = channel.decode()
try:
data = json.loads(message["data"])
except (json.JSONDecodeError, TypeError):
continue
if channel == "calls:events":
await self._handle_call_event(data)
elif channel == "calls:transcript:all":
await self._handle_transcript(data)
async def _handle_call_event(self, data: dict):
"""Handle call start/end events."""
event = data.get("event")
call_id = data.get("call_id")
if event == "call_start":
self.call_states[call_id] = CallSentimentState(call_id=call_id)
logger.info(f"Tracking sentiment for call: {call_id}")
elif event == "call_end":
state = self.call_states.pop(call_id, None)
if state:
logger.info(
f"Call {call_id} sentiment summary: "
f"avg={state.rolling_average:.2f}, "
f"pos={state.total_positive}, neg={state.total_negative}, "
f"neutral={state.total_neutral}, "
f"keywords={state.all_keywords}"
)
async def _handle_transcript(self, data: dict):
"""Analyze a transcription segment and publish results."""
call_id = data.get("call_id", "")
text = data.get("text", "").strip()
if not text or len(text) < 3:
return
# Ensure call state exists
if call_id not in self.call_states:
self.call_states[call_id] = CallSentimentState(call_id=call_id)
state = self.call_states[call_id]
# Run sentiment analysis
score, label, confidence = self.analyzer.analyze(text)
# Run keyword detection
keywords, profanity = self.keyword_detector.detect(text)
# Update call state
state.add_score(score, label)
state.all_keywords.extend(keywords)
state.all_profanity.extend(profanity)
# Build result
result = SentimentResult(
call_id=call_id,
text=text,
timestamp=data.get("timestamp", time.time()),
start_time=data.get("start_time", 0),
end_time=data.get("end_time", 0),
speaker=data.get("speaker", "unknown"),
sentiment_score=score,
sentiment_label=label,
confidence=confidence,
keywords_detected=keywords,
profanity_detected=profanity,
rolling_sentiment=state.rolling_average,
)
# Publish analyzed result
result_json = json.dumps(result.to_dict())
await self.redis.publish(f"calls:sentiment:{call_id}", result_json)
await self.redis.publish("calls:sentiment:all", result_json)
# Store in Redis stream for history
await self.redis.xadd(
f"calls:history:{call_id}",
result.to_dict(),
maxlen=500,
)
# Check for alerts
if keywords or profanity or state.consecutive_negative >= 3:
await self._publish_alert(result, state)
if keywords:
logger.info(
f"Keywords detected in {call_id}: {keywords} — "
f"text: '{text[:80]}...'"
)
async def _publish_alert(self, result: SentimentResult, state: CallSentimentState):
"""Publish an alert when thresholds are exceeded."""
alert_level = "warning"
reasons = []
if result.profanity_detected:
alert_level = "urgent"
reasons.append(f"Profanity: {', '.join(result.profanity_detected)}")
if result.keywords_detected:
alert_level = "warning"
reasons.append(f"Keywords: {', '.join(result.keywords_detected)}")
if state.consecutive_negative >= 5:
alert_level = "critical"
reasons.append(
f"Sustained negative sentiment ({state.consecutive_negative} utterances)"
)
elif state.consecutive_negative >= 3:
alert_level = "urgent"
reasons.append(
f"Negative sentiment trend ({state.consecutive_negative} utterances)"
)
if result.sentiment_score < -0.7:
alert_level = "critical"
reasons.append(f"Very negative sentiment: {result.sentiment_score:.2f}")
alert = {
"type": "alert",
"alert_level": alert_level, # warning, urgent, critical
"call_id": result.call_id,
"timestamp": time.time(),
"reasons": reasons,
"text": result.text,
"speaker": result.speaker,
"sentiment_score": result.sentiment_score,
"rolling_sentiment": state.rolling_average,
}
await self.redis.publish("calls:alerts", json.dumps(alert))
logger.warning(f"ALERT [{alert_level}] call={result.call_id}: {reasons}")
def get_call_sentiment(self, call_id: str) -> Optional[dict]:
"""Get current sentiment state for a call."""
state = self.call_states.get(call_id)
if not state:
return None
return {
"call_id": call_id,
"rolling_sentiment": state.rolling_average,
"overall_label": state.overall_label,
"total_segments": state.total_segments,
"positive_count": state.total_positive,
"negative_count": state.total_negative,
"neutral_count": state.total_neutral,
"consecutive_negative": state.consecutive_negative,
"keywords": list(set(state.all_keywords)),
"profanity": list(set(state.all_profanity)),
}
7. Message Broker — Redis Pub/Sub
Redis serves as the central nervous system connecting all components. The transcription service publishes segments, the sentiment service subscribes and re-publishes enriched results, and the WebSocket server subscribes to deliver updates to browsers.
Redis Channel Structure
calls:events — Call lifecycle events (start, end)
calls:transcript:all — All transcription segments (global feed)
calls:transcript:{id} — Segments for a specific call
calls:sentiment:all — All analyzed segments (global feed)
calls:sentiment:{id} — Analyzed segments for a specific call
calls:alerts — Alert notifications
calls:history:{id} — Redis Stream: persistent history per call
calls:active — Redis Hash: currently active calls
Redis Broker Module
#!/usr/bin/env python3
"""
redis_broker.py — Redis pub/sub wrapper for the call monitoring system.
Provides a clean interface for publishing and subscribing to
call monitoring events. Uses both pub/sub (real-time) and
Streams (persistent history).
"""
import json
import time
import logging
import asyncio
from typing import Optional, Dict, List, Callable, Any
import redis.asyncio as aioredis
logger = logging.getLogger(__name__)
class RedisBroker:
"""
Redis message broker for call monitoring.
Handles pub/sub subscriptions, message publishing,
active call tracking, and history storage.
"""
def __init__(
self,
host: str = "127.0.0.1",
port: int = 6379,
db: int = 0,
password: str = "",
):
url = f"redis://{host}:{port}/{db}"
if password:
url = f"redis://:{password}@{host}:{port}/{db}"
self.redis_url = url
self.redis: Optional[aioredis.Redis] = None
self._pubsub: Optional[aioredis.client.PubSub] = None
self._subscriptions: Dict[str, List[Callable]] = {}
self._running = False
async def connect(self):
"""Establish Redis connection."""
self.redis = aioredis.from_url(
self.redis_url,
decode_responses=False,
max_connections=20,
)
# Test connection
await self.redis.ping()
logger.info(f"Connected to Redis: {self.redis_url}")
async def disconnect(self):
"""Close Redis connection."""
self._running = False
if self._pubsub:
await self._pubsub.unsubscribe()
await self._pubsub.close()
if self.redis:
await self.redis.close()
# --- Publishing ---
async def publish(self, channel: str, data: dict):
"""Publish a message to a channel."""
await self.redis.publish(channel, json.dumps(data))
async def publish_transcript(self, call_id: str, segment: dict):
"""Publish a transcription segment."""
await self.publish(f"calls:transcript:{call_id}", segment)
await self.publish("calls:transcript:all", segment)
async def publish_sentiment(self, call_id: str, result: dict):
"""Publish a sentiment analysis result."""
await self.publish(f"calls:sentiment:{call_id}", result)
await self.publish("calls:sentiment:all", result)
# Also store in stream for history
await self.redis.xadd(
f"calls:history:{call_id}",
{k: str(v) if not isinstance(v, str) else v for k, v in result.items()},
maxlen=1000,
)
async def publish_alert(self, alert: dict):
"""Publish an alert."""
await self.publish("calls:alerts", alert)
# Store alerts separately for dashboard retrieval
await self.redis.lpush(
"calls:recent_alerts",
json.dumps(alert),
)
await self.redis.ltrim("calls:recent_alerts", 0, 99) # Keep last 100
async def publish_call_event(self, event: str, call_id: str, **extra):
"""Publish a call lifecycle event."""
data = {
"event": event,
"call_id": call_id,
"timestamp": time.time(),
**extra,
}
await self.publish("calls:events", data)
# --- Active Call Tracking ---
async def register_active_call(self, call_id: str, metadata: dict):
"""Register a call as active."""
await self.redis.hset(
"calls:active",
call_id,
json.dumps({
"call_id": call_id,
"start_time": time.time(),
**metadata,
}),
)
async def remove_active_call(self, call_id: str):
"""Remove a call from active tracking."""
await self.redis.hdel("calls:active", call_id)
async def get_active_calls(self) -> List[dict]:
"""Get all currently active calls."""
calls = await self.redis.hgetall("calls:active")
result = []
for call_id, data in calls.items():
try:
call_data = json.loads(data)
call_data["duration"] = time.time() - call_data.get("start_time", time.time())
result.append(call_data)
except json.JSONDecodeError:
continue
return result
# --- History ---
async def get_call_history(
self,
call_id: str,
count: int = 100,
) -> List[dict]:
"""Get transcript/sentiment history for a call."""
entries = await self.redis.xrange(
f"calls:history:{call_id}",
count=count,
)
result = []
for entry_id, fields in entries:
data = {}
for k, v in fields.items():
key = k.decode() if isinstance(k, bytes) else k
val = v.decode() if isinstance(v, bytes) else v
# Try to parse JSON values
try:
val = json.loads(val)
except (json.JSONDecodeError, TypeError):
pass
data[key] = val
result.append(data)
return result
async def get_recent_alerts(self, count: int = 20) -> List[dict]:
"""Get recent alerts."""
raw = await self.redis.lrange("calls:recent_alerts", 0, count - 1)
alerts = []
for item in raw:
try:
alerts.append(json.loads(item))
except json.JSONDecodeError:
continue
return alerts
# --- Subscribing ---
async def subscribe(
self,
channels: List[str],
callback: Callable,
):
"""
Subscribe to channels and call callback for each message.
Args:
channels: List of channel names or patterns
callback: Async function(channel, data_dict)
"""
self._pubsub = self.redis.pubsub()
# Separate pattern subscriptions from regular ones
patterns = [c for c in channels if "*" in c]
regular = [c for c in channels if "*" not in c]
if regular:
await self._pubsub.subscribe(*regular)
if patterns:
await self._pubsub.psubscribe(*patterns)
self._running = True
logger.info(f"Subscribed to channels: {channels}")
async for message in self._pubsub.listen():
if not self._running:
break
msg_type = message.get("type", "")
if msg_type not in ("message", "pmessage"):
continue
channel = message.get("channel", b"")
if isinstance(channel, bytes):
channel = channel.decode()
try:
data = json.loads(message["data"])
await callback(channel, data)
except json.JSONDecodeError:
continue
except Exception as e:
logger.error(f"Subscriber callback error: {e}", exc_info=True)
# --- Cleanup ---
async def cleanup_stale_calls(self, max_age: int = 7200):
"""Remove calls that have been active for too long (likely stale)."""
calls = await self.redis.hgetall("calls:active")
now = time.time()
removed = 0
for call_id, data in calls.items():
try:
call_data = json.loads(data)
age = now - call_data.get("start_time", now)
if age > max_age:
await self.redis.hdel("calls:active", call_id)
removed += 1
except json.JSONDecodeError:
await self.redis.hdel("calls:active", call_id)
removed += 1
if removed:
logger.info(f"Cleaned up {removed} stale call entries")
Redis Configuration for Production
# /etc/redis/redis.conf — Key settings for call monitoring
# Memory limit (adjust based on your server)
maxmemory 512mb
maxmemory-policy allkeys-lru
# Persistence (optional — call data is ephemeral)
# Disable RDB if you do not need persistence across restarts
save ""
# Or keep minimal persistence:
# save 300 100
# Pub/Sub client buffer limits
# Increase for high-volume call centers
client-output-buffer-limit pubsub 64mb 32mb 60
# Connection limits
maxclients 256
# Disable unused features for security
rename-command FLUSHDB ""
rename-command FLUSHALL ""
rename-command CONFIG ""
Testing the Broker
#!/usr/bin/env python3
"""
test_broker.py — Quick test to verify Redis pub/sub is working.
Run this in one terminal, then run a publisher in another.
"""
import asyncio
import json
from redis_broker import RedisBroker
async def test_subscriber():
broker = RedisBroker()
await broker.connect()
async def on_message(channel, data):
print(f"[{channel}] {json.dumps(data, indent=2)}")
await broker.subscribe(
["calls:sentiment:all", "calls:alerts", "calls:events"],
on_message,
)
async def test_publisher():
broker = RedisBroker()
await broker.connect()
# Simulate a call start
await broker.publish_call_event("call_start", "test-call-001")
await broker.register_active_call("test-call-001", {
"agent": "John Smith",
"caller": "+44123456789",
"campaign": "UK_Sales",
})
# Simulate sentiment results
for i, (text, score) in enumerate([
("Hello, thank you for calling", 0.6),
("I need help with my account", 0.0),
("This is really frustrating", -0.5),
("I want to cancel my subscription", -0.7),
("I understand, let me help you", 0.4),
]):
await broker.publish_sentiment("test-call-001", {
"type": "sentiment",
"call_id": "test-call-001",
"text": text,
"timestamp": 1700000000 + i * 15,
"start_time": i * 15.0,
"end_time": (i + 1) * 15.0,
"speaker": "caller" if i % 2 else "agent",
"sentiment_score": score,
"sentiment_label": "positive" if score > 0.1 else ("negative" if score < -0.1 else "neutral"),
"keywords_detected": ["cancel"] if "cancel" in text.lower() else [],
"profanity_detected": [],
"rolling_sentiment": score,
})
await asyncio.sleep(0.5)
# Simulate alert
await broker.publish_alert({
"type": "alert",
"alert_level": "urgent",
"call_id": "test-call-001",
"timestamp": 1700000060,
"reasons": ["Keyword: cancel", "Negative sentiment trend"],
"text": "I want to cancel my subscription",
"speaker": "caller",
"sentiment_score": -0.7,
})
# End call
await broker.publish_call_event("call_end", "test-call-001", duration=75.0)
await broker.remove_active_call("test-call-001")
print("Test messages published.")
await broker.disconnect()
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "pub":
asyncio.run(test_publisher())
else:
asyncio.run(test_subscriber())
8. WebSocket Server
The WebSocket server bridges Redis pub/sub and browser clients. It is built with FastAPI for performance and simplicity, providing both WebSocket endpoints for real-time streaming and REST endpoints for call metadata.
Complete Server Implementation
#!/usr/bin/env python3
"""
websocket_server.py — FastAPI WebSocket server for the call monitoring dashboard.
Provides:
- WebSocket endpoint for real-time call data streaming
- REST endpoints for active calls, history, and alerts
- Token-based authentication
- Graceful client connect/disconnect handling
- Static file serving for the dashboard HTML
"""
import os
import json
import time
import asyncio
import logging
from typing import Dict, Set, Optional, List
from contextlib import asynccontextmanager
import redis.asyncio as aioredis
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, HTTPException
from fastapi.responses import HTMLResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
logger = logging.getLogger(__name__)
# Configuration
AUTH_TOKEN = os.getenv("AUTH_TOKEN", "your-secret-token-change-me")
REDIS_URL = os.getenv("REDIS_URL", "redis://127.0.0.1:6379/0")
SERVER_HOST = os.getenv("SERVER_HOST", "0.0.0.0")
SERVER_PORT = int(os.getenv("SERVER_PORT", "8095"))
STATIC_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static")
class ConnectionManager:
"""
Manages WebSocket connections and message routing.
Clients can subscribe to:
- All calls ("all")
- A specific call ID
- Alerts only ("alerts")
"""
def __init__(self):
# Map of subscription_key -> set of WebSocket connections
self.subscriptions: Dict[str, Set[WebSocket]] = {}
# Map of WebSocket -> set of subscribed keys
self.client_subs: Dict[WebSocket, Set[str]] = {}
# Connection metadata
self.client_info: Dict[WebSocket, dict] = {}
async def connect(self, websocket: WebSocket, subscribe_to: str = "all"):
"""Accept a WebSocket connection and register subscription."""
await websocket.accept()
if subscribe_to not in self.subscriptions:
self.subscriptions[subscribe_to] = set()
self.subscriptions[subscribe_to].add(websocket)
if websocket not in self.client_subs:
self.client_subs[websocket] = set()
self.client_subs[websocket].add(subscribe_to)
self.client_info[websocket] = {
"connected_at": time.time(),
"subscribe_to": subscribe_to,
"messages_sent": 0,
}
logger.info(
f"Client connected (sub={subscribe_to}), "
f"total clients: {self.total_clients}"
)
async def disconnect(self, websocket: WebSocket):
"""Remove a WebSocket connection from all subscriptions."""
subs = self.client_subs.pop(websocket, set())
for sub_key in subs:
if sub_key in self.subscriptions:
self.subscriptions[sub_key].discard(websocket)
if not self.subscriptions[sub_key]:
del self.subscriptions[sub_key]
self.client_info.pop(websocket, None)
logger.info(f"Client disconnected, total clients: {self.total_clients}")
async def broadcast(self, subscription_key: str, message: dict):
"""Send a message to all clients subscribed to a key."""
clients = self.subscriptions.get(subscription_key, set()).copy()
if not clients:
return
data = json.dumps(message)
disconnected = []
for websocket in clients:
try:
await websocket.send_text(data)
info = self.client_info.get(websocket, {})
info["messages_sent"] = info.get("messages_sent", 0) + 1
except Exception:
disconnected.append(websocket)
# Clean up disconnected clients
for ws in disconnected:
await self.disconnect(ws)
async def broadcast_to_all(self, message: dict):
"""Send a message to ALL connected clients regardless of subscription."""
all_clients = set()
for clients in self.subscriptions.values():
all_clients.update(clients)
data = json.dumps(message)
disconnected = []
for websocket in all_clients:
try:
await websocket.send_text(data)
except Exception:
disconnected.append(websocket)
for ws in disconnected:
await self.disconnect(ws)
@property
def total_clients(self) -> int:
"""Total number of connected clients."""
return len(self.client_info)
def get_stats(self) -> dict:
"""Get connection statistics."""
return {
"total_clients": self.total_clients,
"subscriptions": {
k: len(v) for k, v in self.subscriptions.items()
},
}
# Global connection manager
manager = ConnectionManager()
async def redis_listener():
"""
Background task that subscribes to Redis channels and
forwards messages to WebSocket clients.
"""
redis = aioredis.from_url(REDIS_URL)
pubsub = redis.pubsub()
await pubsub.psubscribe(
"calls:sentiment:*",
"calls:events",
"calls:alerts",
)
logger.info("Redis listener started")
async for message in pubsub.listen():
msg_type = message.get("type", "")
if msg_type not in ("message", "pmessage"):
continue
channel = message.get("channel", b"")
if isinstance(channel, bytes):
channel = channel.decode()
try:
data = json.loads(message["data"])
except (json.JSONDecodeError, TypeError):
continue
# Route message to appropriate WebSocket clients
if channel == "calls:events":
# Call events go to all subscribers
await manager.broadcast_to_all(data)
elif channel == "calls:alerts":
# Alerts go to "all" and "alerts" subscribers
await manager.broadcast("all", data)
await manager.broadcast("alerts", data)
elif channel.startswith("calls:sentiment:"):
call_id = channel.split(":")[-1]
if call_id == "all":
# Global sentiment feed goes to "all" subscribers
await manager.broadcast("all", data)
else:
# Call-specific sentiment goes to call subscribers
await manager.broadcast(call_id, data)
# Also send to "all" subscribers
await manager.broadcast("all", data)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Start background tasks on app startup."""
task = asyncio.create_task(redis_listener())
logger.info("WebSocket server started")
yield
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Create FastAPI app
app = FastAPI(
title="Call Monitor Dashboard",
version="1.0.0",
lifespan=lifespan,
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def verify_token(token: str) -> bool:
"""Verify authentication token."""
return token == AUTH_TOKEN
# --- WebSocket Endpoints ---
@app.websocket("/ws/calls")
async def ws_all_calls(
websocket: WebSocket,
token: str = Query(default=""),
):
"""
WebSocket endpoint for all calls.
Connect: ws://host:port/ws/calls?token=YOUR_TOKEN
Receives: All sentiment results and call events
"""
if not verify_token(token):
await websocket.close(code=4001, reason="Invalid token")
return
await manager.connect(websocket, subscribe_to="all")
try:
while True:
# Keep connection alive, handle client messages
data = await websocket.receive_text()
try:
msg = json.loads(data)
# Handle client commands (e.g., subscribe to specific call)
if msg.get("action") == "subscribe":
call_id = msg.get("call_id")
if call_id:
if "all" not in manager.subscriptions:
manager.subscriptions["all"] = set()
if call_id not in manager.subscriptions:
manager.subscriptions[call_id] = set()
manager.subscriptions[call_id].add(websocket)
manager.client_subs[websocket].add(call_id)
elif msg.get("action") == "ping":
await websocket.send_text(json.dumps({"type": "pong"}))
except json.JSONDecodeError:
pass
except WebSocketDisconnect:
await manager.disconnect(websocket)
@app.websocket("/ws/calls/{call_id}")
async def ws_single_call(
websocket: WebSocket,
call_id: str,
token: str = Query(default=""),
):
"""
WebSocket endpoint for a specific call.
Connect: ws://host:port/ws/calls/CALL_ID?token=YOUR_TOKEN
Receives: Sentiment results for the specified call only
"""
if not verify_token(token):
await websocket.close(code=4001, reason="Invalid token")
return
await manager.connect(websocket, subscribe_to=call_id)
try:
while True:
data = await websocket.receive_text()
if data == "ping":
await websocket.send_text(json.dumps({"type": "pong"}))
except WebSocketDisconnect:
await manager.disconnect(websocket)
@app.websocket("/ws/alerts")
async def ws_alerts(
websocket: WebSocket,
token: str = Query(default=""),
):
"""
WebSocket endpoint for alerts only.
Connect: ws://host:port/ws/alerts?token=YOUR_TOKEN
Receives: Only alert notifications
"""
if not verify_token(token):
await websocket.close(code=4001, reason="Invalid token")
return
await manager.connect(websocket, subscribe_to="alerts")
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
await manager.disconnect(websocket)
# --- REST Endpoints ---
@app.get("/api/calls")
async def get_active_calls(token: str = Query(default="")):
"""Get list of active calls with metadata."""
if not verify_token(token):
raise HTTPException(status_code=401, detail="Invalid token")
redis = aioredis.from_url(REDIS_URL)
calls_raw = await redis.hgetall("calls:active")
await redis.close()
calls = []
for call_id, data in calls_raw.items():
try:
call_data = json.loads(data)
call_data["duration"] = time.time() - call_data.get("start_time", time.time())
calls.append(call_data)
except json.JSONDecodeError:
continue
return {"calls": calls, "count": len(calls)}
@app.get("/api/calls/{call_id}/history")
async def get_call_history(
call_id: str,
count: int = Query(default=100, le=500),
token: str = Query(default=""),
):
"""Get transcript/sentiment history for a call."""
if not verify_token(token):
raise HTTPException(status_code=401, detail="Invalid token")
redis = aioredis.from_url(REDIS_URL)
entries = await redis.xrange(f"calls:history:{call_id}", count=count)
await redis.close()
history = []
for entry_id, fields in entries:
record = {}
for k, v in fields.items():
key = k.decode() if isinstance(k, bytes) else k
val = v.decode() if isinstance(v, bytes) else v
try:
val = json.loads(val)
except (json.JSONDecodeError, TypeError):
pass
record[key] = val
history.append(record)
return {"call_id": call_id, "history": history, "count": len(history)}
@app.get("/api/alerts")
async def get_recent_alerts(
count: int = Query(default=20, le=100),
token: str = Query(default=""),
):
"""Get recent alerts."""
if not verify_token(token):
raise HTTPException(status_code=401, detail="Invalid token")
redis = aioredis.from_url(REDIS_URL)
raw = await redis.lrange("calls:recent_alerts", 0, count - 1)
await redis.close()
alerts = []
for item in raw:
try:
alerts.append(json.loads(item))
except json.JSONDecodeError:
continue
return {"alerts": alerts, "count": len(alerts)}
@app.get("/api/stats")
async def get_stats(token: str = Query(default="")):
"""Get server statistics."""
if not verify_token(token):
raise HTTPException(status_code=401, detail="Invalid token")
return {
"websocket": manager.get_stats(),
"uptime": time.time(),
}
# --- Dashboard ---
@app.get("/")
async def serve_dashboard():
"""Serve the dashboard HTML page."""
dashboard_path = os.path.join(STATIC_DIR, "dashboard.html")
if os.path.exists(dashboard_path):
return FileResponse(dashboard_path, media_type="text/html")
return HTMLResponse("<h1>Dashboard not found. Place dashboard.html in static/</h1>")
# Mount static files
if os.path.isdir(STATIC_DIR):
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
uvicorn.run(
"websocket_server:app",
host=SERVER_HOST,
port=SERVER_PORT,
reload=False,
log_level="info",
)
9. Dashboard Frontend
The dashboard is a single HTML file with embedded CSS and JavaScript. It connects to the WebSocket server and displays live call data, sentiment charts, and alert notifications. No build tools or frameworks required — just serve the file.
Complete Dashboard (dashboard.html)
Save this file as /opt/call-monitor/static/dashboard.html:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Call Monitor Dashboard</title>
<script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/chart.umd.min.js"></script>
<style>
/* ========== CSS Reset & Variables ========== */
:root {
--bg-primary: #0f1117;
--bg-secondary: #1a1d27;
--bg-tertiary: #252836;
--bg-hover: #2d3042;
--text-primary: #e4e6eb;
--text-secondary: #8b8fa3;
--text-muted: #5d6177;
--accent-blue: #4a9eff;
--accent-green: #2ecc71;
--accent-yellow: #f39c12;
--accent-red: #e74c3c;
--accent-orange: #e67e22;
--accent-purple: #9b59b6;
--border-color: #2d3042;
--shadow: 0 2px 8px rgba(0,0,0,0.3);
--radius: 8px;
--radius-lg: 12px;
--transition: 0.2s ease;
}
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto,
'Helvetica Neue', Arial, sans-serif;
background: var(--bg-primary);
color: var(--text-primary);
min-height: 100vh;
overflow-x: hidden;
}
/* ========== Layout ========== */
.header {
background: var(--bg-secondary);
border-bottom: 1px solid var(--border-color);
padding: 12px 24px;
display: flex;
align-items: center;
justify-content: space-between;
position: sticky;
top: 0;
z-index: 100;
}
.header h1 {
font-size: 18px;
font-weight: 600;
color: var(--text-primary);
}
.header h1 span {
color: var(--accent-blue);
}
.header-stats {
display: flex;
gap: 20px;
font-size: 13px;
color: var(--text-secondary);
}
.header-stats .stat-value {
font-weight: 600;
color: var(--text-primary);
margin-left: 4px;
}
.connection-status {
display: flex;
align-items: center;
gap: 6px;
font-size: 12px;
}
.connection-dot {
width: 8px;
height: 8px;
border-radius: 50%;
background: var(--accent-red);
}
.connection-dot.connected {
background: var(--accent-green);
box-shadow: 0 0 6px var(--accent-green);
}
.main-layout {
display: grid;
grid-template-columns: 320px 1fr;
grid-template-rows: auto 1fr;
height: calc(100vh - 53px);
gap: 0;
}
/* ========== Alert Banner ========== */
.alert-banner {
grid-column: 1 / -1;
background: var(--bg-secondary);
border-bottom: 1px solid var(--border-color);
padding: 0;
max-height: 0;
overflow: hidden;
transition: max-height 0.3s ease, padding 0.3s ease;
}
.alert-banner.visible {
max-height: 200px;
padding: 8px 24px;
}
.alert-item {
display: flex;
align-items: center;
gap: 12px;
padding: 8px 16px;
border-radius: var(--radius);
margin-bottom: 4px;
font-size: 13px;
animation: alertPulse 2s ease-in-out;
}
.alert-item.warning { background: rgba(243, 156, 18, 0.15); border-left: 3px solid var(--accent-yellow); }
.alert-item.urgent { background: rgba(230, 126, 34, 0.15); border-left: 3px solid var(--accent-orange); }
.alert-item.critical { background: rgba(231, 76, 60, 0.15); border-left: 3px solid var(--accent-red); }
@keyframes alertPulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.7; }
}
.alert-level-badge {
font-size: 10px;
font-weight: 700;
text-transform: uppercase;
padding: 2px 8px;
border-radius: 4px;
white-space: nowrap;
}
.alert-level-badge.warning { background: var(--accent-yellow); color: #000; }
.alert-level-badge.urgent { background: var(--accent-orange); color: #fff; }
.alert-level-badge.critical { background: var(--accent-red); color: #fff; }
.alert-dismiss {
margin-left: auto;
cursor: pointer;
color: var(--text-muted);
font-size: 16px;
}
.alert-dismiss:hover { color: var(--text-primary); }
/* ========== Sidebar — Active Calls ========== */
.sidebar {
background: var(--bg-secondary);
border-right: 1px solid var(--border-color);
overflow-y: auto;
padding: 16px;
}
.sidebar-header {
font-size: 13px;
font-weight: 600;
color: var(--text-secondary);
text-transform: uppercase;
letter-spacing: 1px;
margin-bottom: 12px;
display: flex;
justify-content: space-between;
align-items: center;
}
.call-count {
background: var(--accent-blue);
color: #fff;
font-size: 11px;
padding: 2px 8px;
border-radius: 10px;
}
.call-card {
background: var(--bg-tertiary);
border-radius: var(--radius);
padding: 12px;
margin-bottom: 8px;
cursor: pointer;
border: 2px solid transparent;
transition: all var(--transition);
}
.call-card:hover { background: var(--bg-hover); }
.call-card.selected {
border-color: var(--accent-blue);
background: rgba(74, 158, 255, 0.08);
}
.call-card-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 8px;
}
.call-agent {
font-size: 14px;
font-weight: 600;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
max-width: 180px;
}
.call-duration {
font-size: 12px;
color: var(--text-muted);
font-family: 'SF Mono', 'Fira Code', monospace;
}
.call-meta {
display: flex;
gap: 8px;
font-size: 12px;
color: var(--text-secondary);
margin-bottom: 8px;
}
.sentiment-indicator {
display: flex;
align-items: center;
gap: 6px;
}
.sentiment-bar {
width: 100%;
height: 4px;
background: var(--bg-primary);
border-radius: 2px;
overflow: hidden;
}
.sentiment-bar-fill {
height: 100%;
border-radius: 2px;
transition: width 0.5s ease, background 0.5s ease;
}
.sentiment-value {
font-size: 12px;
font-weight: 600;
font-family: 'SF Mono', 'Fira Code', monospace;
min-width: 40px;
text-align: right;
}
.no-calls {
text-align: center;
color: var(--text-muted);
padding: 40px 20px;
font-size: 14px;
}
/* ========== Main Content ========== */
.content {
display: flex;
flex-direction: column;
overflow: hidden;
}
.content-header {
padding: 16px 24px;
border-bottom: 1px solid var(--border-color);
display: flex;
justify-content: space-between;
align-items: center;
}
.content-header h2 {
font-size: 16px;
font-weight: 600;
}
.content-tabs {
display: flex;
gap: 4px;
}
.tab-btn {
padding: 6px 14px;
font-size: 12px;
background: none;
border: 1px solid var(--border-color);
border-radius: var(--radius);
color: var(--text-secondary);
cursor: pointer;
transition: all var(--transition);
}
.tab-btn:hover { background: var(--bg-hover); }
.tab-btn.active { background: var(--accent-blue); color: #fff; border-color: var(--accent-blue); }
/* ========== Transcript View ========== */
.transcript-container {
flex: 1;
overflow-y: auto;
padding: 16px 24px;
}
.transcript-entry {
display: flex;
gap: 12px;
padding: 8px 0;
border-bottom: 1px solid rgba(45, 48, 66, 0.5);
animation: fadeIn 0.3s ease;
}
@keyframes fadeIn {
from { opacity: 0; transform: translateY(8px); }
to { opacity: 1; transform: translateY(0); }
}
.transcript-time {
font-size: 11px;
color: var(--text-muted);
font-family: 'SF Mono', 'Fira Code', monospace;
min-width: 50px;
padding-top: 2px;
}
.transcript-speaker {
font-size: 11px;
font-weight: 700;
text-transform: uppercase;
padding: 2px 8px;
border-radius: 4px;
min-width: 60px;
text-align: center;
align-self: flex-start;
}
.transcript-speaker.agent { background: rgba(74, 158, 255, 0.2); color: var(--accent-blue); }
.transcript-speaker.caller { background: rgba(155, 89, 182, 0.2); color: var(--accent-purple); }
.transcript-speaker.unknown { background: rgba(139, 143, 163, 0.2); color: var(--text-secondary); }
.transcript-text {
flex: 1;
font-size: 14px;
line-height: 1.5;
}
.transcript-sentiment {
min-width: 36px;
text-align: center;
font-size: 18px;
align-self: flex-start;
}
.keyword-highlight {
background: rgba(231, 76, 60, 0.25);
color: var(--accent-red);
padding: 1px 4px;
border-radius: 3px;
font-weight: 600;
}
/* ========== Chart View ========== */
.chart-container {
padding: 24px;
height: 280px;
border-bottom: 1px solid var(--border-color);
}
/* ========== Agent Summary ========== */
.agent-summary {
padding: 16px 24px;
overflow-y: auto;
}
.summary-grid {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(240px, 1fr));
gap: 12px;
}
.summary-card {
background: var(--bg-tertiary);
border-radius: var(--radius);
padding: 16px;
}
.summary-card-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 12px;
}
.summary-card-name {
font-weight: 600;
font-size: 14px;
}
.summary-card-stat {
display: flex;
justify-content: space-between;
font-size: 12px;
color: var(--text-secondary);
margin-bottom: 4px;
}
.summary-card-stat span:last-child {
font-weight: 600;
color: var(--text-primary);
}
/* ========== Welcome Screen ========== */
.welcome-screen {
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
height: 100%;
color: var(--text-muted);
}
.welcome-screen h2 {
font-size: 22px;
margin-bottom: 8px;
color: var(--text-secondary);
}
.welcome-screen p {
font-size: 14px;
max-width: 400px;
text-align: center;
line-height: 1.6;
}
/* ========== Scrollbar ========== */
::-webkit-scrollbar { width: 6px; }
::-webkit-scrollbar-track { background: transparent; }
::-webkit-scrollbar-thumb { background: var(--bg-hover); border-radius: 3px; }
::-webkit-scrollbar-thumb:hover { background: var(--text-muted); }
/* ========== Responsive ========== */
@media (max-width: 768px) {
.main-layout {
grid-template-columns: 1fr;
}
.sidebar {
max-height: 200px;
border-right: none;
border-bottom: 1px solid var(--border-color);
}
}
</style>
</head>
<body>
<!-- Header -->
<div class="header">
<h1><span>Call</span> Monitor</h1>
<div class="header-stats">
<div>Active Calls: <span class="stat-value" id="activeCallCount">0</span></div>
<div>Alerts: <span class="stat-value" id="alertCount">0</span></div>
<div>Segments: <span class="stat-value" id="segmentCount">0</span></div>
</div>
<div class="connection-status">
<div class="connection-dot" id="connectionDot"></div>
<span id="connectionText">Disconnected</span>
</div>
</div>
<!-- Alert Banner -->
<div class="alert-banner" id="alertBanner">
<div id="alertList"></div>
</div>
<div class="main-layout">
<!-- Sidebar: Active Calls -->
<div class="sidebar">
<div class="sidebar-header">
Active Calls
<span class="call-count" id="sidebarCallCount">0</span>
</div>
<div id="callList">
<div class="no-calls">No active calls</div>
</div>
</div>
<!-- Main Content -->
<div class="content">
<div id="welcomeScreen" class="welcome-screen">
<h2>Call Monitor Dashboard</h2>
<p>Waiting for active calls. When calls begin, they will appear
in the sidebar. Click a call to view its live transcript
and sentiment analysis.</p>
</div>
<div id="callDetail" style="display: none; height: 100%; display: none; flex-direction: column;">
<div class="content-header">
<h2 id="detailTitle">Call Details</h2>
<div class="content-tabs">
<button class="tab-btn active" onclick="showTab('transcript')">Transcript</button>
<button class="tab-btn" onclick="showTab('chart')">Sentiment Chart</button>
<button class="tab-btn" onclick="showTab('summary')">Summary</button>
</div>
</div>
<!-- Sentiment Chart (always visible above transcript) -->
<div class="chart-container" id="chartPanel">
<canvas id="sentimentChart"></canvas>
</div>
<!-- Transcript Tab -->
<div class="transcript-container" id="transcriptPanel">
<!-- Transcript entries populated by JS -->
</div>
<!-- Agent Summary Tab -->
<div class="agent-summary" id="summaryPanel" style="display: none;">
<div class="summary-grid" id="summaryGrid"></div>
</div>
</div>
</div>
</div>
<script>
// ==================== Configuration ====================
const CONFIG = {
wsUrl: `ws://${window.location.host}/ws/calls`,
apiUrl: `http://${window.location.host}/api`,
token: new URLSearchParams(window.location.search).get('token') || 'your-secret-token-change-me',
reconnectInterval: 3000,
maxTranscriptEntries: 200,
maxAlerts: 10,
};
// ==================== State ====================
const state = {
ws: null,
connected: false,
selectedCallId: null,
calls: {}, // call_id -> { agent, caller, duration, sentiment, segments: [] }
alerts: [],
totalSegments: 0,
sentimentChart: null,
reconnectTimer: null,
};
// ==================== WebSocket ====================
function connectWebSocket() {
if (state.ws && state.ws.readyState <= 1) return;
const url = `${CONFIG.wsUrl}?token=${CONFIG.token}`;
state.ws = new WebSocket(url);
state.ws.onopen = () => {
state.connected = true;
updateConnectionStatus(true);
console.log('WebSocket connected');
// Start ping interval
state.pingInterval = setInterval(() => {
if (state.ws && state.ws.readyState === 1) {
state.ws.send(JSON.stringify({ action: 'ping' }));
}
}, 30000);
// Fetch initial active calls
fetchActiveCalls();
};
state.ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
handleMessage(data);
} catch (e) {
console.error('Parse error:', e);
}
};
state.ws.onclose = () => {
state.connected = false;
updateConnectionStatus(false);
clearInterval(state.pingInterval);
console.log('WebSocket disconnected, reconnecting...');
state.reconnectTimer = setTimeout(connectWebSocket, CONFIG.reconnectInterval);
};
state.ws.onerror = (err) => {
console.error('WebSocket error:', err);
};
}
function handleMessage(data) {
switch (data.type || data.event) {
case 'sentiment':
handleSentiment(data);
break;
case 'call_start':
handleCallStart(data);
break;
case 'call_end':
handleCallEnd(data);
break;
case 'alert':
handleAlert(data);
break;
case 'pong':
break;
default:
console.log('Unknown message type:', data);
}
}
// ==================== Message Handlers ====================
function handleSentiment(data) {
const callId = data.call_id;
// Ensure call exists in state
if (!state.calls[callId]) {
state.calls[callId] = createCallState(callId);
}
const call = state.calls[callId];
// Add segment
const segment = {
text: data.text,
speaker: data.speaker || 'unknown',
sentiment_score: data.sentiment_score,
sentiment_label: data.sentiment_label,
keywords: data.keywords_detected || [],
profanity: data.profanity_detected || [],
timestamp: data.timestamp,
start_time: data.start_time,
};
call.segments.push(segment);
call.rolling_sentiment = data.rolling_sentiment;
call.last_text = data.text;
state.totalSegments++;
// Update UI
updateCallCard(callId);
updateHeaderStats();
if (state.selectedCallId === callId) {
addTranscriptEntry(segment);
updateSentimentChart(callId);
}
}
function handleCallStart(data) {
const callId = data.call_id;
if (!state.calls[callId]) {
state.calls[callId] = createCallState(callId, data);
}
updateCallList();
updateHeaderStats();
}
function handleCallEnd(data) {
const callId = data.call_id;
const call = state.calls[callId];
if (call) {
call.ended = true;
call.endDuration = data.duration;
updateCallCard(callId);
// Remove after 60 seconds
setTimeout(() => {
delete state.calls[callId];
updateCallList();
updateHeaderStats();
if (state.selectedCallId === callId) {
state.selectedCallId = null;
showWelcomeScreen();
}
}, 60000);
}
}
function handleAlert(data) {
state.alerts.unshift(data);
if (state.alerts.length > CONFIG.maxAlerts) {
state.alerts.pop();
}
showAlertBanner(data);
updateHeaderStats();
}
// ==================== State Helpers ====================
function createCallState(callId, extra = {}) {
return {
call_id: callId,
agent: extra.agent || 'Unknown Agent',
caller: extra.caller || 'Unknown',
campaign: extra.campaign || '',
start_time: extra.start_time || extra.timestamp || Date.now() / 1000,
segments: [],
rolling_sentiment: 0,
last_text: '',
ended: false,
};
}
// ==================== UI Updates ====================
function updateConnectionStatus(connected) {
const dot = document.getElementById('connectionDot');
const text = document.getElementById('connectionText');
if (connected) {
dot.classList.add('connected');
text.textContent = 'Connected';
} else {
dot.classList.remove('connected');
text.textContent = 'Disconnected';
}
}
function updateHeaderStats() {
const activeCalls = Object.values(state.calls).filter(c => !c.ended).length;
document.getElementById('activeCallCount').textContent = activeCalls;
document.getElementById('alertCount').textContent = state.alerts.length;
document.getElementById('segmentCount').textContent = state.totalSegments;
document.getElementById('sidebarCallCount').textContent = activeCalls;
}
function updateCallList() {
const container = document.getElementById('callList');
const calls = Object.values(state.calls);
if (calls.length === 0) {
container.innerHTML = '<div class="no-calls">No active calls</div>';
return;
}
// Sort by start time (newest first)
calls.sort((a, b) => (b.start_time || 0) - (a.start_time || 0));
container.innerHTML = calls.map(call => buildCallCard(call)).join('');
}
function buildCallCard(call) {
const duration = formatDuration(call);
const sentimentColor = getSentimentColor(call.rolling_sentiment);
const sentimentPercent = ((call.rolling_sentiment + 1) / 2) * 100;
const selected = state.selectedCallId === call.call_id ? 'selected' : '';
const endedClass = call.ended ? 'opacity: 0.5;' : '';
return `
<div class="call-card ${selected}" style="${endedClass}"
onclick="selectCall('${call.call_id}')" id="card-${call.call_id}">
<div class="call-card-header">
<div class="call-agent">${escapeHtml(call.agent)}</div>
<div class="call-duration">${duration}</div>
</div>
<div class="call-meta">
<span>${escapeHtml(call.caller)}</span>
${call.campaign ? `<span>${escapeHtml(call.campaign)}</span>` : ''}
${call.ended ? '<span style="color: var(--accent-red);">ENDED</span>' : ''}
</div>
<div class="sentiment-indicator">
<div class="sentiment-bar">
<div class="sentiment-bar-fill" style="width: ${sentimentPercent}%; background: ${sentimentColor};"></div>
</div>
<div class="sentiment-value" style="color: ${sentimentColor};">
${call.rolling_sentiment >= 0 ? '+' : ''}${call.rolling_sentiment.toFixed(2)}
</div>
</div>
</div>
`;
}
function updateCallCard(callId) {
const call = state.calls[callId];
if (!call) return;
const existing = document.getElementById(`card-${callId}`);
if (existing) {
existing.outerHTML = buildCallCard(call);
} else {
updateCallList();
}
}
function selectCall(callId) {
state.selectedCallId = callId;
// Update sidebar selection
document.querySelectorAll('.call-card').forEach(card => {
card.classList.remove('selected');
});
const card = document.getElementById(`card-${callId}`);
if (card) card.classList.add('selected');
// Show call detail
showCallDetail(callId);
}
function showCallDetail(callId) {
const call = state.calls[callId];
if (!call) return;
document.getElementById('welcomeScreen').style.display = 'none';
const detail = document.getElementById('callDetail');
detail.style.display = 'flex';
document.getElementById('detailTitle').textContent =
`${call.agent} - ${call.caller}`;
// Clear and rebuild transcript
const container = document.getElementById('transcriptPanel');
container.innerHTML = '';
call.segments.forEach(seg => addTranscriptEntry(seg));
// Initialize chart
initSentimentChart(callId);
// Auto-scroll to bottom
container.scrollTop = container.scrollHeight;
}
function showWelcomeScreen() {
document.getElementById('welcomeScreen').style.display = 'flex';
document.getElementById('callDetail').style.display = 'none';
}
function addTranscriptEntry(segment) {
const container = document.getElementById('transcriptPanel');
const timeStr = segment.start_time ? formatSeconds(segment.start_time) : '--:--';
const sentimentIcon = getSentimentIcon(segment.sentiment_score);
const speakerClass = segment.speaker || 'unknown';
// Highlight keywords
let text = escapeHtml(segment.text);
if (segment.keywords && segment.keywords.length > 0) {
segment.keywords.forEach(kw => {
const regex = new RegExp(`\\b(${escapeRegex(kw)})\\b`, 'gi');
text = text.replace(regex, '<span class="keyword-highlight">$1</span>');
});
}
if (segment.profanity && segment.profanity.length > 0) {
segment.profanity.forEach(word => {
const regex = new RegExp(`\\b(${escapeRegex(word)})\\b`, 'gi');
text = text.replace(regex, '<span class="keyword-highlight">$1</span>');
});
}
const entry = document.createElement('div');
entry.className = 'transcript-entry';
entry.innerHTML = `
<div class="transcript-time">${timeStr}</div>
<div class="transcript-speaker ${speakerClass}">${speakerClass}</div>
<div class="transcript-text">${text}</div>
<div class="transcript-sentiment">${sentimentIcon}</div>
`;
container.appendChild(entry);
// Trim old entries
while (container.children.length > CONFIG.maxTranscriptEntries) {
container.removeChild(container.firstChild);
}
// Auto-scroll if near bottom
const isNearBottom = container.scrollHeight - container.scrollTop - container.clientHeight < 100;
if (isNearBottom) {
container.scrollTop = container.scrollHeight;
}
}
// ==================== Sentiment Chart ====================
function initSentimentChart(callId) {
const call = state.calls[callId];
if (!call) return;
const ctx = document.getElementById('sentimentChart').getContext('2d');
if (state.sentimentChart) {
state.sentimentChart.destroy();
}
const labels = call.segments.map((s, i) => formatSeconds(s.start_time || i * 5));
const scores = call.segments.map(s => s.sentiment_score);
state.sentimentChart = new Chart(ctx, {
type: 'line',
data: {
labels: labels,
datasets: [{
label: 'Sentiment Score',
data: scores,
borderColor: function(context) {
const index = context.dataIndex;
const value = context.dataset.data[index];
if (value === undefined) return '#4a9eff';
if (value > 0.1) return '#2ecc71';
if (value < -0.1) return '#e74c3c';
return '#f39c12';
},
segment: {
borderColor: function(ctx) {
const prev = ctx.p0.parsed.y;
const curr = ctx.p1.parsed.y;
const avg = (prev + curr) / 2;
if (avg > 0.1) return '#2ecc71';
if (avg < -0.1) return '#e74c3c';
return '#f39c12';
},
},
backgroundColor: 'rgba(74, 158, 255, 0.1)',
fill: true,
tension: 0.3,
pointRadius: 4,
pointBackgroundColor: function(context) {
const value = context.raw;
if (value > 0.1) return '#2ecc71';
if (value < -0.1) return '#e74c3c';
return '#f39c12';
},
pointBorderWidth: 0,
}]
},
options: {
responsive: true,
maintainAspectRatio: false,
animation: { duration: 300 },
plugins: {
legend: { display: false },
tooltip: {
callbacks: {
label: (ctx) => `Sentiment: ${ctx.raw.toFixed(3)}`,
afterLabel: (ctx) => {
const seg = call.segments[ctx.dataIndex];
return seg ? `"${seg.text.substring(0, 60)}..."` : '';
}
}
}
},
scales: {
y: {
min: -1.0,
max: 1.0,
grid: { color: 'rgba(45, 48, 66, 0.5)' },
ticks: { color: '#8b8fa3', font: { size: 11 } },
},
x: {
grid: { display: false },
ticks: {
color: '#8b8fa3',
font: { size: 10 },
maxRotation: 0,
maxTicksLimit: 15,
},
}
}
}
});
}
function updateSentimentChart(callId) {
const call = state.calls[callId];
if (!call || !state.sentimentChart) return;
const labels = call.segments.map((s, i) => formatSeconds(s.start_time || i * 5));
const scores = call.segments.map(s => s.sentiment_score);
state.sentimentChart.data.labels = labels;
state.sentimentChart.data.datasets[0].data = scores;
state.sentimentChart.update('none');
}
// ==================== Alert Banner ====================
function showAlertBanner(alert) {
const banner = document.getElementById('alertBanner');
const list = document.getElementById('alertList');
const reasons = alert.reasons ? alert.reasons.join(', ') : '';
const callId = alert.call_id || 'unknown';
const level = alert.alert_level || 'warning';
const id = `alert-${Date.now()}`;
const html = `
<div class="alert-item ${level}" id="${id}">
<span class="alert-level-badge ${level}">${level}</span>
<span><strong>Call ${callId.substring(0, 12)}...</strong> ${reasons}</span>
<span style="color: var(--text-muted); font-size: 11px;">${escapeHtml(alert.text || '').substring(0, 50)}</span>
<span class="alert-dismiss" onclick="dismissAlert('${id}')">×</span>
</div>
`;
list.insertAdjacentHTML('afterbegin', html);
banner.classList.add('visible');
// Auto-dismiss after 30 seconds for warnings
if (level === 'warning') {
setTimeout(() => dismissAlert(id), 30000);
}
// Trim old alerts
while (list.children.length > CONFIG.maxAlerts) {
list.removeChild(list.lastChild);
}
}
function dismissAlert(id) {
const el = document.getElementById(id);
if (el) el.remove();
const list = document.getElementById('alertList');
if (list.children.length === 0) {
document.getElementById('alertBanner').classList.remove('visible');
}
}
// ==================== Tabs ====================
function showTab(tab) {
document.querySelectorAll('.tab-btn').forEach(btn => btn.classList.remove('active'));
event.target.classList.add('active');
document.getElementById('transcriptPanel').style.display = tab === 'transcript' ? 'block' : 'none';
document.getElementById('chartPanel').style.display = (tab === 'transcript' || tab === 'chart') ? 'block' : 'none';
document.getElementById('summaryPanel').style.display = tab === 'summary' ? 'block' : 'none';
if (tab === 'summary') {
buildSummary();
}
}
function buildSummary() {
const grid = document.getElementById('summaryGrid');
const agents = {};
// Aggregate stats per agent
Object.values(state.calls).forEach(call => {
const agent = call.agent;
if (!agents[agent]) {
agents[agent] = { calls: 0, segments: 0, totalSentiment: 0, alerts: 0, keywords: [] };
}
agents[agent].calls++;
agents[agent].segments += call.segments.length;
call.segments.forEach(seg => {
agents[agent].totalSentiment += seg.sentiment_score;
if (seg.keywords) agents[agent].keywords.push(...seg.keywords);
});
});
// Count alerts per agent
state.alerts.forEach(alert => {
const call = state.calls[alert.call_id];
if (call && agents[call.agent]) {
agents[call.agent].alerts++;
}
});
grid.innerHTML = Object.entries(agents).map(([name, stats]) => {
const avgSentiment = stats.segments > 0 ? (stats.totalSentiment / stats.segments) : 0;
const color = getSentimentColor(avgSentiment);
const uniqueKeywords = [...new Set(stats.keywords)];
return `
<div class="summary-card">
<div class="summary-card-header">
<span class="summary-card-name">${escapeHtml(name)}</span>
<span style="color: ${color}; font-weight: 600;">
${avgSentiment >= 0 ? '+' : ''}${avgSentiment.toFixed(2)}
</span>
</div>
<div class="summary-card-stat"><span>Active Calls</span><span>${stats.calls}</span></div>
<div class="summary-card-stat"><span>Segments</span><span>${stats.segments}</span></div>
<div class="summary-card-stat"><span>Alerts</span><span style="color: ${stats.alerts > 0 ? 'var(--accent-red)' : 'inherit'}">${stats.alerts}</span></div>
<div class="summary-card-stat"><span>Keywords</span><span>${uniqueKeywords.length > 0 ? uniqueKeywords.join(', ') : 'None'}</span></div>
</div>
`;
}).join('');
if (Object.keys(agents).length === 0) {
grid.innerHTML = '<div class="no-calls">No agent data available</div>';
}
}
// ==================== API Calls ====================
async function fetchActiveCalls() {
try {
const res = await fetch(`${CONFIG.apiUrl}/calls?token=${CONFIG.token}`);
const data = await res.json();
if (data.calls) {
data.calls.forEach(call => {
if (!state.calls[call.call_id]) {
state.calls[call.call_id] = createCallState(call.call_id, call);
}
});
updateCallList();
updateHeaderStats();
}
} catch (e) {
console.error('Failed to fetch active calls:', e);
}
}
// ==================== Utility Functions ====================
function formatDuration(call) {
const elapsed = Date.now() / 1000 - (call.start_time || Date.now() / 1000);
return formatSeconds(call.ended ? (call.endDuration || elapsed) : elapsed);
}
function formatSeconds(totalSeconds) {
const mins = Math.floor(totalSeconds / 60);
const secs = Math.floor(totalSeconds % 60);
return `${mins.toString().padStart(2, '0')}:${secs.toString().padStart(2, '0')}`;
}
function getSentimentColor(score) {
if (score > 0.3) return 'var(--accent-green)';
if (score > 0.1) return '#7bc67b';
if (score > -0.1) return 'var(--accent-yellow)';
if (score > -0.3) return 'var(--accent-orange)';
return 'var(--accent-red)';
}
function getSentimentIcon(score) {
if (score > 0.3) return '<span style="color: var(--accent-green);" title="Positive">▲</span>';
if (score > 0.05) return '<span style="color: #7bc67b;" title="Slightly positive">▲</span>';
if (score > -0.05) return '<span style="color: var(--accent-yellow);" title="Neutral">▶</span>';
if (score > -0.3) return '<span style="color: var(--accent-orange);" title="Slightly negative">▼</span>';
return '<span style="color: var(--accent-red);" title="Negative">▼</span>';
}
function escapeHtml(str) {
const div = document.createElement('div');
div.textContent = str || '';
return div.innerHTML;
}
function escapeRegex(str) {
return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
}
// ==================== Duration Updater ====================
setInterval(() => {
// Update call durations every second
document.querySelectorAll('.call-duration').forEach((el, index) => {
const calls = Object.values(state.calls);
if (calls[index] && !calls[index].ended) {
el.textContent = formatDuration(calls[index]);
}
});
}, 1000);
// ==================== Initialize ====================
document.addEventListener('DOMContentLoaded', () => {
connectWebSocket();
});
</script>
</body>
</html>
10. Alert System
The alert system monitors sentiment and keyword triggers, then notifies supervisors through multiple channels. It implements escalation levels (warning, urgent, critical) with configurable thresholds and cooldown periods to prevent alert fatigue.
Complete Alert Manager
#!/usr/bin/env python3
"""
alert_manager.py — Configurable alert system for call monitoring.
Monitors sentiment data from Redis and triggers alerts through
multiple channels: dashboard, email (Resend), Slack webhook,
and optionally supervisor phone calls via AMI.
Alert Levels:
- warning: Keyword detected, mild negative trend
- urgent: Profanity, sustained negative sentiment (3+ utterances)
- critical: Very negative sentiment (< -0.7), 5+ consecutive negatives
Includes cooldown to prevent alert fatigue.
"""
import json
import time
import logging
import asyncio
from typing import Dict, List, Optional
from dataclasses import dataclass, field
import httpx
import redis.asyncio as aioredis
logger = logging.getLogger(__name__)
@dataclass
class AlertRule:
"""Defines when to trigger an alert."""
name: str
level: str # warning, urgent, critical
description: str
enabled: bool = True
# Conditions (any True condition triggers the alert)
min_sentiment: Optional[float] = None # Trigger if score below this
consecutive_negative: Optional[int] = None # Trigger after N consecutive negatives
keywords: Optional[List[str]] = None # Trigger on these keywords
profanity: bool = False # Trigger on any profanity
rolling_below: Optional[float] = None # Trigger if rolling avg below this
# Cooldown
cooldown_seconds: int = 120 # Minimum seconds between alerts of same type per call
@dataclass
class CallAlertState:
"""Track alert state for a single call."""
call_id: str
last_alert_time: Dict[str, float] = field(default_factory=dict) # rule_name -> timestamp
alert_count: int = 0
escalated: bool = False
# Default alert rules
DEFAULT_RULES = [
AlertRule(
name="keyword_detected",
level="warning",
description="Alert keyword detected in conversation",
keywords=[
"cancel", "supervisor", "manager", "complaint",
"escalate", "refund", "unacceptable",
],
cooldown_seconds=60,
),
AlertRule(
name="legal_threat",
level="urgent",
description="Legal or regulatory threat detected",
keywords=["legal", "lawyer", "sue", "court", "regulator", "ombudsman", "ofcom"],
cooldown_seconds=0, # Always alert immediately
),
AlertRule(
name="profanity",
level="urgent",
description="Profanity detected in conversation",
profanity=True,
cooldown_seconds=120,
),
AlertRule(
name="negative_trend",
level="warning",
description="Sustained negative sentiment trend",
consecutive_negative=3,
cooldown_seconds=180,
),
AlertRule(
name="severe_negative_trend",
level="urgent",
description="Severe sustained negative sentiment",
consecutive_negative=5,
cooldown_seconds=120,
),
AlertRule(
name="very_negative",
level="critical",
description="Very negative sentiment detected",
min_sentiment=-0.7,
cooldown_seconds=60,
),
AlertRule(
name="declining_call",
level="urgent",
description="Call sentiment consistently declining",
rolling_below=-0.4,
cooldown_seconds=300,
),
]
class AlertNotifier:
"""
Sends alert notifications through configured channels.
"""
def __init__(
self,
enable_dashboard: bool = True,
enable_email: bool = False,
enable_slack: bool = False,
resend_api_key: str = "",
email_from: str = "[email protected]",
email_to: str = "[email protected]",
slack_webhook_url: str = "",
):
self.enable_dashboard = enable_dashboard
self.enable_email = enable_email
self.enable_slack = enable_slack
self.resend_api_key = resend_api_key
self.email_from = email_from
self.email_to = email_to
self.slack_webhook_url = slack_webhook_url
async def send(self, alert: dict, redis_client: aioredis.Redis):
"""Send alert through all enabled channels."""
tasks = []
if self.enable_dashboard:
tasks.append(self._send_dashboard(alert, redis_client))
if self.enable_email and self.resend_api_key:
tasks.append(self._send_email(alert))
if self.enable_slack and self.slack_webhook_url:
tasks.append(self._send_slack(alert))
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
async def _send_dashboard(self, alert: dict, redis_client: aioredis.Redis):
"""Publish alert to dashboard via Redis."""
await redis_client.publish("calls:alerts", json.dumps(alert))
await redis_client.lpush("calls:recent_alerts", json.dumps(alert))
await redis_client.ltrim("calls:recent_alerts", 0, 99)
async def _send_email(self, alert: dict):
"""Send alert via Resend email API."""
level = alert.get("alert_level", "warning").upper()
call_id = alert.get("call_id", "unknown")
reasons = ", ".join(alert.get("reasons", []))
text = alert.get("text", "")
subject = f"[{level}] Call Alert: {call_id[:16]}"
html_body = f"""
<div style="font-family: Arial, sans-serif; max-width: 600px;">
<div style="background: {'#e74c3c' if level == 'CRITICAL' else '#e67e22' if level == 'URGENT' else '#f39c12'};
color: white; padding: 12px 20px; border-radius: 8px 8px 0 0;">
<h2 style="margin: 0;">{level} Alert</h2>
</div>
<div style="background: #f8f9fa; padding: 20px; border-radius: 0 0 8px 8px;">
<p><strong>Call ID:</strong> {call_id}</p>
<p><strong>Reasons:</strong> {reasons}</p>
<p><strong>Sentiment:</strong> {alert.get('sentiment_score', 'N/A')}</p>
<p><strong>Text:</strong> "{text[:200]}"</p>
<p><strong>Time:</strong> {time.strftime('%Y-%m-%d %H:%M:%S')}</p>
<hr>
<p style="color: #666; font-size: 12px;">
Open the <a href="http://YOUR_SERVER_IP:8095/?token=YOUR_TOKEN">Call Monitor Dashboard</a> to view live details.
</p>
</div>
</div>
"""
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
"https://api.resend.com/emails",
headers={
"Authorization": f"Bearer {self.resend_api_key}",
"Content-Type": "application/json",
},
json={
"from": self.email_from,
"to": [self.email_to],
"subject": subject,
"html": html_body,
},
timeout=10,
)
if resp.status_code == 200:
logger.info(f"Email alert sent: {subject}")
else:
logger.error(f"Email send failed: {resp.status_code} {resp.text}")
except Exception as e:
logger.error(f"Email alert error: {e}")
async def _send_slack(self, alert: dict):
"""Send alert via Slack webhook."""
level = alert.get("alert_level", "warning")
call_id = alert.get("call_id", "unknown")
reasons = ", ".join(alert.get("reasons", []))
text = alert.get("text", "")
emoji = {"warning": ":warning:", "urgent": ":rotating_light:", "critical": ":fire:"}.get(level, ":bell:")
color = {"warning": "#f39c12", "urgent": "#e67e22", "critical": "#e74c3c"}.get(level, "#95a5a6")
payload = {
"attachments": [{
"color": color,
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{emoji} {level.upper()} Alert",
}
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"*Call:*\n{call_id[:20]}"},
{"type": "mrkdwn", "text": f"*Reasons:*\n{reasons}"},
{"type": "mrkdwn", "text": f"*Sentiment:*\n{alert.get('sentiment_score', 'N/A')}"},
{"type": "mrkdwn", "text": f"*Text:*\n_{text[:100]}_"},
]
}
]
}]
}
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
self.slack_webhook_url,
json=payload,
timeout=10,
)
if resp.status_code == 200:
logger.info(f"Slack alert sent: {level} for {call_id}")
else:
logger.error(f"Slack send failed: {resp.status_code}")
except Exception as e:
logger.error(f"Slack alert error: {e}")
class AlertManager:
"""
Manages alert rules, evaluates conditions, and dispatches notifications.
Subscribes to sentiment results from Redis and checks each one
against configured alert rules.
"""
def __init__(
self,
redis_url: str = "redis://127.0.0.1:6379/0",
rules: Optional[List[AlertRule]] = None,
notifier: Optional[AlertNotifier] = None,
):
self.redis_url = redis_url
self.rules = rules or DEFAULT_RULES
self.notifier = notifier or AlertNotifier()
self.redis: Optional[aioredis.Redis] = None
self.call_states: Dict[str, CallAlertState] = {}
async def start(self):
"""Start the alert manager."""
self.redis = aioredis.from_url(self.redis_url)
pubsub = self.redis.pubsub()
await pubsub.subscribe("calls:sentiment:all", "calls:events")
logger.info(
f"AlertManager started with {len(self.rules)} rules "
f"({sum(1 for r in self.rules if r.enabled)} enabled)"
)
async for message in pubsub.listen():
if message["type"] != "message":
continue
channel = message["channel"]
if isinstance(channel, bytes):
channel = channel.decode()
try:
data = json.loads(message["data"])
except (json.JSONDecodeError, TypeError):
continue
if channel == "calls:events":
event = data.get("event")
call_id = data.get("call_id")
if event == "call_start":
self.call_states[call_id] = CallAlertState(call_id=call_id)
elif event == "call_end":
self.call_states.pop(call_id, None)
elif channel == "calls:sentiment:all":
await self._evaluate_rules(data)
async def _evaluate_rules(self, sentiment_data: dict):
"""Evaluate all alert rules against a sentiment result."""
call_id = sentiment_data.get("call_id", "")
if not call_id:
return
# Ensure call state exists
if call_id not in self.call_states:
self.call_states[call_id] = CallAlertState(call_id=call_id)
call_state = self.call_states[call_id]
now = time.time()
triggered_rules = []
for rule in self.rules:
if not rule.enabled:
continue
# Check cooldown
last_fired = call_state.last_alert_time.get(rule.name, 0)
if now - last_fired < rule.cooldown_seconds:
continue
# Evaluate conditions
triggered = False
reasons = []
# Check minimum sentiment threshold
if rule.min_sentiment is not None:
score = sentiment_data.get("sentiment_score", 0)
if score < rule.min_sentiment:
triggered = True
reasons.append(f"Sentiment {score:.2f} below {rule.min_sentiment}")
# Check consecutive negative count
if rule.consecutive_negative is not None:
# We need to count from the sentiment service's state
# This is passed through the rolling_sentiment or we track it here
label = sentiment_data.get("sentiment_label", "neutral")
if label == "negative":
call_state.alert_count += 1
else:
call_state.alert_count = 0
if call_state.alert_count >= rule.consecutive_negative:
triggered = True
reasons.append(
f"{call_state.alert_count} consecutive negative utterances"
)
# Check keywords
if rule.keywords:
detected = sentiment_data.get("keywords_detected", [])
matches = [k for k in detected if k.lower() in [rk.lower() for rk in rule.keywords]]
if matches:
triggered = True
reasons.append(f"Keywords: {', '.join(matches)}")
# Check profanity
if rule.profanity:
profanity = sentiment_data.get("profanity_detected", [])
if profanity:
triggered = True
reasons.append(f"Profanity: {', '.join(profanity)}")
# Check rolling average
if rule.rolling_below is not None:
rolling = sentiment_data.get("rolling_sentiment", 0)
if rolling < rule.rolling_below:
triggered = True
reasons.append(f"Rolling sentiment {rolling:.2f} below {rule.rolling_below}")
if triggered:
triggered_rules.append((rule, reasons))
call_state.last_alert_time[rule.name] = now
# If any rules triggered, send alert with highest severity
if triggered_rules:
# Sort by severity (critical > urgent > warning)
severity_order = {"critical": 3, "urgent": 2, "warning": 1}
triggered_rules.sort(
key=lambda x: severity_order.get(x[0].level, 0), reverse=True
)
highest_rule, all_reasons_flat = triggered_rules[0]
# Collect all reasons from all triggered rules
all_reasons = []
for rule, reasons in triggered_rules:
all_reasons.extend(reasons)
alert = {
"type": "alert",
"alert_level": highest_rule.level,
"call_id": call_id,
"timestamp": now,
"reasons": all_reasons,
"text": sentiment_data.get("text", ""),
"speaker": sentiment_data.get("speaker", "unknown"),
"sentiment_score": sentiment_data.get("sentiment_score", 0),
"rolling_sentiment": sentiment_data.get("rolling_sentiment", 0),
"rules_triggered": [r.name for r, _ in triggered_rules],
}
await self.notifier.send(alert, self.redis)
logger.warning(
f"Alert [{highest_rule.level}] for call {call_id}: "
f"{all_reasons}"
)
# Standalone entry point
async def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
import os
notifier = AlertNotifier(
enable_dashboard=True,
enable_email=bool(os.getenv("RESEND_API_KEY")),
enable_slack=bool(os.getenv("SLACK_WEBHOOK_URL")),
resend_api_key=os.getenv("RESEND_API_KEY", ""),
email_from=os.getenv("ALERT_EMAIL_FROM", "[email protected]"),
email_to=os.getenv("ALERT_EMAIL_TO", "[email protected]"),
slack_webhook_url=os.getenv("SLACK_WEBHOOK_URL", ""),
)
manager = AlertManager(notifier=notifier)
await manager.start()
if __name__ == "__main__":
asyncio.run(main())
11. Production Deployment
systemd Service Files
Create three systemd services for the core components. Each runs independently and communicates through Redis.
Transcription Service:
# /etc/systemd/system/call-transcription.service
[Unit]
Description=Call Transcription Service
After=network.target redis-server.service
Requires=redis-server.service
[Service]
Type=simple
User=root
WorkingDirectory=/opt/call-monitor
Environment=TRANSCRIPTION_ENGINE=whisper
Environment=WHISPER_MODEL=base
Environment=WHISPER_DEVICE=cpu
Environment=WHISPER_COMPUTE_TYPE=int8
Environment=WHISPER_LANGUAGE=en
Environment=SAMPLE_RATE=8000
Environment=CHUNK_DURATION=5.0
Environment=REDIS_URL=redis://127.0.0.1:6379/0
ExecStart=/opt/call-monitor/venv/bin/python -u main_transcription.py
Restart=always
RestartSec=5
StandardOutput=journal
StandardError=journal
# Resource limits
CPUQuota=200%
MemoryMax=4G
MemoryHigh=3G
[Install]
WantedBy=multi-user.target
Sentiment + Alert Service:
# /etc/systemd/system/call-sentiment.service
[Unit]
Description=Call Sentiment Analysis & Alert Service
After=network.target redis-server.service
Requires=redis-server.service
[Service]
Type=simple
User=root
WorkingDirectory=/opt/call-monitor
Environment=SENTIMENT_ENGINE=vader
Environment=REDIS_URL=redis://127.0.0.1:6379/0
Environment=RESEND_API_KEY=
Environment=SLACK_WEBHOOK_URL=
ExecStart=/opt/call-monitor/venv/bin/python -u main_sentiment.py
Restart=always
RestartSec=5
StandardOutput=journal
StandardError=journal
# Sentiment analysis is lightweight
CPUQuota=50%
MemoryMax=512M
[Install]
WantedBy=multi-user.target
WebSocket Dashboard Service:
# /etc/systemd/system/call-dashboard.service
[Unit]
Description=Call Monitor WebSocket Dashboard
After=network.target redis-server.service
Requires=redis-server.service
[Service]
Type=simple
User=root
WorkingDirectory=/opt/call-monitor
Environment=SERVER_HOST=0.0.0.0
Environment=SERVER_PORT=8095
Environment=AUTH_TOKEN=your-secret-token-change-me
Environment=REDIS_URL=redis://127.0.0.1:6379/0
ExecStart=/opt/call-monitor/venv/bin/python -u websocket_server.py
Restart=always
RestartSec=5
StandardOutput=journal
StandardError=journal
CPUQuota=100%
MemoryMax=512M
[Install]
WantedBy=multi-user.target
Main Entry Points
Transcription entry point:
#!/usr/bin/env python3
"""
main_transcription.py — Entry point for the transcription service.
Starts the AudioSocket server and routes audio to the transcription engine.
"""
import asyncio
import logging
import os
from audio_capture import AudioSocketServer
from transcription_service import TranscriptionManager
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
async def main():
engine = os.getenv("TRANSCRIPTION_ENGINE", "whisper")
redis_url = os.getenv("REDIS_URL", "redis://127.0.0.1:6379/0")
# Build engine kwargs based on engine type
if engine == "whisper":
engine_kwargs = {
"model_size": os.getenv("WHISPER_MODEL", "base"),
"device": os.getenv("WHISPER_DEVICE", "cpu"),
"compute_type": os.getenv("WHISPER_COMPUTE_TYPE", "int8"),
"language": os.getenv("WHISPER_LANGUAGE", "en"),
"sample_rate": int(os.getenv("SAMPLE_RATE", "8000")),
}
elif engine == "deepgram":
engine_kwargs = {
"api_key": os.getenv("DEEPGRAM_API_KEY", ""),
"model": os.getenv("DEEPGRAM_MODEL", "nova-2"),
"language": os.getenv("DEEPGRAM_LANGUAGE", "en-GB"),
"sample_rate": int(os.getenv("SAMPLE_RATE", "8000")),
}
else:
raise ValueError(f"Unknown engine: {engine}")
# Initialize transcription manager
tx_manager = TranscriptionManager(
engine=engine,
redis_url=redis_url,
**engine_kwargs,
)
await tx_manager.initialize()
# Start AudioSocket server with callbacks
audio_server = AudioSocketServer(
host="0.0.0.0",
port=int(os.getenv("AUDIOSOCKET_PORT", "9093")),
sample_rate=int(os.getenv("SAMPLE_RATE", "8000")),
chunk_duration=float(os.getenv("CHUNK_DURATION", "5.0")),
on_audio_chunk=tx_manager.on_audio_chunk,
on_call_start=tx_manager.on_call_start,
on_call_end=tx_manager.on_call_end,
)
logger.info(f"Starting transcription service (engine={engine})")
await audio_server.start()
if __name__ == "__main__":
asyncio.run(main())
Sentiment entry point:
#!/usr/bin/env python3
"""
main_sentiment.py — Entry point for sentiment analysis + alert manager.
Runs both services concurrently.
"""
import asyncio
import logging
import os
from sentiment_service import SentimentService
from alert_manager import AlertManager, AlertNotifier
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
async def main():
redis_url = os.getenv("REDIS_URL", "redis://127.0.0.1:6379/0")
# Start sentiment service
sentiment = SentimentService(
engine=os.getenv("SENTIMENT_ENGINE", "vader"),
redis_url=redis_url,
)
# Start alert manager
notifier = AlertNotifier(
enable_dashboard=True,
enable_email=bool(os.getenv("RESEND_API_KEY")),
enable_slack=bool(os.getenv("SLACK_WEBHOOK_URL")),
resend_api_key=os.getenv("RESEND_API_KEY", ""),
email_from=os.getenv("ALERT_EMAIL_FROM", "[email protected]"),
email_to=os.getenv("ALERT_EMAIL_TO", "[email protected]"),
slack_webhook_url=os.getenv("SLACK_WEBHOOK_URL", ""),
)
alert_mgr = AlertManager(redis_url=redis_url, notifier=notifier)
logger.info("Starting sentiment analysis and alert manager")
# Run both concurrently
await asyncio.gather(
sentiment.start(),
alert_mgr.start(),
)
if __name__ == "__main__":
asyncio.run(main())
Enable and Start Services
# Reload systemd
systemctl daemon-reload
# Enable all services
systemctl enable call-transcription call-sentiment call-dashboard
# Start services
systemctl start call-transcription
systemctl start call-sentiment
systemctl start call-dashboard
# Check status
systemctl status call-transcription call-sentiment call-dashboard
# View logs
journalctl -u call-transcription -f
journalctl -u call-sentiment -f
journalctl -u call-dashboard -f
Nginx Reverse Proxy
# /etc/nginx/sites-available/call-monitor
server {
listen 80;
server_name call-monitor.yourdomain.com;
# Dashboard and API
location / {
proxy_pass http://127.0.0.1:8095;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# WebSocket endpoints
location /ws/ {
proxy_pass http://127.0.0.1:8095;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_read_timeout 86400; # 24 hours for long-lived WebSocket
proxy_send_timeout 86400;
}
# Rate limiting for API endpoints
location /api/ {
limit_req zone=api burst=20 nodelay;
proxy_pass http://127.0.0.1:8095;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
# Security headers
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-Content-Type-Options "nosniff" always;
add_header X-XSS-Protection "1; mode=block" always;
access_log /var/log/nginx/call-monitor-access.log;
error_log /var/log/nginx/call-monitor-error.log;
}
# Enable site
ln -sf /etc/nginx/sites-available/call-monitor /etc/nginx/sites-enabled/
nginx -t && systemctl reload nginx
Resource Monitoring
Add a simple health check script:
#!/bin/bash
# /opt/call-monitor/healthcheck.sh
# Verify all services are running
ERRORS=0
for svc in call-transcription call-sentiment call-dashboard redis-server; do
if ! systemctl is-active --quiet "$svc"; then
echo "CRITICAL: $svc is not running"
ERRORS=$((ERRORS + 1))
fi
done
# Check Redis connectivity
if ! redis-cli ping > /dev/null 2>&1; then
echo "CRITICAL: Redis not responding"
ERRORS=$((ERRORS + 1))
fi
# Check WebSocket server responding
HTTP_CODE=$(curl -s -o /dev/null -w '%{http_code}' "http://127.0.0.1:8095/api/stats?token=your-secret-token-change-me")
if [ "$HTTP_CODE" != "200" ]; then
echo "WARNING: Dashboard API returned HTTP $HTTP_CODE"
ERRORS=$((ERRORS + 1))
fi
# Check AudioSocket port
if ! ss -tlnp | grep -q ':9093'; then
echo "WARNING: AudioSocket port 9093 not listening"
ERRORS=$((ERRORS + 1))
fi
if [ $ERRORS -eq 0 ]; then
echo "OK: All services healthy"
fi
exit $ERRORS
chmod +x /opt/call-monitor/healthcheck.sh
12. Integration with ViciDial
Enriching Transcripts with ViciDial Data
When a call comes in via AudioSocket or MixMonitor, you receive a uniqueid but not the agent name, campaign, or inbound group. This module queries ViciDial's database to enrich transcript data with operational context.
#!/usr/bin/env python3
"""
vicidial_integration.py — Enrich call transcripts with ViciDial metadata.
Queries ViciDial's MySQL database to get agent names, campaigns,
inbound groups, and phone numbers for active calls.
Also provides historical storage for completed transcripts.
"""
import time
import json
import logging
import asyncio
from typing import Optional, Dict
from dataclasses import dataclass
import aiomysql
logger = logging.getLogger(__name__)
@dataclass
class ViciDialConfig:
"""ViciDial database connection settings."""
host: str = "127.0.0.1"
port: int = 3306
user: str = "cron"
password: str = "YOUR_DB_PASSWORD"
database: str = "asterisk"
# Use a read replica if available (like report_cron on Replica)
read_replica_host: str = ""
read_replica_port: int = 3306
class ViciDialIntegration:
"""
Query ViciDial database to enrich call transcripts.
Provides agent names, campaign info, call metadata,
and recording file paths.
"""
def __init__(self, config: ViciDialConfig):
self.config = config
self.pool: Optional[aiomysql.Pool] = None
self._cache: Dict[str, dict] = {} # uniqueid -> metadata
self._cache_ttl = 300 # 5 minutes
async def connect(self):
"""Create connection pool."""
self.pool = await aiomysql.create_pool(
host=self.config.read_replica_host or self.config.host,
port=self.config.read_replica_port or self.config.port,
user=self.config.user,
password=self.config.password,
db=self.config.database,
maxsize=5,
minsize=1,
autocommit=True,
connect_timeout=5,
)
logger.info(f"Connected to ViciDial database at {self.config.host}")
async def close(self):
"""Close connection pool."""
if self.pool:
self.pool.close()
await self.pool.wait_closed()
async def get_call_metadata(self, uniqueid: str) -> Optional[dict]:
"""
Get metadata for a call by its Asterisk uniqueid.
Checks both inbound (vicidial_closer_log) and outbound
(vicidial_log) call tables.
Returns dict with: agent, full_name, campaign, phone_number,
ingroup, status, recording_path
"""
# Check cache first
cached = self._cache.get(uniqueid)
if cached and time.time() - cached.get("_cached_at", 0) < self._cache_ttl:
return cached
if not self.pool:
return None
metadata = None
async with self.pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
# Try inbound call log first
await cur.execute(
"""
SELECT
cl.user,
cl.phone_number,
cl.campaign_id,
cl.call_date,
cl.status,
cl.length_in_sec,
cl.queue_seconds,
cl.uniqueid,
cl.closecallid,
cl.group_id AS ingroup,
cl.term_reason,
vu.full_name
FROM vicidial_closer_log cl
LEFT JOIN vicidial_users vu ON cl.user = vu.user
WHERE cl.uniqueid = %s
ORDER BY cl.closecallid DESC
LIMIT 1
""",
(uniqueid,),
)
row = await cur.fetchone()
if row:
metadata = {
"source": "inbound",
"agent": row["user"] or "unknown",
"full_name": row["full_name"] or row["user"] or "Unknown Agent",
"phone_number": row["phone_number"] or "",
"campaign": row["campaign_id"] or "",
"ingroup": row["ingroup"] or "",
"call_date": str(row["call_date"]) if row["call_date"] else "",
"status": row["status"] or "",
"duration": row["length_in_sec"] or 0,
"queue_seconds": row["queue_seconds"] or 0,
"term_reason": row["term_reason"] or "",
}
else:
# Try outbound call log
await cur.execute(
"""
SELECT
vl.user,
vl.phone_number,
vl.campaign_id,
vl.call_date,
vl.status,
vl.length_in_sec,
vl.uniqueid,
vl.term_reason,
vu.full_name
FROM vicidial_log vl
LEFT JOIN vicidial_users vu ON vl.user = vu.user
WHERE vl.uniqueid = %s
ORDER BY vl.call_date DESC
LIMIT 1
""",
(uniqueid,),
)
row = await cur.fetchone()
if row:
metadata = {
"source": "outbound",
"agent": row["user"] or "unknown",
"full_name": row["full_name"] or row["user"] or "Unknown Agent",
"phone_number": row["phone_number"] or "",
"campaign": row["campaign_id"] or "",
"ingroup": "",
"call_date": str(row["call_date"]) if row["call_date"] else "",
"status": row["status"] or "",
"duration": row["length_in_sec"] or 0,
"term_reason": row["term_reason"] or "",
}
if metadata:
metadata["_cached_at"] = time.time()
self._cache[uniqueid] = metadata
return metadata
async def get_live_agents(self) -> list:
"""
Get all currently logged-in agents and their states.
Returns list of dicts with: user, full_name, status, campaign_id,
calls_today, channel
"""
if not self.pool:
return []
async with self.pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"""
SELECT
la.user,
la.status,
la.campaign_id,
la.calls_today,
la.channel,
la.uniqueid,
la.callerid,
vu.full_name
FROM vicidial_live_agents la
LEFT JOIN vicidial_users vu ON la.user = vu.user
WHERE la.status IN ('INCALL', 'QUEUE', 'READY', 'PAUSED')
ORDER BY la.status, la.user
"""
)
rows = await cur.fetchall()
return [dict(row) for row in rows]
async def get_recording_path(self, uniqueid: str) -> Optional[str]:
"""Get the recording file path for a call."""
if not self.pool:
return None
async with self.pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"""
SELECT filename, location
FROM recording_log
WHERE vicidial_id = %s
ORDER BY recording_id DESC
LIMIT 1
""",
(uniqueid,),
)
row = await cur.fetchone()
if row:
return row.get("location") or row.get("filename")
return None
class TranscriptStorage:
"""
Store completed call transcripts with sentiment data.
Uses SQLite for simplicity. Switch to PostgreSQL for production
with high volume.
"""
def __init__(self, db_path: str = "/opt/call-monitor/data/transcripts.db"):
self.db_path = db_path
self.conn = None
async def initialize(self):
"""Create database and tables."""
import aiosqlite
self.conn = await aiosqlite.connect(self.db_path)
await self.conn.execute("PRAGMA journal_mode=WAL")
await self.conn.execute("PRAGMA synchronous=NORMAL")
await self.conn.executescript("""
CREATE TABLE IF NOT EXISTS call_transcripts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
call_id TEXT NOT NULL,
agent TEXT,
agent_name TEXT,
caller TEXT,
campaign TEXT,
ingroup TEXT,
start_time REAL,
end_time REAL,
duration REAL,
avg_sentiment REAL,
min_sentiment REAL,
max_sentiment REAL,
total_segments INTEGER,
positive_segments INTEGER,
negative_segments INTEGER,
neutral_segments INTEGER,
keywords_detected TEXT,
alerts_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS transcript_segments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
call_id TEXT NOT NULL,
speaker TEXT,
text TEXT NOT NULL,
start_time REAL,
end_time REAL,
sentiment_score REAL,
sentiment_label TEXT,
keywords TEXT,
timestamp REAL
);
CREATE INDEX IF NOT EXISTS idx_transcripts_call_id
ON call_transcripts(call_id);
CREATE INDEX IF NOT EXISTS idx_transcripts_agent
ON call_transcripts(agent);
CREATE INDEX IF NOT EXISTS idx_transcripts_start_time
ON call_transcripts(start_time);
CREATE INDEX IF NOT EXISTS idx_segments_call_id
ON transcript_segments(call_id);
""")
await self.conn.commit()
logger.info(f"Transcript storage initialized: {self.db_path}")
async def save_call(
self,
call_id: str,
segments: list,
metadata: Optional[dict] = None,
):
"""Save a completed call's transcript and sentiment data."""
if not self.conn:
return
meta = metadata or {}
scores = [s.get("sentiment_score", 0) for s in segments]
labels = [s.get("sentiment_label", "neutral") for s in segments]
all_keywords = []
for s in segments:
all_keywords.extend(s.get("keywords_detected", []))
await self.conn.execute(
"""
INSERT INTO call_transcripts (
call_id, agent, agent_name, caller, campaign, ingroup,
start_time, end_time, duration,
avg_sentiment, min_sentiment, max_sentiment,
total_segments, positive_segments, negative_segments,
neutral_segments, keywords_detected
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
call_id,
meta.get("agent", ""),
meta.get("full_name", ""),
meta.get("phone_number", ""),
meta.get("campaign", ""),
meta.get("ingroup", ""),
segments[0].get("timestamp", 0) if segments else 0,
segments[-1].get("timestamp", 0) if segments else 0,
meta.get("duration", 0),
sum(scores) / len(scores) if scores else 0,
min(scores) if scores else 0,
max(scores) if scores else 0,
len(segments),
labels.count("positive"),
labels.count("negative"),
labels.count("neutral"),
json.dumps(list(set(all_keywords))),
),
)
# Save individual segments
for seg in segments:
await self.conn.execute(
"""
INSERT INTO transcript_segments (
call_id, speaker, text, start_time, end_time,
sentiment_score, sentiment_label, keywords, timestamp
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
call_id,
seg.get("speaker", "unknown"),
seg.get("text", ""),
seg.get("start_time", 0),
seg.get("end_time", 0),
seg.get("sentiment_score", 0),
seg.get("sentiment_label", "neutral"),
json.dumps(seg.get("keywords_detected", [])),
seg.get("timestamp", 0),
),
)
await self.conn.commit()
logger.info(f"Saved transcript for call {call_id}: {len(segments)} segments")
async def get_agent_stats(self, agent: str, days: int = 7) -> dict:
"""Get aggregate stats for an agent over the last N days."""
if not self.conn:
return {}
cutoff = time.time() - (days * 86400)
async with self.conn.execute(
"""
SELECT
COUNT(*) as total_calls,
AVG(avg_sentiment) as avg_sentiment,
SUM(alerts_count) as total_alerts,
AVG(duration) as avg_duration,
SUM(positive_segments) as total_positive,
SUM(negative_segments) as total_negative,
SUM(total_segments) as total_segments
FROM call_transcripts
WHERE agent = ? AND start_time > ?
""",
(agent, cutoff),
) as cursor:
row = await cursor.fetchone()
if row:
return {
"agent": agent,
"period_days": days,
"total_calls": row[0] or 0,
"avg_sentiment": round(row[1] or 0, 3),
"total_alerts": row[2] or 0,
"avg_duration": round(row[3] or 0, 1),
"total_positive": row[4] or 0,
"total_negative": row[5] or 0,
"total_segments": row[6] or 0,
}
return {}
async def close(self):
if self.conn:
await self.conn.close()
Linking to Recording Files
async def get_recording_url(
uniqueid: str,
server_ip: str = "YOUR_SERVER_IP",
recordings_base: str = "/var/spool/asterisk/monitorDONE",
) -> Optional[str]:
"""
Build a URL to replay a call recording.
ViciDial stores recordings in date-based subdirectories:
/var/spool/asterisk/monitorDONE/YYYYMMDD/filename.wav
The web server at /recordings/ maps to this path.
"""
from datetime import datetime
# Extract date from uniqueid (format: epoch.sequence)
try:
epoch = float(uniqueid.split(".")[0])
date_str = datetime.fromtimestamp(epoch).strftime("%Y%m%d")
except (ValueError, IndexError):
date_str = datetime.now().strftime("%Y%m%d")
# The recording filename typically matches the uniqueid
# or is stored in ViciDial's recording_log table
return f"http://{server_ip}/recordings/{date_str}/{uniqueid}.wav"
13. Troubleshooting
Audio Capture Issues
Problem: AudioSocket connection refused
# Verify the transcription service is listening
ss -tlnp | grep 9093
# Check if Asterisk can reach the server
# On the Asterisk server:
nc -zv YOUR_TRANSCRIPTION_SERVER 9093
# Check firewall rules
iptables -L -n | grep 9093
# If using Docker, ensure port is mapped
docker port call-transcription
Problem: EAGI script not executing
# Check script permissions
ls -la /opt/call-monitor/eagi_capture.py
# Must be executable:
chmod +x /opt/call-monitor/eagi_capture.py
# Check Asterisk can read the file
# Asterisk runs as user 'asterisk' on many systems
chown asterisk:asterisk /opt/call-monitor/eagi_capture.py
# Check shebang line points to correct Python
head -1 /opt/call-monitor/eagi_capture.py
# Should be: #!/usr/bin/env python3
# Test manually (will fail on fd3 but verifies imports work)
python3 /opt/call-monitor/eagi_capture.py
# Check Asterisk logs
grep -i "eagi" /var/log/asterisk/messages | tail -20
Problem: No audio received from MixMonitor watcher
# Verify MixMonitor is recording
ls -la /var/spool/asterisk/monitor/
# Should see .wav files being created during calls
# Check inotifywait is running
ps aux | grep inotifywait
# Test inotifywait manually
inotifywait -m -r -e modify,create /var/spool/asterisk/monitor/
# Check disk space (MixMonitor silently fails when disk is full)
df -h /var/spool/asterisk/
Transcription Latency Issues
Problem: Whisper transcription taking too long (> chunk duration)
# Check which model is loaded
journalctl -u call-transcription | grep "Loading.*model"
# If using 'small' or larger, switch to 'base' or 'tiny'
# Edit /etc/systemd/system/call-transcription.service:
# Environment=WHISPER_MODEL=base
# Check CPU usage during transcription
top -p $(pgrep -f main_transcription)
# Monitor real-time factor (RTF)
# RTF < 1.0 means transcription is faster than real-time
journalctl -u call-transcription -f | grep "RTF="
# If RTF > 0.8, consider:
# 1. Using a smaller model (tiny: RTF ~0.1, base: RTF ~0.2)
# 2. Increasing chunk duration from 5s to 8s (more context, same overhead)
# 3. Reducing beam_size from 3 to 1
# 4. Switching to Deepgram (RTF ~0.04)
Problem: Deepgram high latency or timeouts
# Check network latency to Deepgram servers
ping api.deepgram.com
# Should be < 50ms from EU/US servers
# Verify API key is valid
curl -s -H "Authorization: Token YOUR_DEEPGRAM_API_KEY" \
https://api.deepgram.com/v1/projects
# Check for rate limiting
journalctl -u call-transcription | grep -i "rate\|429\|limit"
# Monitor WebSocket connection stability
journalctl -u call-transcription | grep -i "deepgram\|websocket"
Chunk Duration Tuning
| Chunk Duration | Latency | Accuracy | CPU Load |
|---|---|---|---|
| 2 seconds | Low (2-3s) | Poor (too little context) | High (many chunks) |
| 5 seconds | Medium (5-7s) | Good (recommended) | Medium |
| 8 seconds | Higher (8-10s) | Better (more context) | Lower (fewer chunks) |
| 15 seconds | High (15-17s) | Best | Lowest |
The sweet spot for most deployments is 5 seconds. Decrease to 3 seconds if you need faster alerts at the cost of accuracy. Increase to 8 seconds if accuracy is more important than speed.
WebSocket Issues
Problem: Dashboard shows "Disconnected"
# Check WebSocket server is running
systemctl status call-dashboard
curl -I http://127.0.0.1:8095/
# Test WebSocket connection manually
# Install wscat: npm install -g wscat
wscat -c "ws://127.0.0.1:8095/ws/calls?token=your-secret-token-change-me"
# Check Nginx WebSocket proxy (if using Nginx)
# Ensure these headers are in the Nginx config:
# proxy_http_version 1.1;
# proxy_set_header Upgrade $http_upgrade;
# proxy_set_header Connection "upgrade";
# Check for token mismatch
journalctl -u call-dashboard | grep -i "invalid.*token\|4001"
Problem: WebSocket reconnection loop
# Check for resource exhaustion
# Too many reconnections can exhaust file descriptors
ulimit -n
# Should be at least 65536 for production
# Increase if needed
echo "* soft nofile 65536" >> /etc/security/limits.conf
echo "* hard nofile 65536" >> /etc/security/limits.conf
# Check for Redis connection issues (WebSocket server depends on Redis)
redis-cli ping
# Check system memory
free -h
Sentiment Accuracy Issues
Problem: VADER scoring all utterances as neutral
# Test VADER directly
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
analyzer = SentimentIntensityAnalyzer()
# These should NOT be neutral:
test_phrases = [
"This is absolutely terrible service", # Should be very negative
"Thank you so much, you've been wonderful", # Should be very positive
"I want to cancel my account immediately", # Should be negative
]
for phrase in test_phrases:
scores = analyzer.polarity_scores(phrase)
print(f"{phrase[:50]:50s} compound={scores['compound']:.3f}")
# If results look correct but dashboard shows neutral:
# Check that the sentiment service is actually processing segments
journalctl -u call-sentiment | grep "Keywords\|analyze"
Problem: Too many false-positive keyword alerts
# The keyword detector uses word boundary matching (\b)
# But some words appear in innocent context:
# "cancel" in "I need to cancel my appointment" (valid alert)
# "cancel" in "Can I cancel the hold?" (possibly not an alert)
# Solution: Add context-aware filtering
# In sentiment_service.py, modify _handle_transcript:
IGNORE_CONTEXTS = {
"cancel": ["cancel the hold", "cancel that", "cancel noise"],
"manager": ["account manager", "your manager told me"],
}
def should_alert(keyword: str, full_text: str) -> bool:
"""Check if keyword in context warrants an alert."""
text_lower = full_text.lower()
ignore_list = IGNORE_CONTEXTS.get(keyword.lower(), [])
for ignore_phrase in ignore_list:
if ignore_phrase in text_lower:
return False
return True
Memory Leaks
Problem: Transcription service memory growing over time
# Monitor memory usage
watch -n 5 'ps aux | grep main_transcription | grep -v grep'
# Common causes:
# 1. Call states not being cleaned up after call ends
# 2. Audio buffers accumulating for stale calls
# 3. Faster-Whisper model allocations
# Fix: Add periodic cleanup
# In main_transcription.py, add a cleanup task:
async def cleanup_stale_calls(tx_manager, interval=300):
"""Periodically clean up state for calls that ended without proper cleanup."""
while True:
await asyncio.sleep(interval)
now = time.time()
stale = [
call_id
for call_id, state in tx_manager.call_states.items()
if now - state.call_start > 7200 # 2 hours max
]
for call_id in stale:
logger.warning(f"Cleaning up stale call state: {call_id}")
await tx_manager.on_call_end(call_id)
Problem: Redis memory growing
# Check Redis memory usage
redis-cli info memory | grep used_memory_human
# Check stream sizes
redis-cli keys "calls:history:*" | head -20
# Each stream should be capped at maxlen
# Manual cleanup of old streams
redis-cli keys "calls:history:*" | while read key; do
len=$(redis-cli xlen "$key")
echo "$key: $len entries"
if [ "$len" -gt 1000 ]; then
redis-cli xtrim "$key" MAXLEN 500
fi
done
# Set Redis memory limit in redis.conf
# maxmemory 512mb
# maxmemory-policy allkeys-lru
Dashboard Not Updating
Problem: Connected but no data appearing
# 1. Check Redis pub/sub is working
# Terminal 1 (subscribe):
redis-cli subscribe calls:sentiment:all
# Terminal 2 (publish test message):
redis-cli publish calls:sentiment:all '{"type":"sentiment","call_id":"test","text":"hello","sentiment_score":0.5,"sentiment_label":"positive","speaker":"agent","timestamp":1700000000,"start_time":0,"end_time":5,"keywords_detected":[],"profanity_detected":[],"rolling_sentiment":0.5}'
# If Terminal 1 receives the message but dashboard does not:
# Check the WebSocket server's Redis listener
journalctl -u call-dashboard | grep "Redis listener"
# 2. Check browser console for JavaScript errors
# Open browser DevTools (F12) -> Console tab
# 3. Verify token matches between URL and server
# Dashboard URL should include: ?token=your-secret-token-change-me
Quick Diagnostic Script
#!/bin/bash
# /opt/call-monitor/diagnose.sh
# Run this to diagnose common issues
echo "=== Service Status ==="
for svc in redis-server call-transcription call-sentiment call-dashboard; do
status=$(systemctl is-active "$svc" 2>/dev/null || echo "not-found")
printf " %-25s %s\n" "$svc" "$status"
done
echo ""
echo "=== Port Status ==="
for port in 6379 8095 9093; do
status=$(ss -tlnp | grep -q ":$port " && echo "LISTENING" || echo "NOT LISTENING")
printf " Port %-6s %s\n" "$port" "$status"
done
echo ""
echo "=== Redis Health ==="
echo " Ping: $(redis-cli ping 2>/dev/null || echo 'FAILED')"
echo " Memory: $(redis-cli info memory 2>/dev/null | grep used_memory_human | cut -d: -f2 | tr -d '\r')"
echo " Active calls: $(redis-cli hlen calls:active 2>/dev/null || echo '0')"
echo " Recent alerts: $(redis-cli llen calls:recent_alerts 2>/dev/null || echo '0')"
echo ""
echo "=== Resource Usage ==="
echo " CPU: $(top -bn1 | grep "Cpu(s)" | awk '{print $2}')% user"
echo " Memory: $(free -h | awk '/^Mem:/ {print $3 "/" $2}')"
echo " Disk: $(df -h /opt/call-monitor | tail -1 | awk '{print $5 " used"}')"
echo ""
echo "=== Recent Errors ==="
for svc in call-transcription call-sentiment call-dashboard; do
errors=$(journalctl -u "$svc" --since "1 hour ago" -p err --no-pager 2>/dev/null | wc -l)
printf " %-25s %s errors in last hour\n" "$svc" "$errors"
done
chmod +x /opt/call-monitor/diagnose.sh
End of Tutorial 39
This tutorial covered the complete pipeline for real-time call transcription and sentiment analysis: from capturing live audio out of Asterisk, through streaming transcription with Faster-Whisper or Deepgram, sentiment analysis with VADER, real-time distribution via Redis pub/sub, WebSocket delivery to browsers, and a production-ready dashboard with alert management. Combined with Tutorial 35's batch QA pipeline, you now have both real-time monitoring for supervisors and overnight analysis for quality management — covering the full spectrum of call center intelligence.