← All Tutorials

Building a Real-Time AI Voice Agent for Asterisk

AI & Voice Agents Advanced 44 min read #03

Building a Real-Time AI Voice Agent for Asterisk

AudioSocket + Deepgram STT + Groq LLM + Cartesia TTS

A production-tested guide to building a voice AI agent that handles live phone calls through Asterisk with sub-250ms response latency.


Table of Contents

  1. Why AI Voice Agents Matter
  2. Architecture Overview
  3. Prerequisites
  4. Component Selection: Why This Stack
  5. Asterisk Configuration
  6. AGI Script: Session Bootstrapping
  7. Python Voice Agent: Core Structure
  8. AudioSocket Protocol Implementation
  9. Deepgram Streaming STT
  10. Groq Streaming LLM with Tool Calling
  11. Cartesia Streaming TTS
  12. The Token-Streaming Pipeline
  13. Barge-In and Interruption Handling
  14. Conversation State Machine
  15. Tool Calling Integration
  16. DID-to-Company Context API
  17. Systemd Service Setup
  18. Latency Optimization Deep Dive
  19. Troubleshooting
  20. Performance Benchmarks
  21. Production Considerations

1. Why AI Voice Agents Matter

Every missed phone call is lost revenue. For home services companies -- plumbers, electricians, locksmiths -- a single missed call can mean a lost booking worth hundreds of pounds. Human agents cost money, need shifts covered, and still miss calls at 2 AM.

AI voice agents solve this by answering every call, instantly, 24/7. But the bar is high: callers expect a natural conversation, not a clunky IVR menu. They expect to be heard, understood, and helped in real time. If the agent pauses for two seconds after every sentence, callers hang up.

This tutorial documents a production system that achieves sub-250ms mouth-to-ear latency -- the time from when the caller finishes speaking to when they hear the first syllable of the agent's response. That is faster than most human agents. The key insight that makes this possible: token-streaming. Instead of waiting for the LLM to generate a complete sentence, we pipe each token directly into the TTS engine as it arrives. The caller hears the response as the AI is still thinking.

What You Will Build

A Python voice agent that:


2. Architecture Overview

                          TELEPHONE NETWORK
                                |
                           SIP Trunk
                                |
                    +-----------v-----------+
                    |      ASTERISK PBX      |
                    |                        |
                    |  1. Answer()           |
                    |  2. AGI(setup.agi)     |
                    |     - Generate UUID    |
                    |     - Write metadata   |
                    |  3. AudioSocket(       |
                    |       127.0.0.1:9099)  |
                    +-----------+------------+
                                |
                    TCP (AudioSocket Protocol)
                    8kHz 16-bit PCM, 20ms frames
                                |
                    +-----------v------------+
                    |   PYTHON VOICE AGENT    |
                    |   (asyncio TCP server)  |
                    |                         |
                    |  +-------------------+  |
                    |  | Audio Reader      |  |    +------------------+
          Caller    |  | - Read PCM frames |--------> Deepgram Nova-3 |
          speaks    |  | - Barge-in VAD    |  |    | Streaming STT    |
                    |  +-------------------+  |    | (WebSocket)      |
                    |                         |    +--------+---------+
                    |                         |             |
                    |                         |        Transcript
                    |                         |             |
                    |  +-------------------+  |    +--------v---------+
                    |  | Conversation      |  |    | Groq Llama 3.3   |
                    |  | Manager           |<------| 70B specdec      |
                    |  | - State machine   |  |    | Streaming LLM    |
                    |  | - Message history |  |    | (HTTP SSE)       |
                    |  | - Tool calls      |  |    +--------+---------+
                    |  +-------------------+  |             |
                    |                         |       Token stream
                    |                         |             |
                    |  +-------------------+  |    +--------v---------+
          Caller    |  | Audio Writer      |  |    | Cartesia Sonic-3 |
          hears  <-----| - Queue playback  |<------| Streaming TTS    |
                    |  | - 20ms pacing     |  |    | (WebSocket)      |
                    |  +-------------------+  |    +------------------+
                    |                         |
                    +-------------------------+

    Latency Budget (target: <250ms mouth-to-ear):
    ┌──────────────────────────────────────────────────────┐
    │ Deepgram STT final transcript:    ~150-200ms        │
    │ Groq LLM first token (TTFT):     ~30-50ms          │
    │ Cartesia TTS first audio (TTFB): ~50-80ms          │
    │ AudioSocket frame transmission:  ~20ms (1 frame)   │
    │                                                      │
    │ TOTAL:                            ~200-250ms        │
    └──────────────────────────────────────────────────────┘

Data Flow Summary

  1. Caller speaks into their phone. Audio arrives at Asterisk as RTP.
  2. Asterisk converts RTP to raw PCM and sends it over a TCP socket (AudioSocket protocol) to the Python agent.
  3. Python agent forwards PCM frames to Deepgram's streaming STT WebSocket.
  4. Deepgram returns transcript fragments. When the caller finishes speaking (speech_final), the agent has the complete utterance.
  5. Agent sends the conversation history to Groq's LLM API with streaming enabled.
  6. Each LLM token is immediately forwarded to Cartesia's TTS WebSocket (continuation API).
  7. Cartesia returns PCM audio chunks, which are queued and written back through AudioSocket to Asterisk.
  8. Asterisk converts PCM back to RTP and sends it to the caller's phone.

The critical design choice: steps 6 and 7 happen concurrently. The TTS starts synthesizing audio from the first few tokens while the LLM is still generating the rest of the sentence. This is what cuts the latency from 1-2 seconds down to 200-250ms.


3. Prerequisites

Server Requirements

API Accounts

Service Purpose Pricing Model
Deepgram Speech-to-text (Nova-3) Pay-per-minute (~$0.0043/min)
Groq LLM inference (Llama 3.3 70B) Pay-per-token (free tier available)
Cartesia Text-to-speech (Sonic-3) Pay-per-character

Python Dependencies

pip install websockets aiohttp cartesia

Verify Asterisk AudioSocket Module

asterisk -rx "module show like audiosocket"

Expected output:

Module              Description                       Use Count  Status
res_audiosocket.so  AudioSocket support               0          Running
app_audiosocket.so  AudioSocket application           0          Running
2 modules loaded

If not loaded:

asterisk -rx "module load res_audiosocket.so"
asterisk -rx "module load app_audiosocket.so"

Add to /etc/asterisk/modules.conf to load on startup:

load = res_audiosocket.so
load = app_audiosocket.so

4. Component Selection: Why This Stack

Building this system involved testing multiple providers for each component. Here is what we learned and why we chose this specific stack.

STT: Deepgram Nova-3

Why not Google/AWS/Azure STT? Cloud STT services from the big three add 300-500ms of latency due to their general-purpose architecture. Deepgram is purpose-built for real-time streaming and returns transcript fragments within 150-200ms.

Why Nova-3 specifically?

LLM: Groq Llama 3.3 70B (specdec)

Why not OpenAI GPT-4? Latency. GPT-4 time-to-first-token (TTFT) is 500ms-2s. Groq's speculative decoding delivers the first token in 30-50ms and sustains 1,665 tokens/second. For voice, speed matters more than the last 5% of intelligence.

Why Llama 3.3 70B over smaller models? The 70B model handles nuanced conversation flow (objection handling, knowing when NOT to say something) significantly better than 8B models. At 1,665 tok/s on Groq, you get the quality of a large model at small-model speeds.

Why specdec variant? Speculative decoding uses a smaller draft model to predict tokens, then the large model verifies them in parallel. This gives 6x throughput compared to the standard versatile variant (1,665 vs 276 tok/s) with identical output quality.

TTS: Cartesia Sonic-3

Why not ElevenLabs? We started with ElevenLabs Flash v2. It produces excellent voice quality, but:

Why Cartesia?

Protocol: AudioSocket

Why not ARI/AMI/AGI for audio?

AudioSocket frame format:

+--------+--------+--------+--- ... ---+
| Type   | Length (big-endian)| Payload  |
| 1 byte | 2 bytes           | N bytes  |
+--------+--------+--------+--- ... ---+

Three frame types matter:


5. Asterisk Configuration

Dialplan (extensions.conf or customexte.conf)

The dialplan routes inbound calls to the voice agent. The flow is: answer the call, run the AGI script to generate a UUID and write metadata, then connect AudioSocket to the Python agent.

; ─── AI Voice Agent ─────────────────────────────────────────────
; Route inbound DID calls to the AI voice agent.
; The CALLED variable must be set by the inbound trunk context
; before transferring here (e.g., Set(CALLED=${EXTEN})).

[voice-agent]
exten => voice_agent,1,NoOp(VOICE AGENT: DID=${CALLED} CLI=${CALLERID(num)})
same => n,Answer()
same => n,AGI(voice_agent_setup.agi)
same => n,AudioSocket(${VA_UUID},127.0.0.1:9099)
same => n,NoOp(VOICE AGENT ENDED: ${DIALSTATUS})
same => n,Hangup()

Key points:

Routing Inbound DIDs to the Agent

In your inbound trunk context, route the call to the voice_agent extension:

[from-trunk]
; Match all inbound DIDs and route to voice agent
exten => _X.,1,NoOp(Inbound call to ${EXTEN} from ${CALLERID(num)})
same => n,Set(CALLED=${EXTEN})
same => n,Goto(voice-agent,voice_agent,1)

Or for specific DIDs only:

[from-trunk]
; Route specific DIDs to voice agent, others to normal handling
exten => 02012345678,1,Set(CALLED=${EXTEN})
same => n,Goto(voice-agent,voice_agent,1)

exten => 02087654321,1,Set(CALLED=${EXTEN})
same => n,Goto(voice-agent,voice_agent,1)

; Default: normal call handling
exten => _X.,1,Goto(default,${EXTEN},1)

6. AGI Script: Session Bootstrapping

The AGI script runs before AudioSocket connects. It generates a UUID for the call session and writes caller metadata to a JSON file that the Python agent reads.

Why a separate AGI script?

Asterisk's AudioSocket() application sends the UUID as the first frame over TCP. The Python agent receives this UUID and needs to look up call metadata (which DID was called, the caller's number). The AGI script bridges this gap: it generates the UUID, sets it as a channel variable for AudioSocket, and writes the metadata to a file the Python agent can read.

voice_agent_setup.agi

Save to /var/lib/asterisk/agi-bin/voice_agent_setup.agi (or /usr/share/asterisk/agi-bin/):

#!/usr/bin/perl
# voice_agent_setup.agi
# Generate UUID and write metadata JSON for the voice agent.
# Called by Asterisk dialplan before AudioSocket().
use strict;
use warnings;
$| = 1;  # autoflush STDOUT -- critical for AGI protocol

# ── Read AGI environment variables ──
my %agi;
while (<STDIN>) {
    chomp;
    last if /^$/;
    if (/^agi_(\w+):\s*(.*)$/) {
        $agi{$1} = $2;
    }
}

# ── Generate UUID ──
# Use the kernel's random UUID generator (no external dependencies)
my $uuid = '';
if (open my $fh, '<', '/proc/sys/kernel/random/uuid') {
    $uuid = <$fh>;
    chomp $uuid;
    close $fh;
}

# ── Get call metadata ──
# CALLED is set in the dialplan before calling AGI
print "GET VARIABLE CALLED\n";
my $resp = <STDIN>;
my $did = '';
if ($resp =~ /\((.+)\)/) {
    $did = $1;
}
my $cli = $agi{'callerid'} || '';

# ── Write metadata JSON ──
if ($uuid) {
    my $file = "/tmp/va_${uuid}.json";
    if (open my $fh, '>', $file) {
        print $fh qq({"did":"$did","cli":"$cli"});
        close $fh;
    }
}

# ── Set channel variable for AudioSocket ──
print "SET VARIABLE VA_UUID $uuid\n";
my $result = <STDIN>;

Make it executable:

chmod +x /var/lib/asterisk/agi-bin/voice_agent_setup.agi

Why Perl? Asterisk's AGI protocol communicates via stdin/stdout line-by-line. Perl handles this natively and is available on every Asterisk system. You could write this in Python or Bash, but Perl is the standard for AGI scripts and has zero startup overhead.

Why /tmp/va_{uuid}.json? The Python agent polls for this file (up to 1 second, 50ms intervals) after receiving the UUID frame. Using /tmp means automatic cleanup on reboot. In production, you may want to clean up these files explicitly after each call.


7. Python Voice Agent: Core Structure

The voice agent is a single Python file built on asyncio. Here is the high-level structure:

#!/usr/bin/env python3.11
"""
Voice Agent -- AudioSocket + Deepgram STT + Groq LLM + Cartesia TTS
Token-streaming pipeline: ~200-250ms mouth-to-ear latency.
"""

import asyncio
import struct
import json
import os
import time
import logging
import audioop
from pathlib import Path

import aiohttp
from websockets.asyncio.client import connect as ws_connect
from cartesia import AsyncCartesia

# ── Configuration ──
LISTEN_HOST = "127.0.0.1"
LISTEN_PORT = 9099

# Audio: 8kHz 16-bit signed linear PCM, mono
# 320 bytes = 160 samples = 20ms per frame
CHUNK_SIZE = 320
SAMPLE_RATE = 8000
CHUNK_DURATION = CHUNK_SIZE / (SAMPLE_RATE * 2)  # 0.02s = 20ms

# Barge-in thresholds
BARGEIN_RMS_THRESHOLD = 800   # RMS energy level to detect speech
BARGEIN_DURATION = 0.3        # seconds of sustained speech to trigger

log = logging.getLogger("voice-agent")


class DeepgramSTT:
    """Streaming speech-to-text via Deepgram Nova-3 WebSocket."""
    ...

class GroqLLM:
    """Streaming LLM via Groq OpenAI-compatible API."""
    ...

class CartesiaTTS:
    """Streaming TTS via Cartesia Sonic-3 WebSocket."""
    ...

class VoiceAgent:
    """Main orchestrator: STT -> LLM -> TTS pipeline."""
    ...


async def handle_connection(reader, writer):
    """Handle one AudioSocket connection (one call)."""
    agent = VoiceAgent()
    await agent.handle_call(reader, writer)


async def main():
    server = await asyncio.start_server(
        handle_connection, LISTEN_HOST, LISTEN_PORT,
    )
    log.info("Voice Agent listening on %s:%d", LISTEN_HOST, LISTEN_PORT)
    async with server:
        await server.serve_forever()


if __name__ == "__main__":
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(name)s %(levelname)s %(message)s",
    )
    asyncio.run(main())

VoiceAgent Class: The Orchestrator

class VoiceAgent:
    def __init__(self):
        self.stt = DeepgramSTT()
        self.llm = GroqLLM()
        self.tts = CartesiaTTS()
        self.messages = []           # LLM conversation history
        self.call_context = None     # Company/DID context
        self.audio_out_queue = asyncio.Queue()  # PCM chunks to send to caller
        self.hangup_event = asyncio.Event()     # Signals call ended
        self.is_speaking = asyncio.Event()      # True while agent is talking
        self.barge_in_event = asyncio.Event()   # Caller interrupted
        self.call_uuid = None
        self._current_tts_ctx = None            # For barge-in cancellation

    async def handle_call(self, reader, writer):
        """Main call handler -- the entire call lifecycle."""
        # 1. Read UUID frame from AudioSocket
        # 2. Read metadata from /tmp/va_{uuid}.json
        # 3. Fetch company context from API
        # 4. Build system prompt
        # 5. Connect STT + TTS WebSockets
        # 6. Start background audio I/O tasks
        # 7. Speak greeting
        # 8. Conversation loop: listen -> think -> speak
        ...

Each call creates a new VoiceAgent instance. This keeps state isolated between concurrent calls. The handle_call method runs the entire call lifecycle from greeting to hangup.


8. AudioSocket Protocol Implementation

AudioSocket is a binary TCP protocol. Every frame has a 3-byte header:

Byte Field Description
0 Type 0x00=Hangup, 0x01=UUID, 0x10=Audio, 0xFF=Error
1-2 Length Big-endian uint16, length of payload
3+ Payload Raw data (UUID bytes or PCM audio)

Reading Frames

# AudioSocket protocol constants
AS_TYPE_HANGUP = 0x00
AS_TYPE_UUID   = 0x01
AS_TYPE_AUDIO  = 0x10
AS_TYPE_ERROR  = 0xFF


async def read_as_frame(reader):
    """Read one AudioSocket frame: 1 byte type + 2 bytes length + payload."""
    header = await reader.readexactly(3)
    frame_type = header[0]
    length = struct.unpack(">H", header[1:3])[0]
    payload = b""
    if length > 0:
        payload = await reader.readexactly(length)
    return frame_type, payload

Writing Frames

def make_as_frame(frame_type, payload):
    """Build an AudioSocket frame."""
    return struct.pack(">BH", frame_type, len(payload)) + payload

Audio Reader (Background Task)

The audio reader runs continuously, reading PCM frames from Asterisk. When the agent is NOT speaking, audio goes to Deepgram for transcription. When the agent IS speaking, audio is monitored for barge-in detection instead.

async def _audio_reader(self, reader):
    """Read audio from AudioSocket, forward to STT or check barge-in."""
    speech_energy_start = None

    while not self.hangup_event.is_set():
        frame_type, payload = await read_as_frame(reader)

        if frame_type == AS_TYPE_HANGUP:
            self.hangup_event.set()
            return

        if frame_type == AS_TYPE_ERROR:
            self.hangup_event.set()
            return

        if frame_type == AS_TYPE_AUDIO and payload:
            if self.is_speaking.is_set():
                # Agent is talking -- check for barge-in
                rms = audioop.rms(payload, 2)
                if rms > BARGEIN_RMS_THRESHOLD:
                    if speech_energy_start is None:
                        speech_energy_start = time.monotonic()
                    elif time.monotonic() - speech_energy_start >= BARGEIN_DURATION:
                        # Caller is interrupting
                        self.barge_in_event.set()
                        speech_energy_start = None
                        # Clear audio queue, cancel TTS
                        while not self.audio_out_queue.empty():
                            self.audio_out_queue.get_nowait()
                        if self._current_tts_ctx:
                            await self.tts.cancel_context(self._current_tts_ctx)
                        self.is_speaking.clear()
                        await self.stt.send_audio(payload)
                else:
                    speech_energy_start = None
            else:
                # Agent is silent -- forward to STT
                speech_energy_start = None
                await self.stt.send_audio(payload)

Audio Writer (Background Task)

The audio writer paces PCM chunks at real-time rate (one 320-byte chunk every 20ms):

async def _audio_writer(self, writer):
    """Write queued audio to AudioSocket at real-time rate."""
    while not self.hangup_event.is_set():
        try:
            chunk = await asyncio.wait_for(
                self.audio_out_queue.get(), timeout=0.5
            )
            frame = make_as_frame(AS_TYPE_AUDIO, chunk)
            writer.write(frame)
            await writer.drain()
            await asyncio.sleep(CHUNK_DURATION)  # 20ms pacing
        except asyncio.TimeoutError:
            continue

Why pace at 20ms? If you dump all audio chunks at once, Asterisk buffers them, but the caller hears silence followed by fast-forwarded audio. Real-time pacing ensures smooth playback.


9. Deepgram Streaming STT

Deepgram's WebSocket API accepts a continuous stream of audio and returns transcript fragments in real time.

Connection Setup

class DeepgramSTT:
    def __init__(self):
        self.ws = None
        self.transcript_queue = asyncio.Queue()
        self._recv_task = None
        self._keepalive_task = None

    async def connect(self):
        params = (
            "encoding=linear16"
            "&sample_rate=8000"
            "&channels=1"
            "&model=nova-3"
            "&language=en-GB"
            "&smart_format=true"
            "&endpointing=300"
            "&interim_results=true"
            "&utterance_end_ms=1000"
            "&vad_events=true"
            "&keywords=postcode:2"
            "&keywords=plumber:2"
            "&keywords=callout:1"
        )
        url = f"wss://api.deepgram.com/v1/listen?{params}"
        headers = {"Authorization": f"Token {DEEPGRAM_API_KEY}"}

        self.ws = await ws_connect(
            url, additional_headers=headers,
            ping_interval=5, ping_timeout=10,
        )
        self._recv_task = asyncio.create_task(self._receive_loop())
        self._keepalive_task = asyncio.create_task(self._keepalive_loop())

Key Parameters Explained

Parameter Value Why
encoding=linear16 PCM format Matches Asterisk's native audio format
sample_rate=8000 8kHz Telephony standard -- do not upsample
model=nova-3 Latest model Best accuracy + lowest latency
language=en-GB British English Match your callers' accent
endpointing=300 300ms How long Deepgram waits after silence before finalizing. Lower = faster response but may cut off mid-sentence. 300ms is the sweet spot.
interim_results=true Enable Get partial transcripts while caller is still speaking
utterance_end_ms=1000 1 second Fires UtteranceEnd event after 1s of silence -- useful as a safety flush
vad_events=true Enable Deepgram's server-side voice activity detection events
keywords=postcode:2 Boost weight Improves recognition of domain-specific terms

Receiving Transcripts

async def _receive_loop(self):
    """Process transcript results from Deepgram."""
    async for msg in self.ws:
        data = json.loads(msg)

        if data.get("type") == "Results":
            alt = data["channel"]["alternatives"][0]
            transcript = alt.get("transcript", "").strip()
            is_final = data.get("is_final", False)
            speech_final = data.get("speech_final", False)

            if transcript and is_final:
                await self.transcript_queue.put({
                    "transcript": transcript,
                    "speech_final": speech_final,
                })

        elif data.get("type") == "UtteranceEnd":
            # Safety flush: if we have accumulated fragments, process them
            await self.transcript_queue.put({
                "transcript": "",
                "speech_final": True,
                "utterance_end": True,
            })

Understanding Deepgram's Event Model

Deepgram sends several types of results:

  1. Interim results (is_final=false): Partial transcript while the caller is still speaking. Useful for UI display but we ignore these for LLM input.
  2. Final results (is_final=true, speech_final=false): A finalized word/phrase, but the caller may still be speaking the same sentence.
  3. Speech-final results (is_final=true, speech_final=true): The caller has finished their turn. This is our trigger to send to the LLM.
  4. UtteranceEnd: Fired after utterance_end_ms of silence. Acts as a safety net to flush any accumulated fragments.

Keepalive

Deepgram closes idle WebSocket connections after ~30 seconds. Send keepalives to prevent this:

async def _keepalive_loop(self):
    while self.ws:
        await asyncio.sleep(8)
        await self.ws.send(json.dumps({"type": "KeepAlive"}))

Sending Audio

async def send_audio(self, audio_data):
    """Forward raw PCM bytes to Deepgram."""
    if self.ws:
        await self.ws.send(audio_data)

Graceful Shutdown

async def close(self):
    if self._keepalive_task:
        self._keepalive_task.cancel()
    if self._recv_task:
        self._recv_task.cancel()
    if self.ws:
        await self.ws.send(json.dumps({"type": "CloseStream"}))
        await self.ws.close()

10. Groq Streaming LLM with Tool Calling

Groq provides an OpenAI-compatible chat completions API with streaming support. The key advantage is their speculative decoding inference, which delivers tokens at 1,665 tok/s -- fast enough that the LLM is never the bottleneck.

LLM Client

class GroqLLM:
    def __init__(self):
        self.session = None

    async def _ensure_session(self):
        if not self.session:
            self.session = aiohttp.ClientSession()

    async def generate(self, messages):
        """Stream completion tokens. Yields dicts with type 'text' or 'tool_call'."""
        await self._ensure_session()

        headers = {
            "Authorization": f"Bearer {GROQ_API_KEY}",
            "Content-Type": "application/json",
        }
        payload = {
            "model": GROQ_MODEL,
            "messages": messages,
            "tools": [BOOKING_TOOL],
            "stream": True,
            "temperature": 0.3,
            "max_tokens": 250,
        }

        tool_calls = {}
        t0 = time.monotonic()
        first_token = True

        async with self.session.post(
            "https://api.groq.com/openai/v1/chat/completions",
            headers=headers, json=payload,
        ) as resp:
            if resp.status != 200:
                body = await resp.text()
                log.error("Groq API error %d: %s", resp.status, body[:500])
                yield {"type": "text", "text": "Bear with me, I'm having a small technical issue."}
                return

            buffer = ""
            async for raw_line in resp.content:
                buffer += raw_line.decode()
                while "\n" in buffer:
                    line, buffer = buffer.split("\n", 1)
                    line = line.strip()
                    if not line or not line.startswith("data: "):
                        continue
                    if line == "data: [DONE]":
                        break

                    data = json.loads(line[6:])
                    choice = data["choices"][0]
                    delta = choice.get("delta", {})

                    # Text content -- yield immediately for token-streaming
                    content = delta.get("content")
                    if content:
                        if first_token:
                            log.info("LLM TTFT: %.0fms",
                                     (time.monotonic() - t0) * 1000)
                            first_token = False
                        yield {"type": "text", "text": content}

                    # Tool calls -- accumulate arguments across chunks
                    if "tool_calls" in delta:
                        for tc in delta["tool_calls"]:
                            idx = tc["index"]
                            if idx not in tool_calls:
                                tool_calls[idx] = {
                                    "id": tc.get("id", ""),
                                    "name": tc["function"]["name"],
                                    "arguments": "",
                                }
                            tool_calls[idx]["arguments"] += (
                                tc["function"].get("arguments", "")
                            )

                    # Tool call complete
                    if choice.get("finish_reason") == "tool_calls":
                        for tc in tool_calls.values():
                            yield {
                                "type": "tool_call",
                                "id": tc["id"],
                                "name": tc["name"],
                                "arguments": json.loads(tc["arguments"]),
                            }

Key Design Decisions

temperature: 0.3 -- Low temperature keeps responses predictable and on-script. Voice conversations need consistency; creative variation in responses confuses callers.

max_tokens: 250 -- Voice responses should be short. One sentence per turn. Capping tokens prevents the LLM from rambling. In practice, responses are 10-30 tokens.

stream: true -- Non-negotiable. Without streaming, you wait for the entire response before starting TTS. With streaming, TTS starts on the first token.

Error handling -- If the LLM API fails, the agent says "Bear with me, I'm having a small technical issue" instead of going silent. Always have a fallback utterance.


11. Cartesia Streaming TTS

Cartesia's Sonic-3 is the critical piece that enables sub-250ms latency. Its continuation API lets you stream tokens into an open WebSocket context, and Cartesia synthesizes them as a continuous audio stream with natural prosody.

TTS Client

class CartesiaTTS:
    def __init__(self):
        self.client = None
        self.connection = None

    async def connect(self):
        """Open persistent WebSocket (reused across utterances)."""
        self.client = AsyncCartesia(api_key=CARTESIA_API_KEY)
        self.connection = await self.client.tts.websocket_connect().__aenter__()
        log.info("Cartesia TTS connected")

Two Modes of Operation

The TTS client supports two modes:

1. Single-shot synthesis -- for greetings and short confirmations where you have the complete text:

async def synthesize_streaming(self, text, audio_out_queue):
    """Send complete text, stream audio chunks to queue."""
    ctx = self.connection.context()

    await ctx.send(
        model_id="sonic-3",
        transcript=text,
        voice={"mode": "id", "id": CARTESIA_VOICE_ID},
        output_format={
            "container": "raw",
            "encoding": "pcm_s16le",
            "sample_rate": 8000,
        },
        continue_=False,  # Complete utterance
    )

    async for response in ctx.receive():
        if response.type == "chunk" and response.audio:
            pcm_bytes = response.audio
            # Split into 320-byte chunks (20ms frames)
            for i in range(0, len(pcm_bytes), CHUNK_SIZE):
                chunk = pcm_bytes[i:i + CHUNK_SIZE]
                if len(chunk) < CHUNK_SIZE:
                    chunk += b'\x00' * (CHUNK_SIZE - len(chunk))
                await audio_out_queue.put(chunk)

2. Token-streaming synthesis -- the key innovation. Opens a TTS context that accepts individual LLM tokens and returns audio as a continuous stream:

async def stream_tokens(self, audio_out_queue, cancel_event):
    """Create a token-streaming TTS context.

    Returns (ctx, receive_task) -- the caller pushes tokens into ctx,
    and receive_task writes audio to audio_out_queue in the background.
    """
    ctx = self.connection.context()
    recv_task = asyncio.create_task(
        self._receive_audio(ctx, audio_out_queue, cancel_event)
    )
    return ctx, recv_task

async def _receive_audio(self, ctx, audio_out_queue, cancel_event):
    """Background: receive audio from Cartesia, chunk and queue."""
    first_audio = True
    t0 = time.monotonic()

    async for response in ctx.receive():
        if cancel_event.is_set():
            break

        if response.type == "chunk" and response.audio:
            if first_audio:
                log.info("TTS TTFB: %.0fms", (time.monotonic() - t0) * 1000)
                first_audio = False

            pcm_bytes = response.audio
            for i in range(0, len(pcm_bytes), CHUNK_SIZE):
                if cancel_event.is_set():
                    return
                chunk = pcm_bytes[i:i + CHUNK_SIZE]
                if len(chunk) < CHUNK_SIZE:
                    chunk += b'\x00' * (CHUNK_SIZE - len(chunk))
                await audio_out_queue.put(chunk)

The Continuation API

This is the most important API concept in the entire system. When you send a token with continue_=True, Cartesia treats it as part of an ongoing utterance:

# Each LLM token is sent immediately to Cartesia:
await ctx.send(
    model_id="sonic-3",
    transcript=token,          # e.g., "I", " can", " get"
    voice={"mode": "id", "id": VOICE_ID},
    output_format={...},
    continue_=True,            # <-- This is the key
)

# When the LLM finishes:
await ctx.no_more_inputs()

With continue_=True:

Without continue_=True (sending each sentence separately):

Barge-In Cancellation

When the caller interrupts, cancel the active TTS context to stop audio immediately:

async def cancel_context(self, ctx):
    """Cancel in-progress TTS for barge-in."""
    await self.connection.send({
        "context_id": ctx._context_id,
        "cancel": True,
    })

12. The Token-Streaming Pipeline

This is the core of the system -- the _think_and_speak method that ties LLM and TTS together in a real-time streaming pipeline.

async def _think_and_speak(self):
    """Stream LLM tokens directly to Cartesia TTS.
    No sentence-boundary detection -- tokens flow straight through."""
    full_response = []
    t0 = time.monotonic()
    self.barge_in_event.clear()

    # Create a TTS streaming context
    cancel_event = asyncio.Event()
    ctx, recv_task = await self.tts.stream_tokens(
        self.audio_out_queue, cancel_event
    )
    self._current_tts_ctx = ctx
    self.is_speaking.set()

    tts_voice = {"mode": "id", "id": CARTESIA_VOICE_ID}
    tts_format = {
        "container": "raw",
        "encoding": "pcm_s16le",
        "sample_rate": SAMPLE_RATE,
    }

    try:
        # Stream LLM tokens directly to TTS
        async for event in self.llm.generate(self.messages):
            if self.barge_in_event.is_set():
                log.info("Barge-in: aborting stream")
                cancel_event.set()
                break

            if event["type"] == "text":
                token = event["text"]
                full_response.append(token)

                # Send token directly to TTS (continuation mode)
                await ctx.send(
                    model_id=CARTESIA_MODEL,
                    transcript=token,
                    voice=tts_voice,
                    output_format=tts_format,
                    continue_=True,
                )

            elif event["type"] == "tool_call":
                # Handle tool call (see Section 15)
                ...

        # LLM finished -- flush TTS
        if not self.barge_in_event.is_set():
            await ctx.no_more_inputs()
            await recv_task  # Wait for all audio to be received

            # Wait for audio queue to drain (all chunks played)
            while not self.audio_out_queue.empty():
                await asyncio.sleep(0.02)

    finally:
        self.is_speaking.clear()
        self._current_tts_ctx = None
        await asyncio.sleep(0.1)  # Brief pause before listening

    return "".join(full_response)

Why This Is Fast

The traditional approach (used by most voice AI tutorials) looks like this:

User speaks -> STT -> [wait for full transcript] ->
LLM -> [wait for full response] ->
TTS -> [wait for full audio] ->
Play to user

Total: 1.5 - 3.0 seconds

Our approach pipelines everything:

User speaks -> STT (streaming) ->
             |-> LLM token 1 -> TTS -> Audio chunk 1 -> Play
             |-> LLM token 2 -> TTS -> Audio chunk 2 -> Play
             |-> LLM token 3 -> TTS -> Audio chunk 3 -> Play
             ...

Total: 200-250ms to first audio

The LLM is still generating token 5 when the caller hears the audio from token 1. This is possible because:

  1. Groq's TTFT is ~30-50ms -- the first token arrives almost instantly
  2. Cartesia's continuation API accepts individual tokens without waiting for a complete sentence
  3. No sentence buffering -- we removed the _split_on_sentence_boundary() logic that was adding 200-500ms

13. Barge-In and Interruption Handling

Barge-in is when the caller starts speaking while the agent is still talking. Real humans do this constantly. If your voice agent cannot handle it, the experience feels robotic.

How Barge-In Works

Timeline:
  Agent speaking:  "I can get a plumber out to you within---"
  Caller speaks:                                  "yeah how much?"
  Agent detects:                                     [RMS > 800 for 300ms]
  Agent stops:                                       [audio queue cleared]
  Agent listens:                                     "yeah how much?"
  Agent responds:                                                    "There's a 49 pound callout..."

Implementation: Energy-Based VAD

We use a simple but effective approach: monitor the RMS (root mean square) energy of incoming audio while the agent is speaking. If the energy exceeds a threshold for a sustained duration, the caller is talking.

# Constants
BARGEIN_RMS_THRESHOLD = 800   # Energy threshold (0-32768 range for 16-bit audio)
BARGEIN_DURATION = 0.3        # Sustained speech duration in seconds

In the _audio_reader method, when is_speaking is set:

rms = audioop.rms(payload, 2)  # Calculate RMS of 20ms frame

if rms > BARGEIN_RMS_THRESHOLD:
    if speech_energy_start is None:
        speech_energy_start = time.monotonic()
    elif time.monotonic() - speech_energy_start >= BARGEIN_DURATION:
        # Confirmed barge-in
        self.barge_in_event.set()
        speech_energy_start = None

        # 1. Clear the audio output queue (stop pending playback)
        while not self.audio_out_queue.empty():
            self.audio_out_queue.get_nowait()

        # 2. Cancel the active TTS context
        if self._current_tts_ctx:
            await self.tts.cancel_context(self._current_tts_ctx)

        # 3. Resume STT (start listening again)
        self.is_speaking.clear()
        await self.stt.send_audio(payload)
else:
    speech_energy_start = None  # Reset if energy drops below threshold

Tuning Barge-In Parameters

Parameter Too Low Too High Recommended
BARGEIN_RMS_THRESHOLD False positives from background noise Misses soft-spoken callers 600-1000
BARGEIN_DURATION Triggers on coughs/clicks Slow to detect interruptions 0.2-0.4s

Testing tip: Log the RMS values during real calls to calibrate:

if rms > 200:  # Log anything above ambient noise
    log.debug("RMS during speech: %d", rms)

Why Not WebRTC VAD or Silero?

More sophisticated VAD models (WebRTC VAD, Silero VAD) could distinguish speech from noise better. But for barge-in detection:

If you deploy in noisy environments (call centers, outdoors), consider upgrading to Silero VAD.


14. Conversation State Machine

The voice agent follows an 8-step workflow. Rather than implementing an explicit state machine in code, we encode the workflow entirely in the LLM's system prompt and let the model manage state transitions naturally.

The Workflow

Step 1: GREET
  "Hello, [Company Name], good [morning/afternoon/evening]."
  (Then silence -- wait for caller to state their problem)
       |
       v
Step 2: UNDERSTAND
  Caller states problem. Agent asks ONE follow-up if needed.
  "Is it leaking now?" / "Whole house or just one room?"
       |
       v
Step 3: QUOTE
  "I can get a [trade] out to you within thirty minutes to an hour.
   There's a [fee] pound callout. He'll quote on-site before starting."
  (Wait for agreement)
       |
       v
Step 4: POSTCODE
  "What's your postcode?"
  (Confirm with phonetic alphabet)
       |
       v
Step 5: ADDRESS
  "And the house number and street?"
       |
       v
Step 6: NAME
  "And your name?"
       |
       v
Step 7: BOOK
  [Tool call: create_booking]
  (Automatic -- no user-facing output)
       |
       v
Step 8: CONFIRM
  "That's booked. The [trade] will be with you within the hour.
   Thanks for calling."

System Prompt Design

The system prompt is the most important piece of the entire system. It defines the agent's personality, workflow, and constraints. Here is the template:

SYSTEM_PROMPT_TEMPLATE = """You work at {company_name}, a UK {trade_type} \
company. You answer the phone. Casual British English -- "no worries", \
"lovely", "bear with me". Short replies, 1 sentence max. Never sound scripted.

Never use American English. Never say "sir", "madam", "dear" -- you don't \
know who's calling. Never say "How can I help you?" -- they're calling you, \
they'll tell you.

# Context
- Company: {company_name}
- Trade: {trade_type} / {trade_label}
- Callout: {callout_fee}
- Caller: {caller_id}
- Repeat: {is_repeat}

# Workflow -- follow this exact order, one step per reply

Step 1: Greet. Say "Hello, {company_name}, good [morning/afternoon/evening]."
Then shut up and listen.

Step 2: They tell you the problem. Ask ONE quick follow-up if needed.
Then move on. Don't interrogate.

Step 3: Quote. "I can get a {trade_label} out to you within thirty minutes
to an hour. There's a {callout_fee} pound callout for the {trade_label} to
come out, then he'll quote you on-site before starting anything."
Stop here. Wait for agreement.

Step 4: Postcode. "What's your postcode?" Confirm with phonetic alphabet.

Step 5: Address. "And the house number and street?"

Step 6: Name. "And your name?"

Step 7: Book using create_booking tool. Use caller_id from context -- never
ask for phone number.

Step 8: "That's booked. The {trade_label} will be with you within the hour.
Thanks for calling."

# NEVER do
- Never combine steps -- one thing per reply
- Never ask for phone number
- Never repeat their name back
- Never summarise at the end
- Never quote labour -- only callout fee
- Never give exact time -- "thirty minutes to an hour"
- Never say "call centre", "AI", or system names

# Objections
- Want total price: "Every job's different -- the {trade_label} will quote
  on-site before starting. No obligation."
- Too expensive: "No worries. Thanks for calling."
- Are you local: "Yes, we've got someone nearby."
- Abusive: "I'm not able to help further on this call."
"""

Why Prompt-Based State Management Works

You might expect to need an explicit state machine (enum of states, transition functions, etc.). In practice, the LLM handles this naturally when the prompt is well-structured:

  1. One step per reply -- this constraint prevents the LLM from jumping ahead
  2. Explicit ordering -- numbered steps with clear triggers for each transition
  3. Negative constraints -- "NEVER do" rules prevent common LLM failure modes
  4. Objection handling -- predefined responses for common edge cases keep the LLM on track

The LLM sees the full conversation history and naturally progresses through steps. If the caller says "yes" after the quote, the LLM moves to step 4 (postcode). If they say "how much total?", the LLM handles the objection before continuing.

Dynamic Greeting Based on Time of Day

The greeting is generated in code (not by the LLM) for consistency and speed:

import datetime

hour = datetime.datetime.now().hour
tod = "good morning" if hour < 12 else "good afternoon" if hour < 18 else "good evening"
company = self.call_context.get("company_name", "Home Services")
greeting = f"Hello, {company}, {tod}."

await self._speak(greeting)
self.messages.append({"role": "assistant", "content": greeting})

This saves one LLM round trip. The agent speaks the greeting immediately via single-shot TTS while the first LLM call is reserved for responding to what the caller actually says.


15. Tool Calling Integration

When the agent has collected all required information (name, postcode, address, problem), the LLM invokes the create_booking tool to record the booking in your backend system.

Tool Definition

BOOKING_TOOL = {
    "type": "function",
    "function": {
        "name": "create_booking",
        "description": (
            "Create a job booking after collecting customer name, postcode, "
            "address, and problem description. Call this ONLY after all "
            "details are collected."
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "customer_name": {
                    "type": "string",
                    "description": "Customer full name",
                },
                "postcode": {
                    "type": "string",
                    "description": "UK postcode with space",
                },
                "address": {
                    "type": "string",
                    "description": "Full street address",
                },
                "problem_description": {
                    "type": "string",
                    "description": "One-line summary of the issue",
                },
            },
            "required": [
                "customer_name", "postcode",
                "address", "problem_description",
            ],
        },
    },
}

Handling Tool Calls in the Pipeline

Tool calls interrupt the token-streaming pipeline. When the LLM decides to call a tool, it stops generating text tokens and instead outputs the tool call arguments. Here is how the agent handles this:

# Inside _think_and_speak:
elif event["type"] == "tool_call":
    # 1. Close current TTS context (no more text to speak)
    await ctx.no_more_inputs()
    await recv_task

    self.is_speaking.clear()
    self._current_tts_ctx = None

    if event["name"] == "create_booking":
        # 2. Call the booking API
        result = await self.create_booking(event["arguments"])

        # 3. Add tool call + result to message history
        self.messages.append({
            "role": "assistant",
            "content": None,
            "tool_calls": [{
                "id": event["id"],
                "type": "function",
                "function": {
                    "name": "create_booking",
                    "arguments": json.dumps(event["arguments"]),
                },
            }],
        })
        self.messages.append({
            "role": "tool",
            "tool_call_id": event["id"],
            "content": json.dumps(result or {"success": True}),
        })

        # 4. Get confirmation response from LLM
        confirm_parts = []
        async for ev in self.llm.generate(self.messages):
            if ev["type"] == "text":
                confirm_parts.append(ev["text"])

        confirmation = "".join(confirm_parts)
        if confirmation.strip():
            await self._speak(confirmation.strip())

Booking API Implementation

The booking API is a simple PHP endpoint (or any backend you prefer):

<?php
// create_booking.php -- Receive booking from voice agent
header('Content-Type: application/json');

// Verify API key
$api_key = $_SERVER['HTTP_X_API_KEY'] ?? '';
if ($api_key !== getenv('VOICE_AGENT_API_KEY')) {
    http_response_code(401);
    echo json_encode(['error' => 'Unauthorized']);
    exit;
}

$data = json_decode(file_get_contents('php://input'), true);

// Validate required fields
$required = ['customer_name', 'postcode', 'address', 'problem_description'];
foreach ($required as $field) {
    if (empty($data[$field])) {
        http_response_code(400);
        echo json_encode(['error' => "Missing field: $field"]);
        exit;
    }
}

// Insert into database, send notifications, etc.
// ... your business logic here ...

echo json_encode([
    'success' => true,
    'booking_id' => uniqid('BK-'),
    'message' => 'Booking created successfully',
]);

Adding More Tools

To add more tools (e.g., check availability, look up customer history), define them the same way and add them to the tools array in the LLM request:

CHECK_AVAILABILITY_TOOL = {
    "type": "function",
    "function": {
        "name": "check_availability",
        "description": "Check engineer availability for a given postcode area",
        "parameters": {
            "type": "object",
            "properties": {
                "postcode": {"type": "string"},
                "urgency": {"type": "string", "enum": ["emergency", "standard"]},
            },
            "required": ["postcode"],
        },
    },
}

# In the LLM payload:
"tools": [BOOKING_TOOL, CHECK_AVAILABILITY_TOOL],

16. DID-to-Company Context API

A single voice agent can serve multiple companies by looking up which company owns the phone number (DID) that was called. When a caller dials a plumber's number, the agent greets them as that plumber. When they dial an electrician's number, same agent, different persona.

Context API Endpoint

<?php
// did_context.php -- Return company context for a DID
header('Content-Type: application/json');

$api_key = $_SERVER['HTTP_X_API_KEY'] ?? '';
if ($api_key !== getenv('VOICE_AGENT_API_KEY')) {
    http_response_code(401);
    echo json_encode(['error' => 'Unauthorized']);
    exit;
}

$data = json_decode(file_get_contents('php://input'), true);
$did = $data['did_number'] ?? '';
$cli = $data['caller_id'] ?? '';

// Look up company by DID
// This could be a database query, config file, etc.
$companies = [
    '02012345678' => [
        'company_name' => 'Quick Fix Plumbing',
        'trade_type'   => 'plumbing',
        'trade_label'  => 'plumber',
        'callout_fee'  => 49,
        'area'         => 'London',
    ],
    '01234567890' => [
        'company_name' => 'Spark Electrical',
        'trade_type'   => 'electrical',
        'trade_label'  => 'electrician',
        'callout_fee'  => 59,
        'area'         => 'Manchester',
    ],
];

$company = $companies[$did] ?? [
    'company_name' => 'Home Services',
    'trade_type'   => 'general maintenance',
    'trade_label'  => 'engineer',
    'callout_fee'  => 49,
    'area'         => 'your area',
];

// Check if repeat caller
$is_repeat = false;
// ... query your database for previous calls from $cli ...

$company['caller_id'] = $cli;
$company['is_repeat'] = $is_repeat;
$company['did_number'] = $did;
$company['greeting'] = "Hello, {$company['company_name']}, how can I help you?";

echo json_encode($company);

Agent-Side Context Fetching

async def get_call_context(self, did, cli):
    """Fetch company context from our API."""
    try:
        async with aiohttp.ClientSession() as session:
            async with session.post(
                CONTEXT_API_URL,
                json={"did_number": did, "caller_id": cli},
                headers={"X-API-Key": CONTEXT_API_KEY},
                timeout=aiohttp.ClientTimeout(total=3),
            ) as resp:
                if resp.status == 200:
                    return await resp.json()
    except Exception as e:
        log.warning("Context API error: %s", e)

    # Fallback defaults if API fails
    return {
        "company_name": "Home Services",
        "trade_type": "plumbing",
        "trade_label": "plumber",
        "callout_fee": 49,
        "caller_id": cli,
        "is_repeat": False,
        "greeting": "Hello, how can I help you?",
    }

The 3-second timeout ensures the call is not delayed if the API is slow. The fallback defaults mean the agent can always answer, even if the context API is down.


17. Systemd Service Setup

Run the voice agent as a systemd service for automatic startup and restart.

Environment File

Create /opt/voice-agent/voice_agent.env:

# Voice Agent API Keys -- chmod 600 this file
GROQ_API_KEY="gsk_YOUR_GROQ_API_KEY_HERE"
GROQ_MODEL="llama-3.3-70b-specdec"
DEEPGRAM_API_KEY="YOUR_DEEPGRAM_API_KEY_HERE"
CARTESIA_API_KEY="sk_car_YOUR_CARTESIA_KEY_HERE"
CARTESIA_VOICE_ID="a01c369f-6d2d-4185-bc20-b32c225eab70"
CARTESIA_MODEL="sonic-3"
CONTEXT_API_URL="http://127.0.0.1/api/did_context.php"
CONTEXT_API_KEY="YOUR_INTERNAL_API_KEY_HERE"
BOOKING_API_URL="http://127.0.0.1/api/create_booking.php"

Secure it:

chmod 600 /opt/voice-agent/voice_agent.env

Service File

Create /etc/systemd/system/voice-agent.service:

[Unit]
Description=AI Voice Agent (AudioSocket + Deepgram STT + Groq LLM + Cartesia TTS)
After=network.target asterisk.service

[Service]
Type=simple
ExecStart=/usr/bin/python3.11 /opt/voice-agent/agent.py
WorkingDirectory=/opt/voice-agent
Restart=always
RestartSec=3
StandardOutput=journal
StandardError=journal
SyslogIdentifier=voice-agent

[Install]
WantedBy=multi-user.target

Enable and Start

systemctl daemon-reload
systemctl enable voice-agent
systemctl start voice-agent

Verify It Is Running

systemctl status voice-agent
● voice-agent.service - AI Voice Agent (AudioSocket + Deepgram STT + Groq LLM + Cartesia TTS)
   Loaded: loaded (/etc/systemd/system/voice-agent.service; enabled)
   Active: active (running) since ...

View Logs

# Live logs
journalctl -u voice-agent -f

# Last 100 lines
journalctl -u voice-agent -n 100

Restart After Code Changes

systemctl restart voice-agent

18. Latency Optimization Deep Dive

Achieving sub-250ms mouth-to-ear latency requires optimizing every stage of the pipeline. Here is what matters most, in order of impact.

1. Token-Streaming (Biggest Win: -500ms to -1.5s)

The single most impactful optimization. Instead of buffering the LLM's complete response and then synthesizing, stream each token directly to TTS.

Before (sentence buffering):

LLM generates "I can get a plumber out to you."  -> 400ms
TTS synthesizes entire sentence                   -> 300ms
Total: 700ms before caller hears anything

After (token-streaming):

LLM token "I"     -> TTS starts synthesizing     -> 80ms
LLM token " can"  -> TTS continues
LLM token " get"  -> Caller already hearing "I can..."
Total: ~80ms before caller hears first syllable

2. Deepgram Endpointing Tuning (-100ms to -300ms)

The endpointing parameter controls how long Deepgram waits after the caller stops speaking before finalizing the transcript.

endpointing=500  (default) -- 500ms of silence before finalizing
endpointing=300  (our setting) -- 300ms, more responsive
endpointing=150  -- Very aggressive, may cut off mid-sentence

Start at 300ms. Go lower only if your callers speak in short, decisive phrases.

3. Choose the Right Groq Model (-200ms+ TTFT)

Model TTFT Throughput Quality
llama-3.3-70b-versatile ~100ms 276 tok/s Excellent
llama-3.3-70b-specdec ~30-50ms 1,665 tok/s Identical
llama-3.1-8b-instant ~20ms 3,000+ tok/s Good

The specdec variant gives identical output quality to versatile at 6x the speed. Use it.

4. Keep TTS WebSocket Alive (Save ~200ms Per Turn)

Opening a new WebSocket connection for each utterance adds 100-200ms of TLS handshake overhead. Keep the Cartesia WebSocket open for the duration of the call:

# Connect once at call start
await self.tts.connect()

# Reuse for every utterance
ctx = self.connection.context()  # New context, same connection

5. Native 8kHz PCM Output (Save CPU + ~20ms)

Cartesia supports native 8kHz PCM output. If you use a TTS that outputs 24kHz or 44.1kHz, you must resample to 8kHz for telephony:

# BAD: Receive 24kHz, resample to 8kHz
import audioop
audio_8k = audioop.ratecv(audio_24k, 2, 1, 24000, 8000, None)[0]

# GOOD: Request 8kHz natively from Cartesia
output_format={
    "container": "raw",
    "encoding": "pcm_s16le",
    "sample_rate": 8000,  # Native 8kHz -- no resampling needed
}

6. Short System Prompts (-10ms to -30ms TTFT)

Groq's TTFT increases with prompt length. Keep your system prompt under 500 tokens. Remove verbose instructions and use concise rules.

7. Limit max_tokens (Save Tail Latency)

Set max_tokens: 250 to prevent the LLM from generating unnecessarily long responses. Voice responses should be one sentence. If the LLM rambles, the caller hears a monologue and disengages.

8. Pre-Connect WebSockets

Connect Deepgram and Cartesia WebSockets immediately after receiving the call, before speaking the greeting:

# Connect in parallel
await asyncio.gather(
    self.stt.connect(),
    self.tts.connect(),
)
# Both ready -- no connection delay on first interaction

Latency Budget Summary

Component Time Notes
STT finalization 150-200ms Depends on endpointing setting
Groq TTFT 30-50ms With specdec variant
Network (server to Groq) 10-30ms Depends on server location
Cartesia TTFB 50-80ms From first token to first audio
AudioSocket frame 20ms One 20ms frame
Total ~200-250ms Mouth-to-ear

19. Troubleshooting

Audio Issues

Problem: Caller hears garbled/distorted audio

Cause: Audio format mismatch. Asterisk sends 8kHz 16-bit signed-integer PCM (slin). If your TTS outputs a different format, the audio will be garbled.

Fix: Verify your TTS output format matches exactly:

output_format={
    "container": "raw",
    "encoding": "pcm_s16le",    # Signed 16-bit little-endian
    "sample_rate": 8000,         # 8kHz, not 16kHz or 24kHz
}

Problem: Audio sounds choppy or has gaps

Cause: Audio writer is not pacing at real-time rate.

Fix: Ensure you sleep for exactly one chunk duration (20ms) between writes:

await asyncio.sleep(CHUNK_DURATION)  # 0.02 seconds

Problem: Caller hears echo of their own voice

Cause: The audio reader is forwarding caller audio back through the TTS output path.

Fix: Only forward audio to STT when the agent is not speaking (is_speaking flag).

Problem: Silence at the start of the call

Cause: Answer() not called before AudioSocket() in the dialplan, or the TTS connection is slow to establish.

Fix: Ensure Answer() is the first action, and pre-connect TTS during AGI execution.

Connection Issues

Problem: Deepgram STT connection closed unexpectedly

Cause: Deepgram closes idle connections after ~30 seconds.

Fix: Send keepalive messages every 8-10 seconds:

await self.ws.send(json.dumps({"type": "KeepAlive"}))

Problem: Groq API error 429: Rate limit exceeded

Cause: Too many concurrent requests or tokens per minute.

Fix: Implement retry with exponential backoff, or upgrade your Groq plan. For production, consider having a fallback model (e.g., Groq's 8B model or a local Ollama instance).

Problem: AudioSocket connection refused

Cause: Voice agent is not running or is listening on the wrong port.

Fix:

# Check if agent is listening
ss -tlnp | grep 9099

# Check service status
systemctl status voice-agent

# Check logs for startup errors
journalctl -u voice-agent -n 50

Latency Issues

Problem: Latency is 1-2 seconds instead of 200-250ms

Common causes:

  1. Sentence buffering -- You are waiting for complete sentences before sending to TTS. Use token-streaming instead.
  2. Wrong Groq model -- Using versatile instead of specdec. Check your GROQ_MODEL setting.
  3. TTS reconnecting -- Creating a new WebSocket for each utterance. Keep the connection alive.
  4. High endpointing -- Deepgram's endpointing is set too high. Try 300ms.

Problem: First response is slow, subsequent ones are fast

Cause: WebSocket connections being established on first use.

Fix: Pre-connect both Deepgram and Cartesia WebSockets at call start (before the greeting).

Barge-In Issues

Problem: Agent does not stop when caller interrupts

Cause: BARGEIN_RMS_THRESHOLD is too high, or BARGEIN_DURATION is too long.

Fix: Lower the threshold (try 500) and shorten the duration (try 0.2s). Log RMS values to calibrate:

log.debug("Barge-in check: RMS=%d threshold=%d", rms, BARGEIN_RMS_THRESHOLD)

Problem: Agent stops talking due to background noise

Cause: BARGEIN_RMS_THRESHOLD is too low.

Fix: Increase the threshold (try 1000-1200). Test with actual phone calls, not just headset mic.

Metadata Issues

Problem: Agent greets with wrong company name or "Home Services" default

Cause: The context API is not returning the correct DID mapping, or the CALLED variable is not set in the dialplan.

Fix:

# Test the context API directly
curl -X POST http://127.0.0.1/api/did_context.php \
  -H "X-API-Key: YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"did_number":"02012345678","caller_id":"07700900000"}'

Check that Set(CALLED=${EXTEN}) is in the dialplan before the Goto(voice-agent,...).


20. Performance Benchmarks

Real-world measurements from production calls:

Latency (Measured at Component Level)

Metric P50 P90 P99
STT final transcript 170ms 220ms 350ms
LLM TTFT (Groq specdec) 35ms 55ms 120ms
TTS TTFB (Cartesia) 60ms 85ms 150ms
Total mouth-to-ear 220ms 280ms 400ms

Resource Usage

Metric Value
Memory per call ~30MB
CPU per call ~5% of one core
Concurrent calls tested 10 simultaneous
WebSocket connections per call 2 (Deepgram + Cartesia)
HTTP connections per call 1 (Groq) + 1-2 (context/booking APIs)

Cost Per Call (Average 3-minute call)

Service Cost
Deepgram STT ~$0.013
Groq LLM ~$0.002
Cartesia TTS ~$0.005
Total ~$0.02/call

At $0.02 per call, the AI agent costs less per call than the electricity to keep a human agent's monitor on.


21. Production Considerations

Concurrent Call Handling

Each call creates a new VoiceAgent instance with its own asyncio tasks. The TCP server handles multiple concurrent connections natively. Test with your expected peak concurrency.

For high concurrency (50+ simultaneous calls), consider:

Call Recording

AudioSocket gives you raw PCM audio in both directions. To record calls, fork the audio streams to a file:

# In _audio_reader:
if self.recording_file:
    self.recording_file.write(payload)  # Caller's audio

# In _audio_writer:
if self.recording_file:
    self.recording_file.write(chunk)     # Agent's audio

Graceful Shutdown

Handle SIGTERM to close active calls cleanly:

import signal

def handle_shutdown(signum, frame):
    log.info("Shutdown signal received")
    # The asyncio server will stop accepting new connections
    # Active calls will finish naturally
    raise SystemExit(0)

signal.signal(signal.SIGTERM, handle_shutdown)

Monitoring

Log structured metrics for each call:

log.info("CALL_METRICS uuid=%s duration=%.1f turns=%d bargein_count=%d "
         "avg_latency_ms=%d booking=%s",
         self.call_uuid, duration, turn_count, bargein_count,
         avg_latency, booking_created)

Feed these into Prometheus/Grafana for dashboards tracking:

Fallback Strategy

What happens when an API goes down?

Component Down Fallback
Deepgram Try Google STT or buffer audio and retry
Groq Fall back to llama-3.1-8b-instant (faster, less capable) or local Ollama
Cartesia Fall back to a pre-recorded "Sorry, we're experiencing issues" message
Context API Use hardcoded defaults (generic company name)
Booking API Log booking details locally, process later

Security


Summary

The key principles behind this system:

  1. Stream everything. Never wait for a complete result when you can process partial results. This applies to STT (streaming transcripts), LLM (streaming tokens), and TTS (continuation API).

  2. Pick components for latency, not features. ElevenLabs sounds slightly better than Cartesia in a side-by-side comparison. But Cartesia's continuation API and native 8kHz output cut 300ms off every response. For voice, speed wins.

  3. Keep the LLM on a tight leash. Short system prompts, low temperature, max_tokens cap, explicit workflow steps, and comprehensive "NEVER do" rules. The LLM is a tool, not an autonomous agent.

  4. Handle interruptions. Barge-in is not optional. Real callers interrupt constantly. Energy-based VAD is simple, fast, and good enough.

  5. Fail gracefully. Every external API will fail eventually. Have fallbacks for each component and always have a "sorry, technical issue" utterance ready.

The complete system runs as a single Python file (1,000 lines), a Perl AGI script (30 lines), and six lines of Asterisk dialplan. Total infrastructure cost: ~$0.02 per call.


Built and tested in production handling real inbound calls for UK home services companies. If you have questions or want to discuss implementation details, reach out.

Need expert help with your setup?

VoIP infrastructure consulting, AI voice agent integration, monitoring stacks, scaling — I've done it all in production.

Get a Free Consultation