Close Menu
  • AI
  • Content Creation
  • Tech
  • Robotics
AI-trends.todayAI-trends.today
  • AI
  • Content Creation
  • Tech
  • Robotics
Trending
  • xAI Releases Standalone Grok Speech to text and Text to speech APIs, Aimed at Enterprise Voice Developers
  • Anthropic releases Claude Opus 4.7, a major upgrade for agentic coding, high-resolution vision, and long-horizon autonomous tasks
  • The Coding Guide to Property Based Testing with Hypothesis and Stateful, Differential and Metamorphic Test Designs
  • Schematik Is ‘Cursor for Hardware.’ The Anthropics Want In
  • Hacking the EU’s new age-verification app takes only 2 minutes
  • Google AI Releases Google Auto-Diagnosis: A Large Language Model LLM Based System to Diagnose Integrity Test Failures At Scale
  • This is a complete guide to running OpenAI’s GPT-OSS open-weight models using advanced inference workflows.
  • The Huey Code Guide: Build a High-Performance Background Task Processor Using Scheduling with Retries and Pipelines.
AI-trends.todayAI-trends.today
Home»Tech»How to design a fully streaming voice agent with end-to-end latency budgets, incremental ASR, LLM stream, and real-time TTS

How to design a fully streaming voice agent with end-to-end latency budgets, incremental ASR, LLM stream, and real-time TTS

Tech By Gavin Wallace20/01/20267 Mins Read
Facebook Twitter LinkedIn Email
Samsung Researchers Introduced ANSE (Active Noise Selection for Generation): A
Samsung Researchers Introduced ANSE (Active Noise Selection for Generation): A
Share
Facebook Twitter LinkedIn Email

We build a real-time, end-to-end voice agent in this tutorial that mimics the way modern low latency conversational system operate. We model the entire pipeline from chunked speech input to streaming recognition and incremental language model reasoning, as well as text-to-speech, all while tracking latencies at each stage. We focus on practical engineering tradeoffs by working within strict latency budgets, and monitoring metrics like time to the first token or time to audio. Visit the FULL CODES here.

import time
import asyncio
Numpy can be imported as np
From collections Import Deque
Import dataclass
AsyncIterator is a shortcut for typing in List.
From enum, import Enum
Import matplotlib.pyplot into plt


@dataclass
class LatencyMetrics:
   audio_chunk_received: float = 0.0
   asr_started: float = 0.0
 Asr_partial float: 0.0
 Asr_complete: floating point value 0.0
   llm_started: float = 0.0
   llm_first_token: float = 0.0
   llm_complete: float = 0.0
   tts_started: float = 0.0
 Tts_first_chunk : float = 0.
 The tts_complete function is a float value of 0.0


   def get_time_to_first_audio(self) -> float:
 Return self.tts_first_chunk -- self.asr_complete, if both self.tts_first_chunk AND self.asr_complete are true else return 0.0


   def get_total_latency(self) -> float:
       return self.tts_complete - self.audio_chunk_received if self.tts_complete else 0.0


@dataclass
Class LatencyBudgets
 The float value for asr_processing is 0.1
   asr_finalization: float = 0.3
   llm_first_token: float = 0.5
   llm_token_generation: float = 0.02
 The first chunk of data is a float value of 0.2.
   tts_chunk_generation: float = 0.05
 The time_to_first_audio is float equal to 1.0


Class AgentState (Enum)
 Listening = "listening"
   PROCESSING_SPEECH = "processing_speech"
 Think = "thinking"
 Speaking = "speaking"
 The INTERRUPTED= "interrupted"

The core data structures, state representations and measurement methods that we use to measure latency throughout the voice pipeline are defined. To ensure consistency across stages, we formalize the timing signals used for ASR and LLM. A clear agent state-machine is also established to guide how the system changes during a turn in conversation. See the FULL CODES here.

Class AudioInputStream
   def __init__(self, sample_rate: int = 16000, chunk_duration_ms: int = 100):
       self.sample_rate = sample_rate
       self.chunk_duration_ms = chunk_duration_ms
       self.chunk_size = int(sample_rate * chunk_duration_ms / 1000)


   async def stream_audio(self, text: str) -> AsyncIterator[np.ndarray]:
       chars_per_second = (150 * 5) / 60
       duration_seconds = len(text) / chars_per_second
       num_chunks = int(duration_seconds * 1000 / self.chunk_duration_ms)


 If _ is in the range (num_chunks), then:
           chunk = np.random.randn(self.chunk_size).astype(np.float32) * 0.1
           await asyncio.sleep(self.chunk_duration_ms / 1000)
 

By breaking the speech down into chunks of fixed duration that arrive at asynchronously, we simulate real-time input. To simulate live microphone input, we model realistic speaking speeds and streaming behaviour. This stream is used to test downstream components that are sensitive to latency. Take a look at the FULL CODES here.

Class StreamingASR
   def __init__(self, latency_budget: float = 0.1):
       self.latency_budget = latency_budget
       self.silence_threshold = 0.5


 Async Def Transcribe_Stream (
       self,
       audio_stream: AsyncIterator[np.ndarray],
 Ground_truth
   ) -> AsyncIterator[tuple[str, bool]]:
       words = ground_truth.split()
       words_transcribed = 0
       silence_duration = 0.0
       chunk_count = 0


 Async audio_stream for chunk:
           chunk_count += 1
           await asyncio.sleep(self.latency_budget)


           if chunk_count % 3 == 0 and words_transcribed = self.silence_threshold:
               await asyncio.sleep(0.2)
 Truth, ground_truth
 Return to the Homepage


 Truth, ground_truth

ASR streaming module produces partial transcripts and then emits a result. In order to mimic the way modern ASR works in real-time, we gradually reveal words. Also, we introduce silence-based end to detect the approximate end of an utterance. Take a look at the FULL CODES here.

Class StreamingLLMResponses =
   def __init__(self, time_to_first_token: float = 0.3, tokens_per_second: float = 50):
       self.time_to_first_token = time_to_first_token
       self.tokens_per_second = tokens_per_second


   async def generate_response(self, prompt: str) -> AsyncIterator[str]:
       responses = {
           "hello": "Hello! How can I help you today?",
           "weather": "The weather is sunny with a temperature of 72°F.",
           "time": "The current time is 2:30 PM.",
           "default": "I understand. Let me help you with that."
       }


 Response = response["default"]
 For key in answers:
 If you press the key prompt.lower():
 Response = response[key]
 Breaking News


       await asyncio.sleep(self.time_to_first_token)


 Split the word "response"():
           yield word + " "
           await asyncio.sleep(1.0 / self.tokens_per_second)


class StreamingTTS
   def __init__(self, time_to_first_chunk: float = 0.2, chars_per_second: float = 15):
       self.time_to_first_chunk = time_to_first_chunk
       self.chars_per_second = chars_per_second


   async def synthesize_stream(self, text_stream: AsyncIterator[str]) -> AsyncIterator[np.ndarray]:
 First_chunk = False
       buffer = ""


 Async text for text_stream
           buffer += text
           if len(buffer) >= 20 or first_chunk:
 If first_chunk
                   await asyncio.sleep(self.time_to_first_chunk)
 First_chunk = FALSE


               duration = len(buffer) / self.chars_per_second
               yield np.random.randn(int(16000 * duration)).astype(np.float32) * 0.1
               buffer = ""
               await asyncio.sleep(duration * 0.5)

We model in this sample a working streaming language engine and a model of a streaming text to speech. To capture the time to first token, we generate response token-by-token. Then, we convert the incremental text to audio chunks in order to simulate both early and continuous speech. Take a look at the FULL CODES here.

Class StreamingVoiceAgent
   def __init__(self, latency_budgets: LatencyBudgets):
       self.budgets = latency_budgets
 Self.audio_stream = audioInputStream()
       self.asr = StreamingASR(latency_budgets.asr_processing)
       self.llm = StreamingLLM(
           latency_budgets.llm_first_token,
           1.0 / latency_budgets.llm_token_generation
       )
       self.tts = StreamingTTS(
           latency_budgets.tts_first_chunk,
           1.0 / latency_budgets.tts_chunk_generation
       )
       self.state = AgentState.LISTENING
       self.metrics_history: List[LatencyMetrics] = []


   async def process_turn(self, user_input: str) -> LatencyMetrics:
       metrics = LatencyMetrics()
       start_time = time.time()


       metrics.audio_chunk_received = time.time() - start_time
       audio_gen = self.audio_stream.stream_audio(user_input)


       metrics.asr_started = time.time() - start_time
       async for text, final in self.asr.transcribe_stream(audio_gen, user_input):
 If you are final:
               metrics.asr_complete = time.time() - start_time
 Text = transcription


       metrics.llm_started = time.time() - start_time
 Response = ""
       async for token in self.llm.generate_response(transcription):
           if not metrics.llm_first_token:
               metrics.llm_first_token = time.time() - start_time
           response += token


       metrics.llm_complete = time.time() - start_time
       metrics.tts_started = time.time() - start_time


 Text_stream async():
 For word in reply.():
               yield word + " "


       async for _ in self.tts.synthesize_stream(text_stream()):
           if not metrics.tts_first_chunk:
               metrics.tts_first_chunk = time.time() - start_time


       metrics.tts_complete = time.time() - start_time
       self.metrics_history.append(metrics)
 Return metrics

Wir orchestrate a full voice agent, by combining audio input, LLM and TTS in a single, asynchronous flow. To compute latency metrics, we record timestamps for each transition. Each user interaction is treated as a single experiment for systematic performance analysis. You can check out the FULL CODES here.

async def run_demo():
   budgets = LatencyBudgets(
       asr_processing=0.08,
       llm_first_token=0.3,
       llm_token_generation=0.02,
       tts_first_chunk=0.15,
       time_to_first_audio=0.8
   )


   agent = StreamingVoiceAgent(budgets)


 The inputs are: [
       "Hello, how are you today?",
       "What's the weather like?",
       "Can you tell me the time?"
   ]


 Text in the input:
       await agent.process_turn(text)
       await asyncio.sleep(1)


If __name__ is equal to "__main__":
   asyncio.run(run_demo())

To observe the consistency of latency and its variance, we run multiple conversations through the system. We use aggressive latency budgets in order to put the pipeline through its paces under realistic constraints. Using these tests, we can validate if the system achieves responsiveness across interactions.

We concluded by showing how to orchestrate a streaming voice agent as an asynchronous pipeline, with stage boundaries that are clear and performance guaranteed. Even though the computation time was not trivial, we showed how combining partial ASR and token-level LLM with early-start TTS reduced perceived latency. The method allows us to think systematically and logically about the turn-taking and responsiveness of the system, as well as its optimization. This is a good foundation for moving forward with real-world deployments that use production ASR and LLM models.


Take a look at the FULL CODES here. Also, feel free to follow us on Twitter Don’t forget about our 100k+ ML SubReddit Subscribe Now our Newsletter. Wait! Are you using Telegram? now you can join us on telegram as well.


Asif Razzaq serves as the CEO at Marktechpost Media Inc. As an entrepreneur, Asif has a passion for harnessing Artificial Intelligence to benefit society. Marktechpost was his most recent venture. This platform, which specializes in covering machine learning and deep-learning news, is both technically solid and understandable to a broad audience. This platform has over 2,000,000 monthly views which shows its popularity.

design Streaming
Share. Facebook Twitter LinkedIn Email
Avatar
Gavin Wallace

Related Posts

xAI Releases Standalone Grok Speech to text and Text to speech APIs, Aimed at Enterprise Voice Developers

19/04/2026

Anthropic releases Claude Opus 4.7, a major upgrade for agentic coding, high-resolution vision, and long-horizon autonomous tasks

19/04/2026

The Coding Guide to Property Based Testing with Hypothesis and Stateful, Differential and Metamorphic Test Designs

19/04/2026

Google AI Releases Google Auto-Diagnosis: A Large Language Model LLM Based System to Diagnose Integrity Test Failures At Scale

18/04/2026
Top News

New York has become the latest State to think about a data centre pause

GPT-3.5 vs GPT-4o: Building a Money-Blaster

Couples retreat with 3 chatbots that are AI and humans who love them

WIRED| WIRED

What is AI? AI Blog

Load More
AI-Trends.Today

Your daily source of AI news and trends. Stay up to date with everything AI and automation!

X (Twitter) Instagram
Top Insights

Meet Trackio – The Open Source Python Experiment tracker Library, which simplifies and improves Machine Learning workflows.

02/08/2025

How to Create an EverMem Style AI Agent OS With Hierarchical Memory and FAISS Retrieval. SQLite Storage. And Automated Memory consolidation.

05/03/2026
Latest News

xAI Releases Standalone Grok Speech to text and Text to speech APIs, Aimed at Enterprise Voice Developers

19/04/2026

Anthropic releases Claude Opus 4.7, a major upgrade for agentic coding, high-resolution vision, and long-horizon autonomous tasks

19/04/2026
X (Twitter) Instagram
  • Privacy Policy
  • Contact Us
  • Terms and Conditions
© 2026 AI-Trends.Today

Type above and press Enter to search. Press Esc to cancel.