Skip to main content

StreamingResponseCache

Cache layer for streaming LLM responses. Buffers chunks from a live stream, stores the complete result, and replays chunks from cache on subsequent identical requests — giving callers a consistent streaming interface regardless of cache state.

Overview

Standard ResponseCache can only cache complete (non-streaming) responses. When your LLM call uses stream=True, each chunk arrives as a generator — there's nothing to cache until the stream ends.

StreamingResponseCache solves this:

  • Cache miss: forwards chunks to the caller in real time while buffering them. After the stream ends, the full result is stored.
  • Cache hit: replays stored chunks as a generator — callers get the same streaming interface.

Usage

Sync

import openai
from chengeta_ai import CacheManager, InMemoryBackend, CacheKeyBuilder
from chengeta_ai.layers.streaming_cache import StreamingResponseCache

client = openai.OpenAI()
manager = CacheManager(backend=InMemoryBackend(), key_builder=CacheKeyBuilder())
stream_cache = StreamingResponseCache(manager)

def stream_fn(messages):
return client.chat.completions.create(
model="gpt-4o",
messages=messages,
stream=True,
)

messages = [{"role": "user", "content": "Explain caching in 3 sentences"}]

# First call — live stream, buffered and cached
for chunk in stream_cache.get_or_stream(messages, stream_fn, model_id="gpt-4o"):
print(chunk.choices[0].delta.content or "", end="", flush=True)

print()

# Second call — replayed from cache at full speed
for chunk in stream_cache.get_or_stream(messages, stream_fn, model_id="gpt-4o"):
print(chunk.choices[0].delta.content or "", end="", flush=True)

Async

async def async_stream_fn(messages):
async for chunk in await client.chat.completions.create(..., stream=True):
yield chunk

async for chunk in stream_cache.aget_or_stream(messages, async_stream_fn, model_id="gpt-4o"):
print(chunk.choices[0].delta.content or "", end="", flush=True)

Custom chunk joiner

By default, chunks are stored as a list. Provide chunk_joiner to merge them into a single value before storage:

stream_cache = StreamingResponseCache(
manager,
chunk_joiner=lambda chunks: "".join(
c.choices[0].delta.content or "" for c in chunks
),
)

API Reference

ParameterTypeDefaultDescription
managerCacheManager(required)Cache manager instance
serializerSerializer | NonePickleSerializerSerializer for stored chunks
chunk_joinerCallable | NoneNoneMerge chunks before storage; None = store as list
MethodDescription
get_or_stream(messages, stream_fn, model_id, params, ttl, tags)Sync generator
aget_or_stream(messages, stream_fn, model_id, params, ttl, tags)Async generator