We build a real-time, end-to-end voice agent in this tutorial that mimics the way modern low latency conversational system operate. We model the entire pipeline from chunked speech input to streaming recognition and incremental language model reasoning, as well as text-to-speech, all while tracking latencies at each stage. We focus on practical engineering tradeoffs by working within strict latency budgets, and monitoring metrics like time to the first token or time to audio. Visit the FULL CODES here.
import time
import asyncio
Numpy can be imported as np
From collections Import Deque
Import dataclass
AsyncIterator is a shortcut for typing in List.
From enum, import Enum
Import matplotlib.pyplot into plt
@dataclass
class LatencyMetrics:
audio_chunk_received: float = 0.0
asr_started: float = 0.0
Asr_partial float: 0.0
Asr_complete: floating point value 0.0
llm_started: float = 0.0
llm_first_token: float = 0.0
llm_complete: float = 0.0
tts_started: float = 0.0
Tts_first_chunk : float = 0.
The tts_complete function is a float value of 0.0
def get_time_to_first_audio(self) -> float:
Return self.tts_first_chunk -- self.asr_complete, if both self.tts_first_chunk AND self.asr_complete are true else return 0.0
def get_total_latency(self) -> float:
return self.tts_complete - self.audio_chunk_received if self.tts_complete else 0.0
@dataclass
Class LatencyBudgets
The float value for asr_processing is 0.1
asr_finalization: float = 0.3
llm_first_token: float = 0.5
llm_token_generation: float = 0.02
The first chunk of data is a float value of 0.2.
tts_chunk_generation: float = 0.05
The time_to_first_audio is float equal to 1.0
Class AgentState (Enum)
Listening = "listening"
PROCESSING_SPEECH = "processing_speech"
Think = "thinking"
Speaking = "speaking"
The INTERRUPTED= "interrupted"
The core data structures, state representations and measurement methods that we use to measure latency throughout the voice pipeline are defined. To ensure consistency across stages, we formalize the timing signals used for ASR and LLM. A clear agent state-machine is also established to guide how the system changes during a turn in conversation. See the FULL CODES here.
Class AudioInputStream
def __init__(self, sample_rate: int = 16000, chunk_duration_ms: int = 100):
self.sample_rate = sample_rate
self.chunk_duration_ms = chunk_duration_ms
self.chunk_size = int(sample_rate * chunk_duration_ms / 1000)
async def stream_audio(self, text: str) -> AsyncIterator[np.ndarray]:
chars_per_second = (150 * 5) / 60
duration_seconds = len(text) / chars_per_second
num_chunks = int(duration_seconds * 1000 / self.chunk_duration_ms)
If _ is in the range (num_chunks), then:
chunk = np.random.randn(self.chunk_size).astype(np.float32) * 0.1
await asyncio.sleep(self.chunk_duration_ms / 1000)
By breaking the speech down into chunks of fixed duration that arrive at asynchronously, we simulate real-time input. To simulate live microphone input, we model realistic speaking speeds and streaming behaviour. This stream is used to test downstream components that are sensitive to latency. Take a look at the FULL CODES here.
Class StreamingASR
def __init__(self, latency_budget: float = 0.1):
self.latency_budget = latency_budget
self.silence_threshold = 0.5
Async Def Transcribe_Stream (
self,
audio_stream: AsyncIterator[np.ndarray],
Ground_truth
) -> AsyncIterator[tuple[str, bool]]:
words = ground_truth.split()
words_transcribed = 0
silence_duration = 0.0
chunk_count = 0
Async audio_stream for chunk:
chunk_count += 1
await asyncio.sleep(self.latency_budget)
if chunk_count % 3 == 0 and words_transcribed = self.silence_threshold:
await asyncio.sleep(0.2)
Truth, ground_truth
Return to the Homepage
Truth, ground_truth
ASR streaming module produces partial transcripts and then emits a result. In order to mimic the way modern ASR works in real-time, we gradually reveal words. Also, we introduce silence-based end to detect the approximate end of an utterance. Take a look at the FULL CODES here.
Class StreamingLLMResponses =
def __init__(self, time_to_first_token: float = 0.3, tokens_per_second: float = 50):
self.time_to_first_token = time_to_first_token
self.tokens_per_second = tokens_per_second
async def generate_response(self, prompt: str) -> AsyncIterator[str]:
responses = {
"hello": "Hello! How can I help you today?",
"weather": "The weather is sunny with a temperature of 72°F.",
"time": "The current time is 2:30 PM.",
"default": "I understand. Let me help you with that."
}
Response = response["default"]
For key in answers:
If you press the key prompt.lower():
Response = response[key]
Breaking News
await asyncio.sleep(self.time_to_first_token)
Split the word "response"():
yield word + " "
await asyncio.sleep(1.0 / self.tokens_per_second)
class StreamingTTS
def __init__(self, time_to_first_chunk: float = 0.2, chars_per_second: float = 15):
self.time_to_first_chunk = time_to_first_chunk
self.chars_per_second = chars_per_second
async def synthesize_stream(self, text_stream: AsyncIterator[str]) -> AsyncIterator[np.ndarray]:
First_chunk = False
buffer = ""
Async text for text_stream
buffer += text
if len(buffer) >= 20 or first_chunk:
If first_chunk
await asyncio.sleep(self.time_to_first_chunk)
First_chunk = FALSE
duration = len(buffer) / self.chars_per_second
yield np.random.randn(int(16000 * duration)).astype(np.float32) * 0.1
buffer = ""
await asyncio.sleep(duration * 0.5)
We model in this sample a working streaming language engine and a model of a streaming text to speech. To capture the time to first token, we generate response token-by-token. Then, we convert the incremental text to audio chunks in order to simulate both early and continuous speech. Take a look at the FULL CODES here.
Class StreamingVoiceAgent
def __init__(self, latency_budgets: LatencyBudgets):
self.budgets = latency_budgets
Self.audio_stream = audioInputStream()
self.asr = StreamingASR(latency_budgets.asr_processing)
self.llm = StreamingLLM(
latency_budgets.llm_first_token,
1.0 / latency_budgets.llm_token_generation
)
self.tts = StreamingTTS(
latency_budgets.tts_first_chunk,
1.0 / latency_budgets.tts_chunk_generation
)
self.state = AgentState.LISTENING
self.metrics_history: List[LatencyMetrics] = []
async def process_turn(self, user_input: str) -> LatencyMetrics:
metrics = LatencyMetrics()
start_time = time.time()
metrics.audio_chunk_received = time.time() - start_time
audio_gen = self.audio_stream.stream_audio(user_input)
metrics.asr_started = time.time() - start_time
async for text, final in self.asr.transcribe_stream(audio_gen, user_input):
If you are final:
metrics.asr_complete = time.time() - start_time
Text = transcription
metrics.llm_started = time.time() - start_time
Response = ""
async for token in self.llm.generate_response(transcription):
if not metrics.llm_first_token:
metrics.llm_first_token = time.time() - start_time
response += token
metrics.llm_complete = time.time() - start_time
metrics.tts_started = time.time() - start_time
Text_stream async():
For word in reply.():
yield word + " "
async for _ in self.tts.synthesize_stream(text_stream()):
if not metrics.tts_first_chunk:
metrics.tts_first_chunk = time.time() - start_time
metrics.tts_complete = time.time() - start_time
self.metrics_history.append(metrics)
Return metrics
Wir orchestrate a full voice agent, by combining audio input, LLM and TTS in a single, asynchronous flow. To compute latency metrics, we record timestamps for each transition. Each user interaction is treated as a single experiment for systematic performance analysis. You can check out the FULL CODES here.
async def run_demo():
budgets = LatencyBudgets(
asr_processing=0.08,
llm_first_token=0.3,
llm_token_generation=0.02,
tts_first_chunk=0.15,
time_to_first_audio=0.8
)
agent = StreamingVoiceAgent(budgets)
The inputs are: [
"Hello, how are you today?",
"What's the weather like?",
"Can you tell me the time?"
]
Text in the input:
await agent.process_turn(text)
await asyncio.sleep(1)
If __name__ is equal to "__main__":
asyncio.run(run_demo())
To observe the consistency of latency and its variance, we run multiple conversations through the system. We use aggressive latency budgets in order to put the pipeline through its paces under realistic constraints. Using these tests, we can validate if the system achieves responsiveness across interactions.
We concluded by showing how to orchestrate a streaming voice agent as an asynchronous pipeline, with stage boundaries that are clear and performance guaranteed. Even though the computation time was not trivial, we showed how combining partial ASR and token-level LLM with early-start TTS reduced perceived latency. The method allows us to think systematically and logically about the turn-taking and responsiveness of the system, as well as its optimization. This is a good foundation for moving forward with real-world deployments that use production ASR and LLM models.
Take a look at the FULL CODES here. Also, feel free to follow us on Twitter Don’t forget about our 100k+ ML SubReddit Subscribe Now our Newsletter. Wait! Are you using Telegram? now you can join us on telegram as well.
Asif Razzaq serves as the CEO at Marktechpost Media Inc. As an entrepreneur, Asif has a passion for harnessing Artificial Intelligence to benefit society. Marktechpost was his most recent venture. This platform, which specializes in covering machine learning and deep-learning news, is both technically solid and understandable to a broad audience. This platform has over 2,000,000 monthly views which shows its popularity.

