Tutorial 37: AI Voice Agent — OpenAI Realtime API + Asterisk
Build a Production Voice Agent Using OpenAI's Native Speech-to-Speech API with Asterisk AudioSocket
A complete, production-ready guide to connecting phone calls directly to OpenAI's Realtime API through Asterisk AudioSocket -- delivering sub-300ms latency voice conversations without any intermediate STT or TTS pipeline. Includes complete Python and Node.js bridge servers, function calling, production deployment, and monitoring.
Table of Contents
- Introduction
- Architecture Overview
- Prerequisites
- Asterisk AudioSocket Configuration
- OpenAI Realtime API Deep Dive
- Bridge Server — Python
- Bridge Server — Node.js Alternative
- System Prompt and Conversation Design
- Function Calling and Tools
- Production Deployment
- Cost Management
- Monitoring and Analytics
- Troubleshooting
- Advanced Topics
1. Introduction
The Speech-to-Speech Revolution
Every previous voice agent architecture required the same three-step pipeline: capture caller audio, transcribe it to text (STT), send the text to an LLM, convert the response back to speech (TTS), and stream it back to the caller. Each step adds latency, each step is a failure point, and each step loses information -- the caller's tone, pacing, emotion, and emphasis are all discarded by the STT step and must be artificially reconstructed by the TTS step.
OpenAI's Realtime API changes this fundamentally. It accepts raw audio and returns raw audio. There is no intermediate text step. The model hears the caller's voice directly and generates speech directly. It understands tone. It can whisper, laugh, or match the caller's energy. And because it eliminates two of the three pipeline stages, it is significantly faster than any STT + LLM + TTS chain.
This tutorial builds a complete production system that connects standard telephone calls (PSTN, SIP, or VoIP) to OpenAI's Realtime API through Asterisk's AudioSocket protocol. A caller dials a phone number, Asterisk answers, and within 300 milliseconds they are having a natural voice conversation with an AI agent that can look up customer records, book appointments, transfer calls, and perform any business logic you define.
Comparison with Other Approaches
If you have followed this tutorial series, you have already seen two other voice agent architectures:
- Tutorial 03 covered the local pipeline approach: Whisper (STT) running on your own hardware, a local LLM for conversation, and Piper or Coqui for TTS. Everything runs on your servers, no external API calls, complete privacy and control.
- Tutorial 23 covered the ElevenLabs Conversational AI approach: a managed cloud platform that handles STT, LLM, and TTS as a service, with WebSocket streaming for low latency.
OpenAI Realtime sits between these two extremes. It is a cloud API (not self-hosted), but you control the infrastructure between Asterisk and the API (the bridge server). It gives you more architectural control than a fully managed platform while being simpler than running your own ML models.
Latency Comparison
| Component | Local Pipeline | ElevenLabs Conv AI | OpenAI Realtime |
|---|---|---|---|
| STT | ~80ms (Whisper small) | ~100ms (Deepgram) | N/A (native) |
| LLM Processing | ~120ms (Llama 3 8B) | ~150ms (GPT-4o) | ~200ms (GPT-4o) |
| TTS | ~50ms (Piper) | ~150ms (ElevenLabs) | N/A (native) |
| Network Overhead | ~0ms (local) | ~50ms (2 hops) | ~50ms (1 hop) |
| AudioSocket Bridge | ~5ms | N/A | ~5ms |
| Total End-to-End | ~255ms | ~450ms | ~255ms |
| Perceived by Caller | Fast | Noticeable | Fast |
The latency numbers tell a compelling story. The local pipeline is fast because everything runs on-premises, but it requires expensive GPU hardware and delivers inferior conversation quality (small local LLMs cannot match GPT-4o). OpenAI Realtime matches the local pipeline's speed while delivering GPT-4o-class intelligence -- the best of both worlds, at the cost of per-minute API charges.
Cost Per Minute Comparison
| Approach | Estimated Cost/Min | Notes |
|---|---|---|
| Local Pipeline | ~$0.002 | Hardware amortized, electricity only |
| ElevenLabs Conv AI | ~$0.12 | STT + LLM + TTS combined |
| OpenAI Realtime | ~$0.08 | Audio input + audio output tokens |
| Retell AI | ~$0.10 | Platform + LLM + telephony |
| Vapi | ~$0.11 | Platform + LLM + voice |
OpenAI Realtime is competitive with managed platforms while giving you full control over the call flow, business logic, and integration points. The local pipeline wins on cost but loses on quality and maintenance burden.
When to Use OpenAI Realtime
Choose OpenAI Realtime when:
- You need GPT-4o-class conversation quality (complex reasoning, nuanced responses)
- Low latency is critical (customer-facing calls where pauses lose trust)
- You want native voice understanding (tone detection, emotional matching)
- You already run Asterisk and want to integrate AI into existing call flows
- You need function calling during voice conversations (CRM lookups, bookings)
Choose the local pipeline instead when:
- Privacy requirements prohibit sending audio to third-party APIs
- You handle thousands of concurrent calls (cost adds up fast)
- You need to run in air-gapped environments
- Call volume is high but conversations are simple (IVR replacement)
Choose a managed platform (ElevenLabs/Retell) instead when:
- You do not run your own telephony infrastructure
- You want built-in phone number provisioning and call routing
- You need pre-built integrations (Salesforce, HubSpot, Calendly)
- Development speed matters more than per-minute cost
2. Architecture Overview
System Components
┌─────────────────────────┐
│ OpenAI Realtime API │
│ (wss://api.openai. │
│ com/v1/realtime) │
└────────┬────────────────┘
│ WebSocket
│ (base64 PCM 24kHz)
│
┌──────────┐ SIP/PSTN ┌──────────┐ TCP ┌──────────────────┐
│ Caller │ ──────────────>│ Asterisk │ ────────── │ Bridge Server │
│ (Phone) │<──────────────│ Server │<────────── │ (Python/Node) │
└──────────┘ RTP Audio └──────────┘ AudioSocket└──────────────────┘
(raw PCM │
16kHz) │ HTTP
│
┌────────┴────────┐
│ Your Backend │
│ (CRM, Booking, │
│ Database) │
└─────────────────┘
Data Flow
- Caller dials in -- the call arrives at Asterisk via SIP trunk or PSTN gateway
- Asterisk answers -- the dialplan routes the call to an AudioSocket application
- AudioSocket connects -- Asterisk opens a TCP connection to the bridge server, streaming raw PCM audio at 16kHz 16-bit signed linear
- Bridge server connects to OpenAI -- opens a WebSocket to the Realtime API, configures the session (voice, system prompt, tools)
- Audio flows bidirectionally:
- Caller audio: AudioSocket (16kHz PCM) -> resample to 24kHz -> base64 encode -> send to OpenAI
- AI audio: OpenAI sends base64 PCM 24kHz -> decode + resample to 16kHz -> send back through AudioSocket
- Function calls -- when the AI decides to use a tool, the bridge server receives the function call event, executes the business logic (API call, database query), and sends the result back to OpenAI
- Call ends -- either the caller hangs up (Asterisk closes the AudioSocket connection) or the AI triggers a hangup function
Why AudioSocket
Asterisk provides several ways to connect external applications to call audio:
| Method | Protocol | Latency | Complexity | Audio Quality |
|---|---|---|---|---|
| AGI + EAGI | stdin/stdout | High | Medium | Limited |
| ARI + External Media | HTTP/WS | Medium | High | Good |
| AMI + Conference | TCP | Medium | High | Good |
| AudioSocket | TCP | Low | Low | Excellent |
AudioSocket is the clear winner for real-time voice AI. It provides:
- Raw PCM streaming -- no codec overhead, no RTP parsing, no jitter buffer complications
- Bidirectional audio -- read and write on the same TCP socket
- Simple protocol -- 3-byte header (type + length) followed by payload
- Low latency -- direct audio path, no intermediate processing
- Asterisk-native -- built into Asterisk 16+ as
res_audiosocket
AudioSocket Protocol
The AudioSocket protocol is intentionally minimal. Every message consists of:
┌──────────┬──────────────┬─────────────────────┐
│ Type (1B)│ Length (2B) │ Payload (variable) │
│ │ (big-endian) │ │
└──────────┴──────────────┴─────────────────────┘
Message types:
0x00-- UUID: The call's unique identifier (sent once at connection start)0x01-- Silence: Indicates silence (no payload, or zero-filled payload)0x10-- Audio: Raw PCM audio data (16-bit signed linear, 8kHz or 16kHz)0x02-- Hangup: The call has ended0xFF-- Error: An error occurred
The bridge server reads audio frames (type 0x10), processes them, and writes audio frames back. When the call ends, it receives a hangup message (type 0x02) or the TCP connection closes.
3. Prerequisites
System Requirements
| Component | Minimum | Recommended |
|---|---|---|
| Asterisk Version | 16.0+ | 18.0+ or 20.0+ |
| OS | Debian 11 / Ubuntu 22.04 | Debian 12 / Ubuntu 24.04 |
| Python | 3.10+ | 3.11+ |
| Node.js (alternative) | 18+ | 20+ |
| RAM | 512MB (bridge only) | 2GB+ |
| CPU | 1 core | 2+ cores |
| Network | Stable internet | Low-latency connection |
Software Dependencies
Asterisk modules:
res_audiosocket-- AudioSocket application and channel drivercodec_slin-- Signed linear codec support (usually built-in)
Python packages:
websockets>=12.0
numpy>=1.24.0
scipy>=1.11.0 # For audio resampling
aiohttp>=3.9.0 # For health endpoint and backend API calls
python-dotenv>=1.0.0 # For environment variable management
structlog>=23.0.0 # For structured logging
prometheus-client>=0.20.0 # For metrics (optional)
Node.js packages:
ws>=8.16.0
dotenv>=16.4.0
pino>=8.19.0
prom-client>=15.1.0 # For metrics (optional)
OpenAI API Setup
- Sign up or log in at platform.openai.com
- Navigate to API Keys and create a new key with Realtime API access
- Ensure your account has access to
gpt-4o-realtime-previewmodel - Note: The Realtime API requires a paid account. Free-tier accounts do not have access
- Set a spending limit appropriate to your expected call volume
# Test your API key (should return a WebSocket upgrade, not an error)
curl -s -o /dev/null -w "%{http_code}" \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "OpenAI-Beta: realtime=v1" \
"https://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview"
# Expected: 426 (Upgrade Required) -- this means auth works but needs WebSocket
# If you get 401: API key is invalid
# If you get 403: Account does not have Realtime access
4. Asterisk AudioSocket Configuration
Verify AudioSocket Module
First, confirm that res_audiosocket is available on your Asterisk installation.
# Check if the module is loaded
asterisk -rx "module show like audiosocket"
# Expected output:
# Module Description Use Count Status
# res_audiosocket.so AudioSocket support 0 Running
# If not loaded, load it manually
asterisk -rx "module load res_audiosocket"
# If the module does not exist, you need to compile Asterisk with AudioSocket support
# or install it from your package manager
If res_audiosocket.so is missing, you need to rebuild Asterisk with AudioSocket enabled:
# On Debian/Ubuntu with Asterisk source
cd /usr/src/asterisk-20.*/
make menuselect
# Navigate to: Resource Modules -> res_audiosocket (enable it)
make && make install
# On ViciBox/openSUSE
# AudioSocket is included by default in Asterisk 18+ packages
# Restart Asterisk after installation
systemctl restart asterisk
Dialplan Configuration
Add the AudioSocket dialplan to your Asterisk configuration. The exact file depends on your setup -- for standalone Asterisk use extensions.conf, for ViciDial use extensions_custom.conf or a separate included file.
; /etc/asterisk/extensions_custom.conf (or extensions.conf)
;
; AI Voice Agent -- AudioSocket to Bridge Server
;
; Route inbound calls to the AI agent via AudioSocket.
; The bridge server runs on the same machine (localhost) or a
; dedicated server. Port 9093 is the default for the bridge.
[ai-agent]
; Main entry point for AI-handled calls
exten => s,1,NoOp(AI Voice Agent -- Call from ${CALLERID(num)})
same => n,Answer()
same => n,Wait(0.5)
same => n,Set(CHANNEL(audioreadformat)=slin16)
same => n,Set(CHANNEL(audiowriteformat)=slin16)
same => n,AudioSocket(${CHANNEL(uniqueid)},YOUR_SERVER_IP:9093)
same => n,Hangup()
; Alternative: Route specific DIDs to the AI agent
[from-trunk]
; Send calls to DID 442012345678 to the AI agent
exten => 442012345678,1,NoOp(Inbound to AI Agent DID)
same => n,Goto(ai-agent,s,1)
; Send calls to DID 18005551234 to the AI agent
exten => 18005551234,1,NoOp(Inbound to AI Agent DID - US)
same => n,Goto(ai-agent,s,1)
; All other DIDs handled normally
exten => _X.,1,NoOp(Normal inbound call)
same => n,Goto(default,${EXTEN},1)
Key Dialplan Notes
Answer() before AudioSocket() -- You must answer the call before sending it to AudioSocket. Without this, Asterisk will not set up the audio path.
Wait(0.5) -- A brief pause after answering ensures the audio path is fully established before the bridge server starts receiving audio. Without this, you may get a burst of silence or garbage data at the start.
Set(CHANNEL(audioreadformat)=slin16) -- Forces the audio to 16kHz 16-bit signed linear. This is critical. The default Asterisk audio is 8kHz, which sounds terrible for voice AI. The 16kHz format provides much better audio quality for the OpenAI model to work with.
AudioSocket(uuid,host:port) -- The first argument is a UUID that identifies the call. Using ${CHANNEL(uniqueid)} ensures each call has a unique identifier. The second argument is the bridge server's address.
AudioSocket with ViciDial
If you are running ViciDial and want to route specific inbound groups to the AI agent, add the context to your custom dialplan and configure the inbound DID or inbound group to use it:
; /etc/asterisk/extensions_custom.conf
[ai-agent-vicidial]
exten => s,1,NoOp(ViciDial AI Agent -- CID: ${CALLERID(num)})
same => n,Answer()
same => n,Wait(0.5)
same => n,Set(CHANNEL(audioreadformat)=slin16)
same => n,Set(CHANNEL(audiowriteformat)=slin16)
same => n,Set(CDR(accountcode)=ai-agent)
same => n,AudioSocket(${CHANNEL(uniqueid)},127.0.0.1:9093)
same => n,Hangup()
Then in the ViciDial admin, set the DID's custom context to ai-agent-vicidial or configure the inbound group's routing to point to this context.
Codec Configuration
Ensure your Asterisk installation supports the signed linear codec at 16kHz:
# Verify codec availability
asterisk -rx "core show codecs" | grep -i slin
# Expected output should include:
# slin signed linear (16 bit) PCM
# slin16 signed linear (16 bit) PCM, 16kHz
# slin24 signed linear (16 bit) PCM, 24kHz
# slin48 signed linear (16 bit) PCM, 48kHz
If slin16 is not listed, ensure codec_slin16.so is loaded:
asterisk -rx "module load codec_slin16"
Testing the AudioSocket Route
Before building the bridge server, verify that Asterisk correctly routes calls and attempts the AudioSocket connection:
# Start a simple TCP listener to verify Asterisk connects
# (This will accept the connection and immediately close it)
nc -l -p 9093 &
# Make a test call to the AI agent context
# From the Asterisk CLI:
asterisk -rx "channel originate Local/s@ai-agent application Playback silence/1"
# You should see the connection attempt in the nc output
# Kill nc after testing
kill %1
5. OpenAI Realtime API Deep Dive
Connection Establishment
The OpenAI Realtime API uses WebSocket connections. The connection URL and authentication are:
URL: wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview
Headers:
Authorization: Bearer YOUR_API_KEY
OpenAI-Beta: realtime=v1
Once connected, you configure the session by sending a session.update event. The connection remains open for the duration of the conversation.
Audio Format
The Realtime API works with PCM audio in a specific format:
| Parameter | Value |
|---|---|
| Encoding | PCM 16-bit signed little-endian |
| Sample Rate | 24,000 Hz (24kHz) |
| Channels | 1 (mono) |
| Transport | Base64 encoded |
This is important because Asterisk AudioSocket provides audio at 16kHz and the Realtime API expects 24kHz. The bridge server must resample between these two rates.
Event Types
The Realtime API communicates through JSON events over the WebSocket. Here are the events you will use most frequently:
Client -> Server (you send these):
| Event | Purpose |
|---|---|
session.update |
Configure session (voice, instructions, tools, VAD) |
input_audio_buffer.append |
Send audio data (base64 PCM 24kHz) |
input_audio_buffer.commit |
Manually commit the audio buffer (when VAD is off) |
conversation.item.create |
Add a text message to the conversation |
response.create |
Request a response (when VAD is off) |
response.cancel |
Cancel an in-progress response |
Server -> Client (you receive these):
| Event | Purpose |
|---|---|
session.created |
Connection established, session is ready |
session.updated |
Session configuration confirmed |
input_audio_buffer.speech_started |
VAD detected speech start |
input_audio_buffer.speech_stopped |
VAD detected speech end |
response.audio.delta |
Chunk of audio response (base64 PCM 24kHz) |
response.audio.done |
Audio response complete |
response.audio_transcript.delta |
Chunk of transcript text |
response.audio_transcript.done |
Full transcript available |
response.function_call_arguments.delta |
Chunk of function call args |
response.function_call_arguments.done |
Function call ready to execute |
response.done |
Full response complete |
error |
An error occurred |
rate_limits.updated |
Current rate limit status |
Session Configuration
The session configuration is the most important message you send. It defines the AI's behavior for the entire conversation:
{
"type": "session.update",
"session": {
"modalities": ["text", "audio"],
"instructions": "You are a friendly receptionist for Acme Plumbing...",
"voice": "alloy",
"input_audio_format": "pcm16",
"output_audio_format": "pcm16",
"input_audio_transcription": {
"model": "whisper-1"
},
"turn_detection": {
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 300,
"silence_duration_ms": 500,
"create_response": true
},
"tools": [
{
"type": "function",
"name": "check_appointment",
"description": "Look up an existing appointment by customer phone number",
"parameters": {
"type": "object",
"properties": {
"phone_number": {
"type": "string",
"description": "Customer phone number in E.164 format"
}
},
"required": ["phone_number"]
}
}
],
"tool_choice": "auto",
"temperature": 0.7,
"max_response_output_tokens": 4096
}
}
Voice Options
The Realtime API offers several voices, each with a distinct character:
| Voice | Character | Best For |
|---|---|---|
alloy |
Neutral, balanced | General purpose, professional |
ash |
Warm, confident | Customer service, sales |
ballad |
Soft, gentle | Healthcare, counseling |
coral |
Friendly, upbeat | Booking, hospitality |
echo |
Clear, authoritative | Technical support, instructions |
sage |
Calm, measured | Finance, legal, formal |
shimmer |
Bright, energetic | Marketing, entertainment |
verse |
Conversational, natural | General purpose, casual |
For a professional receptionist handling inbound calls, alloy or ash work best. For a more casual, friendly booking agent, coral or verse are better choices.
Voice Activity Detection (VAD)
The turn_detection configuration controls how the API decides when the caller has finished speaking and it is the AI's turn to respond. This is critical for natural conversation flow.
"turn_detection": {
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 300,
"silence_duration_ms": 500,
"create_response": true
}
Parameters explained:
threshold(0.0 - 1.0): How sensitive the VAD is. Lower values detect quieter speech but may trigger on background noise. Higher values require louder speech. Start with 0.5 and adjust based on your call environment.prefix_padding_ms: How much audio before the detected speech start to include. 300ms ensures you capture the beginning of words that the VAD might otherwise cut off.silence_duration_ms: How long the caller must be silent before the API considers them done speaking. 500ms is a good default -- short enough that the agent feels responsive, long enough that normal speech pauses (breathing, thinking) do not trigger a premature response.create_response: When true, the API automatically generates a response when it detects the caller has stopped speaking. When false, you must manually sendresponse.createevents.
Tuning tips:
- If the AI interrupts callers mid-sentence: increase
silence_duration_msto 700-1000ms - If the AI takes too long to respond after the caller stops: decrease
silence_duration_msto 300-400ms - If the AI responds to background noise: increase
thresholdto 0.6-0.7 - If the AI misses quiet callers: decrease
thresholdto 0.3-0.4
Interruption Handling
One of the Realtime API's best features is natural interruption handling. When the caller starts speaking while the AI is still generating audio:
- The API sends
input_audio_buffer.speech_started - The current response is automatically cancelled
- The API processes the caller's new input
- A new response is generated
This means the bridge server must also stop playing audio to the caller when an interruption is detected. The bridge server code handles this by clearing its audio output buffer when it receives a speech_started event.
6. Bridge Server -- Python
This is the core component -- the bridge between Asterisk's AudioSocket protocol and OpenAI's Realtime WebSocket API. The Python implementation uses asyncio for concurrent handling of both connections.
Project Structure
ai-voice-bridge/
├── bridge.py # Main bridge server
├── audiosocket.py # AudioSocket protocol handler
├── resampler.py # Audio resampling (16kHz <-> 24kHz)
├── config.py # Configuration management
├── requirements.txt # Python dependencies
├── .env # Environment variables (not committed)
├── .env.example # Example environment file
└── systemd/
└── ai-voice-bridge.service # systemd unit file
Configuration (config.py)
"""
Configuration management for AI Voice Bridge.
Loads settings from environment variables with sensible defaults.
"""
import os
from dataclasses import dataclass, field
from dotenv import load_dotenv
load_dotenv()
@dataclass
class Config:
"""Bridge server configuration."""
# OpenAI
openai_api_key: str = field(
default_factory=lambda: os.getenv("OPENAI_API_KEY", "")
)
openai_model: str = field(
default_factory=lambda: os.getenv("OPENAI_MODEL", "gpt-4o-realtime-preview")
)
openai_voice: str = field(
default_factory=lambda: os.getenv("OPENAI_VOICE", "alloy")
)
openai_url: str = field(
default_factory=lambda: os.getenv(
"OPENAI_REALTIME_URL",
"wss://api.openai.com/v1/realtime"
)
)
# AudioSocket server
listen_host: str = field(
default_factory=lambda: os.getenv("LISTEN_HOST", "0.0.0.0")
)
listen_port: int = field(
default_factory=lambda: int(os.getenv("LISTEN_PORT", "9093"))
)
# Voice agent behavior
system_prompt: str = field(
default_factory=lambda: os.getenv("SYSTEM_PROMPT", "You are a helpful assistant.")
)
vad_threshold: float = field(
default_factory=lambda: float(os.getenv("VAD_THRESHOLD", "0.5"))
)
vad_silence_ms: int = field(
default_factory=lambda: int(os.getenv("VAD_SILENCE_MS", "500"))
)
vad_prefix_padding_ms: int = field(
default_factory=lambda: int(os.getenv("VAD_PREFIX_PADDING_MS", "300"))
)
temperature: float = field(
default_factory=lambda: float(os.getenv("TEMPERATURE", "0.7"))
)
# Health endpoint
health_port: int = field(
default_factory=lambda: int(os.getenv("HEALTH_PORT", "9094"))
)
# Logging
log_level: str = field(
default_factory=lambda: os.getenv("LOG_LEVEL", "INFO")
)
log_transcripts: bool = field(
default_factory=lambda: os.getenv("LOG_TRANSCRIPTS", "true").lower() == "true"
)
def validate(self):
"""Validate required configuration."""
if not self.openai_api_key:
raise ValueError("OPENAI_API_KEY environment variable is required")
if not self.system_prompt:
raise ValueError("SYSTEM_PROMPT must not be empty")
Environment File (.env.example)
# OpenAI Configuration
OPENAI_API_KEY=sk-your-api-key-here
OPENAI_MODEL=gpt-4o-realtime-preview
OPENAI_VOICE=alloy
# Bridge Server
LISTEN_HOST=0.0.0.0
LISTEN_PORT=9093
HEALTH_PORT=9094
# Voice Agent
SYSTEM_PROMPT=You are a friendly receptionist for Acme Services. Answer calls professionally, collect the caller's name, phone number, and the nature of their inquiry. If they want to book an appointment, use the book_appointment function.
VAD_THRESHOLD=0.5
VAD_SILENCE_MS=500
VAD_PREFIX_PADDING_MS=300
TEMPERATURE=0.7
# Logging
LOG_LEVEL=INFO
LOG_TRANSCRIPTS=true
Audio Resampler (resampler.py)
"""
Audio resampling between Asterisk (16kHz) and OpenAI Realtime API (24kHz).
Uses scipy for high-quality resampling. The ratio is always 2:3 or 3:2,
which scipy handles efficiently.
"""
import numpy as np
from scipy.signal import resample_poly
class AudioResampler:
"""Resample PCM audio between 16kHz and 24kHz."""
@staticmethod
def resample_16k_to_24k(pcm_data: bytes) -> bytes:
"""
Resample 16kHz PCM to 24kHz PCM.
Used for: Asterisk AudioSocket -> OpenAI Realtime API
Args:
pcm_data: Raw PCM bytes (16-bit signed LE, 16kHz mono)
Returns:
Raw PCM bytes (16-bit signed LE, 24kHz mono)
"""
if not pcm_data:
return b""
# Convert bytes to numpy int16 array
samples = np.frombuffer(pcm_data, dtype=np.int16)
# Resample: 16kHz -> 24kHz = multiply by 3/2
resampled = resample_poly(samples, up=3, down=2).astype(np.int16)
return resampled.tobytes()
@staticmethod
def resample_24k_to_16k(pcm_data: bytes) -> bytes:
"""
Resample 24kHz PCM to 16kHz PCM.
Used for: OpenAI Realtime API -> Asterisk AudioSocket
Args:
pcm_data: Raw PCM bytes (16-bit signed LE, 24kHz mono)
Returns:
Raw PCM bytes (16-bit signed LE, 16kHz mono)
"""
if not pcm_data:
return b""
# Convert bytes to numpy int16 array
samples = np.frombuffer(pcm_data, dtype=np.int16)
# Resample: 24kHz -> 16kHz = multiply by 2/3
resampled = resample_poly(samples, up=2, down=3).astype(np.int16)
return resampled.tobytes()
AudioSocket Protocol Handler (audiosocket.py)
"""
AudioSocket protocol handler for Asterisk.
Implements the AudioSocket wire protocol:
- 1 byte: message type
- 2 bytes: payload length (big-endian)
- N bytes: payload
Message types:
- 0x00: UUID (call identifier)
- 0x01: Silence
- 0x02: Hangup
- 0x10: Audio data (PCM)
- 0xFF: Error
"""
import asyncio
import struct
import uuid
import structlog
logger = structlog.get_logger()
# AudioSocket message types
MSG_UUID = 0x00
MSG_SILENCE = 0x01
MSG_HANGUP = 0x02
MSG_AUDIO = 0x10
MSG_ERROR = 0xFF
class AudioSocketHandler:
"""
Handles a single AudioSocket connection from Asterisk.
Usage:
handler = AudioSocketHandler(reader, writer)
call_id = await handler.read_uuid()
# Read audio in a loop
async for audio_chunk in handler.read_audio():
process(audio_chunk)
# Write audio back
await handler.write_audio(pcm_data)
"""
def __init__(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
):
self.reader = reader
self.writer = writer
self.call_id: str | None = None
self._closed = False
async def read_message(self) -> tuple[int, bytes]:
"""
Read a single AudioSocket message.
Returns:
Tuple of (message_type, payload)
Raises:
ConnectionError: If the connection is closed
"""
# Read 3-byte header: type (1) + length (2)
header = await self.reader.readexactly(3)
msg_type = header[0]
payload_len = struct.unpack(">H", header[1:3])[0]
# Read payload
payload = b""
if payload_len > 0:
payload = await self.reader.readexactly(payload_len)
return msg_type, payload
async def read_uuid(self) -> str:
"""
Read the initial UUID message that identifies the call.
This must be the first message read after connection.
Returns:
The call UUID as a string
"""
msg_type, payload = await self.read_message()
if msg_type != MSG_UUID:
raise ValueError(
f"Expected UUID message (0x00), got 0x{msg_type:02x}"
)
self.call_id = uuid.UUID(bytes=payload).hex
logger.info("audiosocket.uuid_received", call_id=self.call_id)
return self.call_id
async def read_audio(self):
"""
Async generator that yields audio chunks from the AudioSocket.
Yields PCM audio data (bytes) until hangup or connection close.
"""
try:
while not self._closed:
msg_type, payload = await self.read_message()
if msg_type == MSG_AUDIO:
yield payload
elif msg_type == MSG_SILENCE:
# Silence frame -- yield silence bytes to keep timing
# 20ms of silence at 16kHz 16-bit = 640 bytes
yield b"\x00" * 640
elif msg_type == MSG_HANGUP:
logger.info(
"audiosocket.hangup_received",
call_id=self.call_id,
)
self._closed = True
return
elif msg_type == MSG_ERROR:
logger.error(
"audiosocket.error_received",
call_id=self.call_id,
payload=payload.hex(),
)
self._closed = True
return
except asyncio.IncompleteReadError:
logger.info(
"audiosocket.connection_closed",
call_id=self.call_id,
)
self._closed = True
except Exception as e:
logger.error(
"audiosocket.read_error",
call_id=self.call_id,
error=str(e),
)
self._closed = True
async def write_audio(self, pcm_data: bytes):
"""
Write PCM audio data back to Asterisk through the AudioSocket.
Args:
pcm_data: Raw PCM bytes (16-bit signed LE, 16kHz mono)
"""
if self._closed:
return
try:
# Build AudioSocket frame: type (0x10) + length (2 bytes) + data
# Send in chunks of 640 bytes (20ms at 16kHz 16-bit)
chunk_size = 640
for i in range(0, len(pcm_data), chunk_size):
chunk = pcm_data[i : i + chunk_size]
header = struct.pack(">BH", MSG_AUDIO, len(chunk))
self.writer.write(header + chunk)
await self.writer.drain()
except Exception as e:
logger.error(
"audiosocket.write_error",
call_id=self.call_id,
error=str(e),
)
self._closed = True
async def send_hangup(self):
"""Send a hangup message to Asterisk."""
if self._closed:
return
try:
header = struct.pack(">BH", MSG_HANGUP, 0)
self.writer.write(header)
await self.writer.drain()
except Exception:
pass
finally:
self._closed = True
def close(self):
"""Close the AudioSocket connection."""
self._closed = True
try:
self.writer.close()
except Exception:
pass
@property
def is_closed(self) -> bool:
return self._closed
Main Bridge Server (bridge.py)
"""
AI Voice Bridge Server
Bridges Asterisk AudioSocket connections to OpenAI's Realtime API.
Each incoming call creates:
1. An AudioSocket handler (reads/writes PCM from/to Asterisk)
2. A WebSocket connection to OpenAI Realtime API
3. Two async tasks: one for caller->AI audio, one for AI->caller audio
Audio flow:
Caller -> Asterisk -> AudioSocket (16kHz PCM)
-> resample to 24kHz -> base64 encode
-> WebSocket -> OpenAI Realtime API
-> response audio (base64 24kHz PCM)
-> decode + resample to 16kHz
-> AudioSocket -> Asterisk -> Caller
"""
import asyncio
import base64
import json
import signal
import sys
import time
from collections import deque
from datetime import datetime, timezone
import structlog
import websockets
from aiohttp import web
from audiosocket import AudioSocketHandler
from config import Config
from resampler import AudioResampler
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.BoundLogger,
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
)
logger = structlog.get_logger()
class CallSession:
"""
Manages a single call session between AudioSocket and OpenAI Realtime.
"""
def __init__(self, config: Config, call_id: str):
self.config = config
self.call_id = call_id
self.resampler = AudioResampler()
# Audio output buffer (PCM 16kHz to send to Asterisk)
self.output_buffer: deque[bytes] = deque()
self.output_event = asyncio.Event()
# State tracking
self.is_active = True
self.start_time = time.time()
self.ai_speaking = False
# Transcript logging
self.transcript: list[dict] = []
# WebSocket connection to OpenAI
self.ws = None
# Metrics
self.audio_bytes_in = 0
self.audio_bytes_out = 0
self.response_count = 0
self.function_calls = 0
async def connect_openai(self):
"""Establish WebSocket connection to OpenAI Realtime API."""
url = f"{self.config.openai_url}?model={self.config.openai_model}"
headers = {
"Authorization": f"Bearer {self.config.openai_api_key}",
"OpenAI-Beta": "realtime=v1",
}
logger.info(
"openai.connecting",
call_id=self.call_id,
model=self.config.openai_model,
)
self.ws = await websockets.connect(
url,
additional_headers=headers,
max_size=2**24, # 16MB max message size
ping_interval=20,
ping_timeout=10,
)
# Wait for session.created event
msg = await self.ws.recv()
event = json.loads(msg)
if event["type"] != "session.created":
raise RuntimeError(
f"Expected session.created, got {event['type']}"
)
logger.info(
"openai.connected",
call_id=self.call_id,
session_id=event.get("session", {}).get("id"),
)
# Configure the session
await self._configure_session()
async def _configure_session(self):
"""Send session configuration to OpenAI."""
session_config = {
"type": "session.update",
"session": {
"modalities": ["text", "audio"],
"instructions": self.config.system_prompt,
"voice": self.config.openai_voice,
"input_audio_format": "pcm16",
"output_audio_format": "pcm16",
"input_audio_transcription": {
"model": "whisper-1",
},
"turn_detection": {
"type": "server_vad",
"threshold": self.config.vad_threshold,
"prefix_padding_ms": self.config.vad_prefix_padding_ms,
"silence_duration_ms": self.config.vad_silence_ms,
"create_response": True,
},
"tools": self._get_tools(),
"tool_choice": "auto",
"temperature": self.config.temperature,
"max_response_output_tokens": 4096,
},
}
await self.ws.send(json.dumps(session_config))
logger.info("openai.session_configured", call_id=self.call_id)
def _get_tools(self) -> list[dict]:
"""
Define the tools (functions) available to the AI agent.
Override or extend this method to add your business-specific tools.
"""
return [
{
"type": "function",
"name": "transfer_call",
"description": (
"Transfer the caller to a human agent or specific "
"department. Use when the caller requests to speak with "
"a person or when the issue requires human intervention."
),
"parameters": {
"type": "object",
"properties": {
"department": {
"type": "string",
"enum": [
"sales",
"support",
"billing",
"manager",
],
"description": "Department to transfer to",
},
"reason": {
"type": "string",
"description": "Brief reason for the transfer",
},
},
"required": ["department", "reason"],
},
},
{
"type": "function",
"name": "end_call",
"description": (
"End the phone call after the conversation is complete "
"and the caller has confirmed they have no more questions."
),
"parameters": {
"type": "object",
"properties": {
"summary": {
"type": "string",
"description": "Brief summary of the call",
},
"disposition": {
"type": "string",
"enum": [
"resolved",
"callback_needed",
"transferred",
"voicemail",
],
"description": "Call outcome",
},
},
"required": ["summary", "disposition"],
},
},
{
"type": "function",
"name": "lookup_customer",
"description": (
"Look up customer information by phone number. "
"Returns name, account status, and recent interactions."
),
"parameters": {
"type": "object",
"properties": {
"phone_number": {
"type": "string",
"description": (
"Customer phone number"
),
},
},
"required": ["phone_number"],
},
},
{
"type": "function",
"name": "book_appointment",
"description": (
"Book a service appointment for the caller."
),
"parameters": {
"type": "object",
"properties": {
"customer_name": {
"type": "string",
"description": "Full name of the customer",
},
"phone_number": {
"type": "string",
"description": "Contact phone number",
},
"service_type": {
"type": "string",
"description": (
"Type of service needed"
),
},
"preferred_date": {
"type": "string",
"description": (
"Preferred date (YYYY-MM-DD format)"
),
},
"preferred_time": {
"type": "string",
"enum": ["morning", "afternoon", "evening"],
"description": "Preferred time of day",
},
"address": {
"type": "string",
"description": "Service address",
},
"notes": {
"type": "string",
"description": (
"Additional notes about the service needed"
),
},
},
"required": [
"customer_name",
"phone_number",
"service_type",
"preferred_date",
"preferred_time",
"address",
],
},
},
]
async def handle_caller_audio(self, audio_socket: AudioSocketHandler):
"""
Read audio from AudioSocket and forward to OpenAI.
Runs as an async task for the duration of the call.
"""
try:
audio_accumulator = bytearray()
# Send audio in ~100ms chunks to OpenAI (balance latency vs overhead)
# 16kHz * 2 bytes * 0.1s = 3200 bytes
chunk_threshold = 3200
async for pcm_chunk in audio_socket.read_audio():
if not self.is_active:
break
self.audio_bytes_in += len(pcm_chunk)
audio_accumulator.extend(pcm_chunk)
if len(audio_accumulator) >= chunk_threshold:
# Resample 16kHz -> 24kHz for OpenAI
resampled = self.resampler.resample_16k_to_24k(
bytes(audio_accumulator)
)
# Base64 encode and send to OpenAI
encoded = base64.b64encode(resampled).decode("ascii")
event = {
"type": "input_audio_buffer.append",
"audio": encoded,
}
await self.ws.send(json.dumps(event))
audio_accumulator.clear()
# Send any remaining audio
if audio_accumulator:
resampled = self.resampler.resample_16k_to_24k(
bytes(audio_accumulator)
)
encoded = base64.b64encode(resampled).decode("ascii")
event = {
"type": "input_audio_buffer.append",
"audio": encoded,
}
await self.ws.send(json.dumps(event))
except Exception as e:
logger.error(
"caller_audio.error",
call_id=self.call_id,
error=str(e),
)
finally:
self.is_active = False
logger.info(
"caller_audio.stopped",
call_id=self.call_id,
bytes_received=self.audio_bytes_in,
)
async def handle_openai_events(self, audio_socket: AudioSocketHandler):
"""
Receive events from OpenAI and handle them.
Runs as an async task for the duration of the call.
"""
try:
async for msg in self.ws:
if not self.is_active:
break
event = json.loads(msg)
event_type = event.get("type", "")
if event_type == "response.audio.delta":
await self._handle_audio_delta(event, audio_socket)
elif event_type == "response.audio.done":
self.ai_speaking = False
logger.debug(
"openai.audio_response_complete",
call_id=self.call_id,
)
elif event_type == "response.audio_transcript.done":
transcript_text = event.get("transcript", "")
if transcript_text and self.config.log_transcripts:
self.transcript.append({
"role": "assistant",
"text": transcript_text,
"timestamp": datetime.now(timezone.utc).isoformat(),
})
logger.info(
"transcript.assistant",
call_id=self.call_id,
text=transcript_text,
)
elif event_type == "conversation.item.input_audio_transcription.completed":
transcript_text = event.get("transcript", "")
if transcript_text and self.config.log_transcripts:
self.transcript.append({
"role": "caller",
"text": transcript_text,
"timestamp": datetime.now(timezone.utc).isoformat(),
})
logger.info(
"transcript.caller",
call_id=self.call_id,
text=transcript_text,
)
elif event_type == "input_audio_buffer.speech_started":
# Caller started speaking -- clear output buffer
# to stop playing AI audio (interruption)
if self.ai_speaking:
self.output_buffer.clear()
self.ai_speaking = False
logger.info(
"caller.interrupted_ai",
call_id=self.call_id,
)
elif event_type == "input_audio_buffer.speech_stopped":
logger.debug(
"caller.speech_stopped",
call_id=self.call_id,
)
elif event_type == "response.function_call_arguments.done":
await self._handle_function_call(event)
elif event_type == "response.done":
self.response_count += 1
elif event_type == "error":
error_info = event.get("error", {})
logger.error(
"openai.error",
call_id=self.call_id,
error_type=error_info.get("type"),
error_message=error_info.get("message"),
error_code=error_info.get("code"),
)
elif event_type == "rate_limits.updated":
limits = event.get("rate_limits", [])
for limit in limits:
if limit.get("remaining", 999) < 10:
logger.warning(
"openai.rate_limit_low",
call_id=self.call_id,
name=limit.get("name"),
remaining=limit.get("remaining"),
)
elif event_type == "session.updated":
logger.debug(
"openai.session_updated",
call_id=self.call_id,
)
except websockets.ConnectionClosed as e:
logger.warning(
"openai.connection_closed",
call_id=self.call_id,
code=e.code,
reason=e.reason,
)
except Exception as e:
logger.error(
"openai_events.error",
call_id=self.call_id,
error=str(e),
)
finally:
self.is_active = False
logger.info(
"openai_events.stopped",
call_id=self.call_id,
responses=self.response_count,
function_calls=self.function_calls,
)
async def _handle_audio_delta(
self, event: dict, audio_socket: AudioSocketHandler
):
"""Process an audio delta event from OpenAI and send to caller."""
self.ai_speaking = True
# Decode base64 audio (24kHz PCM)
audio_b64 = event.get("delta", "")
if not audio_b64:
return
pcm_24k = base64.b64decode(audio_b64)
# Resample 24kHz -> 16kHz for Asterisk
pcm_16k = self.resampler.resample_24k_to_16k(pcm_24k)
self.audio_bytes_out += len(pcm_16k)
# Write directly to AudioSocket
await audio_socket.write_audio(pcm_16k)
async def _handle_function_call(self, event: dict):
"""Handle a function call from the AI."""
self.function_calls += 1
func_name = event.get("name", "")
call_id = event.get("call_id", "")
args_str = event.get("arguments", "{}")
try:
args = json.loads(args_str)
except json.JSONDecodeError:
args = {}
logger.info(
"function_call.received",
call_id=self.call_id,
function=func_name,
arguments=args,
)
# Execute the function and get the result
result = await self._execute_function(func_name, args)
# Send the function result back to OpenAI
response_event = {
"type": "conversation.item.create",
"item": {
"type": "function_call_output",
"call_id": call_id,
"output": json.dumps(result),
},
}
await self.ws.send(json.dumps(response_event))
# Trigger a new response after the function result
await self.ws.send(json.dumps({"type": "response.create"}))
logger.info(
"function_call.completed",
call_id=self.call_id,
function=func_name,
result=result,
)
async def _execute_function(
self, name: str, args: dict
) -> dict:
"""
Execute a function call.
This is where you implement your business logic.
Replace the placeholder implementations with real API calls,
database queries, etc.
"""
if name == "transfer_call":
department = args.get("department", "support")
reason = args.get("reason", "Caller requested transfer")
# In production, you would trigger a call transfer here
# by sending AMI commands to Asterisk or updating a database flag
logger.info(
"function.transfer_call",
call_id=self.call_id,
department=department,
reason=reason,
)
return {
"status": "success",
"message": f"Transferring to {department}. "
f"Please hold while I connect you.",
}
elif name == "end_call":
summary = args.get("summary", "Call completed")
disposition = args.get("disposition", "resolved")
logger.info(
"function.end_call",
call_id=self.call_id,
summary=summary,
disposition=disposition,
)
# Set flag to end the call after the AI finishes speaking
self.is_active = False
return {
"status": "success",
"message": "Call ending. Say goodbye to the caller.",
}
elif name == "lookup_customer":
phone = args.get("phone_number", "")
# Replace with real CRM/database lookup
logger.info(
"function.lookup_customer",
call_id=self.call_id,
phone=phone,
)
return {
"status": "found",
"customer": {
"name": "John Smith",
"account_id": "CUST-12345",
"status": "active",
"last_service": "2025-12-15",
"notes": "Preferred morning appointments",
},
}
elif name == "book_appointment":
logger.info(
"function.book_appointment",
call_id=self.call_id,
customer=args.get("customer_name"),
service=args.get("service_type"),
date=args.get("preferred_date"),
)
# Replace with real booking system API call
return {
"status": "confirmed",
"booking_id": "BK-2026-0314-001",
"date": args.get("preferred_date", "TBD"),
"time_slot": args.get("preferred_time", "morning"),
"message": (
"Appointment confirmed. The customer will receive "
"a confirmation text message."
),
}
else:
logger.warning(
"function.unknown",
call_id=self.call_id,
function=name,
)
return {"status": "error", "message": f"Unknown function: {name}"}
def get_duration(self) -> float:
"""Get call duration in seconds."""
return time.time() - self.start_time
def get_summary(self) -> dict:
"""Get call summary for logging."""
return {
"call_id": self.call_id,
"duration_seconds": round(self.get_duration(), 1),
"audio_in_bytes": self.audio_bytes_in,
"audio_out_bytes": self.audio_bytes_out,
"responses": self.response_count,
"function_calls": self.function_calls,
"transcript_turns": len(self.transcript),
}
class BridgeServer:
"""
Main bridge server that listens for AudioSocket connections
and bridges them to OpenAI Realtime API.
"""
def __init__(self, config: Config):
self.config = config
self.active_calls: dict[str, CallSession] = {}
self._server = None
self._health_app = None
async def start(self):
"""Start the bridge server and health endpoint."""
self.config.validate()
# Start the AudioSocket TCP server
self._server = await asyncio.start_server(
self._handle_connection,
self.config.listen_host,
self.config.listen_port,
)
logger.info(
"bridge.started",
host=self.config.listen_host,
port=self.config.listen_port,
health_port=self.config.health_port,
)
# Start health endpoint
health_task = asyncio.create_task(self._start_health_server())
# Run the server
async with self._server:
await self._server.serve_forever()
async def _handle_connection(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
):
"""Handle a new AudioSocket connection from Asterisk."""
audio_socket = AudioSocketHandler(reader, writer)
session = None
try:
# Read the call UUID
call_id = await audio_socket.read_uuid()
# Create a new call session
session = CallSession(self.config, call_id)
self.active_calls[call_id] = session
logger.info(
"call.started",
call_id=call_id,
active_calls=len(self.active_calls),
)
# Connect to OpenAI
await session.connect_openai()
# Run caller audio and OpenAI event handlers concurrently
caller_task = asyncio.create_task(
session.handle_caller_audio(audio_socket)
)
openai_task = asyncio.create_task(
session.handle_openai_events(audio_socket)
)
# Wait for either task to complete (call ends)
done, pending = await asyncio.wait(
[caller_task, openai_task],
return_when=asyncio.FIRST_COMPLETED,
)
# Cancel the other task
for task in pending:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(
"call.error",
call_id=audio_socket.call_id,
error=str(e),
error_type=type(e).__name__,
)
finally:
# Clean up
if session:
summary = session.get_summary()
logger.info("call.ended", **summary)
# Log full transcript
if session.transcript and self.config.log_transcripts:
logger.info(
"call.transcript",
call_id=session.call_id,
transcript=session.transcript,
)
# Close OpenAI WebSocket
if session.ws:
await session.ws.close()
# Remove from active calls
self.active_calls.pop(session.call_id, None)
# Close AudioSocket
audio_socket.close()
async def _start_health_server(self):
"""Start the HTTP health check endpoint."""
app = web.Application()
app.router.add_get("/health", self._health_handler)
app.router.add_get("/metrics", self._metrics_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "0.0.0.0", self.config.health_port)
await site.start()
logger.info(
"health.started",
port=self.config.health_port,
)
async def _health_handler(self, request: web.Request) -> web.Response:
"""Health check endpoint."""
return web.json_response({
"status": "healthy",
"active_calls": len(self.active_calls),
"uptime_seconds": round(time.time() - self._start_time, 1)
if hasattr(self, "_start_time")
else 0,
})
async def _metrics_handler(self, request: web.Request) -> web.Response:
"""Prometheus-compatible metrics endpoint."""
lines = []
lines.append(
f"ai_voice_bridge_active_calls {len(self.active_calls)}"
)
for call_id, session in self.active_calls.items():
lines.append(
f'ai_voice_bridge_call_duration_seconds{{call_id="{call_id}"}} '
f"{session.get_duration():.1f}"
)
return web.Response(
text="\n".join(lines) + "\n",
content_type="text/plain",
)
async def main():
"""Entry point."""
config = Config()
# Set up signal handlers for graceful shutdown
loop = asyncio.get_event_loop()
server = BridgeServer(config)
server._start_time = time.time()
# Handle Ctrl+C and SIGTERM
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(
sig,
lambda: asyncio.create_task(shutdown(server)),
)
await server.start()
async def shutdown(server: BridgeServer):
"""Graceful shutdown."""
logger.info(
"bridge.shutting_down",
active_calls=len(server.active_calls),
)
# Close all active calls
for call_id, session in list(server.active_calls.items()):
session.is_active = False
if session.ws:
await session.ws.close()
# Stop the server
if server._server:
server._server.close()
await server._server.wait_closed()
# Exit
sys.exit(0)
if __name__ == "__main__":
asyncio.run(main())
Requirements File (requirements.txt)
websockets>=12.0
numpy>=1.24.0
scipy>=1.11.0
aiohttp>=3.9.0
python-dotenv>=1.0.0
structlog>=23.0.0
prometheus-client>=0.20.0
Running the Bridge Server
# Create project directory
mkdir -p /opt/ai-voice-bridge
cd /opt/ai-voice-bridge
# Create virtual environment
python3.11 -m venv venv
source venv/bin/activate
# Install dependencies
pip install -r requirements.txt
# Copy configuration
cp .env.example .env
# Edit .env with your OpenAI API key and settings
nano .env
# Run the bridge server
python bridge.py
# Expected output:
# {"event": "bridge.started", "host": "0.0.0.0", "port": 9093, ...}
# {"event": "health.started", "port": 9094}
Testing End-to-End
With the bridge server running, make a test call:
# From the Asterisk CLI, originate a test call
asterisk -rx "channel originate Local/s@ai-agent application Wait 60"
# You should see logs in the bridge server:
# {"event": "audiosocket.uuid_received", "call_id": "..."}
# {"event": "openai.connecting", ...}
# {"event": "openai.connected", ...}
# {"event": "openai.session_configured", ...}
# {"event": "call.started", "active_calls": 1}
# From a SIP phone or softphone, call the DID configured in your dialplan
# You should hear the AI agent respond to your voice
7. Bridge Server -- Node.js Alternative
If your team prefers JavaScript/TypeScript, here is an equivalent bridge server implementation using Node.js. The architecture is identical -- the same AudioSocket protocol, the same OpenAI Realtime API events, the same bidirectional audio flow.
Project Structure
ai-voice-bridge-node/
├── bridge.js # Main bridge server
├── audiosocket.js # AudioSocket protocol handler
├── resampler.js # Audio resampling (16kHz <-> 24kHz)
├── package.json # Dependencies
├── .env # Environment variables
└── systemd/
└── ai-voice-bridge.service
Package Configuration (package.json)
{
"name": "ai-voice-bridge",
"version": "1.0.0",
"description": "Bridge between Asterisk AudioSocket and OpenAI Realtime API",
"main": "bridge.js",
"scripts": {
"start": "node bridge.js",
"dev": "node --watch bridge.js"
},
"dependencies": {
"ws": "^8.16.0",
"dotenv": "^16.4.0",
"pino": "^8.19.0",
"pino-pretty": "^10.3.0"
}
}
AudioSocket Handler (audiosocket.js)
/**
* AudioSocket protocol handler for Asterisk.
*
* Protocol: 1 byte type + 2 bytes length (big-endian) + payload
*
* Types:
* 0x00 - UUID
* 0x01 - Silence
* 0x02 - Hangup
* 0x10 - Audio (PCM)
* 0xFF - Error
*/
const { EventEmitter } = require("events");
const MSG_UUID = 0x00;
const MSG_SILENCE = 0x01;
const MSG_HANGUP = 0x02;
const MSG_AUDIO = 0x10;
const MSG_ERROR = 0xff;
class AudioSocketHandler extends EventEmitter {
constructor(socket) {
super();
this.socket = socket;
this.callId = null;
this.closed = false;
this._buffer = Buffer.alloc(0);
socket.on("data", (data) => this._onData(data));
socket.on("close", () => this._onClose());
socket.on("error", (err) => this._onError(err));
}
_onData(data) {
// Accumulate data in buffer
this._buffer = Buffer.concat([this._buffer, data]);
// Process complete messages
while (this._buffer.length >= 3) {
const msgType = this._buffer[0];
const payloadLen = this._buffer.readUInt16BE(1);
// Check if we have the full message
if (this._buffer.length < 3 + payloadLen) {
break; // Wait for more data
}
const payload = this._buffer.subarray(3, 3 + payloadLen);
this._buffer = this._buffer.subarray(3 + payloadLen);
this._handleMessage(msgType, payload);
}
}
_handleMessage(msgType, payload) {
switch (msgType) {
case MSG_UUID:
// Parse UUID from 16 bytes
const hex = payload.toString("hex");
this.callId =
hex.slice(0, 8) +
"-" +
hex.slice(8, 12) +
"-" +
hex.slice(12, 16) +
"-" +
hex.slice(16, 20) +
"-" +
hex.slice(20);
this.emit("uuid", this.callId);
break;
case MSG_AUDIO:
this.emit("audio", payload);
break;
case MSG_SILENCE:
// Emit 20ms of silence at 16kHz 16-bit = 640 bytes
this.emit("audio", Buffer.alloc(640));
break;
case MSG_HANGUP:
this.closed = true;
this.emit("hangup");
break;
case MSG_ERROR:
this.closed = true;
this.emit("error", new Error(`AudioSocket error: ${payload.toString("hex")}`));
break;
}
}
writeAudio(pcmData) {
if (this.closed) return;
try {
// Send in 640-byte chunks (20ms at 16kHz 16-bit)
const chunkSize = 640;
for (let i = 0; i < pcmData.length; i += chunkSize) {
const chunk = pcmData.subarray(i, Math.min(i + chunkSize, pcmData.length));
const header = Buffer.alloc(3);
header[0] = MSG_AUDIO;
header.writeUInt16BE(chunk.length, 1);
this.socket.write(Buffer.concat([header, chunk]));
}
} catch (err) {
this.closed = true;
this.emit("error", err);
}
}
sendHangup() {
if (this.closed) return;
try {
const header = Buffer.alloc(3);
header[0] = MSG_HANGUP;
header.writeUInt16BE(0, 1);
this.socket.write(header);
} catch (_) {
// Ignore errors during hangup
}
this.closed = true;
}
_onClose() {
this.closed = true;
this.emit("close");
}
_onError(err) {
this.closed = true;
this.emit("error", err);
}
close() {
this.closed = true;
try {
this.socket.destroy();
} catch (_) {
// Ignore
}
}
}
module.exports = { AudioSocketHandler };
Audio Resampler (resampler.js)
/**
* Audio resampling between 16kHz and 24kHz using linear interpolation.
*
* For production use with higher quality requirements, consider using
* the 'node-audio-resampler' or 'libsamplerate' bindings.
* Linear interpolation is sufficient for voice and adds zero latency.
*/
class AudioResampler {
/**
* Resample 16kHz PCM to 24kHz PCM (ratio 3:2)
* @param {Buffer} pcmData - 16-bit signed LE PCM at 16kHz
* @returns {Buffer} - 16-bit signed LE PCM at 24kHz
*/
static resample16kTo24k(pcmData) {
if (!pcmData || pcmData.length === 0) return Buffer.alloc(0);
const inputSamples = pcmData.length / 2; // 16-bit = 2 bytes per sample
const outputSamples = Math.floor((inputSamples * 3) / 2);
const output = Buffer.alloc(outputSamples * 2);
for (let i = 0; i < outputSamples; i++) {
// Map output sample index to input sample position
const srcPos = (i * 2) / 3;
const srcIndex = Math.floor(srcPos);
const frac = srcPos - srcIndex;
let sample;
if (srcIndex + 1 < inputSamples) {
// Linear interpolation between two samples
const s1 = pcmData.readInt16LE(srcIndex * 2);
const s2 = pcmData.readInt16LE((srcIndex + 1) * 2);
sample = Math.round(s1 + frac * (s2 - s1));
} else {
sample = pcmData.readInt16LE(srcIndex * 2);
}
// Clamp to int16 range
sample = Math.max(-32768, Math.min(32767, sample));
output.writeInt16LE(sample, i * 2);
}
return output;
}
/**
* Resample 24kHz PCM to 16kHz PCM (ratio 2:3)
* @param {Buffer} pcmData - 16-bit signed LE PCM at 24kHz
* @returns {Buffer} - 16-bit signed LE PCM at 16kHz
*/
static resample24kTo16k(pcmData) {
if (!pcmData || pcmData.length === 0) return Buffer.alloc(0);
const inputSamples = pcmData.length / 2;
const outputSamples = Math.floor((inputSamples * 2) / 3);
const output = Buffer.alloc(outputSamples * 2);
for (let i = 0; i < outputSamples; i++) {
const srcPos = (i * 3) / 2;
const srcIndex = Math.floor(srcPos);
const frac = srcPos - srcIndex;
let sample;
if (srcIndex + 1 < inputSamples) {
const s1 = pcmData.readInt16LE(srcIndex * 2);
const s2 = pcmData.readInt16LE((srcIndex + 1) * 2);
sample = Math.round(s1 + frac * (s2 - s1));
} else {
sample = pcmData.readInt16LE(srcIndex * 2);
}
sample = Math.max(-32768, Math.min(32767, sample));
output.writeInt16LE(sample, i * 2);
}
return output;
}
}
module.exports = { AudioResampler };
Main Bridge Server (bridge.js)
/**
* AI Voice Bridge Server (Node.js)
*
* Bridges Asterisk AudioSocket connections to OpenAI Realtime API.
*/
require("dotenv").config();
const net = require("net");
const http = require("http");
const WebSocket = require("ws");
const pino = require("pino");
const { AudioSocketHandler } = require("./audiosocket");
const { AudioResampler } = require("./resampler");
const logger = pino({
level: process.env.LOG_LEVEL || "info",
transport:
process.env.NODE_ENV !== "production"
? { target: "pino-pretty" }
: undefined,
});
// Configuration
const CONFIG = {
openaiApiKey: process.env.OPENAI_API_KEY || "",
openaiModel: process.env.OPENAI_MODEL || "gpt-4o-realtime-preview",
openaiVoice: process.env.OPENAI_VOICE || "alloy",
listenHost: process.env.LISTEN_HOST || "0.0.0.0",
listenPort: parseInt(process.env.LISTEN_PORT || "9093"),
healthPort: parseInt(process.env.HEALTH_PORT || "9094"),
systemPrompt: process.env.SYSTEM_PROMPT || "You are a helpful assistant.",
vadThreshold: parseFloat(process.env.VAD_THRESHOLD || "0.5"),
vadSilenceMs: parseInt(process.env.VAD_SILENCE_MS || "500"),
vadPrefixPaddingMs: parseInt(process.env.VAD_PREFIX_PADDING_MS || "300"),
temperature: parseFloat(process.env.TEMPERATURE || "0.7"),
logTranscripts: (process.env.LOG_TRANSCRIPTS || "true") === "true",
};
if (!CONFIG.openaiApiKey) {
logger.fatal("OPENAI_API_KEY environment variable is required");
process.exit(1);
}
// Track active calls
const activeCalls = new Map();
const startTime = Date.now();
/**
* Handle a single call session
*/
async function handleCall(socket) {
const audioSocket = new AudioSocketHandler(socket);
let openaiWs = null;
let callId = null;
let aiSpeaking = false;
let audioBytesSent = 0;
let audioBytesReceived = 0;
let transcript = [];
const callStartTime = Date.now();
// Wait for UUID
audioSocket.on("uuid", async (id) => {
callId = id;
activeCalls.set(callId, { startTime: callStartTime });
logger.info({ callId, activeCalls: activeCalls.size }, "Call started");
try {
// Connect to OpenAI Realtime API
const url =
`wss://api.openai.com/v1/realtime?model=${CONFIG.openaiModel}`;
openaiWs = new WebSocket(url, {
headers: {
Authorization: `Bearer ${CONFIG.openaiApiKey}`,
"OpenAI-Beta": "realtime=v1",
},
maxPayload: 16 * 1024 * 1024,
});
openaiWs.on("open", () => {
logger.info({ callId }, "Connected to OpenAI Realtime API");
// Configure session
const sessionConfig = {
type: "session.update",
session: {
modalities: ["text", "audio"],
instructions: CONFIG.systemPrompt,
voice: CONFIG.openaiVoice,
input_audio_format: "pcm16",
output_audio_format: "pcm16",
input_audio_transcription: { model: "whisper-1" },
turn_detection: {
type: "server_vad",
threshold: CONFIG.vadThreshold,
prefix_padding_ms: CONFIG.vadPrefixPaddingMs,
silence_duration_ms: CONFIG.vadSilenceMs,
create_response: true,
},
tools: getTools(),
tool_choice: "auto",
temperature: CONFIG.temperature,
max_response_output_tokens: 4096,
},
};
openaiWs.send(JSON.stringify(sessionConfig));
});
// Handle OpenAI events
openaiWs.on("message", (data) => {
const event = JSON.parse(data.toString());
switch (event.type) {
case "response.audio.delta": {
aiSpeaking = true;
const pcm24k = Buffer.from(event.delta, "base64");
const pcm16k = AudioResampler.resample24kTo16k(pcm24k);
audioBytesReceived += pcm16k.length;
audioSocket.writeAudio(pcm16k);
break;
}
case "response.audio.done":
aiSpeaking = false;
break;
case "response.audio_transcript.done":
if (event.transcript && CONFIG.logTranscripts) {
transcript.push({
role: "assistant",
text: event.transcript,
timestamp: new Date().toISOString(),
});
logger.info(
{ callId, role: "assistant", text: event.transcript },
"Transcript"
);
}
break;
case "conversation.item.input_audio_transcription.completed":
if (event.transcript && CONFIG.logTranscripts) {
transcript.push({
role: "caller",
text: event.transcript,
timestamp: new Date().toISOString(),
});
logger.info(
{ callId, role: "caller", text: event.transcript },
"Transcript"
);
}
break;
case "input_audio_buffer.speech_started":
if (aiSpeaking) {
aiSpeaking = false;
logger.info({ callId }, "Caller interrupted AI");
}
break;
case "response.function_call_arguments.done":
handleFunctionCall(event, openaiWs, callId);
break;
case "error":
logger.error(
{
callId,
errorType: event.error?.type,
errorMessage: event.error?.message,
},
"OpenAI error"
);
break;
case "rate_limits.updated":
for (const limit of event.rate_limits || []) {
if (limit.remaining < 10) {
logger.warn(
{ callId, name: limit.name, remaining: limit.remaining },
"Rate limit low"
);
}
}
break;
}
});
openaiWs.on("close", (code, reason) => {
logger.info(
{ callId, code, reason: reason.toString() },
"OpenAI connection closed"
);
cleanup();
});
openaiWs.on("error", (err) => {
logger.error({ callId, error: err.message }, "OpenAI WebSocket error");
cleanup();
});
} catch (err) {
logger.error({ callId, error: err.message }, "Failed to connect to OpenAI");
cleanup();
}
});
// Forward caller audio to OpenAI
let audioAccumulator = Buffer.alloc(0);
const chunkThreshold = 3200; // ~100ms at 16kHz 16-bit
audioSocket.on("audio", (pcmData) => {
if (!openaiWs || openaiWs.readyState !== WebSocket.OPEN) return;
audioBytesSent += pcmData.length;
audioAccumulator = Buffer.concat([audioAccumulator, pcmData]);
if (audioAccumulator.length >= chunkThreshold) {
const resampled = AudioResampler.resample16kTo24k(audioAccumulator);
const encoded = resampled.toString("base64");
openaiWs.send(
JSON.stringify({
type: "input_audio_buffer.append",
audio: encoded,
})
);
audioAccumulator = Buffer.alloc(0);
}
});
// Handle hangup
audioSocket.on("hangup", () => {
logger.info({ callId }, "Caller hung up");
cleanup();
});
audioSocket.on("close", () => {
cleanup();
});
function cleanup() {
const duration = ((Date.now() - callStartTime) / 1000).toFixed(1);
if (callId) {
activeCalls.delete(callId);
logger.info(
{
callId,
durationSeconds: parseFloat(duration),
audioBytesSent,
audioBytesReceived,
transcriptTurns: transcript.length,
},
"Call ended"
);
if (transcript.length > 0 && CONFIG.logTranscripts) {
logger.info({ callId, transcript }, "Full transcript");
}
}
if (openaiWs && openaiWs.readyState === WebSocket.OPEN) {
openaiWs.close();
}
openaiWs = null;
audioSocket.close();
}
}
/**
* Handle function calls from OpenAI
*/
function handleFunctionCall(event, ws, callId) {
const funcName = event.name || "";
const funcCallId = event.call_id || "";
let args = {};
try {
args = JSON.parse(event.arguments || "{}");
} catch (_) {
args = {};
}
logger.info({ callId, function: funcName, arguments: args }, "Function call");
// Execute the function (replace with real business logic)
let result;
switch (funcName) {
case "transfer_call":
result = {
status: "success",
message: `Transferring to ${args.department}. Please hold.`,
};
break;
case "end_call":
result = {
status: "success",
message: "Call ending. Say goodbye to the caller.",
};
break;
case "lookup_customer":
result = {
status: "found",
customer: {
name: "John Smith",
account_id: "CUST-12345",
status: "active",
},
};
break;
case "book_appointment":
result = {
status: "confirmed",
booking_id: "BK-2026-0314-001",
date: args.preferred_date || "TBD",
time_slot: args.preferred_time || "morning",
};
break;
default:
result = { status: "error", message: `Unknown function: ${funcName}` };
}
// Send result back to OpenAI
ws.send(
JSON.stringify({
type: "conversation.item.create",
item: {
type: "function_call_output",
call_id: funcCallId,
output: JSON.stringify(result),
},
})
);
// Trigger response after function result
ws.send(JSON.stringify({ type: "response.create" }));
logger.info({ callId, function: funcName, result }, "Function call completed");
}
/**
* Define available tools
*/
function getTools() {
return [
{
type: "function",
name: "transfer_call",
description:
"Transfer the caller to a human agent or specific department.",
parameters: {
type: "object",
properties: {
department: {
type: "string",
enum: ["sales", "support", "billing", "manager"],
},
reason: { type: "string" },
},
required: ["department", "reason"],
},
},
{
type: "function",
name: "end_call",
description: "End the phone call after conversation is complete.",
parameters: {
type: "object",
properties: {
summary: { type: "string" },
disposition: {
type: "string",
enum: ["resolved", "callback_needed", "transferred", "voicemail"],
},
},
required: ["summary", "disposition"],
},
},
{
type: "function",
name: "lookup_customer",
description: "Look up customer information by phone number.",
parameters: {
type: "object",
properties: {
phone_number: { type: "string" },
},
required: ["phone_number"],
},
},
{
type: "function",
name: "book_appointment",
description: "Book a service appointment for the caller.",
parameters: {
type: "object",
properties: {
customer_name: { type: "string" },
phone_number: { type: "string" },
service_type: { type: "string" },
preferred_date: { type: "string" },
preferred_time: {
type: "string",
enum: ["morning", "afternoon", "evening"],
},
address: { type: "string" },
notes: { type: "string" },
},
required: [
"customer_name",
"phone_number",
"service_type",
"preferred_date",
"preferred_time",
"address",
],
},
},
];
}
// ── TCP Server (AudioSocket) ──
const tcpServer = net.createServer((socket) => {
logger.info(
{ remote: `${socket.remoteAddress}:${socket.remotePort}` },
"New AudioSocket connection"
);
handleCall(socket);
});
tcpServer.listen(CONFIG.listenPort, CONFIG.listenHost, () => {
logger.info(
{ host: CONFIG.listenHost, port: CONFIG.listenPort },
"AudioSocket bridge server started"
);
});
// ── Health HTTP Server ──
const healthServer = http.createServer((req, res) => {
if (req.url === "/health") {
res.writeHead(200, { "Content-Type": "application/json" });
res.end(
JSON.stringify({
status: "healthy",
active_calls: activeCalls.size,
uptime_seconds: Math.round((Date.now() - startTime) / 1000),
})
);
} else if (req.url === "/metrics") {
let lines = [`ai_voice_bridge_active_calls ${activeCalls.size}`];
for (const [id, info] of activeCalls) {
const dur = ((Date.now() - info.startTime) / 1000).toFixed(1);
lines.push(
`ai_voice_bridge_call_duration_seconds{call_id="${id}"} ${dur}`
);
}
res.writeHead(200, { "Content-Type": "text/plain" });
res.end(lines.join("\n") + "\n");
} else {
res.writeHead(404);
res.end("Not Found\n");
}
});
healthServer.listen(CONFIG.healthPort, () => {
logger.info({ port: CONFIG.healthPort }, "Health server started");
});
// ── Graceful Shutdown ──
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);
function shutdown() {
logger.info({ activeCalls: activeCalls.size }, "Shutting down");
tcpServer.close();
healthServer.close();
process.exit(0);
}
Running the Node.js Bridge
# Create project directory
mkdir -p /opt/ai-voice-bridge-node
cd /opt/ai-voice-bridge-node
# Initialize and install
npm init -y
npm install ws dotenv pino pino-pretty
# Copy the files (bridge.js, audiosocket.js, resampler.js)
# Edit .env with your API key
# Run
node bridge.js
8. System Prompt and Conversation Design
Choosing the Right Voice
The voice you select sets the entire tone of every conversation. Here is a practical guide based on real-world testing across different use cases:
| Use Case | Recommended Voice | Why |
|---|---|---|
| UK Home Services Receptionist | ash |
Warm and confident, sounds like a professional receptionist |
| US Medical Office | sage |
Calm and measured, reassuring for patients |
| Restaurant Reservations | coral |
Friendly and upbeat, matches hospitality energy |
| Technical Support | echo |
Clear and authoritative, inspires confidence |
| General Business | alloy |
Neutral, works for everything, offends nobody |
| After-Hours Answering | verse |
Conversational and natural, does not sound robotic |
System Prompt Template
The system prompt is the single most important factor in conversation quality. Here is a production-tested template that you can adapt to any business:
You are Sarah, a professional receptionist for {{COMPANY_NAME}}. You answer
inbound phone calls and help callers with their enquiries.
PERSONALITY:
- Warm, professional, and efficient
- Use natural conversational language, not corporate speak
- Keep responses concise -- this is a phone call, not an email
- Match the caller's energy level (excited caller = more upbeat response)
- Use filler words sparingly but naturally ("right", "sure", "of course")
- Never say "I'm an AI" or "as an AI language model" -- you are Sarah
CONVERSATION FLOW:
1. Greet the caller: "Good morning/afternoon, {{COMPANY_NAME}}, Sarah speaking.
How can I help?"
2. Listen to their request and identify the type of enquiry
3. For booking requests:
a. Collect: full name, callback number, address/postcode, issue description
b. Confirm the details back to the caller
c. Use the book_appointment function
d. Provide the booking confirmation
4. For general enquiries:
a. Answer from your knowledge base if possible
b. If you cannot answer, offer to take a message or transfer
5. Always end with: "Is there anything else I can help with?"
6. Close: "Thank you for calling, have a lovely day!"
RULES:
- NEVER give pricing information -- always say "Our engineer will assess
on-site and provide a quote before starting any work"
- NEVER diagnose problems -- say "I'll make sure the right engineer is sent out"
- If asked about availability, say "Let me check..." then use the
check_availability function
- If the caller is angry or upset, acknowledge their frustration:
"I completely understand, let me get this sorted for you right away"
- If you cannot help, offer to transfer: "Let me put you through to
someone who can help with that"
- Keep the conversation under 3 minutes when possible
- If the caller goes off-topic, gently redirect: "Of course -- now,
let me just get a few details so we can help you with that"
KNOWLEDGE BASE:
- {{COMPANY_NAME}} provides {{SERVICES}}
- Service area: {{SERVICE_AREA}}
- Operating hours: {{HOURS}}
- Emergency callouts available 24/7 (additional charges may apply)
Turn Detection Tuning
The VAD (Voice Activity Detection) parameters have a dramatic impact on conversation naturalness. Here are tested configurations for different scenarios:
// Fast-paced conversation (booking agent, short calls)
{
"threshold": 0.5,
"prefix_padding_ms": 200,
"silence_duration_ms": 400,
"create_response": true
}
// Thoughtful conversation (support, complex enquiries)
{
"threshold": 0.4,
"prefix_padding_ms": 400,
"silence_duration_ms": 800,
"create_response": true
}
// Noisy environment (caller in car, on street)
{
"threshold": 0.7,
"prefix_padding_ms": 300,
"silence_duration_ms": 600,
"create_response": true
}
// Manual turn control (advanced, not recommended for most)
{
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 300,
"silence_duration_ms": 500,
"create_response": false
}
When create_response is false, you must manually trigger responses by sending response.create events. This gives you precise control over timing but requires sophisticated client-side logic to determine when the caller has finished speaking.
Handling Interruptions
The Realtime API supports natural interruptions -- when the caller speaks while the AI is talking, the AI stops and listens. However, you need to handle the audio pipeline correctly:
When
input_audio_buffer.speech_startedis received:- Clear any buffered output audio that has not been sent to Asterisk yet
- The API automatically cancels its current response
- A new response will be generated based on the caller's input
Common interruption scenarios and how to handle them:
- Caller corrects the AI: The AI should acknowledge the correction naturally
- Caller says "yes" or "no" early: The AI should respond to the confirmation
- Background noise triggers interruption: Increase the VAD threshold
- Caller and AI speak simultaneously: The API handles this gracefully -- the AI yields
Transfer Workflow
When the AI needs to transfer a call to a human agent, the flow is:
- AI says "Let me transfer you now, please hold for a moment"
- AI calls the
transfer_callfunction - Your bridge server receives the function call
- Bridge server sends AMI (Asterisk Manager Interface) command to transfer the channel
- Return function result to tell the AI the transfer is initiated
- Asterisk performs the transfer
Here is how to implement the AMI transfer in the bridge server:
# Add to the _execute_function method in bridge.py
import socket
async def _transfer_via_ami(self, channel: str, destination: str):
"""Send AMI command to transfer the call."""
ami_host = os.getenv("AMI_HOST", "127.0.0.1")
ami_port = int(os.getenv("AMI_PORT", "5038"))
ami_user = os.getenv("AMI_USER", "admin")
ami_pass = os.getenv("AMI_PASS", "YOUR_AMI_PASSWORD")
# Connect to AMI
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ami_host, ami_port))
sock.recv(1024) # Read banner
# Login
login = (
f"Action: Login\r\n"
f"Username: {ami_user}\r\n"
f"Secret: {ami_pass}\r\n\r\n"
)
sock.send(login.encode())
sock.recv(1024)
# Redirect (transfer) the channel
redirect = (
f"Action: Redirect\r\n"
f"Channel: {channel}\r\n"
f"Context: transfer-destinations\r\n"
f"Exten: {destination}\r\n"
f"Priority: 1\r\n\r\n"
)
sock.send(redirect.encode())
sock.recv(1024)
# Logoff
sock.send(b"Action: Logoff\r\n\r\n")
sock.close()
Add matching dialplan contexts for transfer destinations:
; /etc/asterisk/extensions_custom.conf
[transfer-destinations]
; Transfer to sales queue
exten => sales,1,NoOp(Transferring to Sales)
same => n,Queue(sales-queue,t,,,60)
same => n,Hangup()
; Transfer to support queue
exten => support,1,NoOp(Transferring to Support)
same => n,Queue(support-queue,t,,,60)
same => n,Hangup()
; Transfer to manager direct line
exten => manager,1,NoOp(Transferring to Manager)
same => n,Dial(SIP/manager-phone,30)
same => n,Hangup()
; Transfer to billing
exten => billing,1,NoOp(Transferring to Billing)
same => n,Queue(billing-queue,t,,,60)
same => n,Hangup()
9. Function Calling and Tools
How Function Calling Works in Realtime API
Function calling (tools) in the Realtime API works identically to the standard Chat Completions API, except it happens during a voice conversation. The flow is:
- You define tools in the
session.updateconfiguration - During conversation, the AI decides a tool is needed
- The API sends a
response.function_call_arguments.doneevent with the function name and arguments - Your bridge server executes the function (API call, database query, etc.)
- You send back a
conversation.item.createevent with the function result - You send a
response.createevent to prompt the AI to continue speaking - The AI incorporates the function result into its next spoken response
The key difference from text-based function calling: the AI keeps talking while you process the function. It says something like "Let me check that for you..." and then seamlessly incorporates the result when it arrives.
Defining Tools
Tools are defined in the session configuration. Each tool has a name, description, and JSON Schema parameters:
tools = [
{
"type": "function",
"name": "check_availability",
"description": (
"Check available appointment slots for a given date and "
"service type. Returns available time slots."
),
"parameters": {
"type": "object",
"properties": {
"date": {
"type": "string",
"description": "Date to check (YYYY-MM-DD)",
},
"service_type": {
"type": "string",
"enum": [
"plumbing",
"electrical",
"heating",
"drainage",
"locksmith",
],
"description": "Type of service",
},
"postcode": {
"type": "string",
"description": "Customer postcode for area coverage check",
},
},
"required": ["date", "service_type"],
},
},
{
"type": "function",
"name": "create_lead",
"description": (
"Create a new lead/enquiry in the CRM system with the "
"caller's details and their requirements."
),
"parameters": {
"type": "object",
"properties": {
"caller_name": {
"type": "string",
"description": "Full name of the caller",
},
"phone_number": {
"type": "string",
"description": "Callback phone number",
},
"email": {
"type": "string",
"description": "Email address (optional)",
},
"service_needed": {
"type": "string",
"description": "Description of the service required",
},
"urgency": {
"type": "string",
"enum": ["emergency", "urgent", "standard", "flexible"],
"description": "How urgent is the request",
},
"address": {
"type": "string",
"description": "Service address including postcode",
},
"notes": {
"type": "string",
"description": "Any additional notes from the call",
},
},
"required": [
"caller_name",
"phone_number",
"service_needed",
"urgency",
"address",
],
},
},
{
"type": "function",
"name": "get_caller_history",
"description": (
"Retrieve previous call history and service records for "
"a phone number. Helps identify returning customers."
),
"parameters": {
"type": "object",
"properties": {
"phone_number": {
"type": "string",
"description": "Phone number to look up",
},
},
"required": ["phone_number"],
},
},
{
"type": "function",
"name": "send_sms",
"description": (
"Send an SMS confirmation to the caller with booking "
"details or a reference number."
),
"parameters": {
"type": "object",
"properties": {
"phone_number": {
"type": "string",
"description": "Phone number to send SMS to",
},
"message": {
"type": "string",
"description": "SMS message content (max 160 characters)",
},
},
"required": ["phone_number", "message"],
},
},
]
Implementing Real Function Handlers
Replace the placeholder implementations with real API calls. Here is a practical example using aiohttp for HTTP backend calls:
import aiohttp
class FunctionHandler:
"""Handles function calls from the AI agent."""
def __init__(self, config):
self.config = config
self.backend_url = os.getenv("BACKEND_API_URL", "http://localhost:8080")
self.backend_key = os.getenv("BACKEND_API_KEY", "")
self._session = None
async def _get_session(self):
"""Get or create aiohttp session."""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
headers={
"Authorization": f"Bearer {self.backend_key}",
"Content-Type": "application/json",
},
timeout=aiohttp.ClientTimeout(total=5),
)
return self._session
async def execute(self, name: str, args: dict) -> dict:
"""Execute a function and return the result."""
handler = getattr(self, f"_fn_{name}", None)
if handler is None:
return {"status": "error", "message": f"Unknown function: {name}"}
try:
return await handler(args)
except aiohttp.ClientError as e:
logger.error(f"Backend API error in {name}: {e}")
return {
"status": "error",
"message": "I'm having trouble accessing our system right now.",
}
except Exception as e:
logger.error(f"Function {name} failed: {e}")
return {
"status": "error",
"message": "Something went wrong. Please try again.",
}
async def _fn_check_availability(self, args: dict) -> dict:
"""Check appointment availability via backend API."""
session = await self._get_session()
async with session.get(
f"{self.backend_url}/api/availability",
params={
"date": args.get("date"),
"service": args.get("service_type"),
"postcode": args.get("postcode", ""),
},
) as resp:
if resp.status == 200:
data = await resp.json()
return {
"status": "success",
"available_slots": data.get("slots", []),
"message": "Slots retrieved successfully",
}
return {
"status": "unavailable",
"message": "No slots available for this date",
}
async def _fn_create_lead(self, args: dict) -> dict:
"""Create a lead in the CRM."""
session = await self._get_session()
async with session.post(
f"{self.backend_url}/api/leads",
json=args,
) as resp:
if resp.status in (200, 201):
data = await resp.json()
return {
"status": "created",
"lead_id": data.get("id"),
"reference": data.get("reference"),
"message": "Lead created successfully",
}
return {
"status": "error",
"message": "Failed to create lead",
}
async def _fn_get_caller_history(self, args: dict) -> dict:
"""Look up caller history."""
session = await self._get_session()
phone = args.get("phone_number", "")
async with session.get(
f"{self.backend_url}/api/customers/search",
params={"phone": phone},
) as resp:
if resp.status == 200:
data = await resp.json()
if data.get("found"):
return {
"status": "found",
"customer_name": data.get("name"),
"previous_jobs": data.get("jobs_count", 0),
"last_service_date": data.get("last_service"),
"notes": data.get("notes", ""),
}
return {
"status": "not_found",
"message": "No previous records for this number",
}
async def _fn_send_sms(self, args: dict) -> dict:
"""Send SMS via backend API."""
session = await self._get_session()
async with session.post(
f"{self.backend_url}/api/sms/send",
json={
"to": args.get("phone_number"),
"body": args.get("message"),
},
) as resp:
if resp.status == 200:
return {
"status": "sent",
"message": "SMS sent successfully",
}
return {
"status": "failed",
"message": "Failed to send SMS",
}
async def close(self):
"""Close the HTTP session."""
if self._session and not self._session.closed:
await self._session.close()
Function Call Timing Considerations
Function calls during voice conversations have timing constraints that do not exist in text-based chat:
- Fast functions (< 500ms): Customer lookup, availability check. The AI can say "Let me check..." and the result arrives before the pause feels awkward.
- Medium functions (500ms - 2s): Booking creation, CRM updates. The AI should fill the time naturally: "I'm just creating that booking for you now..."
- Slow functions (> 2s): External API calls, complex queries. Consider returning a partial result immediately and following up, or restructuring the flow so the AI can continue talking while waiting.
# Example: Handling slow functions with a timeout
async def _fn_check_availability_with_timeout(self, args: dict) -> dict:
try:
return await asyncio.wait_for(
self._fn_check_availability(args),
timeout=3.0,
)
except asyncio.TimeoutError:
return {
"status": "pending",
"message": (
"The system is taking a moment. Please tell the caller "
"we will text them available slots within 5 minutes."
),
}
10. Production Deployment
systemd Service File
# /etc/systemd/system/ai-voice-bridge.service
[Unit]
Description=AI Voice Bridge - Asterisk AudioSocket to OpenAI Realtime
After=network.target asterisk.service
Wants=network-online.target
[Service]
Type=simple
User=asterisk
Group=asterisk
WorkingDirectory=/opt/ai-voice-bridge
Environment=PATH=/opt/ai-voice-bridge/venv/bin:/usr/bin:/bin
EnvironmentFile=/opt/ai-voice-bridge/.env
ExecStart=/opt/ai-voice-bridge/venv/bin/python bridge.py
Restart=always
RestartSec=5
StartLimitBurst=5
StartLimitIntervalSec=60
# Security hardening
NoNewPrivileges=yes
ProtectSystem=strict
ProtectHome=yes
ReadWritePaths=/opt/ai-voice-bridge/logs
PrivateTmp=yes
# Resource limits
LimitNOFILE=65535
MemoryMax=1G
# Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=ai-voice-bridge
[Install]
WantedBy=multi-user.target
# Enable and start
systemctl daemon-reload
systemctl enable ai-voice-bridge
systemctl start ai-voice-bridge
# Check status
systemctl status ai-voice-bridge
# View logs
journalctl -u ai-voice-bridge -f
For Node.js deployment:
# /etc/systemd/system/ai-voice-bridge.service
[Unit]
Description=AI Voice Bridge (Node.js)
After=network.target asterisk.service
Wants=network-online.target
[Service]
Type=simple
User=asterisk
Group=asterisk
WorkingDirectory=/opt/ai-voice-bridge-node
EnvironmentFile=/opt/ai-voice-bridge-node/.env
ExecStart=/usr/bin/node bridge.js
Restart=always
RestartSec=5
# Resource limits
LimitNOFILE=65535
MemoryMax=1G
StandardOutput=journal
StandardError=journal
SyslogIdentifier=ai-voice-bridge
[Install]
WantedBy=multi-user.target
Environment Variable Management
Never hardcode API keys. Use environment files with restricted permissions:
# Create the env file
cat > /opt/ai-voice-bridge/.env << 'EOF'
OPENAI_API_KEY=sk-your-key-here
OPENAI_MODEL=gpt-4o-realtime-preview
OPENAI_VOICE=alloy
LISTEN_PORT=9093
HEALTH_PORT=9094
SYSTEM_PROMPT=You are Sarah, a receptionist for Acme Plumbing...
VAD_THRESHOLD=0.5
VAD_SILENCE_MS=500
TEMPERATURE=0.7
LOG_LEVEL=INFO
LOG_TRANSCRIPTS=true
BACKEND_API_URL=http://localhost:8080
BACKEND_API_KEY=your-backend-api-key
AMI_HOST=127.0.0.1
AMI_PORT=5038
AMI_USER=admin
AMI_PASS=your-ami-password
EOF
# Restrict permissions
chmod 600 /opt/ai-voice-bridge/.env
chown asterisk:asterisk /opt/ai-voice-bridge/.env
Reconnection Logic
The bridge server should handle OpenAI WebSocket disconnections gracefully. The Python implementation already handles this per-call (each call creates a new WebSocket connection). For additional resilience, add connection retry logic:
async def connect_openai_with_retry(self, max_retries=3, base_delay=1.0):
"""Connect to OpenAI with exponential backoff retry."""
for attempt in range(max_retries):
try:
await self.connect_openai()
return # Success
except Exception as e:
if attempt == max_retries - 1:
raise # Final attempt failed
delay = base_delay * (2 ** attempt)
logger.warning(
"openai.retry",
call_id=self.call_id,
attempt=attempt + 1,
delay=delay,
error=str(e),
)
await asyncio.sleep(delay)
Concurrent Call Handling
The bridge server handles multiple concurrent calls out of the box -- each AudioSocket connection runs in its own async task with its own OpenAI WebSocket. However, you should be aware of limits:
| Resource | Limit | Impact |
|---|---|---|
| OpenAI Realtime connections | ~100 concurrent (Tier 4) | Hard limit, calls fail |
| Bridge server memory | ~5MB per call | Plan RAM accordingly |
| Bridge server CPU | ~0.1 core per call | Resampling is the bottleneck |
| Network bandwidth | ~128kbps per call direction | Minimal |
For high concurrency (50+ simultaneous calls), consider:
# Add a connection limiter to the bridge server
class BridgeServer:
def __init__(self, config: Config):
# ...
self.max_concurrent_calls = int(
os.getenv("MAX_CONCURRENT_CALLS", "50")
)
self._semaphore = asyncio.Semaphore(self.max_concurrent_calls)
async def _handle_connection(self, reader, writer):
if not self._semaphore._value:
logger.warning("bridge.at_capacity")
# Play a "please try again" message or queue the call
writer.close()
return
async with self._semaphore:
await self._handle_connection_inner(reader, writer)
Firewall Configuration
Open only the necessary ports:
# AudioSocket (Asterisk -> Bridge, typically same host or LAN only)
iptables -A INPUT -p tcp --dport 9093 -s 127.0.0.1 -j ACCEPT
iptables -A INPUT -p tcp --dport 9093 -s YOUR_ASTERISK_IP -j ACCEPT
# Health endpoint (monitoring)
iptables -A INPUT -p tcp --dport 9094 -s YOUR_MONITORING_IP -j ACCEPT
# Block everything else on these ports
iptables -A INPUT -p tcp --dport 9093 -j DROP
iptables -A INPUT -p tcp --dport 9094 -j DROP
11. Cost Management
OpenAI Realtime API Pricing
The Realtime API bills based on audio tokens. Audio is tokenized differently from text -- roughly 1 second of audio equals ~50 tokens.
| Component | Rate | Notes |
|---|---|---|
| Audio Input | $0.06 / 1M tokens | Caller's speech |
| Audio Output | $0.24 / 1M tokens | AI's speech |
| Text Input (tools) | $0.005 / 1K tokens | System prompt, tool definitions |
| Text Output (tools) | $0.020 / 1K tokens | Function call arguments |
Cost Per Call Estimates
| Call Length | Audio In (tokens) | Audio Out (tokens) | Text (tokens) | Estimated Cost |
|---|---|---|---|---|
| 1 minute | ~3,000 | ~3,000 | ~500 | ~$0.09 |
| 3 minutes | ~9,000 | ~9,000 | ~1,000 | ~$0.25 |
| 5 minutes | ~15,000 | ~15,000 | ~1,500 | ~$0.40 |
| 10 minutes | ~30,000 | ~30,000 | ~2,500 | ~$0.80 |
Note: These are estimates. Actual costs vary based on how much the AI speaks versus listens, tool usage frequency, and system prompt length.
Monthly Cost Projections
| Daily Calls | Avg Duration | Monthly Cost |
|---|---|---|
| 10 calls | 3 min | ~$75 |
| 50 calls | 3 min | ~$375 |
| 100 calls | 3 min | ~$750 |
| 500 calls | 3 min | ~$3,750 |
| 1000 calls | 2 min | ~$5,400 |
Cost Optimization Strategies
1. Minimize system prompt length
Every token in your system prompt is billed as text input for every response. A 500-token prompt with 20 AI turns per call = 10,000 text input tokens per call.
# BAD: 800+ tokens
You are Sarah, a professional and highly experienced receptionist who has been
working at Acme Plumbing Services for over 10 years. You have deep knowledge
of all plumbing services including emergency callouts, boiler installations,
bathroom refurbishments, kitchen plumbing, drain unblocking, and central
heating repairs. You always maintain a warm and friendly tone while being
efficient and professional. [... continues for paragraphs ...]
# GOOD: 200 tokens
You are Sarah, receptionist at Acme Plumbing. Be warm, professional, concise.
Collect: name, phone, address, issue. Book via book_appointment function.
Never give prices. Never diagnose. Keep calls under 3 minutes.
2. Use concise tool descriptions
Tool descriptions are sent as text tokens. Keep them short and actionable.
3. Shorten conversations
Design your prompt to guide callers through the flow efficiently. Every extra minute of conversation costs money.
4. Monitor and set alerts
# Add to .env
COST_ALERT_DAILY_THRESHOLD=50.00 # Alert if daily cost exceeds $50
# Add to bridge.py - track estimated costs
class CallSession:
def estimate_cost(self) -> float:
"""Estimate the cost of this call based on audio tokens."""
duration_seconds = self.get_duration()
audio_in_tokens = duration_seconds * 50 * 0.5 # ~50% of time is input
audio_out_tokens = duration_seconds * 50 * 0.5
text_tokens = 1000 # Approximate for system prompt + tools
cost = (
(audio_in_tokens / 1_000_000) * 60 +
(audio_out_tokens / 1_000_000) * 240 +
(text_tokens / 1_000) * 0.005
)
return round(cost, 4)
5. Use response length limits
Set max_response_output_tokens to prevent the AI from giving long-winded responses:
{
"max_response_output_tokens": 2048
}
For a phone receptionist, even 1024 tokens per response is more than enough. The AI should speak in short, conversational sentences.
6. Implement call duration limits
Add a maximum call duration to prevent unusually long calls from running up costs:
# Add to handle_caller_audio
MAX_CALL_DURATION = 600 # 10 minutes
async def handle_caller_audio(self, audio_socket):
try:
async for pcm_chunk in audio_socket.read_audio():
if not self.is_active:
break
if self.get_duration() > MAX_CALL_DURATION:
logger.warning(
"call.duration_limit",
call_id=self.call_id,
duration=self.get_duration(),
)
# Tell the AI to wrap up
await self.ws.send(json.dumps({
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "system",
"content": [{
"type": "input_text",
"text": "The call has been going on for 10 minutes. "
"Please wrap up the conversation politely."
}]
}
}))
await self.ws.send(json.dumps({"type": "response.create"}))
break
# ... rest of audio handling
except Exception as e:
pass
12. Monitoring and Analytics
Prometheus Metrics
Add comprehensive metrics to the bridge server for monitoring with Prometheus and Grafana:
"""
metrics.py - Prometheus metrics for the AI Voice Bridge
"""
from prometheus_client import (
Counter,
Gauge,
Histogram,
Info,
generate_latest,
)
# Service info
bridge_info = Info("ai_voice_bridge", "AI Voice Bridge service info")
bridge_info.info({
"version": "1.0.0",
"runtime": "python",
})
# Call metrics
calls_total = Counter(
"ai_voice_bridge_calls_total",
"Total number of calls handled",
["status"], # completed, error, timeout
)
active_calls = Gauge(
"ai_voice_bridge_active_calls",
"Number of currently active calls",
)
call_duration = Histogram(
"ai_voice_bridge_call_duration_seconds",
"Call duration in seconds",
buckets=[10, 30, 60, 120, 180, 300, 600],
)
# Audio metrics
audio_bytes_received = Counter(
"ai_voice_bridge_audio_bytes_received_total",
"Total audio bytes received from callers",
)
audio_bytes_sent = Counter(
"ai_voice_bridge_audio_bytes_sent_total",
"Total audio bytes sent to callers",
)
# OpenAI metrics
openai_connections = Counter(
"ai_voice_bridge_openai_connections_total",
"Total OpenAI WebSocket connections",
["status"], # connected, failed
)
openai_responses = Counter(
"ai_voice_bridge_openai_responses_total",
"Total responses generated by OpenAI",
)
function_calls_total = Counter(
"ai_voice_bridge_function_calls_total",
"Total function calls made",
["function_name", "status"], # success, error
)
# Latency metrics
response_latency = Histogram(
"ai_voice_bridge_response_latency_seconds",
"Time from caller speech end to first AI audio byte",
buckets=[0.1, 0.2, 0.3, 0.5, 0.7, 1.0, 2.0],
)
# Error metrics
errors_total = Counter(
"ai_voice_bridge_errors_total",
"Total errors encountered",
["type"], # openai_error, audiosocket_error, function_error
)
# Cost tracking
estimated_cost = Counter(
"ai_voice_bridge_estimated_cost_dollars",
"Estimated cumulative API cost in dollars",
)
Integrating Metrics into the Bridge
# Add to bridge.py - update the health/metrics handler
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
async def _metrics_handler(self, request: web.Request) -> web.Response:
"""Prometheus-compatible metrics endpoint."""
return web.Response(
body=generate_latest(),
content_type=CONTENT_TYPE_LATEST,
)
Prometheus Scrape Configuration
# Add to /etc/prometheus/prometheus.yml
scrape_configs:
- job_name: "ai-voice-bridge"
scrape_interval: 15s
static_configs:
- targets: ["YOUR_SERVER_IP:9094"]
labels:
service: "ai-voice-bridge"
environment: "production"
Grafana Dashboard
Create a Grafana dashboard for monitoring the voice agent. Here is a dashboard JSON you can import:
{
"dashboard": {
"title": "AI Voice Agent - OpenAI Realtime Bridge",
"panels": [
{
"title": "Active Calls",
"type": "stat",
"targets": [
{
"expr": "ai_voice_bridge_active_calls",
"legendFormat": "Active Calls"
}
],
"fieldConfig": {
"defaults": {
"thresholds": {
"steps": [
{ "color": "green", "value": 0 },
{ "color": "yellow", "value": 10 },
{ "color": "red", "value": 25 }
]
}
}
}
},
{
"title": "Calls Per Hour",
"type": "timeseries",
"targets": [
{
"expr": "rate(ai_voice_bridge_calls_total[1h]) * 3600",
"legendFormat": "{{ status }}"
}
]
},
{
"title": "Average Call Duration",
"type": "stat",
"targets": [
{
"expr": "rate(ai_voice_bridge_call_duration_seconds_sum[1h]) / rate(ai_voice_bridge_call_duration_seconds_count[1h])",
"legendFormat": "Avg Duration (s)"
}
]
},
{
"title": "Response Latency (p50/p95/p99)",
"type": "timeseries",
"targets": [
{
"expr": "histogram_quantile(0.50, rate(ai_voice_bridge_response_latency_seconds_bucket[5m]))",
"legendFormat": "p50"
},
{
"expr": "histogram_quantile(0.95, rate(ai_voice_bridge_response_latency_seconds_bucket[5m]))",
"legendFormat": "p95"
},
{
"expr": "histogram_quantile(0.99, rate(ai_voice_bridge_response_latency_seconds_bucket[5m]))",
"legendFormat": "p99"
}
]
},
{
"title": "Function Calls",
"type": "timeseries",
"targets": [
{
"expr": "rate(ai_voice_bridge_function_calls_total[5m]) * 60",
"legendFormat": "{{ function_name }} ({{ status }})"
}
]
},
{
"title": "Estimated Daily Cost ($)",
"type": "stat",
"targets": [
{
"expr": "increase(ai_voice_bridge_estimated_cost_dollars[24h])",
"legendFormat": "Daily Cost"
}
],
"fieldConfig": {
"defaults": {
"unit": "currencyUSD",
"thresholds": {
"steps": [
{ "color": "green", "value": 0 },
{ "color": "yellow", "value": 25 },
{ "color": "red", "value": 50 }
]
}
}
}
},
{
"title": "Errors",
"type": "timeseries",
"targets": [
{
"expr": "rate(ai_voice_bridge_errors_total[5m]) * 60",
"legendFormat": "{{ type }}"
}
]
}
]
}
}
Call Logging and Analytics
For detailed call analytics, log every call to a database or file. Here is a simple file-based approach:
import json
from pathlib import Path
from datetime import datetime, timezone
class CallLogger:
"""Log call details and transcripts to JSON files."""
def __init__(self, log_dir: str = "/var/log/ai-voice-bridge/calls"):
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
def log_call(self, session: CallSession):
"""Write call details to a JSON file."""
call_data = {
"call_id": session.call_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"duration_seconds": round(session.get_duration(), 1),
"audio_bytes_in": session.audio_bytes_in,
"audio_bytes_out": session.audio_bytes_out,
"responses": session.response_count,
"function_calls": session.function_calls,
"estimated_cost": session.estimate_cost(),
"transcript": session.transcript,
}
filename = (
f"{datetime.now(timezone.utc).strftime('%Y%m%d')}/"
f"{session.call_id}.json"
)
filepath = self.log_dir / filename
filepath.parent.mkdir(parents=True, exist_ok=True)
with open(filepath, "w") as f:
json.dump(call_data, f, indent=2)
logger.info(
"call.logged",
call_id=session.call_id,
path=str(filepath),
)
13. Troubleshooting
Audio Format Issues
Problem: No audio from AI / silence on the line
# Check 1: Verify AudioSocket is connecting
journalctl -u ai-voice-bridge -f
# Look for: "audiosocket.uuid_received" and "openai.connected"
# Check 2: Verify Asterisk audio format
asterisk -rx "core show channel SIP/trunk-00000001" | grep -i format
# Should show: slin16
# Check 3: Verify the bridge receives audio
# Add debug logging temporarily:
# LOG_LEVEL=DEBUG in .env
# Check 4: Test audio resampling independently
python3 -c "
from resampler import AudioResampler
import os
# Generate 1 second of silence at 16kHz
silence_16k = b'\x00' * 32000
result = AudioResampler.resample_16k_to_24k(silence_16k)
print(f'Input: {len(silence_16k)} bytes, Output: {len(result)} bytes')
# Expected: Input: 32000 bytes, Output: 48000 bytes
"
Problem: Audio sounds distorted or robotic
This is usually caused by incorrect sample rate handling:
# Verify Asterisk is sending 16kHz, not 8kHz
# In your dialplan, ensure these lines are present:
# Set(CHANNEL(audioreadformat)=slin16)
# Set(CHANNEL(audiowriteformat)=slin16)
# If you cannot use slin16 (older Asterisk), modify the resampler:
# Change resample_16k_to_24k to resample_8k_to_24k (ratio 3:1)
# Change resample_24k_to_16k to resample_24k_to_8k (ratio 1:3)
Problem: Audio has clicking or popping sounds
Usually caused by audio frame boundaries not aligning:
# Ensure you send complete frames to AudioSocket
# Each frame should be exactly 640 bytes (20ms at 16kHz 16-bit)
# The write_audio method in audiosocket.py handles chunking
WebSocket Disconnections
Problem: OpenAI WebSocket closes unexpectedly
# Check the close code and reason:
# 1000 = Normal closure
# 1001 = Going away (server shutdown)
# 1006 = Abnormal closure (network issue)
# 1008 = Policy violation (usually auth issue)
# 1011 = Server error
# 4001 = Authentication failed
# 4003 = Rate limited
# For auth issues:
curl -s "https://api.openai.com/v1/models" \
-H "Authorization: Bearer YOUR_API_KEY" | python3 -m json.tool
# If 401: API key is invalid or expired
# For rate limiting:
# Check your OpenAI dashboard for current usage and limits
# Implement backoff in the bridge server (see reconnection logic)
Problem: WebSocket connection hangs on establishment
# Test connectivity to OpenAI from your server
curl -v -N \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "OpenAI-Beta: realtime=v1" \
-H "Upgrade: websocket" \
-H "Connection: Upgrade" \
"https://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview"
# Check DNS resolution
dig api.openai.com
# Check if a firewall is blocking WebSocket upgrades
# Some corporate firewalls strip Upgrade headers
Latency Issues
Problem: AI takes too long to respond (> 1 second)
# Measure component latency:
# 1. AudioSocket -> Bridge: should be < 5ms (local TCP)
# 2. Resampling: should be < 2ms per frame
# 3. Bridge -> OpenAI: depends on network (ping api.openai.com)
# 4. OpenAI processing: typically 200-400ms
# 5. OpenAI -> Bridge -> Asterisk: same as #3 + #1
# Test network latency
ping api.openai.com
# If > 100ms, consider a server closer to OpenAI's infrastructure (US)
# Check if the issue is VAD-related
# If VAD silence_duration_ms is too high, the AI waits too long
# after the caller stops speaking. Try reducing to 400ms.
# Check CPU usage (resampling is CPU-intensive)
top -p $(pgrep -f bridge.py)
# If CPU > 80% per call, the server is underpowered
Problem: Echo or feedback loop
# This happens when AI audio feeds back into the input
# AudioSocket is full-duplex, so echo cancellation is needed
# Solution 1: Enable Asterisk's built-in echo cancellation
# In sip.conf or pjsip.conf:
# echocancel=yes
# echocancelwhenbridged=yes
# Solution 2: Add silence detection in the bridge
# When the AI is speaking, attenuate or mute the input audio
# This is a simple approach that prevents echo:
# Add to handle_caller_audio in bridge.py
async def handle_caller_audio(self, audio_socket):
async for pcm_chunk in audio_socket.read_audio():
if not self.is_active:
break
# Simple echo prevention: reduce input volume while AI speaks
if self.ai_speaking:
# Convert to numpy, attenuate by 90%, convert back
samples = np.frombuffer(pcm_chunk, dtype=np.int16)
samples = (samples * 0.1).astype(np.int16)
pcm_chunk = samples.tobytes()
# ... rest of processing
Rate Limits
OpenAI enforces rate limits on the Realtime API:
| Tier | Max Concurrent | Tokens/Min | Requests/Min |
|---|---|---|---|
| Tier 1 | 10 | 200,000 | 100 |
| Tier 2 | 25 | 500,000 | 200 |
| Tier 3 | 50 | 1,000,000 | 500 |
| Tier 4 | 100 | 2,000,000 | 1,000 |
When you hit a rate limit:
- The API sends a
rate_limits.updatedevent showing remaining capacity - If you exceed the limit, the WebSocket connection may be closed with code 4003
- Implement exponential backoff and a call queue for overflow
# Add rate limit tracking
class RateLimitTracker:
def __init__(self):
self.limits = {}
def update(self, rate_limits: list):
for limit in rate_limits:
self.limits[limit["name"]] = {
"limit": limit.get("limit"),
"remaining": limit.get("remaining"),
"reset_seconds": limit.get("reset_seconds"),
}
def can_accept_call(self) -> bool:
"""Check if we have capacity for another call."""
for name, info in self.limits.items():
if info.get("remaining", 999) < 5:
return False
return True
Common Errors and Solutions
| Error | Cause | Solution |
|---|---|---|
module res_audiosocket not found |
Module not compiled | Rebuild Asterisk with AudioSocket support |
Connection refused on port 9093 |
Bridge not running | Check systemd status, port conflicts |
WebSocket 401 Unauthorized |
Invalid API key | Verify OPENAI_API_KEY in .env |
WebSocket 429 Too Many Requests |
Rate limited | Reduce concurrent calls, upgrade tier |
Audio sounds choppy |
Network jitter or CPU overload | Use a closer server, optimize resampling |
AI does not respond |
VAD threshold too high | Lower VAD_THRESHOLD to 0.3-0.4 |
AI interrupts caller |
VAD silence too short | Increase VAD_SILENCE_MS to 700+ |
No audio to caller |
Wrong audio format | Verify slin16 in dialplan |
Call drops after 30s |
WebSocket idle timeout | Ensure audio is being sent continuously |
Function calls fail |
Backend API unreachable | Check BACKEND_API_URL, test connectivity |
14. Advanced Topics
Multi-Language Support
The Realtime API supports multiple languages. You can build a multi-language agent by detecting the caller's language and adjusting the session:
# Option 1: Language-specific DIDs
# Route different DIDs to different system prompts
LANGUAGE_PROMPTS = {
"en": "You are Sarah, a receptionist for Acme Services. Speak English.",
"es": "Eres Maria, recepcionista de Acme Services. Habla en espanol.",
"fr": "Vous etes Sophie, receptionniste chez Acme Services. Parlez en francais.",
"it": "Sei Sara, receptionist di Acme Services. Parla in italiano.",
"de": "Du bist Lisa, Rezeptionistin bei Acme Services. Sprich auf Deutsch.",
}
# In the dialplan, pass the language as a variable:
# Set(LANGUAGE=es)
# AudioSocket(${CHANNEL(uniqueid)},127.0.0.1:9093)
# In the bridge, read the language from the DID or channel variable
# and select the appropriate system prompt
# Option 2: Auto-detect language from the first few seconds
# Let the AI handle it with a multi-language system prompt
MULTILINGUAL_PROMPT = """
You are a multilingual receptionist for Acme Services.
LANGUAGE DETECTION:
- Listen to the caller's first words
- Respond in the SAME language they use
- If they switch languages mid-call, switch with them
- Supported languages: English, Spanish, French, Italian, German
Greet initially in English. If the caller responds in another language,
immediately switch and continue in their language for the rest of the call.
[... rest of the prompt in English - the AI will translate on the fly ...]
"""
Warm Transfer
A warm transfer is when the AI hands off to a human agent while providing context about the call. This requires coordination between the bridge server and Asterisk:
async def warm_transfer(self, department: str, context: str):
"""
Execute a warm transfer:
1. AI tells the caller to hold
2. Bridge creates a conference
3. Bridge calls the human agent
4. Bridge whispers the context to the human agent
5. Bridge bridges the caller into the conference
"""
# Step 1: Tell the AI to inform the caller
await self.ws.send(json.dumps({
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "system",
"content": [{
"type": "input_text",
"text": (
"Tell the caller you are transferring them now and "
"they will be connected shortly. Then stop speaking."
),
}],
},
}))
await self.ws.send(json.dumps({"type": "response.create"}))
# Step 2: Wait for AI to finish speaking (2-3 seconds)
await asyncio.sleep(3)
# Step 3: Send AMI commands for attended transfer
# This creates a second call to the agent and bridges them
ami_commands = [
# Originate a call to the human agent
{
"Action": "Originate",
"Channel": f"SIP/{department}-queue",
"Context": "warm-transfer",
"Exten": "s",
"Priority": "1",
"CallerID": f"AI Transfer <{self.call_id[:10]}>",
"Variable": f"TRANSFER_CONTEXT={context}",
"Async": "yes",
},
]
# Execute via AMI...
; Dialplan for warm transfer
[warm-transfer]
exten => s,1,NoOp(Warm transfer - context: ${TRANSFER_CONTEXT})
same => n,Answer()
; Play the context to the human agent (whisper)
same => n,Playback(beep)
same => n,SayAlpha(${TRANSFER_CONTEXT:0:20})
same => n,Playback(beep)
; Bridge the agent to the original caller
same => n,Bridge(${TRANSFER_CHANNEL})
same => n,Hangup()
Recording Both Sides
To record both the caller and the AI for quality assurance, you have two options:
Option 1: Asterisk-side recording (recommended)
; In the dialplan, before AudioSocket:
exten => s,1,NoOp(AI Voice Agent)
same => n,Answer()
same => n,Wait(0.5)
same => n,Set(CHANNEL(audioreadformat)=slin16)
same => n,Set(CHANNEL(audiowriteformat)=slin16)
same => n,MixMonitor(/var/spool/asterisk/monitor/${UNIQUEID}.wav,b)
same => n,AudioSocket(${CHANNEL(uniqueid)},YOUR_SERVER_IP:9093)
same => n,StopMixMonitor()
same => n,Hangup()
Option 2: Bridge-side recording (for separate tracks)
import wave
import struct
from datetime import datetime
class AudioRecorder:
"""Record caller and AI audio to separate WAV files."""
def __init__(self, call_id: str, output_dir: str = "/var/spool/ai-recordings"):
self.call_id = call_id
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
base = self.output_dir / f"{timestamp}_{call_id}"
# Caller audio (16kHz input from AudioSocket)
self.caller_file = wave.open(str(base) + "_caller.wav", "wb")
self.caller_file.setnchannels(1)
self.caller_file.setsampwidth(2)
self.caller_file.setframerate(16000)
# AI audio (16kHz output to AudioSocket)
self.ai_file = wave.open(str(base) + "_ai.wav", "wb")
self.ai_file.setnchannels(1)
self.ai_file.setsampwidth(2)
self.ai_file.setframerate(16000)
def write_caller(self, pcm_data: bytes):
"""Write caller audio chunk."""
self.caller_file.writeframes(pcm_data)
def write_ai(self, pcm_data: bytes):
"""Write AI audio chunk."""
self.ai_file.writeframes(pcm_data)
def close(self):
"""Close both files."""
self.caller_file.close()
self.ai_file.close()
A/B Testing Prompts
Test different system prompts to optimize conversation quality and conversion rates:
import random
import hashlib
class PromptABTester:
"""A/B test different system prompts."""
def __init__(self):
self.variants = {
"control": {
"prompt": "You are Sarah, a professional receptionist...",
"voice": "alloy",
"vad_silence_ms": 500,
},
"friendly": {
"prompt": "You are Emma, a warm and chatty receptionist...",
"voice": "coral",
"vad_silence_ms": 600,
},
"efficient": {
"prompt": "You are Alex, a brisk and efficient receptionist...",
"voice": "echo",
"vad_silence_ms": 400,
},
}
def get_variant(self, call_id: str) -> tuple[str, dict]:
"""
Deterministically assign a variant based on call ID.
This ensures the same caller always gets the same variant
within a test period.
"""
hash_val = int(hashlib.md5(call_id.encode()).hexdigest(), 16)
variant_names = list(self.variants.keys())
selected = variant_names[hash_val % len(variant_names)]
return selected, self.variants[selected]
def log_result(self, call_id: str, variant: str, metrics: dict):
"""Log A/B test results for analysis."""
logger.info(
"ab_test.result",
call_id=call_id,
variant=variant,
duration=metrics.get("duration"),
function_calls=metrics.get("function_calls"),
disposition=metrics.get("disposition"),
estimated_cost=metrics.get("cost"),
)
To use A/B testing, modify the CallSession._configure_session method to select the variant:
# In bridge.py
ab_tester = PromptABTester()
class CallSession:
async def _configure_session(self):
variant_name, variant = ab_tester.get_variant(self.call_id)
self.variant_name = variant_name
logger.info(
"ab_test.assigned",
call_id=self.call_id,
variant=variant_name,
)
session_config = {
"type": "session.update",
"session": {
"instructions": variant["prompt"],
"voice": variant["voice"],
"turn_detection": {
"type": "server_vad",
"silence_duration_ms": variant["vad_silence_ms"],
# ... other VAD settings
},
# ... rest of config
},
}
await self.ws.send(json.dumps(session_config))
Dynamic Session Updates
You can update the session configuration mid-call. This is useful for:
- Changing the system prompt after identifying the caller's intent
- Adjusting VAD settings if the call environment changes
- Adding or removing tools based on conversation progress
async def update_session_prompt(self, new_instructions: str):
"""Update the system prompt mid-conversation."""
await self.ws.send(json.dumps({
"type": "session.update",
"session": {
"instructions": new_instructions,
},
}))
Scaling to Multiple Servers
For high-volume deployments (100+ concurrent calls), deploy multiple bridge servers behind a load balancer:
┌──────────────┐
│ HAProxy / │
Asterisk ───────────────>│ TCP Load │
(AudioSocket) │ Balancer │
└──────┬───────┘
│
┌─────────────┼─────────────┐
│ │ │
┌─────┴─────┐ ┌────┴──────┐ ┌────┴──────┐
│ Bridge #1 │ │ Bridge #2 │ │ Bridge #3 │
│ (20 calls)│ │ (20 calls)│ │ (20 calls)│
└───────────┘ └───────────┘ └───────────┘
HAProxy configuration for TCP load balancing:
# /etc/haproxy/haproxy.cfg
frontend ai_voice_bridge
bind *:9093
mode tcp
default_backend bridge_servers
backend bridge_servers
mode tcp
balance leastconn
server bridge1 10.0.0.11:9093 check
server bridge2 10.0.0.12:9093 check
server bridge3 10.0.0.13:9093 check
Integrating with Existing IVR
If you have an existing IVR and want to add AI handling for specific menu options:
; Existing IVR with AI option
[ivr-main]
exten => s,1,Answer()
same => n,Playback(welcome-message)
same => n,Background(press-1-for-sales&press-2-for-support&press-3-for-ai)
same => n,WaitExten(5)
exten => 1,1,Goto(sales-queue,s,1)
exten => 2,1,Goto(support-queue,s,1)
; Route to AI agent
exten => 3,1,NoOp(Caller chose AI agent)
same => n,Set(CHANNEL(audioreadformat)=slin16)
same => n,Set(CHANNEL(audiowriteformat)=slin16)
same => n,AudioSocket(${CHANNEL(uniqueid)},127.0.0.1:9093)
same => n,Hangup()
; Timeout / invalid - go to AI agent as fallback
exten => t,1,Goto(3,1)
exten => i,1,Goto(3,1)
Summary
This tutorial covered the complete pipeline for building an AI voice agent using OpenAI's Realtime API with Asterisk AudioSocket:
- Architecture: Phone -> Asterisk -> AudioSocket -> Bridge Server -> OpenAI Realtime API
- AudioSocket: Simple TCP protocol for bidirectional PCM audio streaming
- Bridge Server: Complete Python and Node.js implementations with audio resampling, event handling, and function calling
- Conversation Design: Voice selection, VAD tuning, interruption handling, transfer workflows
- Function Calling: Real-time tool execution during voice conversations
- Production Deployment: systemd services, security, concurrent call handling
- Cost Management: Pricing breakdown, optimization strategies, budget alerts
- Monitoring: Prometheus metrics, Grafana dashboards, call logging
- Troubleshooting: Audio format issues, WebSocket problems, latency debugging
- Advanced Topics: Multi-language, warm transfer, recording, A/B testing
The OpenAI Realtime API represents the current best balance of quality, latency, and development effort for voice AI. The native speech-to-speech model eliminates the complexity of managing separate STT and TTS services, while delivering conversation quality that matches or exceeds the best pipeline-based approaches.
Start with the Python bridge server, test with a single DID, and iterate on your system prompt based on real call recordings. The prompt is the product -- everything else is plumbing.
Related Tutorials:
- Tutorial 03: AI Voice Agent -- Local Pipeline (Whisper + LLM + Piper)
- Tutorial 23: AI Voice Agent -- ElevenLabs Conversational AI
- Tutorial 28: Voice Agent Prompt Engineering and Conversation Design
- Tutorial 29: Voice Agent Stack Comparison and Booking Backend
Tutorial #37 in the Production VoIP Tutorial Series Difficulty: Advanced | Reading Time: ~75 minutes Technologies: Python, Node.js, OpenAI Realtime API, WebSocket, Asterisk, AudioSocket, asyncio