Advanced Memory Vector Database Configuration¶
This guide covers advanced configuration patterns, custom implementations, and migration strategies for memory vector databases in Redis Agent Memory Server.
Advanced Factory Patterns¶
Environment-Based Configuration¶
# my_backends.py
import os
import json
from agent_memory_server.memory_vector_db import MemoryVectorDatabase
def create_configured_backend(embeddings) -> MemoryVectorDatabase:
"""Factory that reads configuration from environment."""
config = json.loads(os.getenv("MEMORY_VECTOR_DB_CONFIG", "{}"))
backend_type = os.getenv("BACKEND_TYPE", "redis")
if backend_type == "redis":
from agent_memory_server.memory_vector_db_factory import create_redis_memory_vector_db
return create_redis_memory_vector_db(embeddings)
elif backend_type == "custom":
return MyCustomBackend(embeddings, **config)
else:
raise ValueError(f"Unsupported backend: {backend_type}")
Resilient Factory with Fallback¶
# resilient_factory.py
import logging
from agent_memory_server.memory_vector_db import MemoryVectorDatabase
logger = logging.getLogger(__name__)
def create_resilient_backend(embeddings) -> MemoryVectorDatabase:
"""Factory with fallback to default Redis backend."""
try:
# Try custom backend first
return create_custom_backend(embeddings)
except Exception as e:
logger.warning(f"Custom backend failed: {e}, falling back to Redis")
from agent_memory_server.memory_vector_db_factory import create_redis_memory_vector_db
return create_redis_memory_vector_db(embeddings)
Custom MemoryVectorDatabase Implementation¶
Full Custom Implementation¶
# custom_backend.py
from agent_memory_server.memory_vector_db import MemoryVectorDatabase
from agent_memory_server.models import MemoryRecord, MemoryRecordResult, MemoryRecordResults
import logging
class AdvancedCustomBackend(MemoryVectorDatabase):
"""Advanced custom backend with caching and batch operations."""
def __init__(self, embeddings, batch_size: int = 50):
self.embeddings = embeddings
self.logger = logging.getLogger(__name__)
self._embedding_cache = {}
self._batch_size = batch_size
async def add_memories(self, memories: list[MemoryRecord]) -> list[str]:
"""Add memories with optimized batching."""
if not memories:
return []
self.logger.info(f"Adding {len(memories)} memories in batches of {self._batch_size}")
all_ids = []
for i in range(0, len(memories), self._batch_size):
batch = memories[i:i + self._batch_size]
batch_ids = await self._add_memory_batch(batch)
all_ids.extend(batch_ids)
return all_ids
async def _add_memory_batch(self, memories: list[MemoryRecord]) -> list[str]:
"""Add a batch of memories."""
texts = [m.text for m in memories]
# Generate embeddings
embeddings = await self.embeddings.aembed_documents(texts)
# Store in your backend
ids = []
for memory, embedding in zip(memories, embeddings):
memory_id = await self._store_memory(memory, embedding)
ids.append(memory_id)
return ids
async def _store_memory(self, memory: MemoryRecord, embedding: list[float]) -> str:
"""Store a single memory in your backend."""
# Implement your storage logic here
raise NotImplementedError
async def search_memories(self, query: str, limit: int = 10, **kwargs) -> MemoryRecordResults:
"""Search with advanced filtering and ranking."""
# Generate query embedding
query_embedding = await self.embeddings.aembed_query(query)
# Perform search in your backend
results = await self._vector_search(query_embedding, limit, **kwargs)
return results
async def _vector_search(self, embedding: list[float], limit: int, **kwargs) -> MemoryRecordResults:
"""Perform vector search in your backend."""
raise NotImplementedError
async def delete_memories(self, memory_ids: list[str]) -> int:
"""Delete memories by ID."""
raise NotImplementedError
async def update_memories(self, memories: list[MemoryRecord]) -> int:
"""Update existing memories."""
updated_ids = await self.add_memories(memories)
return len(updated_ids)
async def count_memories(self, **filter_kwargs) -> int:
"""Count memories matching filters."""
results = await self.list_memories(limit=100000, **filter_kwargs)
return results.total
async def list_memories(self, offset: int = 0, limit: int = 100, **filter_kwargs) -> MemoryRecordResults:
"""List memories with optional filters."""
raise NotImplementedError
def create_advanced_custom_backend(embeddings) -> AdvancedCustomBackend:
"""Factory for advanced custom backend."""
return AdvancedCustomBackend(embeddings)
Migration Strategies¶
Data Export and Import¶
# migration_tools.py
import json
import asyncio
from datetime import datetime
from typing import Any
from agent_memory_client import MemoryAPIClient
class MemoryMigrator:
"""Tool for migrating data between memory vector databases."""
def __init__(self, source_client: MemoryAPIClient, target_client: MemoryAPIClient):
self.source = source_client
self.target = target_client
async def migrate_all_memories(self, batch_size: int = 100) -> dict[str, int]:
"""Migrate all memories from source to target."""
print("Starting migration...")
# Export all memories
memories = await self.export_memories()
print(f"Exported {len(memories)} memories")
# Import in batches
imported_count = await self.import_memories(memories, batch_size)
# Verification
verification_results = await self.verify_migration()
return {
"exported": len(memories),
"imported": imported_count,
"verification_passed": verification_results["success"],
"missing_memories": verification_results["missing_count"]
}
async def export_memories(self, user_id: str = None, namespace: str = None) -> list[dict[str, Any]]:
"""Export memories from source system."""
memories = []
offset = 0
batch_size = 1000
while True:
results = await self.source.search_long_term_memory(
text="",
user_id=user_id,
namespace=namespace,
limit=batch_size,
offset=offset
)
if not results.memories:
break
for memory in results.memories:
memory_dict = {
"id": memory.id,
"text": memory.text,
"memory_type": memory.memory_type,
"user_id": memory.user_id,
"session_id": memory.session_id,
"namespace": memory.namespace,
"topics": memory.topics,
"entities": memory.entities,
"created_at": memory.created_at.isoformat() if memory.created_at else None,
"updated_at": memory.updated_at.isoformat() if memory.updated_at else None,
"access_count": memory.access_count,
"pinned": getattr(memory, "pinned", False)
}
memories.append(memory_dict)
offset += batch_size
print(f"Exported {len(memories)} memories so far...")
return memories
async def import_memories(self, memories: list[dict[str, Any]], batch_size: int = 100) -> int:
"""Import memories to target system."""
imported_count = 0
for i in range(0, len(memories), batch_size):
batch = memories[i:i + batch_size]
memory_records = [{k: v for k, v in mem.items() if v is not None} for mem in batch]
try:
result = await self.target.create_long_term_memories(memory_records)
imported_count += len(result.memories)
print(f"Imported batch {i//batch_size + 1}: {len(result.memories)} memories")
await asyncio.sleep(0.1)
except Exception as e:
print(f"Error importing batch {i//batch_size + 1}: {e}")
return imported_count
async def verify_migration(self, sample_size: int = 100) -> dict[str, Any]:
"""Verify migration by sampling memories."""
source_sample = await self.source.search_long_term_memory(text="", limit=sample_size)
missing_count = 0
verified_count = 0
for memory in source_sample.memories:
target_results = await self.target.search_long_term_memory(
text=memory.text[:100],
user_id=memory.user_id,
limit=5
)
found = any(
result.id == memory.id or result.text == memory.text
for result in target_results.memories
)
if found:
verified_count += 1
else:
missing_count += 1
success_rate = verified_count / len(source_sample.memories) if source_sample.memories else 0
return {
"success": success_rate > 0.95,
"success_rate": success_rate,
"verified_count": verified_count,
"missing_count": missing_count,
"sample_size": len(source_sample.memories)
}
async def export_to_file(self, filename: str, user_id: str = None, namespace: str = None):
"""Export memories to JSON file."""
memories = await self.export_memories(user_id, namespace)
export_data = {
"export_timestamp": datetime.now().isoformat(),
"total_count": len(memories),
"user_id": user_id,
"namespace": namespace,
"memories": memories
}
with open(filename, 'w', encoding='utf-8') as f:
json.dump(export_data, f, indent=2, ensure_ascii=False)
print(f"Exported {len(memories)} memories to {filename}")
async def import_from_file(self, filename: str, batch_size: int = 100) -> int:
"""Import memories from JSON file."""
with open(filename, 'r', encoding='utf-8') as f:
export_data = json.load(f)
memories = export_data.get("memories", [])
print(f"Found {len(memories)} memories in {filename}")
return await self.import_memories(memories, batch_size)