Skip to content

Advanced Memory Vector Database Configuration

This guide covers advanced configuration patterns, custom implementations, and migration strategies for memory vector databases in Redis Agent Memory Server.

Advanced Factory Patterns

Environment-Based Configuration

# my_backends.py
import os
import json
from agent_memory_server.memory_vector_db import MemoryVectorDatabase

def create_configured_backend(embeddings) -> MemoryVectorDatabase:
    """Factory that reads configuration from environment."""
    config = json.loads(os.getenv("MEMORY_VECTOR_DB_CONFIG", "{}"))
    backend_type = os.getenv("BACKEND_TYPE", "redis")

    if backend_type == "redis":
        from agent_memory_server.memory_vector_db_factory import create_redis_memory_vector_db
        return create_redis_memory_vector_db(embeddings)
    elif backend_type == "custom":
        return MyCustomBackend(embeddings, **config)
    else:
        raise ValueError(f"Unsupported backend: {backend_type}")

Resilient Factory with Fallback

# resilient_factory.py
import logging
from agent_memory_server.memory_vector_db import MemoryVectorDatabase

logger = logging.getLogger(__name__)

def create_resilient_backend(embeddings) -> MemoryVectorDatabase:
    """Factory with fallback to default Redis backend."""
    try:
        # Try custom backend first
        return create_custom_backend(embeddings)
    except Exception as e:
        logger.warning(f"Custom backend failed: {e}, falling back to Redis")
        from agent_memory_server.memory_vector_db_factory import create_redis_memory_vector_db
        return create_redis_memory_vector_db(embeddings)

Custom MemoryVectorDatabase Implementation

Full Custom Implementation

# custom_backend.py
from agent_memory_server.memory_vector_db import MemoryVectorDatabase
from agent_memory_server.models import MemoryRecord, MemoryRecordResult, MemoryRecordResults
import logging

class AdvancedCustomBackend(MemoryVectorDatabase):
    """Advanced custom backend with caching and batch operations."""

    def __init__(self, embeddings, batch_size: int = 50):
        self.embeddings = embeddings
        self.logger = logging.getLogger(__name__)
        self._embedding_cache = {}
        self._batch_size = batch_size

    async def add_memories(self, memories: list[MemoryRecord]) -> list[str]:
        """Add memories with optimized batching."""
        if not memories:
            return []

        self.logger.info(f"Adding {len(memories)} memories in batches of {self._batch_size}")

        all_ids = []
        for i in range(0, len(memories), self._batch_size):
            batch = memories[i:i + self._batch_size]
            batch_ids = await self._add_memory_batch(batch)
            all_ids.extend(batch_ids)

        return all_ids

    async def _add_memory_batch(self, memories: list[MemoryRecord]) -> list[str]:
        """Add a batch of memories."""
        texts = [m.text for m in memories]

        # Generate embeddings
        embeddings = await self.embeddings.aembed_documents(texts)

        # Store in your backend
        ids = []
        for memory, embedding in zip(memories, embeddings):
            memory_id = await self._store_memory(memory, embedding)
            ids.append(memory_id)

        return ids

    async def _store_memory(self, memory: MemoryRecord, embedding: list[float]) -> str:
        """Store a single memory in your backend."""
        # Implement your storage logic here
        raise NotImplementedError

    async def search_memories(self, query: str, limit: int = 10, **kwargs) -> MemoryRecordResults:
        """Search with advanced filtering and ranking."""
        # Generate query embedding
        query_embedding = await self.embeddings.aembed_query(query)

        # Perform search in your backend
        results = await self._vector_search(query_embedding, limit, **kwargs)

        return results

    async def _vector_search(self, embedding: list[float], limit: int, **kwargs) -> MemoryRecordResults:
        """Perform vector search in your backend."""
        raise NotImplementedError

    async def delete_memories(self, memory_ids: list[str]) -> int:
        """Delete memories by ID."""
        raise NotImplementedError

    async def update_memories(self, memories: list[MemoryRecord]) -> int:
        """Update existing memories."""
        updated_ids = await self.add_memories(memories)
        return len(updated_ids)

    async def count_memories(self, **filter_kwargs) -> int:
        """Count memories matching filters."""
        results = await self.list_memories(limit=100000, **filter_kwargs)
        return results.total

    async def list_memories(self, offset: int = 0, limit: int = 100, **filter_kwargs) -> MemoryRecordResults:
        """List memories with optional filters."""
        raise NotImplementedError

def create_advanced_custom_backend(embeddings) -> AdvancedCustomBackend:
    """Factory for advanced custom backend."""
    return AdvancedCustomBackend(embeddings)

Migration Strategies

Data Export and Import

# migration_tools.py
import json
import asyncio
from datetime import datetime
from typing import Any
from agent_memory_client import MemoryAPIClient

class MemoryMigrator:
    """Tool for migrating data between memory vector databases."""

    def __init__(self, source_client: MemoryAPIClient, target_client: MemoryAPIClient):
        self.source = source_client
        self.target = target_client

    async def migrate_all_memories(self, batch_size: int = 100) -> dict[str, int]:
        """Migrate all memories from source to target."""
        print("Starting migration...")

        # Export all memories
        memories = await self.export_memories()
        print(f"Exported {len(memories)} memories")

        # Import in batches
        imported_count = await self.import_memories(memories, batch_size)

        # Verification
        verification_results = await self.verify_migration()

        return {
            "exported": len(memories),
            "imported": imported_count,
            "verification_passed": verification_results["success"],
            "missing_memories": verification_results["missing_count"]
        }

    async def export_memories(self, user_id: str = None, namespace: str = None) -> list[dict[str, Any]]:
        """Export memories from source system."""
        memories = []
        offset = 0
        batch_size = 1000

        while True:
            results = await self.source.search_long_term_memory(
                text="",
                user_id=user_id,
                namespace=namespace,
                limit=batch_size,
                offset=offset
            )

            if not results.memories:
                break

            for memory in results.memories:
                memory_dict = {
                    "id": memory.id,
                    "text": memory.text,
                    "memory_type": memory.memory_type,
                    "user_id": memory.user_id,
                    "session_id": memory.session_id,
                    "namespace": memory.namespace,
                    "topics": memory.topics,
                    "entities": memory.entities,
                    "created_at": memory.created_at.isoformat() if memory.created_at else None,
                    "updated_at": memory.updated_at.isoformat() if memory.updated_at else None,
                    "access_count": memory.access_count,
                    "pinned": getattr(memory, "pinned", False)
                }
                memories.append(memory_dict)

            offset += batch_size
            print(f"Exported {len(memories)} memories so far...")

        return memories

    async def import_memories(self, memories: list[dict[str, Any]], batch_size: int = 100) -> int:
        """Import memories to target system."""
        imported_count = 0

        for i in range(0, len(memories), batch_size):
            batch = memories[i:i + batch_size]
            memory_records = [{k: v for k, v in mem.items() if v is not None} for mem in batch]

            try:
                result = await self.target.create_long_term_memories(memory_records)
                imported_count += len(result.memories)
                print(f"Imported batch {i//batch_size + 1}: {len(result.memories)} memories")
                await asyncio.sleep(0.1)
            except Exception as e:
                print(f"Error importing batch {i//batch_size + 1}: {e}")

        return imported_count

    async def verify_migration(self, sample_size: int = 100) -> dict[str, Any]:
        """Verify migration by sampling memories."""
        source_sample = await self.source.search_long_term_memory(text="", limit=sample_size)

        missing_count = 0
        verified_count = 0

        for memory in source_sample.memories:
            target_results = await self.target.search_long_term_memory(
                text=memory.text[:100],
                user_id=memory.user_id,
                limit=5
            )

            found = any(
                result.id == memory.id or result.text == memory.text
                for result in target_results.memories
            )

            if found:
                verified_count += 1
            else:
                missing_count += 1

        success_rate = verified_count / len(source_sample.memories) if source_sample.memories else 0

        return {
            "success": success_rate > 0.95,
            "success_rate": success_rate,
            "verified_count": verified_count,
            "missing_count": missing_count,
            "sample_size": len(source_sample.memories)
        }

    async def export_to_file(self, filename: str, user_id: str = None, namespace: str = None):
        """Export memories to JSON file."""
        memories = await self.export_memories(user_id, namespace)

        export_data = {
            "export_timestamp": datetime.now().isoformat(),
            "total_count": len(memories),
            "user_id": user_id,
            "namespace": namespace,
            "memories": memories
        }

        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(export_data, f, indent=2, ensure_ascii=False)

        print(f"Exported {len(memories)} memories to {filename}")

    async def import_from_file(self, filename: str, batch_size: int = 100) -> int:
        """Import memories from JSON file."""
        with open(filename, 'r', encoding='utf-8') as f:
            export_data = json.load(f)

        memories = export_data.get("memories", [])
        print(f"Found {len(memories)} memories in {filename}")

        return await self.import_memories(memories, batch_size)