Lore: Reliable AI Response Streaming with TCP-Inspired Buffer Management

When building Lore, a service that provides AI-generated programming concept explanations to beginners, I faced an interesting challenge: How could I ensure reliable delivery of streamed responses while maintaining real-time performance?

The solution drew inspiration from one of networking's most battle-tested protocols - TCP's sliding window mechanism.

If you are new to TCP, I highly recommend reading "What is TCP (Transmission Control Protocol)?"

The Challenge

Streaming AI-generated text presents several reliability concerns:

  • Network interruptions could cause lost chunks of text
  • Client-side processing delays might miss chunks
  • Server-side buffer overflows could drop content
  • Out-of-order delivery would create nonsensical responses

Traditional request/response patterns wouldn't work well here. I needed a way to stream content as it was generated while ensuring every piece reached the client in the correct order.

The TCP Inspiration

TCP's sliding window protocol has managed reliable data transfer across unreliable networks for decades. Its core concepts seemed perfect for this use case:

  • Sequence numbers for tracking chunks
  • Window-based flow control
  • Acknowledgment system
  • Retransmission of lost data

Video Demo

I recorded the initial service demo immediately after I had a reliable, working version. There are mny things that need to be done to make it robust and scalable to more than one user, but I'm excited that I got the basics to work.

Implementation

I designed a buffering system that mirrors these TCP concepts:

class ChunkBuffer:
    def __init__(self, request_id: int, window_size: int = 5):
        self.request_id: int = request_id
        self.window_size: int = window_size
        self.buffer: dict[int, TextChunk] = {}
        self.next_sequence: int = 0
        self.last_acked: int = -1
        self.unacked_chunks: dict[int, TextChunk] = {}
        self.last_transmit: dict[int, float] = {}
        self.condition = threading.Condition()

The buffer maintains a window of unacknowledged chunks. Chunks are built up as my model generates new text from a streamer.

# Initialize a streamer so I don't have to wait for entire response
streamer = TextIteratorStreamer(
    self.tokenizer,
    timeout=None,
    skip_prompt=True,
    skip_special_tokens=True
)

Then a new thread is started in which the generate() method on the model is called.

thread = threading.Thread(
    target=self.model.generate,
    kwargs={
        "input_ids": inputs["input_ids"],
        "streamer": streamer,
        "max_new_tokens": self.settings.MODEL_MAX_LENGTH,
        "do_sample": True,
        "temperature": self.settings.MODEL_TEMPERATURE,
        "top_p": 0.9,
        "top_k": 50,
        "repetition_penalty": 1.2,
        "no_repeat_ngram_size": 3
    }
)
thread.start()

The cool thing about this is that I can now just wait for new bits text to be generated by the model, and build up chunks to be sent to the client.

# Process generated chunks
current_chunk = ""
for new_text in streamer:
    current_chunk += new_text

    # Buffer chunks when they reach defined size
    if len(current_chunk) >= self.settings.BUFFER_CHUNK_SIZE:
        chunk_obj = buffer.add_chunk(current_chunk)
        current_chunk = ""

        # Publish new chunk to the message bus
        self.valkey.publish(
            f'{self.settings.RESPONSE_CHANNEL_PREFIX}{question.request_id}',
            chunk_obj.model_dump_json()
        )

The Flow Control Dance

Just like TCP, the system uses back pressure to prevent overwhelming any component:

  1. The model generates text chunks
  2. Chunks enter the buffer until it reaches window_size
  3. Generation pauses until acknowledgments arrive
  4. Each acknowledgment frees buffer space
  5. Generation resumes when space is available

This creates a natural flow control system where the client's processing speed influences the generation pace.

Multi-Threaded Architecture

The service runs multiple concurrent threads to handle different responsibilities:

  1. Question Thread: Listens for incoming questions and initiates response generation
  2. AI Thread: Handles the model inference and chunk generation
  3. Acknowledgment Thread: Processes chunk acknowledgments
  4. Retransmission Thread: Monitors for and handles lost chunks

Here's how the service initializes the initial threads. The text generation thread was shown above.

def start(self) -> None:
    """Start all service threads"""
    try:
        # Initialize and start service threads
        self.question_thread = threading.Thread(
            target=self._process_questions,
            name="question_thread"
        )
        self.ack_thread = threading.Thread(
            target=self._process_acks,
            name="ack_thread"
        )
        self.retransmit_thread = threading.Thread(
            target=self._process_retransmits,
            name="retransmit_thread"
        )
        
        # Start threads
        self.question_thread.start()
        self.ack_thread.start()
        self.retransmit_thread.start()
        

Monitoring with Prometheus and Grafana

To ensure reliability and performance, the service includes comprehensive monitoring using Prometheus and Grafana running in Docker containers. The Lore service publishes metrics data in key places, which is available from a Prometheus HTTP server. Grafana, running as a separate service, periodically scrapes that raw data to display it in a more visuall appealing way.

Prometheus Metrics Collection

The service exposes key metrics through a Prometheus endpoint:

  • Buffer utilization rates
  • Chunk transmission latency
  • Acknowledgment processing times
  • Retransmission counts
  • Model generation speed

Real-Time Delivery with Server-Sent Events

Lore implements real-time response streaming using Server-Sent Events (SSE), enabling chunk-by-chunk delivery of AI responses to clients. The streaming process involves three main components working in concert.

Message Flow

First, Lore generates response chunks and publishes them to Valkey. Each chunk is a complete package of information.

  • Sequence number
  • Text generated by model
  • Request ID
  • Completion status

The Django API acts as the intermediary, subscribing to these chunks from Valkey as they arrive. It creates a streaming response using StreamingHttpResponse and formats each chunk as an SSE event. Throughout this process, the API maintains an open connection, staying ready to receive and forward chunks until the final one arrives.

On the frontend, the client establishes an SSE connection to receive these chunks in real-time. As each chunk arrives, the client processes it and sends back an acknowledgment to confirm receipt. This acknowledgment system is crucial for reliability—if any chunks go missing in transit, they'll be automatically retransmitted. The entire process is designed for smooth, efficient operation from start to finish, with clean connection handling throughout.

This architecture ensures:

  • Real-time delivery of responses
  • Reliable message delivery with acknowledgments
  • Automatic retransmission of lost chunks
  • Clean connection handling

Results

The TCP-inspired design of this system has exceeded expectations in production. By implementing sequence numbers, acknowledgments, and retransmission, I've achieved perfect reliability — not a single chunk has been lost.

The sequencing system ensures responses always arrive in the correct order, while the windowing mechanism provides natural flow control. When the client application process chunks slowly, the system adapts automatically rather than overwhelming them with data.

Even network interruptions are handled gracefully, with the system picking up right where it left off when the connection is restored.

Conclusion

By borrowing concepts from TCP and implementing a multi-threaded architecture with robust monitoring, I turned a potentially fragile streaming system into a reliable, observable, production-ready service. It's a reminder that often the best solutions come from adapting proven patterns to new contexts while ensuring proper observability.