Advanced Vector Store Configuration¶
This guide covers advanced configuration patterns, performance optimization, custom implementations, and migration strategies for vector store backends in Redis Agent Memory Server.
Advanced Factory Patterns¶
Multi-Backend Hybrid Factory¶
Combine multiple backends for different use cases:
# hybrid_factory.py
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
from typing import Dict, Any
class HybridVectorStore(VectorStore):
"""Hybrid vectorstore that routes operations based on content type."""
def __init__(self, embeddings: Embeddings):
self.embeddings = embeddings
self.fast_store = self._create_fast_store(embeddings) # Redis for recent data
self.archive_store = self._create_archive_store(embeddings) # S3/cheaper storage
def _create_fast_store(self, embeddings: Embeddings) -> VectorStore:
"""Create fast vectorstore for recent/active memories."""
from langchain_redis import Redis as LangchainRedis
return LangchainRedis(
redis_url=os.getenv("REDIS_URL"),
index_name="fast_memories",
embeddings=embeddings
)
def _create_archive_store(self, embeddings: Embeddings) -> VectorStore:
"""Create archive vectorstore for old/inactive memories."""
from langchain_chroma import Chroma
return Chroma(
persist_directory=os.getenv("ARCHIVE_PERSIST_DIR", "./archive"),
collection_name="archived_memories",
embedding_function=embeddings
)
def add_texts(self, texts: list[str], metadatas: list[dict] = None, **kwargs):
"""Add texts to appropriate store based on metadata."""
if not metadatas:
metadatas = [{}] * len(texts)
# Route based on memory age or access patterns
fast_texts, fast_meta = [], []
archive_texts, archive_meta = [], []
for text, meta in zip(texts, metadatas):
# Route recent or high-access memories to fast store
if self._should_use_fast_store(meta):
fast_texts.append(text)
fast_meta.append(meta)
else:
archive_texts.append(text)
archive_meta.append(meta)
# Add to appropriate stores
results = []
if fast_texts:
results.extend(self.fast_store.add_texts(fast_texts, fast_meta, **kwargs))
if archive_texts:
results.extend(self.archive_store.add_texts(archive_texts, archive_meta, **kwargs))
return results
def similarity_search(self, query: str, k: int = 4, **kwargs):
"""Search both stores and combine results."""
# Search fast store first (likely to have relevant recent data)
fast_results = self.fast_store.similarity_search(query, k=k//2, **kwargs)
archive_results = self.archive_store.similarity_search(query, k=k//2, **kwargs)
# Combine and re-rank results
all_results = fast_results + archive_results
return all_results[:k]
def _should_use_fast_store(self, metadata: dict) -> bool:
"""Determine if memory should go to fast store."""
# Example routing logic
access_count = metadata.get("access_count", 0)
created_days_ago = self._days_since_created(metadata.get("created_at"))
return access_count > 5 or created_days_ago < 30
def _days_since_created(self, created_at: str) -> float:
"""Calculate days since creation."""
if not created_at:
return float('inf')
# Implementation depends on your timestamp format
return 0.0 # Placeholder
def create_hybrid_vectorstore(embeddings: Embeddings) -> HybridVectorStore:
"""Factory for hybrid vectorstore."""
return HybridVectorStore(embeddings)
Custom Adapter Implementation¶
Advanced Custom Adapter¶
# custom_advanced_adapter.py
from agent_memory_server.vectorstore_adapter import VectorStoreAdapter
from agent_memory_server.models import MemoryRecord, MemoryRecordResult
from langchain_core.embeddings import Embeddings
from langchain_core.documents import Document
from typing import Optional, List
import logging
class AdvancedCustomAdapter(VectorStoreAdapter):
"""Advanced custom adapter with caching and batch operations."""
def __init__(self, vectorstore, embeddings: Embeddings):
super().__init__(vectorstore, embeddings)
self.logger = logging.getLogger(__name__)
self._embedding_cache = {}
self._batch_size = 50
async def add_memories(self, memories: List[MemoryRecord]) -> List[str]:
"""Add memories with optimized batching and caching."""
if not memories:
return []
self.logger.info(f"Adding {len(memories)} memories in batches of {self._batch_size}")
all_ids = []
# Process in batches
for i in range(0, len(memories), self._batch_size):
batch = memories[i:i + self._batch_size]
batch_ids = await self._add_memory_batch(batch)
all_ids.extend(batch_ids)
return all_ids
async def _add_memory_batch(self, memories: List[MemoryRecord]) -> List[str]:
"""Add a batch of memories with optimizations."""
# Prepare documents
documents = []
for memory in memories:
# Generate embeddings with caching
embedding = await self._get_cached_embedding(memory.text)
doc = Document(
id=memory.id,
page_content=memory.text,
metadata=self._prepare_metadata(memory)
)
documents.append(doc)
# Add to vectorstore
try:
if hasattr(self.vectorstore, 'aadd_documents'):
return await self.vectorstore.aadd_documents(documents)
else:
return self.vectorstore.add_documents(documents)
except Exception as e:
self.logger.error(f"Error adding batch: {e}")
raise
async def _get_cached_embedding(self, text: str) -> List[float]:
"""Get embedding with caching for performance."""
text_hash = hash(text)
if text_hash in self._embedding_cache:
return self._embedding_cache[text_hash]
# Generate new embedding
if hasattr(self.embeddings, 'aembed_query'):
embedding = await self.embeddings.aembed_query(text)
else:
embedding = self.embeddings.embed_query(text)
# Cache with size limit
if len(self._embedding_cache) < 1000: # Limit cache size
self._embedding_cache[text_hash] = embedding
return embedding
def _prepare_metadata(self, memory: MemoryRecord) -> dict:
"""Prepare metadata optimized for the specific backend."""
metadata = {
"id_": memory.id,
"user_id": memory.user_id,
"namespace": memory.namespace or "default",
"memory_type": memory.memory_type.value,
"created_at": memory.created_at.isoformat() if memory.created_at else None,
"access_count": memory.access_count or 0,
"pinned": getattr(memory, "pinned", False)
}
# Add topics and entities if present
if memory.topics:
metadata["topics"] = memory.topics[:10] # Limit array size
if memory.entities:
metadata["entities"] = memory.entities[:10]
# Remove None values
return {k: v for k, v in metadata.items() if v is not None}
async def search_memories(
self,
query: str,
limit: int = 10,
namespace: Optional[str] = None,
user_id: Optional[str] = None,
**kwargs
) -> MemoryRecordResult:
"""Search with advanced filtering and ranking."""
# Build filter conditions
filters = {}
if namespace:
filters["namespace"] = namespace
if user_id:
filters["user_id"] = user_id
# Perform search with retry logic
max_retries = 3
for attempt in range(max_retries):
try:
# Custom search implementation based on your vectorstore
results = await self._perform_search(query, limit, filters, **kwargs)
# Post-process results
processed_results = self._post_process_results(results, query)
return MemoryRecordResult(
memories=processed_results[:limit],
total_count=len(processed_results)
)
except Exception as e:
if attempt < max_retries - 1:
self.logger.warning(f"Search attempt {attempt + 1} failed: {e}, retrying...")
await asyncio.sleep(0.5 * (attempt + 1)) # Exponential backoff
else:
self.logger.error(f"Search failed after {max_retries} attempts: {e}")
raise
async def _perform_search(self, query: str, limit: int, filters: dict, **kwargs):
"""Perform the actual search operation."""
# Implementation depends on your specific vectorstore
# This is a template - implement based on your backend
if hasattr(self.vectorstore, 'asimilarity_search'):
return await self.vectorstore.asimilarity_search(
query=query,
k=limit,
filter=filters,
**kwargs
)
else:
return self.vectorstore.similarity_search(
query=query,
k=limit,
filter=filters,
**kwargs
)
def _post_process_results(self, results: List[Document], query: str) -> List[MemoryRecord]:
"""Post-process search results for optimization."""
processed = []
for doc in results:
try:
# Convert document back to MemoryRecord
memory = self._document_to_memory(doc)
# Add computed relevance score if not present
if not hasattr(memory, 'relevance_score'):
memory.relevance_score = self._calculate_relevance(doc, query)
processed.append(memory)
except Exception as e:
self.logger.warning(f"Error processing result: {e}")
continue
# Sort by relevance
processed.sort(key=lambda x: getattr(x, 'relevance_score', 0), reverse=True)
return processed
def _calculate_relevance(self, doc: Document, query: str) -> float:
"""Calculate custom relevance score."""
# Simple text similarity as fallback
# Replace with more sophisticated scoring if needed
text_lower = doc.page_content.lower()
query_lower = query.lower()
# Basic keyword matching score
query_words = set(query_lower.split())
text_words = set(text_lower.split())
if not query_words:
return 0.0
intersection = query_words.intersection(text_words)
return len(intersection) / len(query_words)
def create_advanced_custom_adapter(embeddings: Embeddings) -> AdvancedCustomAdapter:
"""Factory for advanced custom adapter."""
# Use any vectorstore backend
from langchain_chroma import Chroma
vectorstore = Chroma(
persist_directory=os.getenv("CUSTOM_PERSIST_DIR", "./custom_data"),
collection_name="advanced_memories",
embedding_function=embeddings
)
return AdvancedCustomAdapter(vectorstore, embeddings)
Migration Strategies¶
Data Export and Import¶
# migration_tools.py
import json
import asyncio
from datetime import datetime
from typing import List, Dict, Any
from agent_memory_client import MemoryAPIClient
class VectorStoreMigrator:
"""Tool for migrating data between vector stores."""
def __init__(self, source_client: MemoryAPIClient, target_client: MemoryAPIClient):
self.source = source_client
self.target = target_client
async def migrate_all_memories(self, batch_size: int = 100) -> Dict[str, int]:
"""Migrate all memories from source to target."""
print("Starting migration...")
# Export all memories
memories = await self.export_memories()
print(f"Exported {len(memories)} memories")
# Import in batches
imported_count = await self.import_memories(memories, batch_size)
# Verification
verification_results = await self.verify_migration()
return {
"exported": len(memories),
"imported": imported_count,
"verification_passed": verification_results["success"],
"missing_memories": verification_results["missing_count"]
}
async def export_memories(self, user_id: str = None, namespace: str = None) -> List[Dict[str, Any]]:
"""Export memories from source system."""
memories = []
offset = 0
batch_size = 1000
while True:
# Search with pagination
results = await self.source.search_long_term_memory(
text="", # Empty query to get all
user_id=user_id,
namespace=namespace,
limit=batch_size,
offset=offset
)
if not results.memories:
break
# Convert to exportable format
for memory in results.memories:
memory_dict = {
"id": memory.id,
"text": memory.text,
"memory_type": memory.memory_type,
"user_id": memory.user_id,
"session_id": memory.session_id,
"namespace": memory.namespace,
"topics": memory.topics,
"entities": memory.entities,
"created_at": memory.created_at.isoformat() if memory.created_at else None,
"updated_at": memory.updated_at.isoformat() if memory.updated_at else None,
"access_count": memory.access_count,
"pinned": getattr(memory, "pinned", False)
}
memories.append(memory_dict)
offset += batch_size
print(f"Exported {len(memories)} memories so far...")
return memories
async def import_memories(self, memories: List[Dict[str, Any]], batch_size: int = 100) -> int:
"""Import memories to target system."""
imported_count = 0
for i in range(0, len(memories), batch_size):
batch = memories[i:i + batch_size]
# Convert to MemoryRecord format
memory_records = []
for mem_dict in batch:
# Remove None values and prepare for import
clean_dict = {k: v for k, v in mem_dict.items() if v is not None}
memory_records.append(clean_dict)
try:
# Import batch
result = await self.target.create_long_term_memories(memory_records)
imported_count += len(result.memories)
print(f"Imported batch {i//batch_size + 1}: {len(result.memories)} memories")
# Small delay to avoid overwhelming the target system
await asyncio.sleep(0.1)
except Exception as e:
print(f"Error importing batch {i//batch_size + 1}: {e}")
# Continue with next batch
return imported_count
async def verify_migration(self, sample_size: int = 100) -> Dict[str, Any]:
"""Verify migration by sampling memories."""
# Get sample from source
source_sample = await self.source.search_long_term_memory(
text="",
limit=sample_size
)
missing_count = 0
verified_count = 0
for memory in source_sample.memories:
# Try to find in target
target_results = await self.target.search_long_term_memory(
text=memory.text[:100], # Use first 100 chars for matching
user_id=memory.user_id,
limit=5
)
# Look for exact match
found = any(
result.id == memory.id or result.text == memory.text
for result in target_results.memories
)
if found:
verified_count += 1
else:
missing_count += 1
success_rate = verified_count / len(source_sample.memories) if source_sample.memories else 0
return {
"success": success_rate > 0.95, # 95% success threshold
"success_rate": success_rate,
"verified_count": verified_count,
"missing_count": missing_count,
"sample_size": len(source_sample.memories)
}
async def export_to_file(self, filename: str, user_id: str = None, namespace: str = None):
"""Export memories to JSON file."""
memories = await self.export_memories(user_id, namespace)
export_data = {
"export_timestamp": datetime.now().isoformat(),
"total_count": len(memories),
"user_id": user_id,
"namespace": namespace,
"memories": memories
}
with open(filename, 'w', encoding='utf-8') as f:
json.dump(export_data, f, indent=2, ensure_ascii=False)
print(f"Exported {len(memories)} memories to {filename}")
async def import_from_file(self, filename: str, batch_size: int = 100) -> int:
"""Import memories from JSON file."""
with open(filename, 'r', encoding='utf-8') as f:
export_data = json.load(f)
memories = export_data.get("memories", [])
print(f"Found {len(memories)} memories in {filename}")
return await self.import_memories(memories, batch_size)
# Usage example
async def migrate_pinecone_to_redis():
"""Example: Migrate from Pinecone to Redis."""
# Source (Pinecone)
source_client = MemoryAPIClient(
base_url="http://localhost:8000", # Current Pinecone setup
)
# Target (Redis) - New Redis-based setup
target_client = MemoryAPIClient(
base_url="http://localhost:8001", # New Redis setup
)
migrator = VectorStoreMigrator(source_client, target_client)
# Option 1: Direct migration
results = await migrator.migrate_all_memories(batch_size=50)
print(f"Migration results: {results}")
# Option 2: File-based migration (safer for large datasets)
await migrator.export_to_file("memory_export.json")
# ... Stop old server, start new server with Redis backend ...
imported = await migrator.import_from_file("memory_export.json")
print(f"Imported {imported} memories from file")
Zero-Downtime Migration¶
# zero_downtime_migration.py
import asyncio
from datetime import datetime, timedelta
from typing import Set
class ZeroDowntimeMigrator:
"""Perform migration with zero downtime using dual-write strategy."""
def __init__(self, primary_client: MemoryAPIClient, secondary_client: MemoryAPIClient):
self.primary = primary_client
self.secondary = secondary_client
self.migration_start_time = None
async def start_dual_write_migration(self):
"""Start dual-write phase of migration."""
self.migration_start_time = datetime.now()
print(f"Starting dual-write migration at {self.migration_start_time}")
# Phase 1: Start writing to both systems
print("Phase 1: Enabling dual writes...")
await self._enable_dual_writes()
# Phase 2: Backfill historical data
print("Phase 2: Backfilling historical data...")
await self._backfill_historical_data()
# Phase 3: Verify consistency
print("Phase 3: Verifying data consistency...")
consistency_check = await self._verify_consistency()
if consistency_check["success"]:
print("✅ Migration ready for cutover")
return True
else:
print("❌ Consistency check failed")
return False
async def _enable_dual_writes(self):
"""Configure system to write to both primary and secondary."""
# This would require modification to the memory server
# to support dual writes during migration
pass
async def _backfill_historical_data(self):
"""Copy all historical data to secondary system."""
migrator = VectorStoreMigrator(self.primary, self.secondary)
# Only migrate data created before migration start
cutoff_time = self.migration_start_time
print(f"Backfilling data created before {cutoff_time}")
# Export historical memories
memories = []
offset = 0
batch_size = 1000
while True:
results = await self.primary.search_long_term_memory(
text="",
limit=batch_size,
offset=offset,
created_before=cutoff_time # Only historical data
)
if not results.memories:
break
memories.extend(results.memories)
offset += batch_size
print(f"Collected {len(memories)} historical memories...")
# Import to secondary
imported = await migrator.import_memories(
[self._memory_to_dict(mem) for mem in memories],
batch_size=100
)
print(f"Backfilled {imported} historical memories")
async def _verify_consistency(self) -> dict:
"""Verify both systems have consistent data."""
# Sample recent memories from both systems
sample_size = 1000
primary_memories = await self.primary.search_long_term_memory(
text="",
limit=sample_size,
created_after=self.migration_start_time - timedelta(hours=1)
)
secondary_memories = await self.secondary.search_long_term_memory(
text="",
limit=sample_size,
created_after=self.migration_start_time - timedelta(hours=1)
)
# Compare memory IDs
primary_ids = {mem.id for mem in primary_memories.memories}
secondary_ids = {mem.id for mem in secondary_memories.memories}
missing_in_secondary = primary_ids - secondary_ids
extra_in_secondary = secondary_ids - primary_ids
consistency_rate = len(primary_ids.intersection(secondary_ids)) / len(primary_ids) if primary_ids else 1.0
return {
"success": consistency_rate > 0.98, # 98% consistency threshold
"consistency_rate": consistency_rate,
"missing_in_secondary": len(missing_in_secondary),
"extra_in_secondary": len(extra_in_secondary),
"primary_count": len(primary_ids),
"secondary_count": len(secondary_ids)
}
def _memory_to_dict(self, memory) -> dict:
"""Convert memory object to dictionary."""
return {
"id": memory.id,
"text": memory.text,
"memory_type": memory.memory_type,
"user_id": memory.user_id,
"session_id": memory.session_id,
"namespace": memory.namespace,
"topics": memory.topics,
"entities": memory.entities,
"created_at": memory.created_at.isoformat() if memory.created_at else None,
"updated_at": memory.updated_at.isoformat() if memory.updated_at else None,
"access_count": memory.access_count,
"pinned": getattr(memory, "pinned", False)
}
async def complete_cutover(self):
"""Complete migration by switching traffic to secondary."""
print("Completing cutover to secondary system...")
# Final consistency check
final_check = await self._verify_consistency()
if not final_check["success"]:
raise Exception("Final consistency check failed - aborting cutover")
# At this point, you would:
# 1. Update configuration to use secondary as primary
# 2. Stop dual writes
# 3. Decommission old primary
print("✅ Cutover completed successfully")
return final_check