Building a Real-Time AI Voice Agent for Asterisk
AudioSocket + Deepgram STT + Groq LLM + Cartesia TTS
A production-tested guide to building a voice AI agent that handles live phone calls through Asterisk with sub-250ms response latency.
Table of Contents
- Why AI Voice Agents Matter
- Architecture Overview
- Prerequisites
- Component Selection: Why This Stack
- Asterisk Configuration
- AGI Script: Session Bootstrapping
- Python Voice Agent: Core Structure
- AudioSocket Protocol Implementation
- Deepgram Streaming STT
- Groq Streaming LLM with Tool Calling
- Cartesia Streaming TTS
- The Token-Streaming Pipeline
- Barge-In and Interruption Handling
- Conversation State Machine
- Tool Calling Integration
- DID-to-Company Context API
- Systemd Service Setup
- Latency Optimization Deep Dive
- Troubleshooting
- Performance Benchmarks
- Production Considerations
1. Why AI Voice Agents Matter
Every missed phone call is lost revenue. For home services companies -- plumbers, electricians, locksmiths -- a single missed call can mean a lost booking worth hundreds of pounds. Human agents cost money, need shifts covered, and still miss calls at 2 AM.
AI voice agents solve this by answering every call, instantly, 24/7. But the bar is high: callers expect a natural conversation, not a clunky IVR menu. They expect to be heard, understood, and helped in real time. If the agent pauses for two seconds after every sentence, callers hang up.
This tutorial documents a production system that achieves sub-250ms mouth-to-ear latency -- the time from when the caller finishes speaking to when they hear the first syllable of the agent's response. That is faster than most human agents. The key insight that makes this possible: token-streaming. Instead of waiting for the LLM to generate a complete sentence, we pipe each token directly into the TTS engine as it arrives. The caller hears the response as the AI is still thinking.
What You Will Build
A Python voice agent that:
- Answers live phone calls through Asterisk via the AudioSocket protocol
- Transcribes speech in real time using Deepgram Nova-3 streaming STT
- Generates responses using Groq's Llama 3.3 70B at 1,665 tokens/second
- Synthesizes speech using Cartesia Sonic-3 with native 8kHz PCM output
- Handles caller interruptions (barge-in) mid-sentence
- Follows an 8-step conversation workflow (greet, understand, quote, collect details, book)
- Calls external APIs to create bookings (tool calling)
- Dynamically greets callers based on which phone number they dialed
2. Architecture Overview
TELEPHONE NETWORK
|
SIP Trunk
|
+-----------v-----------+
| ASTERISK PBX |
| |
| 1. Answer() |
| 2. AGI(setup.agi) |
| - Generate UUID |
| - Write metadata |
| 3. AudioSocket( |
| 127.0.0.1:9099) |
+-----------+------------+
|
TCP (AudioSocket Protocol)
8kHz 16-bit PCM, 20ms frames
|
+-----------v------------+
| PYTHON VOICE AGENT |
| (asyncio TCP server) |
| |
| +-------------------+ |
| | Audio Reader | | +------------------+
Caller | | - Read PCM frames |--------> Deepgram Nova-3 |
speaks | | - Barge-in VAD | | | Streaming STT |
| +-------------------+ | | (WebSocket) |
| | +--------+---------+
| | |
| | Transcript
| | |
| +-------------------+ | +--------v---------+
| | Conversation | | | Groq Llama 3.3 |
| | Manager |<------| 70B specdec |
| | - State machine | | | Streaming LLM |
| | - Message history | | | (HTTP SSE) |
| | - Tool calls | | +--------+---------+
| +-------------------+ | |
| | Token stream
| | |
| +-------------------+ | +--------v---------+
Caller | | Audio Writer | | | Cartesia Sonic-3 |
hears <-----| - Queue playback |<------| Streaming TTS |
| | - 20ms pacing | | | (WebSocket) |
| +-------------------+ | +------------------+
| |
+-------------------------+
Latency Budget (target: <250ms mouth-to-ear):
┌──────────────────────────────────────────────────────┐
│ Deepgram STT final transcript: ~150-200ms │
│ Groq LLM first token (TTFT): ~30-50ms │
│ Cartesia TTS first audio (TTFB): ~50-80ms │
│ AudioSocket frame transmission: ~20ms (1 frame) │
│ │
│ TOTAL: ~200-250ms │
└──────────────────────────────────────────────────────┘
Data Flow Summary
- Caller speaks into their phone. Audio arrives at Asterisk as RTP.
- Asterisk converts RTP to raw PCM and sends it over a TCP socket (AudioSocket protocol) to the Python agent.
- Python agent forwards PCM frames to Deepgram's streaming STT WebSocket.
- Deepgram returns transcript fragments. When the caller finishes speaking (
speech_final), the agent has the complete utterance. - Agent sends the conversation history to Groq's LLM API with streaming enabled.
- Each LLM token is immediately forwarded to Cartesia's TTS WebSocket (continuation API).
- Cartesia returns PCM audio chunks, which are queued and written back through AudioSocket to Asterisk.
- Asterisk converts PCM back to RTP and sends it to the caller's phone.
The critical design choice: steps 6 and 7 happen concurrently. The TTS starts synthesizing audio from the first few tokens while the LLM is still generating the rest of the sentence. This is what cuts the latency from 1-2 seconds down to 200-250ms.
3. Prerequisites
Server Requirements
- Asterisk 16+ with
res_audiosocketmodule loaded (Asterisk 16.8+ or 18+) - Python 3.11+ with asyncio support
- Linux server (Ubuntu 22.04/Debian 12 recommended)
- At least 2 CPU cores and 2GB RAM for the voice agent process
- A SIP trunk for receiving inbound calls
API Accounts
| Service | Purpose | Pricing Model |
|---|---|---|
| Deepgram | Speech-to-text (Nova-3) | Pay-per-minute (~$0.0043/min) |
| Groq | LLM inference (Llama 3.3 70B) | Pay-per-token (free tier available) |
| Cartesia | Text-to-speech (Sonic-3) | Pay-per-character |
Python Dependencies
pip install websockets aiohttp cartesia
Verify Asterisk AudioSocket Module
asterisk -rx "module show like audiosocket"
Expected output:
Module Description Use Count Status
res_audiosocket.so AudioSocket support 0 Running
app_audiosocket.so AudioSocket application 0 Running
2 modules loaded
If not loaded:
asterisk -rx "module load res_audiosocket.so"
asterisk -rx "module load app_audiosocket.so"
Add to /etc/asterisk/modules.conf to load on startup:
load = res_audiosocket.so
load = app_audiosocket.so
4. Component Selection: Why This Stack
Building this system involved testing multiple providers for each component. Here is what we learned and why we chose this specific stack.
STT: Deepgram Nova-3
Why not Google/AWS/Azure STT? Cloud STT services from the big three add 300-500ms of latency due to their general-purpose architecture. Deepgram is purpose-built for real-time streaming and returns transcript fragments within 150-200ms.
Why Nova-3 specifically?
- Native streaming WebSocket API (not gRPC -- simpler to integrate)
endpointingparameter controls how quickly it decides the caller stopped speakingspeech_finalflag tells you when a complete utterance is ready- Keyword boosting (
keywords=postcode:2) improves accuracy on domain terms - British English model (
language=en-GB) handles UK accents well
LLM: Groq Llama 3.3 70B (specdec)
Why not OpenAI GPT-4? Latency. GPT-4 time-to-first-token (TTFT) is 500ms-2s. Groq's speculative decoding delivers the first token in 30-50ms and sustains 1,665 tokens/second. For voice, speed matters more than the last 5% of intelligence.
Why Llama 3.3 70B over smaller models? The 70B model handles nuanced conversation flow (objection handling, knowing when NOT to say something) significantly better than 8B models. At 1,665 tok/s on Groq, you get the quality of a large model at small-model speeds.
Why specdec variant?
Speculative decoding uses a smaller draft model to predict tokens, then the large model verifies them in parallel. This gives 6x throughput compared to the standard versatile variant (1,665 vs 276 tok/s) with identical output quality.
TTS: Cartesia Sonic-3
Why not ElevenLabs? We started with ElevenLabs Flash v2. It produces excellent voice quality, but:
- Minimum latency of 300-400ms even with their streaming API
- No native 8kHz output -- you get 24kHz or 44.1kHz and must resample, adding CPU overhead and latency
- Their WebSocket API does not support the continuation pattern needed for token-streaming
Why Cartesia?
- Native 8kHz PCM output --
pcm_s16leatsample_rate: 8000-- no resampling needed - Continuation API -- send tokens one at a time via the same WebSocket context, and Cartesia synthesizes them as a continuous stream. This is the key to token-streaming.
- 50-80ms TTFB -- first audio bytes arrive within 50-80ms of the first token
- Context-aware prosody -- the continuation API maintains prosody across tokens, so "I can" + "get a" + "plumber out" sounds natural, not choppy
Protocol: AudioSocket
Why not ARI/AMI/AGI for audio?
- AGI can play files but cannot stream bidirectional audio in real time
- ARI can handle media via external channels but adds complexity (Stasis, WebSocket, etc.)
- AudioSocket is a simple TCP protocol designed exactly for this: bidirectional PCM streaming between Asterisk and an external application
AudioSocket frame format:
+--------+--------+--------+--- ... ---+
| Type | Length (big-endian)| Payload |
| 1 byte | 2 bytes | N bytes |
+--------+--------+--------+--- ... ---+
Three frame types matter:
0x01(UUID): First frame, contains the 16-byte call UUID0x10(Audio): PCM audio data (320 bytes = 20ms at 8kHz/16-bit/mono)0x00(Hangup): Call ended
5. Asterisk Configuration
Dialplan (extensions.conf or customexte.conf)
The dialplan routes inbound calls to the voice agent. The flow is: answer the call, run the AGI script to generate a UUID and write metadata, then connect AudioSocket to the Python agent.
; ─── AI Voice Agent ─────────────────────────────────────────────
; Route inbound DID calls to the AI voice agent.
; The CALLED variable must be set by the inbound trunk context
; before transferring here (e.g., Set(CALLED=${EXTEN})).
[voice-agent]
exten => voice_agent,1,NoOp(VOICE AGENT: DID=${CALLED} CLI=${CALLERID(num)})
same => n,Answer()
same => n,AGI(voice_agent_setup.agi)
same => n,AudioSocket(${VA_UUID},127.0.0.1:9099)
same => n,NoOp(VOICE AGENT ENDED: ${DIALSTATUS})
same => n,Hangup()
Key points:
Answer()picks up the call and starts media flowing. This must happen before AudioSocket.AGI(voice_agent_setup.agi)generates a unique call ID and writes metadata (DID, caller ID) to a temp file that the Python agent reads.AudioSocket(${VA_UUID},127.0.0.1:9099)connects bidirectional audio to the Python TCP server. The UUID is passed as the first frame so the agent can correlate the audio stream with the metadata.- The agent runs on
127.0.0.1:9099(localhost only -- no external exposure).
Routing Inbound DIDs to the Agent
In your inbound trunk context, route the call to the voice_agent extension:
[from-trunk]
; Match all inbound DIDs and route to voice agent
exten => _X.,1,NoOp(Inbound call to ${EXTEN} from ${CALLERID(num)})
same => n,Set(CALLED=${EXTEN})
same => n,Goto(voice-agent,voice_agent,1)
Or for specific DIDs only:
[from-trunk]
; Route specific DIDs to voice agent, others to normal handling
exten => 02012345678,1,Set(CALLED=${EXTEN})
same => n,Goto(voice-agent,voice_agent,1)
exten => 02087654321,1,Set(CALLED=${EXTEN})
same => n,Goto(voice-agent,voice_agent,1)
; Default: normal call handling
exten => _X.,1,Goto(default,${EXTEN},1)
6. AGI Script: Session Bootstrapping
The AGI script runs before AudioSocket connects. It generates a UUID for the call session and writes caller metadata to a JSON file that the Python agent reads.
Why a separate AGI script?
Asterisk's AudioSocket() application sends the UUID as the first frame over TCP. The Python agent receives this UUID and needs to look up call metadata (which DID was called, the caller's number). The AGI script bridges this gap: it generates the UUID, sets it as a channel variable for AudioSocket, and writes the metadata to a file the Python agent can read.
voice_agent_setup.agi
Save to /var/lib/asterisk/agi-bin/voice_agent_setup.agi (or /usr/share/asterisk/agi-bin/):
#!/usr/bin/perl
# voice_agent_setup.agi
# Generate UUID and write metadata JSON for the voice agent.
# Called by Asterisk dialplan before AudioSocket().
use strict;
use warnings;
$| = 1; # autoflush STDOUT -- critical for AGI protocol
# ── Read AGI environment variables ──
my %agi;
while (<STDIN>) {
chomp;
last if /^$/;
if (/^agi_(\w+):\s*(.*)$/) {
$agi{$1} = $2;
}
}
# ── Generate UUID ──
# Use the kernel's random UUID generator (no external dependencies)
my $uuid = '';
if (open my $fh, '<', '/proc/sys/kernel/random/uuid') {
$uuid = <$fh>;
chomp $uuid;
close $fh;
}
# ── Get call metadata ──
# CALLED is set in the dialplan before calling AGI
print "GET VARIABLE CALLED\n";
my $resp = <STDIN>;
my $did = '';
if ($resp =~ /\((.+)\)/) {
$did = $1;
}
my $cli = $agi{'callerid'} || '';
# ── Write metadata JSON ──
if ($uuid) {
my $file = "/tmp/va_${uuid}.json";
if (open my $fh, '>', $file) {
print $fh qq({"did":"$did","cli":"$cli"});
close $fh;
}
}
# ── Set channel variable for AudioSocket ──
print "SET VARIABLE VA_UUID $uuid\n";
my $result = <STDIN>;
Make it executable:
chmod +x /var/lib/asterisk/agi-bin/voice_agent_setup.agi
Why Perl? Asterisk's AGI protocol communicates via stdin/stdout line-by-line. Perl handles this natively and is available on every Asterisk system. You could write this in Python or Bash, but Perl is the standard for AGI scripts and has zero startup overhead.
Why /tmp/va_{uuid}.json? The Python agent polls for this file (up to 1 second, 50ms intervals) after receiving the UUID frame. Using /tmp means automatic cleanup on reboot. In production, you may want to clean up these files explicitly after each call.
7. Python Voice Agent: Core Structure
The voice agent is a single Python file built on asyncio. Here is the high-level structure:
#!/usr/bin/env python3.11
"""
Voice Agent -- AudioSocket + Deepgram STT + Groq LLM + Cartesia TTS
Token-streaming pipeline: ~200-250ms mouth-to-ear latency.
"""
import asyncio
import struct
import json
import os
import time
import logging
import audioop
from pathlib import Path
import aiohttp
from websockets.asyncio.client import connect as ws_connect
from cartesia import AsyncCartesia
# ── Configuration ──
LISTEN_HOST = "127.0.0.1"
LISTEN_PORT = 9099
# Audio: 8kHz 16-bit signed linear PCM, mono
# 320 bytes = 160 samples = 20ms per frame
CHUNK_SIZE = 320
SAMPLE_RATE = 8000
CHUNK_DURATION = CHUNK_SIZE / (SAMPLE_RATE * 2) # 0.02s = 20ms
# Barge-in thresholds
BARGEIN_RMS_THRESHOLD = 800 # RMS energy level to detect speech
BARGEIN_DURATION = 0.3 # seconds of sustained speech to trigger
log = logging.getLogger("voice-agent")
class DeepgramSTT:
"""Streaming speech-to-text via Deepgram Nova-3 WebSocket."""
...
class GroqLLM:
"""Streaming LLM via Groq OpenAI-compatible API."""
...
class CartesiaTTS:
"""Streaming TTS via Cartesia Sonic-3 WebSocket."""
...
class VoiceAgent:
"""Main orchestrator: STT -> LLM -> TTS pipeline."""
...
async def handle_connection(reader, writer):
"""Handle one AudioSocket connection (one call)."""
agent = VoiceAgent()
await agent.handle_call(reader, writer)
async def main():
server = await asyncio.start_server(
handle_connection, LISTEN_HOST, LISTEN_PORT,
)
log.info("Voice Agent listening on %s:%d", LISTEN_HOST, LISTEN_PORT)
async with server:
await server.serve_forever()
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
asyncio.run(main())
VoiceAgent Class: The Orchestrator
class VoiceAgent:
def __init__(self):
self.stt = DeepgramSTT()
self.llm = GroqLLM()
self.tts = CartesiaTTS()
self.messages = [] # LLM conversation history
self.call_context = None # Company/DID context
self.audio_out_queue = asyncio.Queue() # PCM chunks to send to caller
self.hangup_event = asyncio.Event() # Signals call ended
self.is_speaking = asyncio.Event() # True while agent is talking
self.barge_in_event = asyncio.Event() # Caller interrupted
self.call_uuid = None
self._current_tts_ctx = None # For barge-in cancellation
async def handle_call(self, reader, writer):
"""Main call handler -- the entire call lifecycle."""
# 1. Read UUID frame from AudioSocket
# 2. Read metadata from /tmp/va_{uuid}.json
# 3. Fetch company context from API
# 4. Build system prompt
# 5. Connect STT + TTS WebSockets
# 6. Start background audio I/O tasks
# 7. Speak greeting
# 8. Conversation loop: listen -> think -> speak
...
Each call creates a new VoiceAgent instance. This keeps state isolated between concurrent calls. The handle_call method runs the entire call lifecycle from greeting to hangup.
8. AudioSocket Protocol Implementation
AudioSocket is a binary TCP protocol. Every frame has a 3-byte header:
| Byte | Field | Description |
|---|---|---|
| 0 | Type | 0x00=Hangup, 0x01=UUID, 0x10=Audio, 0xFF=Error |
| 1-2 | Length | Big-endian uint16, length of payload |
| 3+ | Payload | Raw data (UUID bytes or PCM audio) |
Reading Frames
# AudioSocket protocol constants
AS_TYPE_HANGUP = 0x00
AS_TYPE_UUID = 0x01
AS_TYPE_AUDIO = 0x10
AS_TYPE_ERROR = 0xFF
async def read_as_frame(reader):
"""Read one AudioSocket frame: 1 byte type + 2 bytes length + payload."""
header = await reader.readexactly(3)
frame_type = header[0]
length = struct.unpack(">H", header[1:3])[0]
payload = b""
if length > 0:
payload = await reader.readexactly(length)
return frame_type, payload
Writing Frames
def make_as_frame(frame_type, payload):
"""Build an AudioSocket frame."""
return struct.pack(">BH", frame_type, len(payload)) + payload
Audio Reader (Background Task)
The audio reader runs continuously, reading PCM frames from Asterisk. When the agent is NOT speaking, audio goes to Deepgram for transcription. When the agent IS speaking, audio is monitored for barge-in detection instead.
async def _audio_reader(self, reader):
"""Read audio from AudioSocket, forward to STT or check barge-in."""
speech_energy_start = None
while not self.hangup_event.is_set():
frame_type, payload = await read_as_frame(reader)
if frame_type == AS_TYPE_HANGUP:
self.hangup_event.set()
return
if frame_type == AS_TYPE_ERROR:
self.hangup_event.set()
return
if frame_type == AS_TYPE_AUDIO and payload:
if self.is_speaking.is_set():
# Agent is talking -- check for barge-in
rms = audioop.rms(payload, 2)
if rms > BARGEIN_RMS_THRESHOLD:
if speech_energy_start is None:
speech_energy_start = time.monotonic()
elif time.monotonic() - speech_energy_start >= BARGEIN_DURATION:
# Caller is interrupting
self.barge_in_event.set()
speech_energy_start = None
# Clear audio queue, cancel TTS
while not self.audio_out_queue.empty():
self.audio_out_queue.get_nowait()
if self._current_tts_ctx:
await self.tts.cancel_context(self._current_tts_ctx)
self.is_speaking.clear()
await self.stt.send_audio(payload)
else:
speech_energy_start = None
else:
# Agent is silent -- forward to STT
speech_energy_start = None
await self.stt.send_audio(payload)
Audio Writer (Background Task)
The audio writer paces PCM chunks at real-time rate (one 320-byte chunk every 20ms):
async def _audio_writer(self, writer):
"""Write queued audio to AudioSocket at real-time rate."""
while not self.hangup_event.is_set():
try:
chunk = await asyncio.wait_for(
self.audio_out_queue.get(), timeout=0.5
)
frame = make_as_frame(AS_TYPE_AUDIO, chunk)
writer.write(frame)
await writer.drain()
await asyncio.sleep(CHUNK_DURATION) # 20ms pacing
except asyncio.TimeoutError:
continue
Why pace at 20ms? If you dump all audio chunks at once, Asterisk buffers them, but the caller hears silence followed by fast-forwarded audio. Real-time pacing ensures smooth playback.
9. Deepgram Streaming STT
Deepgram's WebSocket API accepts a continuous stream of audio and returns transcript fragments in real time.
Connection Setup
class DeepgramSTT:
def __init__(self):
self.ws = None
self.transcript_queue = asyncio.Queue()
self._recv_task = None
self._keepalive_task = None
async def connect(self):
params = (
"encoding=linear16"
"&sample_rate=8000"
"&channels=1"
"&model=nova-3"
"&language=en-GB"
"&smart_format=true"
"&endpointing=300"
"&interim_results=true"
"&utterance_end_ms=1000"
"&vad_events=true"
"&keywords=postcode:2"
"&keywords=plumber:2"
"&keywords=callout:1"
)
url = f"wss://api.deepgram.com/v1/listen?{params}"
headers = {"Authorization": f"Token {DEEPGRAM_API_KEY}"}
self.ws = await ws_connect(
url, additional_headers=headers,
ping_interval=5, ping_timeout=10,
)
self._recv_task = asyncio.create_task(self._receive_loop())
self._keepalive_task = asyncio.create_task(self._keepalive_loop())
Key Parameters Explained
| Parameter | Value | Why |
|---|---|---|
encoding=linear16 |
PCM format | Matches Asterisk's native audio format |
sample_rate=8000 |
8kHz | Telephony standard -- do not upsample |
model=nova-3 |
Latest model | Best accuracy + lowest latency |
language=en-GB |
British English | Match your callers' accent |
endpointing=300 |
300ms | How long Deepgram waits after silence before finalizing. Lower = faster response but may cut off mid-sentence. 300ms is the sweet spot. |
interim_results=true |
Enable | Get partial transcripts while caller is still speaking |
utterance_end_ms=1000 |
1 second | Fires UtteranceEnd event after 1s of silence -- useful as a safety flush |
vad_events=true |
Enable | Deepgram's server-side voice activity detection events |
keywords=postcode:2 |
Boost weight | Improves recognition of domain-specific terms |
Receiving Transcripts
async def _receive_loop(self):
"""Process transcript results from Deepgram."""
async for msg in self.ws:
data = json.loads(msg)
if data.get("type") == "Results":
alt = data["channel"]["alternatives"][0]
transcript = alt.get("transcript", "").strip()
is_final = data.get("is_final", False)
speech_final = data.get("speech_final", False)
if transcript and is_final:
await self.transcript_queue.put({
"transcript": transcript,
"speech_final": speech_final,
})
elif data.get("type") == "UtteranceEnd":
# Safety flush: if we have accumulated fragments, process them
await self.transcript_queue.put({
"transcript": "",
"speech_final": True,
"utterance_end": True,
})
Understanding Deepgram's Event Model
Deepgram sends several types of results:
- Interim results (
is_final=false): Partial transcript while the caller is still speaking. Useful for UI display but we ignore these for LLM input. - Final results (
is_final=true, speech_final=false): A finalized word/phrase, but the caller may still be speaking the same sentence. - Speech-final results (
is_final=true, speech_final=true): The caller has finished their turn. This is our trigger to send to the LLM. - UtteranceEnd: Fired after
utterance_end_msof silence. Acts as a safety net to flush any accumulated fragments.
Keepalive
Deepgram closes idle WebSocket connections after ~30 seconds. Send keepalives to prevent this:
async def _keepalive_loop(self):
while self.ws:
await asyncio.sleep(8)
await self.ws.send(json.dumps({"type": "KeepAlive"}))
Sending Audio
async def send_audio(self, audio_data):
"""Forward raw PCM bytes to Deepgram."""
if self.ws:
await self.ws.send(audio_data)
Graceful Shutdown
async def close(self):
if self._keepalive_task:
self._keepalive_task.cancel()
if self._recv_task:
self._recv_task.cancel()
if self.ws:
await self.ws.send(json.dumps({"type": "CloseStream"}))
await self.ws.close()
10. Groq Streaming LLM with Tool Calling
Groq provides an OpenAI-compatible chat completions API with streaming support. The key advantage is their speculative decoding inference, which delivers tokens at 1,665 tok/s -- fast enough that the LLM is never the bottleneck.
LLM Client
class GroqLLM:
def __init__(self):
self.session = None
async def _ensure_session(self):
if not self.session:
self.session = aiohttp.ClientSession()
async def generate(self, messages):
"""Stream completion tokens. Yields dicts with type 'text' or 'tool_call'."""
await self._ensure_session()
headers = {
"Authorization": f"Bearer {GROQ_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": GROQ_MODEL,
"messages": messages,
"tools": [BOOKING_TOOL],
"stream": True,
"temperature": 0.3,
"max_tokens": 250,
}
tool_calls = {}
t0 = time.monotonic()
first_token = True
async with self.session.post(
"https://api.groq.com/openai/v1/chat/completions",
headers=headers, json=payload,
) as resp:
if resp.status != 200:
body = await resp.text()
log.error("Groq API error %d: %s", resp.status, body[:500])
yield {"type": "text", "text": "Bear with me, I'm having a small technical issue."}
return
buffer = ""
async for raw_line in resp.content:
buffer += raw_line.decode()
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if not line or not line.startswith("data: "):
continue
if line == "data: [DONE]":
break
data = json.loads(line[6:])
choice = data["choices"][0]
delta = choice.get("delta", {})
# Text content -- yield immediately for token-streaming
content = delta.get("content")
if content:
if first_token:
log.info("LLM TTFT: %.0fms",
(time.monotonic() - t0) * 1000)
first_token = False
yield {"type": "text", "text": content}
# Tool calls -- accumulate arguments across chunks
if "tool_calls" in delta:
for tc in delta["tool_calls"]:
idx = tc["index"]
if idx not in tool_calls:
tool_calls[idx] = {
"id": tc.get("id", ""),
"name": tc["function"]["name"],
"arguments": "",
}
tool_calls[idx]["arguments"] += (
tc["function"].get("arguments", "")
)
# Tool call complete
if choice.get("finish_reason") == "tool_calls":
for tc in tool_calls.values():
yield {
"type": "tool_call",
"id": tc["id"],
"name": tc["name"],
"arguments": json.loads(tc["arguments"]),
}
Key Design Decisions
temperature: 0.3 -- Low temperature keeps responses predictable and on-script. Voice conversations need consistency; creative variation in responses confuses callers.
max_tokens: 250 -- Voice responses should be short. One sentence per turn. Capping tokens prevents the LLM from rambling. In practice, responses are 10-30 tokens.
stream: true -- Non-negotiable. Without streaming, you wait for the entire response before starting TTS. With streaming, TTS starts on the first token.
Error handling -- If the LLM API fails, the agent says "Bear with me, I'm having a small technical issue" instead of going silent. Always have a fallback utterance.
11. Cartesia Streaming TTS
Cartesia's Sonic-3 is the critical piece that enables sub-250ms latency. Its continuation API lets you stream tokens into an open WebSocket context, and Cartesia synthesizes them as a continuous audio stream with natural prosody.
TTS Client
class CartesiaTTS:
def __init__(self):
self.client = None
self.connection = None
async def connect(self):
"""Open persistent WebSocket (reused across utterances)."""
self.client = AsyncCartesia(api_key=CARTESIA_API_KEY)
self.connection = await self.client.tts.websocket_connect().__aenter__()
log.info("Cartesia TTS connected")
Two Modes of Operation
The TTS client supports two modes:
1. Single-shot synthesis -- for greetings and short confirmations where you have the complete text:
async def synthesize_streaming(self, text, audio_out_queue):
"""Send complete text, stream audio chunks to queue."""
ctx = self.connection.context()
await ctx.send(
model_id="sonic-3",
transcript=text,
voice={"mode": "id", "id": CARTESIA_VOICE_ID},
output_format={
"container": "raw",
"encoding": "pcm_s16le",
"sample_rate": 8000,
},
continue_=False, # Complete utterance
)
async for response in ctx.receive():
if response.type == "chunk" and response.audio:
pcm_bytes = response.audio
# Split into 320-byte chunks (20ms frames)
for i in range(0, len(pcm_bytes), CHUNK_SIZE):
chunk = pcm_bytes[i:i + CHUNK_SIZE]
if len(chunk) < CHUNK_SIZE:
chunk += b'\x00' * (CHUNK_SIZE - len(chunk))
await audio_out_queue.put(chunk)
2. Token-streaming synthesis -- the key innovation. Opens a TTS context that accepts individual LLM tokens and returns audio as a continuous stream:
async def stream_tokens(self, audio_out_queue, cancel_event):
"""Create a token-streaming TTS context.
Returns (ctx, receive_task) -- the caller pushes tokens into ctx,
and receive_task writes audio to audio_out_queue in the background.
"""
ctx = self.connection.context()
recv_task = asyncio.create_task(
self._receive_audio(ctx, audio_out_queue, cancel_event)
)
return ctx, recv_task
async def _receive_audio(self, ctx, audio_out_queue, cancel_event):
"""Background: receive audio from Cartesia, chunk and queue."""
first_audio = True
t0 = time.monotonic()
async for response in ctx.receive():
if cancel_event.is_set():
break
if response.type == "chunk" and response.audio:
if first_audio:
log.info("TTS TTFB: %.0fms", (time.monotonic() - t0) * 1000)
first_audio = False
pcm_bytes = response.audio
for i in range(0, len(pcm_bytes), CHUNK_SIZE):
if cancel_event.is_set():
return
chunk = pcm_bytes[i:i + CHUNK_SIZE]
if len(chunk) < CHUNK_SIZE:
chunk += b'\x00' * (CHUNK_SIZE - len(chunk))
await audio_out_queue.put(chunk)
The Continuation API
This is the most important API concept in the entire system. When you send a token with continue_=True, Cartesia treats it as part of an ongoing utterance:
# Each LLM token is sent immediately to Cartesia:
await ctx.send(
model_id="sonic-3",
transcript=token, # e.g., "I", " can", " get"
voice={"mode": "id", "id": VOICE_ID},
output_format={...},
continue_=True, # <-- This is the key
)
# When the LLM finishes:
await ctx.no_more_inputs()
With continue_=True:
- Cartesia buffers a few tokens internally to establish prosody context
- It then starts streaming audio while still accepting more tokens
- The audio sounds natural because Cartesia sees the context around each word
- When you call
no_more_inputs(), it flushes the remaining audio
Without continue_=True (sending each sentence separately):
- Each sentence starts from scratch prosodically
- You must wait for the complete sentence before synthesizing
- This adds 200-500ms per turn from sentence buffering alone
Barge-In Cancellation
When the caller interrupts, cancel the active TTS context to stop audio immediately:
async def cancel_context(self, ctx):
"""Cancel in-progress TTS for barge-in."""
await self.connection.send({
"context_id": ctx._context_id,
"cancel": True,
})
12. The Token-Streaming Pipeline
This is the core of the system -- the _think_and_speak method that ties LLM and TTS together in a real-time streaming pipeline.
async def _think_and_speak(self):
"""Stream LLM tokens directly to Cartesia TTS.
No sentence-boundary detection -- tokens flow straight through."""
full_response = []
t0 = time.monotonic()
self.barge_in_event.clear()
# Create a TTS streaming context
cancel_event = asyncio.Event()
ctx, recv_task = await self.tts.stream_tokens(
self.audio_out_queue, cancel_event
)
self._current_tts_ctx = ctx
self.is_speaking.set()
tts_voice = {"mode": "id", "id": CARTESIA_VOICE_ID}
tts_format = {
"container": "raw",
"encoding": "pcm_s16le",
"sample_rate": SAMPLE_RATE,
}
try:
# Stream LLM tokens directly to TTS
async for event in self.llm.generate(self.messages):
if self.barge_in_event.is_set():
log.info("Barge-in: aborting stream")
cancel_event.set()
break
if event["type"] == "text":
token = event["text"]
full_response.append(token)
# Send token directly to TTS (continuation mode)
await ctx.send(
model_id=CARTESIA_MODEL,
transcript=token,
voice=tts_voice,
output_format=tts_format,
continue_=True,
)
elif event["type"] == "tool_call":
# Handle tool call (see Section 15)
...
# LLM finished -- flush TTS
if not self.barge_in_event.is_set():
await ctx.no_more_inputs()
await recv_task # Wait for all audio to be received
# Wait for audio queue to drain (all chunks played)
while not self.audio_out_queue.empty():
await asyncio.sleep(0.02)
finally:
self.is_speaking.clear()
self._current_tts_ctx = None
await asyncio.sleep(0.1) # Brief pause before listening
return "".join(full_response)
Why This Is Fast
The traditional approach (used by most voice AI tutorials) looks like this:
User speaks -> STT -> [wait for full transcript] ->
LLM -> [wait for full response] ->
TTS -> [wait for full audio] ->
Play to user
Total: 1.5 - 3.0 seconds
Our approach pipelines everything:
User speaks -> STT (streaming) ->
|-> LLM token 1 -> TTS -> Audio chunk 1 -> Play
|-> LLM token 2 -> TTS -> Audio chunk 2 -> Play
|-> LLM token 3 -> TTS -> Audio chunk 3 -> Play
...
Total: 200-250ms to first audio
The LLM is still generating token 5 when the caller hears the audio from token 1. This is possible because:
- Groq's TTFT is ~30-50ms -- the first token arrives almost instantly
- Cartesia's continuation API accepts individual tokens without waiting for a complete sentence
- No sentence buffering -- we removed the
_split_on_sentence_boundary()logic that was adding 200-500ms
13. Barge-In and Interruption Handling
Barge-in is when the caller starts speaking while the agent is still talking. Real humans do this constantly. If your voice agent cannot handle it, the experience feels robotic.
How Barge-In Works
Timeline:
Agent speaking: "I can get a plumber out to you within---"
Caller speaks: "yeah how much?"
Agent detects: [RMS > 800 for 300ms]
Agent stops: [audio queue cleared]
Agent listens: "yeah how much?"
Agent responds: "There's a 49 pound callout..."
Implementation: Energy-Based VAD
We use a simple but effective approach: monitor the RMS (root mean square) energy of incoming audio while the agent is speaking. If the energy exceeds a threshold for a sustained duration, the caller is talking.
# Constants
BARGEIN_RMS_THRESHOLD = 800 # Energy threshold (0-32768 range for 16-bit audio)
BARGEIN_DURATION = 0.3 # Sustained speech duration in seconds
In the _audio_reader method, when is_speaking is set:
rms = audioop.rms(payload, 2) # Calculate RMS of 20ms frame
if rms > BARGEIN_RMS_THRESHOLD:
if speech_energy_start is None:
speech_energy_start = time.monotonic()
elif time.monotonic() - speech_energy_start >= BARGEIN_DURATION:
# Confirmed barge-in
self.barge_in_event.set()
speech_energy_start = None
# 1. Clear the audio output queue (stop pending playback)
while not self.audio_out_queue.empty():
self.audio_out_queue.get_nowait()
# 2. Cancel the active TTS context
if self._current_tts_ctx:
await self.tts.cancel_context(self._current_tts_ctx)
# 3. Resume STT (start listening again)
self.is_speaking.clear()
await self.stt.send_audio(payload)
else:
speech_energy_start = None # Reset if energy drops below threshold
Tuning Barge-In Parameters
| Parameter | Too Low | Too High | Recommended |
|---|---|---|---|
BARGEIN_RMS_THRESHOLD |
False positives from background noise | Misses soft-spoken callers | 600-1000 |
BARGEIN_DURATION |
Triggers on coughs/clicks | Slow to detect interruptions | 0.2-0.4s |
Testing tip: Log the RMS values during real calls to calibrate:
if rms > 200: # Log anything above ambient noise
log.debug("RMS during speech: %d", rms)
Why Not WebRTC VAD or Silero?
More sophisticated VAD models (WebRTC VAD, Silero VAD) could distinguish speech from noise better. But for barge-in detection:
- We do not need to distinguish speech from noise perfectly -- a false positive just means the agent pauses briefly
- RMS calculation on a 20ms frame takes microseconds; neural VAD takes milliseconds
- The 300ms sustained-energy requirement filters out most false positives (coughs, clicks, brief noise)
- Simpler code means fewer failure modes in production
If you deploy in noisy environments (call centers, outdoors), consider upgrading to Silero VAD.
14. Conversation State Machine
The voice agent follows an 8-step workflow. Rather than implementing an explicit state machine in code, we encode the workflow entirely in the LLM's system prompt and let the model manage state transitions naturally.
The Workflow
Step 1: GREET
"Hello, [Company Name], good [morning/afternoon/evening]."
(Then silence -- wait for caller to state their problem)
|
v
Step 2: UNDERSTAND
Caller states problem. Agent asks ONE follow-up if needed.
"Is it leaking now?" / "Whole house or just one room?"
|
v
Step 3: QUOTE
"I can get a [trade] out to you within thirty minutes to an hour.
There's a [fee] pound callout. He'll quote on-site before starting."
(Wait for agreement)
|
v
Step 4: POSTCODE
"What's your postcode?"
(Confirm with phonetic alphabet)
|
v
Step 5: ADDRESS
"And the house number and street?"
|
v
Step 6: NAME
"And your name?"
|
v
Step 7: BOOK
[Tool call: create_booking]
(Automatic -- no user-facing output)
|
v
Step 8: CONFIRM
"That's booked. The [trade] will be with you within the hour.
Thanks for calling."
System Prompt Design
The system prompt is the most important piece of the entire system. It defines the agent's personality, workflow, and constraints. Here is the template:
SYSTEM_PROMPT_TEMPLATE = """You work at {company_name}, a UK {trade_type} \
company. You answer the phone. Casual British English -- "no worries", \
"lovely", "bear with me". Short replies, 1 sentence max. Never sound scripted.
Never use American English. Never say "sir", "madam", "dear" -- you don't \
know who's calling. Never say "How can I help you?" -- they're calling you, \
they'll tell you.
# Context
- Company: {company_name}
- Trade: {trade_type} / {trade_label}
- Callout: {callout_fee}
- Caller: {caller_id}
- Repeat: {is_repeat}
# Workflow -- follow this exact order, one step per reply
Step 1: Greet. Say "Hello, {company_name}, good [morning/afternoon/evening]."
Then shut up and listen.
Step 2: They tell you the problem. Ask ONE quick follow-up if needed.
Then move on. Don't interrogate.
Step 3: Quote. "I can get a {trade_label} out to you within thirty minutes
to an hour. There's a {callout_fee} pound callout for the {trade_label} to
come out, then he'll quote you on-site before starting anything."
Stop here. Wait for agreement.
Step 4: Postcode. "What's your postcode?" Confirm with phonetic alphabet.
Step 5: Address. "And the house number and street?"
Step 6: Name. "And your name?"
Step 7: Book using create_booking tool. Use caller_id from context -- never
ask for phone number.
Step 8: "That's booked. The {trade_label} will be with you within the hour.
Thanks for calling."
# NEVER do
- Never combine steps -- one thing per reply
- Never ask for phone number
- Never repeat their name back
- Never summarise at the end
- Never quote labour -- only callout fee
- Never give exact time -- "thirty minutes to an hour"
- Never say "call centre", "AI", or system names
# Objections
- Want total price: "Every job's different -- the {trade_label} will quote
on-site before starting. No obligation."
- Too expensive: "No worries. Thanks for calling."
- Are you local: "Yes, we've got someone nearby."
- Abusive: "I'm not able to help further on this call."
"""
Why Prompt-Based State Management Works
You might expect to need an explicit state machine (enum of states, transition functions, etc.). In practice, the LLM handles this naturally when the prompt is well-structured:
- One step per reply -- this constraint prevents the LLM from jumping ahead
- Explicit ordering -- numbered steps with clear triggers for each transition
- Negative constraints -- "NEVER do" rules prevent common LLM failure modes
- Objection handling -- predefined responses for common edge cases keep the LLM on track
The LLM sees the full conversation history and naturally progresses through steps. If the caller says "yes" after the quote, the LLM moves to step 4 (postcode). If they say "how much total?", the LLM handles the objection before continuing.
Dynamic Greeting Based on Time of Day
The greeting is generated in code (not by the LLM) for consistency and speed:
import datetime
hour = datetime.datetime.now().hour
tod = "good morning" if hour < 12 else "good afternoon" if hour < 18 else "good evening"
company = self.call_context.get("company_name", "Home Services")
greeting = f"Hello, {company}, {tod}."
await self._speak(greeting)
self.messages.append({"role": "assistant", "content": greeting})
This saves one LLM round trip. The agent speaks the greeting immediately via single-shot TTS while the first LLM call is reserved for responding to what the caller actually says.
15. Tool Calling Integration
When the agent has collected all required information (name, postcode, address, problem), the LLM invokes the create_booking tool to record the booking in your backend system.
Tool Definition
BOOKING_TOOL = {
"type": "function",
"function": {
"name": "create_booking",
"description": (
"Create a job booking after collecting customer name, postcode, "
"address, and problem description. Call this ONLY after all "
"details are collected."
),
"parameters": {
"type": "object",
"properties": {
"customer_name": {
"type": "string",
"description": "Customer full name",
},
"postcode": {
"type": "string",
"description": "UK postcode with space",
},
"address": {
"type": "string",
"description": "Full street address",
},
"problem_description": {
"type": "string",
"description": "One-line summary of the issue",
},
},
"required": [
"customer_name", "postcode",
"address", "problem_description",
],
},
},
}
Handling Tool Calls in the Pipeline
Tool calls interrupt the token-streaming pipeline. When the LLM decides to call a tool, it stops generating text tokens and instead outputs the tool call arguments. Here is how the agent handles this:
# Inside _think_and_speak:
elif event["type"] == "tool_call":
# 1. Close current TTS context (no more text to speak)
await ctx.no_more_inputs()
await recv_task
self.is_speaking.clear()
self._current_tts_ctx = None
if event["name"] == "create_booking":
# 2. Call the booking API
result = await self.create_booking(event["arguments"])
# 3. Add tool call + result to message history
self.messages.append({
"role": "assistant",
"content": None,
"tool_calls": [{
"id": event["id"],
"type": "function",
"function": {
"name": "create_booking",
"arguments": json.dumps(event["arguments"]),
},
}],
})
self.messages.append({
"role": "tool",
"tool_call_id": event["id"],
"content": json.dumps(result or {"success": True}),
})
# 4. Get confirmation response from LLM
confirm_parts = []
async for ev in self.llm.generate(self.messages):
if ev["type"] == "text":
confirm_parts.append(ev["text"])
confirmation = "".join(confirm_parts)
if confirmation.strip():
await self._speak(confirmation.strip())
Booking API Implementation
The booking API is a simple PHP endpoint (or any backend you prefer):
<?php
// create_booking.php -- Receive booking from voice agent
header('Content-Type: application/json');
// Verify API key
$api_key = $_SERVER['HTTP_X_API_KEY'] ?? '';
if ($api_key !== getenv('VOICE_AGENT_API_KEY')) {
http_response_code(401);
echo json_encode(['error' => 'Unauthorized']);
exit;
}
$data = json_decode(file_get_contents('php://input'), true);
// Validate required fields
$required = ['customer_name', 'postcode', 'address', 'problem_description'];
foreach ($required as $field) {
if (empty($data[$field])) {
http_response_code(400);
echo json_encode(['error' => "Missing field: $field"]);
exit;
}
}
// Insert into database, send notifications, etc.
// ... your business logic here ...
echo json_encode([
'success' => true,
'booking_id' => uniqid('BK-'),
'message' => 'Booking created successfully',
]);
Adding More Tools
To add more tools (e.g., check availability, look up customer history), define them the same way and add them to the tools array in the LLM request:
CHECK_AVAILABILITY_TOOL = {
"type": "function",
"function": {
"name": "check_availability",
"description": "Check engineer availability for a given postcode area",
"parameters": {
"type": "object",
"properties": {
"postcode": {"type": "string"},
"urgency": {"type": "string", "enum": ["emergency", "standard"]},
},
"required": ["postcode"],
},
},
}
# In the LLM payload:
"tools": [BOOKING_TOOL, CHECK_AVAILABILITY_TOOL],
16. DID-to-Company Context API
A single voice agent can serve multiple companies by looking up which company owns the phone number (DID) that was called. When a caller dials a plumber's number, the agent greets them as that plumber. When they dial an electrician's number, same agent, different persona.
Context API Endpoint
<?php
// did_context.php -- Return company context for a DID
header('Content-Type: application/json');
$api_key = $_SERVER['HTTP_X_API_KEY'] ?? '';
if ($api_key !== getenv('VOICE_AGENT_API_KEY')) {
http_response_code(401);
echo json_encode(['error' => 'Unauthorized']);
exit;
}
$data = json_decode(file_get_contents('php://input'), true);
$did = $data['did_number'] ?? '';
$cli = $data['caller_id'] ?? '';
// Look up company by DID
// This could be a database query, config file, etc.
$companies = [
'02012345678' => [
'company_name' => 'Quick Fix Plumbing',
'trade_type' => 'plumbing',
'trade_label' => 'plumber',
'callout_fee' => 49,
'area' => 'London',
],
'01234567890' => [
'company_name' => 'Spark Electrical',
'trade_type' => 'electrical',
'trade_label' => 'electrician',
'callout_fee' => 59,
'area' => 'Manchester',
],
];
$company = $companies[$did] ?? [
'company_name' => 'Home Services',
'trade_type' => 'general maintenance',
'trade_label' => 'engineer',
'callout_fee' => 49,
'area' => 'your area',
];
// Check if repeat caller
$is_repeat = false;
// ... query your database for previous calls from $cli ...
$company['caller_id'] = $cli;
$company['is_repeat'] = $is_repeat;
$company['did_number'] = $did;
$company['greeting'] = "Hello, {$company['company_name']}, how can I help you?";
echo json_encode($company);
Agent-Side Context Fetching
async def get_call_context(self, did, cli):
"""Fetch company context from our API."""
try:
async with aiohttp.ClientSession() as session:
async with session.post(
CONTEXT_API_URL,
json={"did_number": did, "caller_id": cli},
headers={"X-API-Key": CONTEXT_API_KEY},
timeout=aiohttp.ClientTimeout(total=3),
) as resp:
if resp.status == 200:
return await resp.json()
except Exception as e:
log.warning("Context API error: %s", e)
# Fallback defaults if API fails
return {
"company_name": "Home Services",
"trade_type": "plumbing",
"trade_label": "plumber",
"callout_fee": 49,
"caller_id": cli,
"is_repeat": False,
"greeting": "Hello, how can I help you?",
}
The 3-second timeout ensures the call is not delayed if the API is slow. The fallback defaults mean the agent can always answer, even if the context API is down.
17. Systemd Service Setup
Run the voice agent as a systemd service for automatic startup and restart.
Environment File
Create /opt/voice-agent/voice_agent.env:
# Voice Agent API Keys -- chmod 600 this file
GROQ_API_KEY="gsk_YOUR_GROQ_API_KEY_HERE"
GROQ_MODEL="llama-3.3-70b-specdec"
DEEPGRAM_API_KEY="YOUR_DEEPGRAM_API_KEY_HERE"
CARTESIA_API_KEY="sk_car_YOUR_CARTESIA_KEY_HERE"
CARTESIA_VOICE_ID="a01c369f-6d2d-4185-bc20-b32c225eab70"
CARTESIA_MODEL="sonic-3"
CONTEXT_API_URL="http://127.0.0.1/api/did_context.php"
CONTEXT_API_KEY="YOUR_INTERNAL_API_KEY_HERE"
BOOKING_API_URL="http://127.0.0.1/api/create_booking.php"
Secure it:
chmod 600 /opt/voice-agent/voice_agent.env
Service File
Create /etc/systemd/system/voice-agent.service:
[Unit]
Description=AI Voice Agent (AudioSocket + Deepgram STT + Groq LLM + Cartesia TTS)
After=network.target asterisk.service
[Service]
Type=simple
ExecStart=/usr/bin/python3.11 /opt/voice-agent/agent.py
WorkingDirectory=/opt/voice-agent
Restart=always
RestartSec=3
StandardOutput=journal
StandardError=journal
SyslogIdentifier=voice-agent
[Install]
WantedBy=multi-user.target
Enable and Start
systemctl daemon-reload
systemctl enable voice-agent
systemctl start voice-agent
Verify It Is Running
systemctl status voice-agent
● voice-agent.service - AI Voice Agent (AudioSocket + Deepgram STT + Groq LLM + Cartesia TTS)
Loaded: loaded (/etc/systemd/system/voice-agent.service; enabled)
Active: active (running) since ...
View Logs
# Live logs
journalctl -u voice-agent -f
# Last 100 lines
journalctl -u voice-agent -n 100
Restart After Code Changes
systemctl restart voice-agent
18. Latency Optimization Deep Dive
Achieving sub-250ms mouth-to-ear latency requires optimizing every stage of the pipeline. Here is what matters most, in order of impact.
1. Token-Streaming (Biggest Win: -500ms to -1.5s)
The single most impactful optimization. Instead of buffering the LLM's complete response and then synthesizing, stream each token directly to TTS.
Before (sentence buffering):
LLM generates "I can get a plumber out to you." -> 400ms
TTS synthesizes entire sentence -> 300ms
Total: 700ms before caller hears anything
After (token-streaming):
LLM token "I" -> TTS starts synthesizing -> 80ms
LLM token " can" -> TTS continues
LLM token " get" -> Caller already hearing "I can..."
Total: ~80ms before caller hears first syllable
2. Deepgram Endpointing Tuning (-100ms to -300ms)
The endpointing parameter controls how long Deepgram waits after the caller stops speaking before finalizing the transcript.
endpointing=500 (default) -- 500ms of silence before finalizing
endpointing=300 (our setting) -- 300ms, more responsive
endpointing=150 -- Very aggressive, may cut off mid-sentence
Start at 300ms. Go lower only if your callers speak in short, decisive phrases.
3. Choose the Right Groq Model (-200ms+ TTFT)
| Model | TTFT | Throughput | Quality |
|---|---|---|---|
llama-3.3-70b-versatile |
~100ms | 276 tok/s | Excellent |
llama-3.3-70b-specdec |
~30-50ms | 1,665 tok/s | Identical |
llama-3.1-8b-instant |
~20ms | 3,000+ tok/s | Good |
The specdec variant gives identical output quality to versatile at 6x the speed. Use it.
4. Keep TTS WebSocket Alive (Save ~200ms Per Turn)
Opening a new WebSocket connection for each utterance adds 100-200ms of TLS handshake overhead. Keep the Cartesia WebSocket open for the duration of the call:
# Connect once at call start
await self.tts.connect()
# Reuse for every utterance
ctx = self.connection.context() # New context, same connection
5. Native 8kHz PCM Output (Save CPU + ~20ms)
Cartesia supports native 8kHz PCM output. If you use a TTS that outputs 24kHz or 44.1kHz, you must resample to 8kHz for telephony:
# BAD: Receive 24kHz, resample to 8kHz
import audioop
audio_8k = audioop.ratecv(audio_24k, 2, 1, 24000, 8000, None)[0]
# GOOD: Request 8kHz natively from Cartesia
output_format={
"container": "raw",
"encoding": "pcm_s16le",
"sample_rate": 8000, # Native 8kHz -- no resampling needed
}
6. Short System Prompts (-10ms to -30ms TTFT)
Groq's TTFT increases with prompt length. Keep your system prompt under 500 tokens. Remove verbose instructions and use concise rules.
7. Limit max_tokens (Save Tail Latency)
Set max_tokens: 250 to prevent the LLM from generating unnecessarily long responses. Voice responses should be one sentence. If the LLM rambles, the caller hears a monologue and disengages.
8. Pre-Connect WebSockets
Connect Deepgram and Cartesia WebSockets immediately after receiving the call, before speaking the greeting:
# Connect in parallel
await asyncio.gather(
self.stt.connect(),
self.tts.connect(),
)
# Both ready -- no connection delay on first interaction
Latency Budget Summary
| Component | Time | Notes |
|---|---|---|
| STT finalization | 150-200ms | Depends on endpointing setting |
| Groq TTFT | 30-50ms | With specdec variant |
| Network (server to Groq) | 10-30ms | Depends on server location |
| Cartesia TTFB | 50-80ms | From first token to first audio |
| AudioSocket frame | 20ms | One 20ms frame |
| Total | ~200-250ms | Mouth-to-ear |
19. Troubleshooting
Audio Issues
Problem: Caller hears garbled/distorted audio
Cause: Audio format mismatch. Asterisk sends 8kHz 16-bit signed-integer PCM (slin). If your TTS outputs a different format, the audio will be garbled.
Fix: Verify your TTS output format matches exactly:
output_format={
"container": "raw",
"encoding": "pcm_s16le", # Signed 16-bit little-endian
"sample_rate": 8000, # 8kHz, not 16kHz or 24kHz
}
Problem: Audio sounds choppy or has gaps
Cause: Audio writer is not pacing at real-time rate.
Fix: Ensure you sleep for exactly one chunk duration (20ms) between writes:
await asyncio.sleep(CHUNK_DURATION) # 0.02 seconds
Problem: Caller hears echo of their own voice
Cause: The audio reader is forwarding caller audio back through the TTS output path.
Fix: Only forward audio to STT when the agent is not speaking (is_speaking flag).
Problem: Silence at the start of the call
Cause: Answer() not called before AudioSocket() in the dialplan, or the TTS connection is slow to establish.
Fix: Ensure Answer() is the first action, and pre-connect TTS during AGI execution.
Connection Issues
Problem: Deepgram STT connection closed unexpectedly
Cause: Deepgram closes idle connections after ~30 seconds.
Fix: Send keepalive messages every 8-10 seconds:
await self.ws.send(json.dumps({"type": "KeepAlive"}))
Problem: Groq API error 429: Rate limit exceeded
Cause: Too many concurrent requests or tokens per minute.
Fix: Implement retry with exponential backoff, or upgrade your Groq plan. For production, consider having a fallback model (e.g., Groq's 8B model or a local Ollama instance).
Problem: AudioSocket connection refused
Cause: Voice agent is not running or is listening on the wrong port.
Fix:
# Check if agent is listening
ss -tlnp | grep 9099
# Check service status
systemctl status voice-agent
# Check logs for startup errors
journalctl -u voice-agent -n 50
Latency Issues
Problem: Latency is 1-2 seconds instead of 200-250ms
Common causes:
- Sentence buffering -- You are waiting for complete sentences before sending to TTS. Use token-streaming instead.
- Wrong Groq model -- Using
versatileinstead ofspecdec. Check yourGROQ_MODELsetting. - TTS reconnecting -- Creating a new WebSocket for each utterance. Keep the connection alive.
- High endpointing -- Deepgram's
endpointingis set too high. Try 300ms.
Problem: First response is slow, subsequent ones are fast
Cause: WebSocket connections being established on first use.
Fix: Pre-connect both Deepgram and Cartesia WebSockets at call start (before the greeting).
Barge-In Issues
Problem: Agent does not stop when caller interrupts
Cause: BARGEIN_RMS_THRESHOLD is too high, or BARGEIN_DURATION is too long.
Fix: Lower the threshold (try 500) and shorten the duration (try 0.2s). Log RMS values to calibrate:
log.debug("Barge-in check: RMS=%d threshold=%d", rms, BARGEIN_RMS_THRESHOLD)
Problem: Agent stops talking due to background noise
Cause: BARGEIN_RMS_THRESHOLD is too low.
Fix: Increase the threshold (try 1000-1200). Test with actual phone calls, not just headset mic.
Metadata Issues
Problem: Agent greets with wrong company name or "Home Services" default
Cause: The context API is not returning the correct DID mapping, or the CALLED variable is not set in the dialplan.
Fix:
# Test the context API directly
curl -X POST http://127.0.0.1/api/did_context.php \
-H "X-API-Key: YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{"did_number":"02012345678","caller_id":"07700900000"}'
Check that Set(CALLED=${EXTEN}) is in the dialplan before the Goto(voice-agent,...).
20. Performance Benchmarks
Real-world measurements from production calls:
Latency (Measured at Component Level)
| Metric | P50 | P90 | P99 |
|---|---|---|---|
| STT final transcript | 170ms | 220ms | 350ms |
| LLM TTFT (Groq specdec) | 35ms | 55ms | 120ms |
| TTS TTFB (Cartesia) | 60ms | 85ms | 150ms |
| Total mouth-to-ear | 220ms | 280ms | 400ms |
Resource Usage
| Metric | Value |
|---|---|
| Memory per call | ~30MB |
| CPU per call | ~5% of one core |
| Concurrent calls tested | 10 simultaneous |
| WebSocket connections per call | 2 (Deepgram + Cartesia) |
| HTTP connections per call | 1 (Groq) + 1-2 (context/booking APIs) |
Cost Per Call (Average 3-minute call)
| Service | Cost |
|---|---|
| Deepgram STT | ~$0.013 |
| Groq LLM | ~$0.002 |
| Cartesia TTS | ~$0.005 |
| Total | ~$0.02/call |
At $0.02 per call, the AI agent costs less per call than the electricity to keep a human agent's monitor on.
21. Production Considerations
Concurrent Call Handling
Each call creates a new VoiceAgent instance with its own asyncio tasks. The TCP server handles multiple concurrent connections natively. Test with your expected peak concurrency.
For high concurrency (50+ simultaneous calls), consider:
- Running multiple agent processes behind a load balancer
- Connection pooling for the Groq HTTP client
- Monitoring WebSocket connection limits on Deepgram and Cartesia
Call Recording
AudioSocket gives you raw PCM audio in both directions. To record calls, fork the audio streams to a file:
# In _audio_reader:
if self.recording_file:
self.recording_file.write(payload) # Caller's audio
# In _audio_writer:
if self.recording_file:
self.recording_file.write(chunk) # Agent's audio
Graceful Shutdown
Handle SIGTERM to close active calls cleanly:
import signal
def handle_shutdown(signum, frame):
log.info("Shutdown signal received")
# The asyncio server will stop accepting new connections
# Active calls will finish naturally
raise SystemExit(0)
signal.signal(signal.SIGTERM, handle_shutdown)
Monitoring
Log structured metrics for each call:
log.info("CALL_METRICS uuid=%s duration=%.1f turns=%d bargein_count=%d "
"avg_latency_ms=%d booking=%s",
self.call_uuid, duration, turn_count, bargein_count,
avg_latency, booking_created)
Feed these into Prometheus/Grafana for dashboards tracking:
- Average response latency per turn
- Barge-in frequency (indicates latency or verbosity issues)
- Booking conversion rate
- Call duration distribution
- Error rate by component (STT/LLM/TTS failures)
Fallback Strategy
What happens when an API goes down?
| Component Down | Fallback |
|---|---|
| Deepgram | Try Google STT or buffer audio and retry |
| Groq | Fall back to llama-3.1-8b-instant (faster, less capable) or local Ollama |
| Cartesia | Fall back to a pre-recorded "Sorry, we're experiencing issues" message |
| Context API | Use hardcoded defaults (generic company name) |
| Booking API | Log booking details locally, process later |
Security
- Run the voice agent on
127.0.0.1only -- never expose port 9099 externally - Store API keys in the environment file with
chmod 600 - Rate-limit your context and booking APIs
- Sanitize any data from the LLM before inserting into your database (the LLM could hallucinate SQL injection if a caller says something adversarial)
- Log calls but do not log API keys or full audio to external services
Summary
The key principles behind this system:
Stream everything. Never wait for a complete result when you can process partial results. This applies to STT (streaming transcripts), LLM (streaming tokens), and TTS (continuation API).
Pick components for latency, not features. ElevenLabs sounds slightly better than Cartesia in a side-by-side comparison. But Cartesia's continuation API and native 8kHz output cut 300ms off every response. For voice, speed wins.
Keep the LLM on a tight leash. Short system prompts, low temperature, max_tokens cap, explicit workflow steps, and comprehensive "NEVER do" rules. The LLM is a tool, not an autonomous agent.
Handle interruptions. Barge-in is not optional. Real callers interrupt constantly. Energy-based VAD is simple, fast, and good enough.
Fail gracefully. Every external API will fail eventually. Have fallbacks for each component and always have a "sorry, technical issue" utterance ready.
The complete system runs as a single Python file (1,000 lines), a Perl AGI script (30 lines), and six lines of Asterisk dialplan. Total infrastructure cost: ~$0.02 per call.
Built and tested in production handling real inbound calls for UK home services companies. If you have questions or want to discuss implementation details, reach out.