Skip to main content
AsyncMemory gives you a non-blocking interface to Mem0’s storage layer so Python applications can add, search, and manage memories directly from async code. Use it when you embed Mem0 inside FastAPI services, background workers, or any workflow that relies on asyncio.
You’ll use this when…
  • Your agent already runs in an async framework and you need memory calls to await cleanly.
  • You want to embed Mem0’s storage locally without sending requests through the synchronous client.
  • You plan to mix memory operations with other async APIs (OpenAI, HTTP calls, databases).
AsyncMemory expects a running event loop. Always call it inside async def functions or through helpers like asyncio.run() to avoid runtime errors.
Working in TypeScript? The Node SDK still uses synchronous calls—use Memory there and rely on Python’s AsyncMemory when you need awaited operations.

Feature anatomy

  • Direct storage access: AsyncMemory talks to the same backends as the synchronous client but keeps everything in-process for lower latency.
  • Method parity: Each memory operation (add, search, get_all, delete, etc.) mirrors the synchronous API, letting you reuse payload shapes.
  • Concurrent execution: Non-blocking I/O lets you schedule multiple memory tasks with asyncio.gather.
  • Scoped organization: Continue using user_id, agent_id, and run_id to separate memories across sessions and agents.
OperationAsync signatureNotes
Create memoriesawait memory.add(...)Same arguments as synchronous Memory.add.
Search memoriesawait memory.search(...)Returns dict with results, identical shape.
List memoriesawait memory.get_all(...)Filter by user_id, agent_id, run_id.
Retrieve memoryawait memory.get(memory_id=...)Raises ValueError if ID is invalid.
Update memoryawait memory.update(memory_id=..., data=...)Accepts partial updates.
Delete memoryawait memory.delete(memory_id=...)Returns confirmation payload.
Delete in bulkawait memory.delete_all(...)Requires at least one scope filter.
Historyawait memory.history(memory_id=...)Fetches change log for auditing.

Configure it

Initialize the client

import asyncio
from mem0 import AsyncMemory

# Default configuration
memory = AsyncMemory()

# Custom configuration
from mem0.configs.base import MemoryConfig
custom_config = MemoryConfig(
    # Your custom configuration here
)
memory = AsyncMemory(config=custom_config)
Run await memory.search(...) once right after initialization. If it returns memories without errors, your configuration works.
Keep configuration objects close to the async client so you can reuse them across workers without recreating vector store connections.

Manage lifecycle and concurrency

import asyncio
from contextlib import asynccontextmanager
from mem0 import AsyncMemory

@asynccontextmanager
async def get_memory():
    memory = AsyncMemory()
    try:
        yield memory
    finally:
        # Clean up resources if needed
        pass

async def safe_memory_usage():
    async with get_memory() as memory:
        return await memory.search("test query", user_id="alice")
Wrap the client in an async context manager when you need a clean shutdown (for example, inside FastAPI startup/shutdown hooks).
async def batch_operations():
    memory = AsyncMemory()

    tasks = [
        memory.add(
            messages=[{"role": "user", "content": f"Message {i}"}],
            user_id=f"user_{i}"
        )
        for i in range(5)
    ]

    results = await asyncio.gather(*tasks, return_exceptions=True)
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} completed successfully")
When concurrency works correctly, successful tasks return memory IDs while failures surface as exceptions in the results list.

Add resilience with retries

import asyncio
from mem0 import AsyncMemory

async def with_timeout_and_retry(operation, max_retries=3, timeout=10.0):
    for attempt in range(max_retries):
        try:
            return await asyncio.wait_for(operation(), timeout=timeout)
        except asyncio.TimeoutError:
            print(f"Timeout on attempt {attempt + 1}")
        except Exception as exc:
            print(f"Error on attempt {attempt + 1}: {exc}")

        if attempt < max_retries - 1:
            await asyncio.sleep(2 ** attempt)

    raise Exception(f"Operation failed after {max_retries} attempts")

async def robust_memory_search():
    memory = AsyncMemory()

    async def search_operation():
        return await memory.search("test query", user_id="alice")

    return await with_timeout_and_retry(search_operation)
Always cap retries—runaway loops can keep the event loop busy and block other tasks.

See it in action

Core operations

# Create memories
result = await memory.add(
    messages=[
        {"role": "user", "content": "I'm travelling to SF"},
        {"role": "assistant", "content": "That's great to hear!"}
    ],
    user_id="alice"
)

# Search memories
results = await memory.search(
    query="Where am I travelling?",
    user_id="alice"
)

# List memories
all_memories = await memory.get_all(user_id="alice")

# Get a specific memory
specific_memory = await memory.get(memory_id="memory-id-here")

# Update a memory
updated_memory = await memory.update(
    memory_id="memory-id-here",
    data="I'm travelling to Seattle"
)

# Delete a memory
await memory.delete(memory_id="memory-id-here")

# Delete scoped memories
await memory.delete_all(user_id="alice")
Confirm each call returns the same response fields as the synchronous client (IDs, results, or confirmation objects). Missing keys usually mean the coroutine wasn’t awaited.
delete_all requires at least one of user_id, agent_id, or run_id. Provide all three to narrow deletion to a single session.

Scoped organization

await memory.add(
    messages=[{"role": "user", "content": "I prefer vegetarian food"}],
    user_id="alice",
    agent_id="diet-assistant",
    run_id="consultation-001"
)

all_user_memories = await memory.get_all(user_id="alice")
agent_memories = await memory.get_all(user_id="alice", agent_id="diet-assistant")
session_memories = await memory.get_all(user_id="alice", run_id="consultation-001")
specific_memories = await memory.get_all(
    user_id="alice",
    agent_id="diet-assistant",
    run_id="consultation-001"
)

history = await memory.history(memory_id="memory-id-here")
Use history when you need audit trails for compliance or debugging update logic.

Blend with other async APIs

import asyncio
from openai import AsyncOpenAI
from mem0 import AsyncMemory

async_openai_client = AsyncOpenAI()
async_memory = AsyncMemory()

async def chat_with_memories(message: str, user_id: str = "default_user") -> str:
    search_result = await async_memory.search(query=message, user_id=user_id, limit=3)
    relevant_memories = search_result["results"]
    memories_str = "\n".join(f"- {entry['memory']}" for entry in relevant_memories)

    system_prompt = (
        "You are a helpful AI. Answer the question based on query and memories.\n"
        f"User Memories:\n{memories_str}"
    )

    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": message},
    ]

    response = await async_openai_client.chat.completions.create(
        model="gpt-4.1-nano-2025-04-14",
        messages=messages
    )

    assistant_response = response.choices[0].message.content
    messages.append({"role": "assistant", "content": assistant_response})
    await async_memory.add(messages, user_id=user_id)

    return assistant_response
When everything is wired correctly, the OpenAI response should incorporate recent memories and the follow-up add call should persist the new assistant turn.

Handle errors gracefully

from mem0 import AsyncMemory
from mem0.configs.base import MemoryConfig

async def handle_initialization_errors():
    try:
        config = MemoryConfig(
            vector_store={"provider": "chroma", "config": {"path": "./chroma_db"}},
            llm={"provider": "openai", "config": {"model": "gpt-4.1-nano-2025-04-14"}}
        )
        AsyncMemory(config=config)
        print("AsyncMemory initialized successfully")
    except ValueError as err:
        print(f"Configuration error: {err}")
    except ConnectionError as err:
        print(f"Connection error: {err}")

async def handle_memory_operation_errors():
    memory = AsyncMemory()
    try:
        await memory.get(memory_id="non-existent-id")
    except ValueError as err:
        print(f"Invalid memory ID: {err}")

    try:
        await memory.search(query="", user_id="alice")
    except ValueError as err:
        print(f"Invalid search query: {err}")
Catch and log ValueError exceptions from invalid inputs—async stack traces can otherwise disappear inside background tasks.

Serve through FastAPI

from fastapi import FastAPI, HTTPException
from mem0 import AsyncMemory

app = FastAPI()
memory = AsyncMemory()

@app.post("/memories/")
async def add_memory(messages: list, user_id: str):
    try:
        result = await memory.add(messages=messages, user_id=user_id)
        return {"status": "success", "data": result}
    except Exception as exc:
        raise HTTPException(status_code=500, detail=str(exc))

@app.get("/memories/search")
async def search_memories(query: str, user_id: str, limit: int = 10):
    try:
        result = await memory.search(query=query, user_id=user_id, limit=limit)
        return {"status": "success", "data": result}
    except Exception as exc:
        raise HTTPException(status_code=500, detail=str(exc))
Create one AsyncMemory instance per process when using FastAPI—startup hooks are a good place to configure and reuse it.

Instrument logging

import logging
import time
from functools import wraps

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def log_async_operation(operation_name):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start_time = time.time()
            logger.info(f"Starting {operation_name}")
            try:
                result = await func(*args, **kwargs)
                duration = time.time() - start_time
                logger.info(f"{operation_name} completed in {duration:.2f}s")
                return result
            except Exception as exc:
                duration = time.time() - start_time
                logger.error(f"{operation_name} failed after {duration:.2f}s: {exc}")
                raise
        return wrapper
    return decorator

@log_async_operation("Memory Add")
async def logged_memory_add(memory, messages, user_id):
    return await memory.add(messages=messages, user_id=user_id)
Logged durations give you the baseline needed to spot regressions once AsyncMemory is in production.

Verify the feature is working

  • Run a quick add/search cycle and confirm the returned memory content matches your input.
  • Inspect application logs to ensure async tasks complete without blocking the event loop.
  • In FastAPI or other frameworks, hit health endpoints to verify the shared client handles concurrent requests.
  • Monitor retry counters—unexpected spikes indicate configuration or connectivity issues.

Best practices

  1. Keep operations awaited: Forgetting await is the fastest way to miss writes—lint for it or add helper wrappers.
  2. Scope deletions carefully: Always supply user_id, agent_id, or run_id to avoid purging too much data.
  3. Batch writes thoughtfully: Use asyncio.gather for throughput but cap concurrency based on backend capacity.
  4. Log errors with context: Capture user and agent scopes to triage failures quickly.
  5. Reuse clients: Instantiate AsyncMemory once per worker to avoid repeated backend handshakes.

Troubleshooting

IssuePossible causesFix
Initialization failsMissing dependencies, invalid configValidate MemoryConfig settings and environment variables.
Slow operationsLarge datasets, network latencyCache heavy queries and tune vector store parameters.
Memory not foundInvalid ID or deleted recordCheck ID source and handle soft-deleted states.
Connection timeoutsNetwork issues, overloaded backendApply retries/backoff and inspect infrastructure health.
Out-of-memory errorsOversized batchesReduce concurrency or chunk operations into smaller sets.