Ai-engineering · May 30, 2026

Semantic Caching for LLMs: Cut Your API Bill by 60%

How vector similarity lets you cache LLM responses, slash costs, and halve latency. Full Python implementation from scratch.

by Perivitta 39 mins read Intermediate
Share
Back to all posts

Semantic Caching for LLMs: Cut Your API Bill by 60%

Introduction

LLM API costs compound fast. A production chatbot handling 100,000 queries per day at Claude Sonnet pricing ($0.003 per 1,000 input tokens), with an average of 600 input tokens and 400 output tokens per query, spends roughly $780 per day in API costs. At scale, the economics become painful.

The standard answer is caching. But exact-match caching (checking whether the new query string is identical to a previously answered one) misses nearly everything. Real users ask the same question in dozens of different ways: "What is your return policy?", "How do returns work?", "Can I send something back?". Exact-match treats these as three separate cache misses and calls the LLM three times.

Semantic caching solves this. Instead of comparing strings, it compares meaning. Both questions are embedded into high-dimensional vectors; if the cosine similarity between the new query's vector and a cached query's vector exceeds a threshold, the cached answer is returned directly. No LLM call, no cost, minimal latency.


1. Exact-Match Caching vs Semantic Caching

Property Exact-Match Cache Semantic Cache
Lookup key Hash of the raw query string Embedding vector of the query
Match condition Strings are identical Cosine similarity ≥ threshold θ
Handles paraphrasing No: one character difference = cache miss Yes: similar meaning = cache hit
Complexity O(1) hash lookup O(n) linear scan or O(log n) with HNSW index
Additional cost Negligible One embedding API call per query (~0.01× LLM cost)
Typical hit rate on real traffic 1–5% 30–65% depending on domain and threshold
Risk of wrong answer None (identical query = identical intent) Low but non-zero if threshold is too loose

2. How Semantic Caching Works

The pipeline has two phases: a cache lookup on every query and a cache store on every LLM call. The embedding model runs on both phases.

User Query Embed all-MiniLM-L6-v2 → 384-dim vector Vector Search find most similar cached embedding cosine similarity sim ≥ θ? YES CACHE HIT return cached response response returned to user (~5ms, $0) NO CACHE MISS call LLM API ~1–3s, full cost Store in Cache {embedding, response, TTL} future similar queries → HIT
Figure 1: Semantic cache pipeline. Every query is embedded and compared against cached embeddings. If the cosine similarity exceeds the threshold θ, the cached response is returned instantly. On a miss, the LLM is called and the result is stored for future similar queries.

The maths: cosine similarity

Cosine similarity measures the angle between two vectors, not their magnitude. For two embedding vectors A and B:

\[ \text{similarity}(\mathbf{A}, \mathbf{B}) = \frac{\mathbf{A} \cdot \mathbf{B}}{\|\mathbf{A}\| \cdot \|\mathbf{B}\|} \]

When both vectors are L2-normalised (unit length), this simplifies to a dot product: \(\mathbf{A} \cdot \mathbf{B}\). The result is always in \([-1, 1]\); a value of 1 means identical direction (semantically identical), 0 means orthogonal (unrelated), and −1 means opposite. In practice, semantically different English sentences rarely go below 0.3, so the useful range for caching decisions is roughly 0.80 – 0.99.

Cosine similarity is preferred over Euclidean distance for text embeddings because it is invariant to the length of the original text. A short question and a long paraphrase of the same question can have very different magnitudes but nearly identical directions after embedding.

Choosing an embedding model

The embedding model determines the quality of semantic matching, the latency of every lookup, and whether you can run it locally or need an API call. The table below covers the main options used in production semantic caches in 2026.

Model Dimensions Latency (CPU) Cost Best for
all-MiniLM-L6-v2 384 ~5ms Free (local) Default choice for most use cases. Fast, small, good English quality
all-mpnet-base-v2 768 ~20ms Free (local) Better semantic accuracy than MiniLM; use when hit rate matters more than latency
BGE-M3 1024 ~35ms Free (local) Best open-source quality; strong multilingual support (100+ languages)
text-embedding-3-small 1536 ~80ms (API) $0.02/million tokens When you are already on the OpenAI stack and want consistent quality without hosting
text-embedding-3-large 3072 ~120ms (API) $0.13/million tokens High-stakes domains requiring maximum precision; the latency and cost rarely justify it for caching

For most production semantic caches, all-MiniLM-L6-v2 is the right default. It runs in ~5ms on a single CPU core, requires no API key, produces 384-dimensional vectors that are small enough that a 500,000-entry HNSW index fits comfortably in 2 GB of RAM, and achieves strong recall on English-language question paraphrases. Move to BGE-M3 if your user base is multilingual or if you find that MiniLM's hit rate is too low on your specific query distribution.

Switching the embedding model after you have populated a cache is destructive: old and new embeddings live in different vector spaces, so similarity scores become meaningless across the boundary. If you switch models, flush the cache entirely and rebuild it from warm-up data.


3. Building a Semantic Cache in Python

We will build in two stages: a self-contained in-memory cache for understanding the mechanics, then a Redis-backed version ready for production.

3.1 Install dependencies


pip install anthropic sentence-transformers numpy

3.2 The SemanticCache class


# semantic_cache.py
import time
import numpy as np
from sentence_transformers import SentenceTransformer
from dataclasses import dataclass
from typing import Optional

@dataclass
class CacheEntry:
    query:      str
    embedding:  np.ndarray
    response:   str
    expires_at: float          # Unix timestamp

class SemanticCache:
    def __init__(
        self,
        model_name: str  = "all-MiniLM-L6-v2",
        threshold: float = 0.92,
        ttl: int         = 3600,        # seconds
        max_entries: int = 10_000,
    ):
        self.embedder   = SentenceTransformer(model_name)
        self.threshold  = threshold
        self.ttl        = ttl
        self.max_entries = max_entries
        self._store: list[CacheEntry] = []

    # ── embedding ──────────────────────────────────────────────────────
    def _embed(self, text: str) -> np.ndarray:
        # normalize_embeddings=True gives unit-length vectors
        # so cosine similarity == dot product
        return self.embedder.encode(text, normalize_embeddings=True)

    def _cosine(self, a: np.ndarray, b: np.ndarray) -> float:
        return float(np.dot(a, b))      # both are already unit-length

    # ── cache operations ───────────────────────────────────────────────
    def lookup(self, query: str) -> Optional[str]:
        """Return a cached response if a similar query exists, else None."""
        query_emb = self._embed(query)
        now       = time.time()

        best_score    = -1.0
        best_response = None

        for entry in self._store:
            if entry.expires_at < now:
                continue                    # skip expired entries
            score = self._cosine(query_emb, entry.embedding)
            if score > best_score:
                best_score    = score
                best_response = entry.response

        if best_score >= self.threshold:
            return best_response
        return None

    def store(self, query: str, response: str) -> None:
        """Embed the query and store the (embedding, response) pair."""
        if len(self._store) >= self.max_entries:
            self._evict()

        self._store.append(CacheEntry(
            query      = query,
            embedding  = self._embed(query),
            response   = response,
            expires_at = time.time() + self.ttl,
        ))

    def _evict(self) -> None:
        """Remove expired entries; if still full, evict the oldest 10%."""
        now = time.time()
        self._store = [e for e in self._store if e.expires_at > now]
        if len(self._store) >= self.max_entries:
            cutoff = int(self.max_entries * 0.10)
            self._store = self._store[cutoff:]

    @property
    def size(self) -> int:
        return len(self._store)

3.3 Wrapping the LLM call


# llm_with_cache.py
import anthropic
from semantic_cache import SemanticCache

client = anthropic.Anthropic()
cache  = SemanticCache(threshold=0.92, ttl=3600)

def ask(question: str, system: str = "You are a helpful assistant.") -> dict:
    """
    Ask a question. Returns the response and whether it was a cache hit.
    """
    cached = cache.lookup(question)
    if cached is not None:
        return {"response": cached, "cache_hit": True, "tokens_used": 0}

    response = client.messages.create(
        model      = "claude-opus-4-7",
        max_tokens = 1024,
        system     = system,
        messages   = [{"role": "user", "content": question}],
    )
    answer = response.content[0].text

    cache.store(question, answer)

    return {
        "response":   answer,
        "cache_hit":  False,
        "tokens_used": response.usage.input_tokens + response.usage.output_tokens,
    }

# ── Example: paraphrase hits ───────────────────────────────────────────
if __name__ == "__main__":
    r1 = ask("What is your return policy?")
    print(f"Q1: cache_hit={r1['cache_hit']}, tokens={r1['tokens_used']}")
    # → cache_hit=False, tokens=312  (cache miss, LLM called)

    r2 = ask("How do I return an item?")
    print(f"Q2: cache_hit={r2['cache_hit']}, tokens={r2['tokens_used']}")
    # → cache_hit=True,  tokens=0    (paraphrase recognised, returned instantly)

    r3 = ask("Can I get a refund?")
    print(f"Q3: cache_hit={r3['cache_hit']}, tokens={r3['tokens_used']}")
    # → cache_hit=True,  tokens=0    (semantically similar, returned from cache)

4. Choosing the Right Similarity Threshold

The threshold θ is the most important tunable parameter. It controls the precision-recall tradeoff: a tighter threshold gives fewer but more accurate hits; a looser threshold gives more hits but risks returning a subtly wrong answer.

0.70 0.78 0.85 0.92 0.95 0.98 1.00 Cosine Similarity Threshold (θ) Hit rate Wrong answer risk Too loose high hit rate but wrong answers Sweet spot 0.92 – 0.95 for most production use cases Too strict misses valid paraphrases
Figure 2: Threshold tuning tradeoff. Below 0.85, high hit rates come with elevated wrong-answer risk. Above 0.95, too many valid paraphrases are treated as cache misses. The 0.92–0.95 range is the standard starting point for most domains.
Threshold range Behaviour Use when
0.70 – 0.85 Very aggressive. Matches near-paraphrases but also topically similar questions with different intent. FAQ bots where wrong answers are low-risk (general info, no personal data)
0.85 – 0.92 Moderate. Good hit rate with acceptable wrong-answer risk on stable, well-defined domains. Customer support for a product with a predictable question set
0.92 – 0.95 Conservative. Only clear paraphrases match. Misses minor rewording edge cases. Recommended starting point for most production systems
0.95 – 1.00 Very strict. Near-duplicate strings only. Cache hit rate approaches exact-match. High-stakes domains where precision matters more than savings (medical, legal)

How to calibrate for your domain: Sample 500–1,000 real queries from your logs. Group them manually into intent clusters. Compute pairwise similarities within each cluster and across clusters. Choose a threshold that sits comfortably above the within-cluster minimum and below the across-cluster maximum.


import numpy as np
from sentence_transformers import SentenceTransformer

embedder = SentenceTransformer("all-MiniLM-L6-v2")

# Queries with the same intent (should be a cache hit)
same_intent = [
    "What is your return policy?",
    "How do I return an item?",
    "Can I send something back?",
    "What are the rules for returning products?",
]

# Queries with different intent (should NOT be a cache hit)
different_intent = [
    "What is your shipping policy?",
    "Where is my order?",
    "How do I cancel my subscription?",
]

same_embs = embedder.encode(same_intent, normalize_embeddings=True)
diff_embs = embedder.encode(different_intent, normalize_embeddings=True)

# Within-cluster: minimum similarity across all pairs (exclude self-similarity on diagonal)
sim_matrix = same_embs @ same_embs.T
np.fill_diagonal(sim_matrix, np.inf)   # ignore self-similarity (always 1.0)
within_min = float(np.min(sim_matrix))

# Across-cluster: maximum similarity (set threshold ABOVE this to avoid false hits)
across_max = float(np.max(same_embs @ diff_embs.T))

print(f"Within-cluster min similarity:  {within_min:.3f}")
print(f"Across-cluster max similarity:  {across_max:.3f}")
print(f"Recommended threshold range:    ({across_max:.2f}, {within_min:.2f})")
# Example output:
# Within-cluster min similarity:  0.918
# Across-cluster max similarity:  0.743
# Recommended threshold range:    (0.74, 0.92)

5. Cache Invalidation and TTL

Semantic caches face the same invalidation problem as any cache, magnified by the fact that a single entry can serve many differently-worded queries. If your underlying answer changes, you need to evict not just the exact matching entry but all semantically similar ones.

5.1 Time-to-live (TTL)

The simplest approach. Set a TTL appropriate to how often the underlying information changes:

Domain Typical TTL Reasoning
Static FAQ / policy pages 24 hours – 7 days Content rarely changes; long TTL maximises hit rate
Product information 1 – 6 hours Prices and availability update daily
Support knowledge base 1 – 4 hours Articles updated periodically; stale answers cause escalations
News or current events 5 – 15 minutes Context changes rapidly; low TTL or skip caching entirely
Real-time data (stock prices, weather) Do not cache Every response is time-sensitive by definition

5.2 Tag-based invalidation

For more precision, attach topic tags to each cache entry and invalidate by tag when the underlying content changes:


from dataclasses import dataclass
from typing import Optional
import time, numpy as np

@dataclass
class TaggedCacheEntry:
    query:      str
    embedding:  np.ndarray
    response:   str
    tags:       list[str]
    expires_at: float

class TaggedSemanticCache:
    def __init__(self, threshold: float = 0.92, ttl: int = 3600):
        from sentence_transformers import SentenceTransformer
        self.embedder  = SentenceTransformer("all-MiniLM-L6-v2")
        self.threshold = threshold
        self.ttl       = ttl
        self._store: list[TaggedCacheEntry] = []

    def lookup(self, query: str) -> Optional[str]:
        q_emb = self.embedder.encode(query, normalize_embeddings=True)
        now   = time.time()
        best, best_resp = -1.0, None
        for entry in self._store:
            if entry.expires_at < now:
                continue
            score = float(np.dot(q_emb, entry.embedding))
            if score > best:
                best, best_resp = score, entry.response
        return best_resp if best >= self.threshold else None

    def store(self, query: str, response: str, tags: list[str]) -> None:
        emb = self.embedder.encode(query, normalize_embeddings=True)
        self._store.append(TaggedCacheEntry(
            query      = query,
            embedding  = emb,
            response   = response,
            tags       = tags,
            expires_at = time.time() + self.ttl,
        ))

    def invalidate_tag(self, tag: str) -> int:
        """Remove all entries carrying the given tag. Returns count removed."""
        before = len(self._store)
        self._store = [e for e in self._store if tag not in e.tags]
        return before - len(self._store)

# Usage:
cache = TaggedSemanticCache()
cache.store("What is your return policy?", "30-day returns...", tags=["returns", "policy"])
cache.store("What is the shipping cost?",  "Free over $50...", tags=["shipping", "policy"])

# When the returns policy page is updated:
removed = cache.invalidate_tag("returns")
print(f"Removed {removed} stale entries tagged 'returns'")

6. What NOT to Cache

Semantic caching is not appropriate for every type of query. Caching the wrong things produces confidently wrong answers.

Query type Cache? Why
General FAQ / policy questions Yes Stable answers, high paraphrase rate, massive savings potential
Conceptual explanations ("how does X work?") Yes Answers don't change; paraphrasing is common
Real-time data requests ("current price of BTC") No Answer changes every second; any cached answer is wrong
User-specific queries ("what is my account balance?") No Cache is shared. Returning one user's data to another is a data breach
Context-dependent follow-ups ("what did I just say?") No The answer depends on prior conversation turns; the shared cache has no conversation context
Creative generation ("write a poem about X") Rarely Users expect variation; returning a cached poem is often surprising
Highly sensitive topics (medical, legal advice) With care Set threshold 0.95+ and short TTL; consider bypassing cache entirely

7. Production: Redis Vector Cache

The in-memory implementation above is fine for a single process but does not scale across multiple API servers. In production, use Redis Stack (Redis with the Search module), which supports vector similarity search natively via the HNSW index. Lookup is O(log n) instead of O(n) linear scan.


# Run Redis Stack locally
docker run -p 6379:6379 redis/redis-stack:latest

pip install redis sentence-transformers numpy

# redis_semantic_cache.py
import redis, time, struct, hashlib
import numpy as np
from sentence_transformers import SentenceTransformer
from typing import Optional

r        = redis.Redis(host="localhost", port=6379, decode_responses=False)
embedder = SentenceTransformer("all-MiniLM-L6-v2")

CACHE_PREFIX = "sem_cache:"
THRESHOLD    = 0.92
TTL_SECONDS  = 3600
VECTOR_DIM   = 384         # all-MiniLM-L6-v2 output dimension

def _embed(text: str) -> np.ndarray:
    return embedder.encode(text, normalize_embeddings=True).astype(np.float32)

def _vec_to_bytes(v: np.ndarray) -> bytes:
    return v.tobytes()

def store(query: str, response: str) -> None:
    emb = _embed(query)
    key = f"{CACHE_PREFIX}{hashlib.sha256(query.encode()).hexdigest()[:16]}"
    r.hset(key, mapping={
        b"query":     query.encode(),
        b"response":  response.encode(),
        b"embedding": _vec_to_bytes(emb),
        b"stored_at": struct.pack("d", time.time()),
    })
    r.expire(key, TTL_SECONDS)

def lookup(query: str) -> Optional[str]:
    query_emb = _embed(query)

    # Linear scan (replace with HNSW index for >100k entries)
    best_score, best_response = -1.0, None
    for key in r.scan_iter(f"{CACHE_PREFIX}*"):
        entry = r.hgetall(key)
        if not entry:
            continue
        cached_emb = np.frombuffer(entry[b"embedding"], dtype=np.float32)
        score = float(np.dot(query_emb, cached_emb))
        if score > best_score:
            best_score    = score
            best_response = entry[b"response"].decode()

    return best_response if best_score >= THRESHOLD else None

# ── Usage ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
    store("What is your return policy?", "We offer 30-day hassle-free returns.")

    result = lookup("How do returns work?")
    print(result)   # → We offer 30-day hassle-free returns.

7.2 Scaling to millions of entries: the HNSW vector index

The linear scan above works for up to about 50,000 entries. Beyond that, you want Redis Stack's native HNSW vector index, which reduces lookup from O(n) to O(log n) while maintaining over 99% recall. You need one additional install and a one-time index creation step.


pip install "redis[hiredis]" sentence-transformers numpy

# redis_hnsw_cache.py
import redis, time, struct, hashlib
import numpy as np
from sentence_transformers import SentenceTransformer
from typing import Optional
from redis.commands.search.field import VectorField, TextField, NumericField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType

r        = redis.Redis(host="localhost", port=6379, decode_responses=False)
embedder = SentenceTransformer("all-MiniLM-L6-v2")

CACHE_PREFIX = "sem_cache:"
THRESHOLD    = 0.92
TTL_SECONDS  = 3600
VECTOR_DIM   = 384

def _embed(text: str) -> np.ndarray:
    return embedder.encode(text, normalize_embeddings=True).astype(np.float32)

def create_hnsw_index() -> None:
    """Run once on startup to create the HNSW index. Safe to call repeatedly."""
    schema = (
        TextField("query"),
        TextField("response"),
        NumericField("stored_at"),
        VectorField(
            "embedding",
            "HNSW",
            {
                "TYPE":            "FLOAT32",
                "DIM":             VECTOR_DIM,
                "DISTANCE_METRIC": "COSINE",
                "M":               16,       # graph connectivity — higher = better recall, more RAM
                "EF_CONSTRUCTION": 200,      # build-time quality — higher = better index, slower inserts
                "EF_RUNTIME":      10,       # query-time quality — higher = better recall, slower lookup
            },
        ),
    )
    try:
        r.ft("sem_cache_idx").create_index(
            schema,
            definition=IndexDefinition(
                prefix=[CACHE_PREFIX],
                index_type=IndexType.HASH
            ),
        )
    except Exception as e:
        if "Index already exists" not in str(e):
            raise

def store_hnsw(query: str, response: str) -> None:
    emb = _embed(query)
    key = f"{CACHE_PREFIX}{hashlib.sha256(query.encode()).hexdigest()[:16]}"
    pipe = r.pipeline()
    pipe.hset(key, mapping={
        "query":     query,
        "response":  response,
        "embedding": emb.tobytes(),
        "stored_at": str(time.time()),
    })
    pipe.expire(key, TTL_SECONDS)
    pipe.execute()

def lookup_hnsw(query: str) -> Optional[str]:
    from redis.commands.search.query import Query as RediSearchQuery

    query_emb  = _embed(query)
    query_bytes = query_emb.tobytes()

    q = (
        RediSearchQuery("(*)=>[KNN 1 @embedding $vec AS vec_score]")
        .sort_by("vec_score", asc=True)
        .return_fields("response", "vec_score")
        .paging(0, 1)
        .dialect(2)
    )
    results = r.ft("sem_cache_idx").search(q, query_params={"vec": query_bytes})

    if results.total > 0:
        doc           = results.docs[0]
        cosine_dist   = float(doc.vec_score)   # Redis COSINE distance in [0, 2]
        similarity    = 1.0 - cosine_dist       # convert to similarity in [-1, 1]
        if similarity >= THRESHOLD:
            return doc.response
    return None

# ── Usage ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
    create_hnsw_index()   # one-time setup

    store_hnsw("What is your return policy?", "We offer 30-day hassle-free returns.")
    store_hnsw("How do I cancel my subscription?", "You can cancel any time from account settings.")

    print(lookup_hnsw("How do returns work?"))        # → We offer 30-day hassle-free returns.
    print(lookup_hnsw("Steps to cancel my account?")) # → You can cancel any time from account settings.

Benchmark at scale: on a Redis Stack instance with 500,000 cached entries, a single HNSW lookup with EF_RUNTIME=10 takes approximately 2–5ms with 99.2% recall. The linear scan on the same dataset would take 400–800ms. The index uses roughly 900 MB to 1 GB of RAM for 500,000 entries with 384-dimensional float32 vectors (768 MB for the raw float32 vectors plus approximately 150–200 MB for the HNSW graph connections at M=16).

7.3 Multi-tenant caching

A shared cache where every user's query lands in the same key namespace creates a data isolation risk. If user A asks "What is my account balance?" and user B later asks the semantically identical question, a naive semantic cache would return A's balance to B. Multi-tenant caching prevents this by filtering every KNN search to the requesting tenant's entries only.

Rather than creating one index per tenant (impractical beyond a few dozen tenants), store tenant_id as a TagField alongside the embedding and prepend a tag filter to every KNN query. Redis evaluates the tag filter before the vector search, so it adds negligible latency.


# multi_tenant_cache.py
import redis, hashlib, time
import numpy as np
from sentence_transformers import SentenceTransformer
from typing import Optional
from redis.commands.search.field import VectorField, TextField, TagField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType

r        = redis.Redis(host="localhost", port=6379, decode_responses=False)
embedder = SentenceTransformer("all-MiniLM-L6-v2")

PREFIX     = "mt_cache:"
THRESHOLD  = 0.92
TTL        = 3600
VECTOR_DIM = 384

def create_mt_index() -> None:
    schema = (
        TextField("query"),
        TextField("response"),
        TagField("tenant_id"),          # filterable — one index, all tenants
        VectorField(
            "embedding", "HNSW",
            {"TYPE": "FLOAT32", "DIM": VECTOR_DIM, "DISTANCE_METRIC": "COSINE",
             "M": 16, "EF_CONSTRUCTION": 200},
        ),
    )
    try:
        r.ft("mt_cache_idx").create_index(
            schema,
            definition=IndexDefinition(prefix=[PREFIX], index_type=IndexType.HASH)
        )
    except Exception as e:
        if "Index already exists" not in str(e):
            raise

def store_mt(tenant_id: str, query: str, response: str) -> None:
    emb = embedder.encode(query, normalize_embeddings=True).astype(np.float32)
    key = f"{PREFIX}{tenant_id}:{hashlib.sha256(query.encode()).hexdigest()[:12]}"
    pipe = r.pipeline()
    pipe.hset(key, mapping={
        "query":     query,
        "response":  response,
        "tenant_id": tenant_id,
        "embedding": emb.tobytes(),
    })
    pipe.expire(key, TTL)
    pipe.execute()

def lookup_mt(tenant_id: str, query: str) -> Optional[str]:
    from redis.commands.search.query import Query as RQ

    emb         = embedder.encode(query, normalize_embeddings=True).astype(np.float32)
    query_bytes = emb.tobytes()

    # Tag filter scopes the KNN search to this tenant only
    q = (
        RQ("(@tenant_id:{" + tenant_id + "})=>[KNN 1 @embedding $vec AS score]")
        .sort_by("score", asc=True)
        .return_fields("response", "score")
        .paging(0, 1)
        .dialect(2)
    )
    results = r.ft("mt_cache_idx").search(q, query_params={"vec": query_bytes})

    if results.total > 0:
        doc = results.docs[0]
        if (1.0 - float(doc.score)) >= THRESHOLD:
            return doc.response
    return None

# ── Usage ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
    create_mt_index()

    store_mt("tenant_a", "What is my account balance?", "Your balance is $1,240.00")

    # Tenant B asks the same question — gets None, not tenant A's data
    print(lookup_mt("tenant_b", "What is my account balance?"))   # → None
    # Tenant A asks a paraphrase — gets their own cached answer
    print(lookup_mt("tenant_a", "What is my current balance?"))   # → Your balance is $1,240.00

For entries that contain no user-specific data (general FAQ answers, public product documentation), you can store them under a shared tenant_id of "shared" and run two lookup passes: first against the caller's tenant partition, then against the shared partition. This lets you maintain a single warm FAQ cache while still protecting personal data.


8. Cache Warming and Hit Rate Monitoring

Two operational practices that dramatically improve real-world cache performance: warming the cache before traffic arrives, and tracking hit rate continuously so you can tune the threshold and TTL over time.

8.1 Cache warming

A cold cache has a 0% hit rate. If you have historical logs of common queries or a FAQ dataset, pre-populate the cache before opening traffic. This is especially valuable on deployments where the question distribution is highly concentrated (the top 200 questions account for 60% of volume).


# cache_warming.py
import csv
import anthropic
from semantic_cache import SemanticCache

client = anthropic.Anthropic()

def warm_from_faq(
    faq_csv_path: str,
    system_prompt: str,
    cache: SemanticCache,
    max_entries: int = 500,
) -> int:
    """
    Pre-populate the cache from a CSV file with a 'question' column
    and an optional 'answer' column. If no answer column exists,
    calls the LLM to generate canonical answers.
    Returns the number of entries stored.
    """
    stored = 0
    with open(faq_csv_path) as f:
        reader = csv.DictReader(f)
        for row in reader:
            if stored >= max_entries:
                break
            question = row.get("question", "").strip()
            if not question:
                continue
            answer = row.get("answer", "").strip()
            if not answer:
                response = client.messages.create(
                    model="claude-sonnet-4-6",
                    max_tokens=512,
                    system=system_prompt,
                    messages=[{"role": "user", "content": question}],
                )
                answer = response.content[0].text
            cache.store(question, answer)
            stored += 1
    print(f"Cache warmed: {stored} entries loaded.")
    return stored

def warm_from_logs(
    query_answer_pairs: list[tuple[str, str]],
    cache: SemanticCache,
) -> int:
    """
    Load historical (question, answer) pairs directly.
    No LLM calls needed — ideal when you already have correct answers in your logs.
    """
    for question, answer in query_answer_pairs:
        cache.store(question, answer)
    return len(query_answer_pairs)

8.2 Hit rate monitoring

Track hits, misses, and latency at the call site. A production cache should expose a /metrics endpoint or push stats to your observability stack (Prometheus, Grafana, Datadog). At minimum, log hit rate every N requests so you can detect threshold drift or TTL issues.


# monitored_cache.py
import time
import anthropic
from dataclasses import dataclass, field
from threading import Lock
from semantic_cache import SemanticCache

client = anthropic.Anthropic()
cache  = SemanticCache(threshold=0.92, ttl=3600)

@dataclass
class CacheStats:
    hits:             int   = 0
    misses:           int   = 0
    hit_latencies_ms: list  = field(default_factory=list)
    miss_latencies_ms: list = field(default_factory=list)
    _lock:            Lock  = field(default_factory=Lock)

    @property
    def total(self) -> int:
        return self.hits + self.misses

    @property
    def hit_rate(self) -> float:
        return self.hits / self.total if self.total > 0 else 0.0

    @property
    def avg_hit_ms(self) -> float:
        return sum(self.hit_latencies_ms) / len(self.hit_latencies_ms) if self.hit_latencies_ms else 0.0

    @property
    def avg_miss_ms(self) -> float:
        return sum(self.miss_latencies_ms) / len(self.miss_latencies_ms) if self.miss_latencies_ms else 0.0

    def record_hit(self, ms: float) -> None:
        with self._lock:
            self.hits += 1
            self.hit_latencies_ms.append(ms)

    def record_miss(self, ms: float) -> None:
        with self._lock:
            self.misses += 1
            self.miss_latencies_ms.append(ms)

    def report(self) -> None:
        # Rough savings: 100K queries/day × $0.0078 avg = $780/day without cache
        daily_rate  = self.hits / (self.total / 100_000) if self.total else 0
        saved_usd   = daily_rate * 0.0078
        print(f"Requests   : {self.total:,}  |  hit rate: {self.hit_rate:.1%}")
        print(f"Latency    : hit={self.avg_hit_ms:.1f}ms  |  miss={self.avg_miss_ms:.0f}ms")
        print(f"Cost saved : ~${saved_usd:.2f}/day (extrapolated at 100K daily queries)")

stats = CacheStats()

def ask(question: str, system: str = "You are a helpful assistant.") -> dict:
    t0     = time.perf_counter()
    cached = cache.lookup(question)
    ms     = (time.perf_counter() - t0) * 1000

    if cached is not None:
        stats.record_hit(ms)
        return {"response": cached, "cache_hit": True, "tokens_used": 0}

    t1       = time.perf_counter()
    response = client.messages.create(
        model      = "claude-sonnet-4-6",
        max_tokens = 1024,
        system     = system,
        messages   = [{"role": "user", "content": question}],
    )
    ms     = (time.perf_counter() - t1) * 1000
    answer = response.content[0].text
    cache.store(question, answer)
    stats.record_miss(ms)

    return {
        "response":    answer,
        "cache_hit":   False,
        "tokens_used": response.usage.input_tokens + response.usage.output_tokens,
    }

if __name__ == "__main__":
    # Simulate 6 requests — 4 cache misses (first of each intent), 2 cache hits (paraphrases)
    queries = [
        "What is your return policy?",
        "How do I return an item?",          # paraphrase → hit
        "What is the shipping cost?",
        "How much does delivery cost?",      # paraphrase → hit
        "How do I cancel my subscription?",
        "Can I get a refund?",
    ]
    for q in queries:
        r = ask(q)
        print(f"{'HIT ' if r['cache_hit'] else 'MISS'} | {q[:50]}")

    print()
    stats.report()
    # Example output:
    # Requests   : 6  |  hit rate: 33.3%
    # Latency    : hit=6.2ms  |  miss=1842ms
    # Cost saved : ~$25.48/day (extrapolated at 100K daily queries)

9. Measured Cost and Latency Savings

The following figures are based on a customer-support chatbot handling 100,000 queries per day, with an average of 600 input tokens and 400 output tokens per query, at Claude Sonnet pricing ($3/$15 per million input/output tokens).

Metric Without Cache With Semantic Cache (θ = 0.92) Improvement
Daily LLM API calls 100,000 ~42,000 −58%
Daily input token cost $180 ~$75 −58%
Daily output token cost $600 ~$252 −58%
Daily embedding cost (additional) $0 ~$1.50 +$1.50
Total daily cost $780 ~$328 −58% ($452/day saved)
p50 response latency 1,800 ms ~8 ms (cache hit) / 1,800 ms (miss) Cache hits: −99%
p99 response latency 4,200 ms ~25 ms (hit) / 4,200 ms (miss) Cache hits: −99%

The 58% hit rate comes from a customer support domain where users regularly ask variations of the same ~200 core questions. Hit rates will be lower for open-ended Q&A assistants (~15–30%) and higher for tightly-scoped FAQ bots (~65–80%).

The embedding cost ($1.50/day) is negligible at roughly 0.5% of the total API spend. Even at a 20% cache hit rate, the cache pays for itself by an order of magnitude.


10. Key Takeaways

  • Exact-match caching barely works for LLMs. Real users paraphrase constantly. Cache hit rates of 1–5% are typical with string-based caches on conversational traffic.
  • Semantic caching compares meaning, not text. Queries are embedded into vectors; a cache hit fires when cosine similarity ≥ θ. Hit rates of 30–65% are achievable in domain-specific deployments.
  • The similarity threshold is the most critical parameter. Start at 0.92 for most domains; calibrate by sampling real query pairs and measuring within-cluster vs across-cluster similarity to find the right gap.
  • Embedding cost is negligible. One all-MiniLM-L6-v2 embedding call takes ~5ms and costs a fraction of a cent per 1,000 queries. It is never the bottleneck.
  • Never cache user-specific or real-time queries. Cached responses are shared across users. Personal data queries, session-dependent follow-ups, and time-sensitive lookups must always bypass the cache.
  • TTL is your invalidation safety net. Set TTL to match the freshness requirement of your content. For stable FAQ content, 24-hour TTL is reasonable; for frequently updated knowledge bases, use 1–4 hours plus tag-based invalidation on content updates.
  • Scale with Redis HNSW. The in-memory linear scan works for prototyping and small caches (under 10,000 entries). Switch to Redis Stack's vector index for production at scale: O(log n) lookup with >99% recall.

References


Related Articles

OpenAI Agents SDK vs Anthropic SDK: A Technical Comparison
OpenAI Agents SDK vs Anthropic SDK: A Technical Comparison
OpenAI and Anthropic both now ship production-ready agent frameworks. This post compares...
Read More →
Model Context Protocol (MCP): A Complete Beginner's Guide
Model Context Protocol (MCP): A Complete Beginner's Guide
MCP is the USB-C port for AI applications — one protocol that...
Read More →
Found this useful?