repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | import asyncio |
| #2 | import concurrent |
| #3 | import gc |
| #4 | import hashlib |
| #5 | import json |
| #6 | import logging |
| #7 | import os |
| #8 | import uuid |
| #9 | import warnings |
| #10 | from copy import deepcopy |
| #11 | from datetime import datetime |
| #12 | from typing import Any, Dict, Optional |
| #13 | |
| #14 | import pytz |
| #15 | from pydantic import ValidationError |
| #16 | |
| #17 | from mem0.configs.base import MemoryConfig, MemoryItem |
| #18 | from mem0.configs.enums import MemoryType |
| #19 | from mem0.configs.prompts import ( |
| #20 | PROCEDURAL_MEMORY_SYSTEM_PROMPT, |
| #21 | get_update_memory_messages, |
| #22 | ) |
| #23 | from mem0.exceptions import ValidationError as Mem0ValidationError |
| #24 | from mem0.memory.base import MemoryBase |
| #25 | from mem0.memory.setup import mem0_dir, setup_config |
| #26 | from mem0.memory.storage import SQLiteManager |
| #27 | from mem0.memory.telemetry import capture_event |
| #28 | from mem0.memory.utils import ( |
| #29 | extract_json, |
| #30 | get_fact_retrieval_messages, |
| #31 | parse_messages, |
| #32 | parse_vision_messages, |
| #33 | process_telemetry_filters, |
| #34 | remove_code_blocks, |
| #35 | ) |
| #36 | from mem0.utils.factory import ( |
| #37 | EmbedderFactory, |
| #38 | GraphStoreFactory, |
| #39 | LlmFactory, |
| #40 | VectorStoreFactory, |
| #41 | RerankerFactory, |
| #42 | ) |
| #43 | |
| #44 | # Suppress SWIG deprecation warnings globally |
| #45 | warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*SwigPy.*") |
| #46 | warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*swigvarlink.*") |
| #47 | |
| #48 | # Initialize logger early for util functions |
| #49 | logger = logging.getLogger(__name__) |
| #50 | |
| #51 | |
| #52 | def _safe_deepcopy_config(config): |
| #53 | """Safely deepcopy config, falling back to JSON serialization for non-serializable objects.""" |
| #54 | try: |
| #55 | return deepcopy(config) |
| #56 | except Exception as e: |
| #57 | logger.debug(f"Deepcopy failed, using JSON serialization: {e}") |
| #58 | |
| #59 | config_class = type(config) |
| #60 | |
| #61 | if hasattr(config, "model_dump"): |
| #62 | try: |
| #63 | clone_dict = config.model_dump(mode="json") |
| #64 | except Exception: |
| #65 | clone_dict = {k: v for k, v in config.__dict__.items()} |
| #66 | elif hasattr(config, "__dataclass_fields__"): |
| #67 | from dataclasses import asdict |
| #68 | clone_dict = asdict(config) |
| #69 | else: |
| #70 | clone_dict = {k: v for k, v in config.__dict__.items()} |
| #71 | |
| #72 | sensitive_tokens = ("auth", "credential", "password", "token", "secret", "key", "connection_class") |
| #73 | for field_name in list(clone_dict.keys()): |
| #74 | if any(token in field_name.lower() for token in sensitive_tokens): |
| #75 | clone_dict[field_name] = None |
| #76 | |
| #77 | try: |
| #78 | return config_class(**clone_dict) |
| #79 | except Exception as reconstruction_error: |
| #80 | logger.warning( |
| #81 | f"Failed to reconstruct config: {reconstruction_error}. " |
| #82 | f"Telemetry may be affected." |
| #83 | ) |
| #84 | raise |
| #85 | |
| #86 | |
| #87 | def _build_filters_and_metadata( |
| #88 | *, # Enforce keyword-only arguments |
| #89 | user_id: Optional[str] = None, |
| #90 | agent_id: Optional[str] = None, |
| #91 | run_id: Optional[str] = None, |
| #92 | actor_id: Optional[str] = None, # For query-time filtering |
| #93 | input_metadata: Optional[Dict[str, Any]] = None, |
| #94 | input_filters: Optional[Dict[str, Any]] = None, |
| #95 | ) -> tuple[Dict[str, Any], Dict[str, Any]]: |
| #96 | """ |
| #97 | Constructs metadata for storage and filters for querying based on session and actor identifiers. |
| #98 | |
| #99 | This helper supports multiple session identifiers (`user_id`, `agent_id`, and/or `run_id`) |
| #100 | for flexible session scoping and optionally narrows queries to a specific `actor_id`. It returns two dicts: |
| #101 | |
| #102 | 1. `base_metadata_template`: Used as a template for metadata when storing new memories. |
| #103 | It includes all provided session identifier(s) and any `input_metadata`. |
| #104 | 2. `effective_query_filters`: Used for querying existing memories. It includes all |
| #105 | provided session identifier(s), any `input_filters`, and a resolved actor |
| #106 | identifier for targeted filtering if specified by any actor-related inputs. |
| #107 | |
| #108 | Actor filtering precedence: explicit `actor_id` arg → `filters["actor_id"]` |
| #109 | This resolved actor ID is used for querying but is not added to `base_metadata_template`, |
| #110 | as the actor for storage is typically derived from message content at a later stage. |
| #111 | |
| #112 | Args: |
| #113 | user_id (Optional[str]): User identifier, for session scoping. |
| #114 | agent_id (Optional[str]): Agent identifier, for session scoping. |
| #115 | run_id (Optional[str]): Run identifier, for session scoping. |
| #116 | actor_id (Optional[str]): Explicit actor identifier, used as a potential source for |
| #117 | actor-specific filtering. See actor resolution precedence in the main description. |
| #118 | input_metadata (Optional[Dict[str, Any]]): Base dictionary to be augmented with |
| #119 | session identifiers for the storage metadata template. Defaults to an empty dict. |
| #120 | input_filters (Optional[Dict[str, Any]]): Base dictionary to be augmented with |
| #121 | session and actor identifiers for query filters. Defaults to an empty dict. |
| #122 | |
| #123 | Returns: |
| #124 | tuple[Dict[str, Any], Dict[str, Any]]: A tuple containing: |
| #125 | - base_metadata_template (Dict[str, Any]): Metadata template for storing memories, |
| #126 | scoped to the provided session(s). |
| #127 | - effective_query_filters (Dict[str, Any]): Filters for querying memories, |
| #128 | scoped to the provided session(s) and potentially a resolved actor. |
| #129 | """ |
| #130 | |
| #131 | base_metadata_template = deepcopy(input_metadata) if input_metadata else {} |
| #132 | effective_query_filters = deepcopy(input_filters) if input_filters else {} |
| #133 | |
| #134 | # ---------- add all provided session ids ---------- |
| #135 | session_ids_provided = [] |
| #136 | |
| #137 | if user_id: |
| #138 | base_metadata_template["user_id"] = user_id |
| #139 | effective_query_filters["user_id"] = user_id |
| #140 | session_ids_provided.append("user_id") |
| #141 | |
| #142 | if agent_id: |
| #143 | base_metadata_template["agent_id"] = agent_id |
| #144 | effective_query_filters["agent_id"] = agent_id |
| #145 | session_ids_provided.append("agent_id") |
| #146 | |
| #147 | if run_id: |
| #148 | base_metadata_template["run_id"] = run_id |
| #149 | effective_query_filters["run_id"] = run_id |
| #150 | session_ids_provided.append("run_id") |
| #151 | |
| #152 | if not session_ids_provided: |
| #153 | raise Mem0ValidationError( |
| #154 | message="At least one of 'user_id', 'agent_id', or 'run_id' must be provided.", |
| #155 | error_code="VALIDATION_001", |
| #156 | details={"provided_ids": {"user_id": user_id, "agent_id": agent_id, "run_id": run_id}}, |
| #157 | suggestion="Please provide at least one identifier to scope the memory operation." |
| #158 | ) |
| #159 | |
| #160 | # ---------- optional actor filter ---------- |
| #161 | resolved_actor_id = actor_id or effective_query_filters.get("actor_id") |
| #162 | if resolved_actor_id: |
| #163 | effective_query_filters["actor_id"] = resolved_actor_id |
| #164 | |
| #165 | return base_metadata_template, effective_query_filters |
| #166 | |
| #167 | |
| #168 | setup_config() |
| #169 | logger = logging.getLogger(__name__) |
| #170 | |
| #171 | |
| #172 | class Memory(MemoryBase): |
| #173 | def __init__(self, config: MemoryConfig = MemoryConfig()): |
| #174 | self.config = config |
| #175 | |
| #176 | self.custom_fact_extraction_prompt = self.config.custom_fact_extraction_prompt |
| #177 | self.custom_update_memory_prompt = self.config.custom_update_memory_prompt |
| #178 | self.embedding_model = EmbedderFactory.create( |
| #179 | self.config.embedder.provider, |
| #180 | self.config.embedder.config, |
| #181 | self.config.vector_store.config, |
| #182 | ) |
| #183 | self.vector_store = VectorStoreFactory.create( |
| #184 | self.config.vector_store.provider, self.config.vector_store.config |
| #185 | ) |
| #186 | self.llm = LlmFactory.create(self.config.llm.provider, self.config.llm.config) |
| #187 | self.db = SQLiteManager(self.config.history_db_path) |
| #188 | self.collection_name = self.config.vector_store.config.collection_name |
| #189 | self.api_version = self.config.version |
| #190 | |
| #191 | # Initialize reranker if configured |
| #192 | self.reranker = None |
| #193 | if config.reranker: |
| #194 | self.reranker = RerankerFactory.create( |
| #195 | config.reranker.provider, |
| #196 | config.reranker.config |
| #197 | ) |
| #198 | |
| #199 | self.enable_graph = False |
| #200 | |
| #201 | if self.config.graph_store.config: |
| #202 | provider = self.config.graph_store.provider |
| #203 | self.graph = GraphStoreFactory.create(provider, self.config) |
| #204 | self.enable_graph = True |
| #205 | else: |
| #206 | self.graph = None |
| #207 | # Create telemetry config manually to avoid deepcopy issues with thread locks |
| #208 | telemetry_config_dict = {} |
| #209 | if hasattr(self.config.vector_store.config, 'model_dump'): |
| #210 | # For pydantic models |
| #211 | telemetry_config_dict = self.config.vector_store.config.model_dump() |
| #212 | else: |
| #213 | # For other objects, manually copy common attributes |
| #214 | for attr in ['host', 'port', 'path', 'api_key', 'index_name', 'dimension', 'metric']: |
| #215 | if hasattr(self.config.vector_store.config, attr): |
| #216 | telemetry_config_dict[attr] = getattr(self.config.vector_store.config, attr) |
| #217 | |
| #218 | # Override collection name for telemetry |
| #219 | telemetry_config_dict['collection_name'] = "mem0migrations" |
| #220 | |
| #221 | # Set path for file-based vector stores |
| #222 | telemetry_config = _safe_deepcopy_config(self.config.vector_store.config) |
| #223 | if self.config.vector_store.provider in ["faiss", "qdrant"]: |
| #224 | provider_path = f"migrations_{self.config.vector_store.provider}" |
| #225 | telemetry_config_dict['path'] = os.path.join(mem0_dir, provider_path) |
| #226 | os.makedirs(telemetry_config_dict['path'], exist_ok=True) |
| #227 | |
| #228 | # Create the config object using the same class as the original |
| #229 | telemetry_config = self.config.vector_store.config.__class__(**telemetry_config_dict) |
| #230 | self._telemetry_vector_store = VectorStoreFactory.create( |
| #231 | self.config.vector_store.provider, telemetry_config |
| #232 | ) |
| #233 | capture_event("mem0.init", self, {"sync_type": "sync"}) |
| #234 | |
| #235 | @classmethod |
| #236 | def from_config(cls, config_dict: Dict[str, Any]): |
| #237 | try: |
| #238 | config = cls._process_config(config_dict) |
| #239 | config = MemoryConfig(**config_dict) |
| #240 | except ValidationError as e: |
| #241 | logger.error(f"Configuration validation error: {e}") |
| #242 | raise |
| #243 | return cls(config) |
| #244 | |
| #245 | @staticmethod |
| #246 | def _process_config(config_dict: Dict[str, Any]) -> Dict[str, Any]: |
| #247 | if "graph_store" in config_dict: |
| #248 | if "vector_store" not in config_dict and "embedder" in config_dict: |
| #249 | config_dict["vector_store"] = {} |
| #250 | config_dict["vector_store"]["config"] = {} |
| #251 | config_dict["vector_store"]["config"]["embedding_model_dims"] = config_dict["embedder"]["config"][ |
| #252 | "embedding_dims" |
| #253 | ] |
| #254 | try: |
| #255 | return config_dict |
| #256 | except ValidationError as e: |
| #257 | logger.error(f"Configuration validation error: {e}") |
| #258 | raise |
| #259 | |
| #260 | def _should_use_agent_memory_extraction(self, messages, metadata): |
| #261 | """Determine whether to use agent memory extraction based on the logic: |
| #262 | - If agent_id is present and messages contain assistant role -> True |
| #263 | - Otherwise -> False |
| #264 | |
| #265 | Args: |
| #266 | messages: List of message dictionaries |
| #267 | metadata: Metadata containing user_id, agent_id, etc. |
| #268 | |
| #269 | Returns: |
| #270 | bool: True if should use agent memory extraction, False for user memory extraction |
| #271 | """ |
| #272 | # Check if agent_id is present in metadata |
| #273 | has_agent_id = metadata.get("agent_id") is not None |
| #274 | |
| #275 | # Check if there are assistant role messages |
| #276 | has_assistant_messages = any(msg.get("role") == "assistant" for msg in messages) |
| #277 | |
| #278 | # Use agent memory extraction if agent_id is present and there are assistant messages |
| #279 | return has_agent_id and has_assistant_messages |
| #280 | |
| #281 | def add( |
| #282 | self, |
| #283 | messages, |
| #284 | *, |
| #285 | user_id: Optional[str] = None, |
| #286 | agent_id: Optional[str] = None, |
| #287 | run_id: Optional[str] = None, |
| #288 | metadata: Optional[Dict[str, Any]] = None, |
| #289 | infer: bool = True, |
| #290 | memory_type: Optional[str] = None, |
| #291 | prompt: Optional[str] = None, |
| #292 | ): |
| #293 | """ |
| #294 | Create a new memory. |
| #295 | |
| #296 | Adds new memories scoped to a single session id (e.g. `user_id`, `agent_id`, or `run_id`). One of those ids is required. |
| #297 | |
| #298 | Args: |
| #299 | messages (str or List[Dict[str, str]]): The message content or list of messages |
| #300 | (e.g., `[{"role": "user", "content": "Hello"}, {"role": "assistant", "content": "Hi"}]`) |
| #301 | to be processed and stored. |
| #302 | user_id (str, optional): ID of the user creating the memory. Defaults to None. |
| #303 | agent_id (str, optional): ID of the agent creating the memory. Defaults to None. |
| #304 | run_id (str, optional): ID of the run creating the memory. Defaults to None. |
| #305 | metadata (dict, optional): Metadata to store with the memory. Defaults to None. |
| #306 | infer (bool, optional): If True (default), an LLM is used to extract key facts from |
| #307 | 'messages' and decide whether to add, update, or delete related memories. |
| #308 | If False, 'messages' are added as raw memories directly. |
| #309 | memory_type (str, optional): Specifies the type of memory. Currently, only |
| #310 | `MemoryType.PROCEDURAL.value` ("procedural_memory") is explicitly handled for |
| #311 | creating procedural memories (typically requires 'agent_id'). Otherwise, memories |
| #312 | are treated as general conversational/factual memories.memory_type (str, optional): Type of memory to create. Defaults to None. By default, it creates the short term memories and long term (semantic and episodic) memories. Pass "procedural_memory" to create procedural memories. |
| #313 | prompt (str, optional): Prompt to use for the memory creation. Defaults to None. |
| #314 | |
| #315 | |
| #316 | Returns: |
| #317 | dict: A dictionary containing the result of the memory addition operation, typically |
| #318 | including a list of memory items affected (added, updated) under a "results" key, |
| #319 | and potentially "relations" if graph store is enabled. |
| #320 | Example for v1.1+: `{"results": [{"id": "...", "memory": "...", "event": "ADD"}]}` |
| #321 | |
| #322 | Raises: |
| #323 | Mem0ValidationError: If input validation fails (invalid memory_type, messages format, etc.). |
| #324 | VectorStoreError: If vector store operations fail. |
| #325 | GraphStoreError: If graph store operations fail. |
| #326 | EmbeddingError: If embedding generation fails. |
| #327 | LLMError: If LLM operations fail. |
| #328 | DatabaseError: If database operations fail. |
| #329 | """ |
| #330 | |
| #331 | processed_metadata, effective_filters = _build_filters_and_metadata( |
| #332 | user_id=user_id, |
| #333 | agent_id=agent_id, |
| #334 | run_id=run_id, |
| #335 | input_metadata=metadata, |
| #336 | ) |
| #337 | |
| #338 | if memory_type is not None and memory_type != MemoryType.PROCEDURAL.value: |
| #339 | raise Mem0ValidationError( |
| #340 | message=f"Invalid 'memory_type'. Please pass {MemoryType.PROCEDURAL.value} to create procedural memories.", |
| #341 | error_code="VALIDATION_002", |
| #342 | details={"provided_type": memory_type, "valid_type": MemoryType.PROCEDURAL.value}, |
| #343 | suggestion=f"Use '{MemoryType.PROCEDURAL.value}' to create procedural memories." |
| #344 | ) |
| #345 | |
| #346 | if isinstance(messages, str): |
| #347 | messages = [{"role": "user", "content": messages}] |
| #348 | |
| #349 | elif isinstance(messages, dict): |
| #350 | messages = [messages] |
| #351 | |
| #352 | elif not isinstance(messages, list): |
| #353 | raise Mem0ValidationError( |
| #354 | message="messages must be str, dict, or list[dict]", |
| #355 | error_code="VALIDATION_003", |
| #356 | details={"provided_type": type(messages).__name__, "valid_types": ["str", "dict", "list[dict]"]}, |
| #357 | suggestion="Convert your input to a string, dictionary, or list of dictionaries." |
| #358 | ) |
| #359 | |
| #360 | if agent_id is not None and memory_type == MemoryType.PROCEDURAL.value: |
| #361 | results = self._create_procedural_memory(messages, metadata=processed_metadata, prompt=prompt) |
| #362 | return results |
| #363 | |
| #364 | if self.config.llm.config.get("enable_vision"): |
| #365 | messages = parse_vision_messages(messages, self.llm, self.config.llm.config.get("vision_details")) |
| #366 | else: |
| #367 | messages = parse_vision_messages(messages) |
| #368 | |
| #369 | with concurrent.futures.ThreadPoolExecutor() as executor: |
| #370 | future1 = executor.submit(self._add_to_vector_store, messages, processed_metadata, effective_filters, infer) |
| #371 | future2 = executor.submit(self._add_to_graph, messages, effective_filters) |
| #372 | |
| #373 | concurrent.futures.wait([future1, future2]) |
| #374 | |
| #375 | vector_store_result = future1.result() |
| #376 | graph_result = future2.result() |
| #377 | |
| #378 | if self.enable_graph: |
| #379 | return { |
| #380 | "results": vector_store_result, |
| #381 | "relations": graph_result, |
| #382 | } |
| #383 | |
| #384 | return {"results": vector_store_result} |
| #385 | |
| #386 | def _add_to_vector_store(self, messages, metadata, filters, infer): |
| #387 | if not infer: |
| #388 | returned_memories = [] |
| #389 | for message_dict in messages: |
| #390 | if ( |
| #391 | not isinstance(message_dict, dict) |
| #392 | or message_dict.get("role") is None |
| #393 | or message_dict.get("content") is None |
| #394 | ): |
| #395 | logger.warning(f"Skipping invalid message format: {message_dict}") |
| #396 | continue |
| #397 | |
| #398 | if message_dict["role"] == "system": |
| #399 | continue |
| #400 | |
| #401 | per_msg_meta = deepcopy(metadata) |
| #402 | per_msg_meta["role"] = message_dict["role"] |
| #403 | |
| #404 | actor_name = message_dict.get("name") |
| #405 | if actor_name: |
| #406 | per_msg_meta["actor_id"] = actor_name |
| #407 | |
| #408 | msg_content = message_dict["content"] |
| #409 | msg_embeddings = self.embedding_model.embed(msg_content, "add") |
| #410 | mem_id = self._create_memory(msg_content, msg_embeddings, per_msg_meta) |
| #411 | |
| #412 | returned_memories.append( |
| #413 | { |
| #414 | "id": mem_id, |
| #415 | "memory": msg_content, |
| #416 | "event": "ADD", |
| #417 | "actor_id": actor_name if actor_name else None, |
| #418 | "role": message_dict["role"], |
| #419 | } |
| #420 | ) |
| #421 | return returned_memories |
| #422 | |
| #423 | parsed_messages = parse_messages(messages) |
| #424 | |
| #425 | if self.config.custom_fact_extraction_prompt: |
| #426 | system_prompt = self.config.custom_fact_extraction_prompt |
| #427 | user_prompt = f"Input:\n{parsed_messages}" |
| #428 | else: |
| #429 | # Determine if this should use agent memory extraction based on agent_id presence |
| #430 | # and role types in messages |
| #431 | is_agent_memory = self._should_use_agent_memory_extraction(messages, metadata) |
| #432 | system_prompt, user_prompt = get_fact_retrieval_messages(parsed_messages, is_agent_memory) |
| #433 | |
| #434 | response = self.llm.generate_response( |
| #435 | messages=[ |
| #436 | {"role": "system", "content": system_prompt}, |
| #437 | {"role": "user", "content": user_prompt}, |
| #438 | ], |
| #439 | response_format={"type": "json_object"}, |
| #440 | ) |
| #441 | |
| #442 | try: |
| #443 | response = remove_code_blocks(response) |
| #444 | if not response.strip(): |
| #445 | new_retrieved_facts = [] |
| #446 | else: |
| #447 | try: |
| #448 | # First try direct JSON parsing |
| #449 | new_retrieved_facts = json.loads(response)["facts"] |
| #450 | except json.JSONDecodeError: |
| #451 | # Try extracting JSON from response using built-in function |
| #452 | extracted_json = extract_json(response) |
| #453 | new_retrieved_facts = json.loads(extracted_json)["facts"] |
| #454 | except Exception as e: |
| #455 | logger.error(f"Error in new_retrieved_facts: {e}") |
| #456 | new_retrieved_facts = [] |
| #457 | |
| #458 | if not new_retrieved_facts: |
| #459 | logger.debug("No new facts retrieved from input. Skipping memory update LLM call.") |
| #460 | |
| #461 | retrieved_old_memory = [] |
| #462 | new_message_embeddings = {} |
| #463 | # Search for existing memories using the provided session identifiers |
| #464 | # Use all available session identifiers for accurate memory retrieval |
| #465 | search_filters = {} |
| #466 | if filters.get("user_id"): |
| #467 | search_filters["user_id"] = filters["user_id"] |
| #468 | if filters.get("agent_id"): |
| #469 | search_filters["agent_id"] = filters["agent_id"] |
| #470 | if filters.get("run_id"): |
| #471 | search_filters["run_id"] = filters["run_id"] |
| #472 | for new_mem in new_retrieved_facts: |
| #473 | messages_embeddings = self.embedding_model.embed(new_mem, "add") |
| #474 | new_message_embeddings[new_mem] = messages_embeddings |
| #475 | existing_memories = self.vector_store.search( |
| #476 | query=new_mem, |
| #477 | vectors=messages_embeddings, |
| #478 | limit=5, |
| #479 | filters=search_filters, |
| #480 | ) |
| #481 | for mem in existing_memories: |
| #482 | retrieved_old_memory.append({"id": mem.id, "text": mem.payload.get("data", "")}) |
| #483 | |
| #484 | unique_data = {} |
| #485 | for item in retrieved_old_memory: |
| #486 | unique_data[item["id"]] = item |
| #487 | retrieved_old_memory = list(unique_data.values()) |
| #488 | logger.info(f"Total existing memories: {len(retrieved_old_memory)}") |
| #489 | |
| #490 | # mapping UUIDs with integers for handling UUID hallucinations |
| #491 | temp_uuid_mapping = {} |
| #492 | for idx, item in enumerate(retrieved_old_memory): |
| #493 | temp_uuid_mapping[str(idx)] = item["id"] |
| #494 | retrieved_old_memory[idx]["id"] = str(idx) |
| #495 | |
| #496 | if new_retrieved_facts: |
| #497 | function_calling_prompt = get_update_memory_messages( |
| #498 | retrieved_old_memory, new_retrieved_facts, self.config.custom_update_memory_prompt |
| #499 | ) |
| #500 | |
| #501 | try: |
| #502 | response: str = self.llm.generate_response( |
| #503 | messages=[{"role": "user", "content": function_calling_prompt}], |
| #504 | response_format={"type": "json_object"}, |
| #505 | ) |
| #506 | except Exception as e: |
| #507 | logger.error(f"Error in new memory actions response: {e}") |
| #508 | response = "" |
| #509 | |
| #510 | try: |
| #511 | if not response or not response.strip(): |
| #512 | logger.warning("Empty response from LLM, no memories to extract") |
| #513 | new_memories_with_actions = {} |
| #514 | else: |
| #515 | response = remove_code_blocks(response) |
| #516 | new_memories_with_actions = json.loads(response) |
| #517 | except Exception as e: |
| #518 | logger.error(f"Invalid JSON response: {e}") |
| #519 | new_memories_with_actions = {} |
| #520 | else: |
| #521 | new_memories_with_actions = {} |
| #522 | |
| #523 | returned_memories = [] |
| #524 | try: |
| #525 | for resp in new_memories_with_actions.get("memory", []): |
| #526 | logger.info(resp) |
| #527 | try: |
| #528 | action_text = resp.get("text") |
| #529 | if not action_text: |
| #530 | logger.info("Skipping memory entry because of empty `text` field.") |
| #531 | continue |
| #532 | |
| #533 | event_type = resp.get("event") |
| #534 | if event_type == "ADD": |
| #535 | memory_id = self._create_memory( |
| #536 | data=action_text, |
| #537 | existing_embeddings=new_message_embeddings, |
| #538 | metadata=deepcopy(metadata), |
| #539 | ) |
| #540 | returned_memories.append({"id": memory_id, "memory": action_text, "event": event_type}) |
| #541 | elif event_type == "UPDATE": |
| #542 | self._update_memory( |
| #543 | memory_id=temp_uuid_mapping[resp.get("id")], |
| #544 | data=action_text, |
| #545 | existing_embeddings=new_message_embeddings, |
| #546 | metadata=deepcopy(metadata), |
| #547 | ) |
| #548 | returned_memories.append( |
| #549 | { |
| #550 | "id": temp_uuid_mapping[resp.get("id")], |
| #551 | "memory": action_text, |
| #552 | "event": event_type, |
| #553 | "previous_memory": resp.get("old_memory"), |
| #554 | } |
| #555 | ) |
| #556 | elif event_type == "DELETE": |
| #557 | self._delete_memory(memory_id=temp_uuid_mapping[resp.get("id")]) |
| #558 | returned_memories.append( |
| #559 | { |
| #560 | "id": temp_uuid_mapping[resp.get("id")], |
| #561 | "memory": action_text, |
| #562 | "event": event_type, |
| #563 | } |
| #564 | ) |
| #565 | elif event_type == "NONE": |
| #566 | # Even if content doesn't need updating, update session IDs if provided |
| #567 | memory_id = temp_uuid_mapping.get(resp.get("id")) |
| #568 | if memory_id and (metadata.get("agent_id") or metadata.get("run_id")): |
| #569 | # Update only the session identifiers, keep content the same |
| #570 | existing_memory = self.vector_store.get(vector_id=memory_id) |
| #571 | updated_metadata = deepcopy(existing_memory.payload) |
| #572 | if metadata.get("agent_id"): |
| #573 | updated_metadata["agent_id"] = metadata["agent_id"] |
| #574 | if metadata.get("run_id"): |
| #575 | updated_metadata["run_id"] = metadata["run_id"] |
| #576 | updated_metadata["updated_at"] = datetime.now(pytz.timezone("US/Pacific")).isoformat() |
| #577 | |
| #578 | self.vector_store.update( |
| #579 | vector_id=memory_id, |
| #580 | vector=None, # Keep same embeddings |
| #581 | payload=updated_metadata, |
| #582 | ) |
| #583 | logger.info(f"Updated session IDs for memory {memory_id}") |
| #584 | else: |
| #585 | logger.info("NOOP for Memory.") |
| #586 | except Exception as e: |
| #587 | logger.error(f"Error processing memory action: {resp}, Error: {e}") |
| #588 | except Exception as e: |
| #589 | logger.error(f"Error iterating new_memories_with_actions: {e}") |
| #590 | |
| #591 | keys, encoded_ids = process_telemetry_filters(filters) |
| #592 | capture_event( |
| #593 | "mem0.add", |
| #594 | self, |
| #595 | {"version": self.api_version, "keys": keys, "encoded_ids": encoded_ids, "sync_type": "sync"}, |
| #596 | ) |
| #597 | return returned_memories |
| #598 | |
| #599 | def _add_to_graph(self, messages, filters): |
| #600 | added_entities = [] |
| #601 | if self.enable_graph: |
| #602 | if filters.get("user_id") is None: |
| #603 | filters["user_id"] = "user" |
| #604 | |
| #605 | data = "\n".join([msg["content"] for msg in messages if "content" in msg and msg["role"] != "system"]) |
| #606 | added_entities = self.graph.add(data, filters) |
| #607 | |
| #608 | return added_entities |
| #609 | |
| #610 | def get(self, memory_id): |
| #611 | """ |
| #612 | Retrieve a memory by ID. |
| #613 | |
| #614 | Args: |
| #615 | memory_id (str): ID of the memory to retrieve. |
| #616 | |
| #617 | Returns: |
| #618 | dict: Retrieved memory. |
| #619 | """ |
| #620 | capture_event("mem0.get", self, {"memory_id": memory_id, "sync_type": "sync"}) |
| #621 | memory = self.vector_store.get(vector_id=memory_id) |
| #622 | if not memory: |
| #623 | return None |
| #624 | |
| #625 | promoted_payload_keys = [ |
| #626 | "user_id", |
| #627 | "agent_id", |
| #628 | "run_id", |
| #629 | "actor_id", |
| #630 | "role", |
| #631 | ] |
| #632 | |
| #633 | core_and_promoted_keys = {"data", "hash", "created_at", "updated_at", "id", *promoted_payload_keys} |
| #634 | |
| #635 | result_item = MemoryItem( |
| #636 | id=memory.id, |
| #637 | memory=memory.payload.get("data", ""), |
| #638 | hash=memory.payload.get("hash"), |
| #639 | created_at=memory.payload.get("created_at"), |
| #640 | updated_at=memory.payload.get("updated_at"), |
| #641 | ).model_dump() |
| #642 | |
| #643 | for key in promoted_payload_keys: |
| #644 | if key in memory.payload: |
| #645 | result_item[key] = memory.payload[key] |
| #646 | |
| #647 | additional_metadata = {k: v for k, v in memory.payload.items() if k not in core_and_promoted_keys} |
| #648 | if additional_metadata: |
| #649 | result_item["metadata"] = additional_metadata |
| #650 | |
| #651 | return result_item |
| #652 | |
| #653 | def get_all( |
| #654 | self, |
| #655 | *, |
| #656 | user_id: Optional[str] = None, |
| #657 | agent_id: Optional[str] = None, |
| #658 | run_id: Optional[str] = None, |
| #659 | filters: Optional[Dict[str, Any]] = None, |
| #660 | limit: int = 100, |
| #661 | ): |
| #662 | """ |
| #663 | List all memories. |
| #664 | |
| #665 | Args: |
| #666 | user_id (str, optional): user id |
| #667 | agent_id (str, optional): agent id |
| #668 | run_id (str, optional): run id |
| #669 | filters (dict, optional): Additional custom key-value filters to apply to the search. |
| #670 | These are merged with the ID-based scoping filters. For example, |
| #671 | `filters={"actor_id": "some_user"}`. |
| #672 | limit (int, optional): The maximum number of memories to return. Defaults to 100. |
| #673 | |
| #674 | Returns: |
| #675 | dict: A dictionary containing a list of memories under the "results" key, |
| #676 | and potentially "relations" if graph store is enabled. For API v1.0, |
| #677 | it might return a direct list (see deprecation warning). |
| #678 | Example for v1.1+: `{"results": [{"id": "...", "memory": "...", ...}]}` |
| #679 | """ |
| #680 | |
| #681 | _, effective_filters = _build_filters_and_metadata( |
| #682 | user_id=user_id, agent_id=agent_id, run_id=run_id, input_filters=filters |
| #683 | ) |
| #684 | |
| #685 | if not any(key in effective_filters for key in ("user_id", "agent_id", "run_id")): |
| #686 | raise ValueError("At least one of 'user_id', 'agent_id', or 'run_id' must be specified.") |
| #687 | |
| #688 | keys, encoded_ids = process_telemetry_filters(effective_filters) |
| #689 | capture_event( |
| #690 | "mem0.get_all", self, {"limit": limit, "keys": keys, "encoded_ids": encoded_ids, "sync_type": "sync"} |
| #691 | ) |
| #692 | |
| #693 | with concurrent.futures.ThreadPoolExecutor() as executor: |
| #694 | future_memories = executor.submit(self._get_all_from_vector_store, effective_filters, limit) |
| #695 | future_graph_entities = ( |
| #696 | executor.submit(self.graph.get_all, effective_filters, limit) if self.enable_graph else None |
| #697 | ) |
| #698 | |
| #699 | concurrent.futures.wait( |
| #700 | [future_memories, future_graph_entities] if future_graph_entities else [future_memories] |
| #701 | ) |
| #702 | |
| #703 | all_memories_result = future_memories.result() |
| #704 | graph_entities_result = future_graph_entities.result() if future_graph_entities else None |
| #705 | |
| #706 | if self.enable_graph: |
| #707 | return {"results": all_memories_result, "relations": graph_entities_result} |
| #708 | |
| #709 | return {"results": all_memories_result} |
| #710 | |
| #711 | def _get_all_from_vector_store(self, filters, limit): |
| #712 | memories_result = self.vector_store.list(filters=filters, limit=limit) |
| #713 | |
| #714 | # Handle different vector store return formats by inspecting first element |
| #715 | if isinstance(memories_result, (tuple, list)) and len(memories_result) > 0: |
| #716 | first_element = memories_result[0] |
| #717 | |
| #718 | # If first element is a container, unwrap one level |
| #719 | if isinstance(first_element, (list, tuple)): |
| #720 | actual_memories = first_element |
| #721 | else: |
| #722 | # First element is a memory object, structure is already flat |
| #723 | actual_memories = memories_result |
| #724 | else: |
| #725 | actual_memories = memories_result |
| #726 | |
| #727 | promoted_payload_keys = [ |
| #728 | "user_id", |
| #729 | "agent_id", |
| #730 | "run_id", |
| #731 | "actor_id", |
| #732 | "role", |
| #733 | ] |
| #734 | core_and_promoted_keys = {"data", "hash", "created_at", "updated_at", "id", *promoted_payload_keys} |
| #735 | |
| #736 | formatted_memories = [] |
| #737 | for mem in actual_memories: |
| #738 | memory_item_dict = MemoryItem( |
| #739 | id=mem.id, |
| #740 | memory=mem.payload.get("data", ""), |
| #741 | hash=mem.payload.get("hash"), |
| #742 | created_at=mem.payload.get("created_at"), |
| #743 | updated_at=mem.payload.get("updated_at"), |
| #744 | ).model_dump(exclude={"score"}) |
| #745 | |
| #746 | for key in promoted_payload_keys: |
| #747 | if key in mem.payload: |
| #748 | memory_item_dict[key] = mem.payload[key] |
| #749 | |
| #750 | additional_metadata = {k: v for k, v in mem.payload.items() if k not in core_and_promoted_keys} |
| #751 | if additional_metadata: |
| #752 | memory_item_dict["metadata"] = additional_metadata |
| #753 | |
| #754 | formatted_memories.append(memory_item_dict) |
| #755 | |
| #756 | return formatted_memories |
| #757 | |
| #758 | def search( |
| #759 | self, |
| #760 | query: str, |
| #761 | *, |
| #762 | user_id: Optional[str] = None, |
| #763 | agent_id: Optional[str] = None, |
| #764 | run_id: Optional[str] = None, |
| #765 | limit: int = 100, |
| #766 | filters: Optional[Dict[str, Any]] = None, |
| #767 | threshold: Optional[float] = None, |
| #768 | rerank: bool = True, |
| #769 | ): |
| #770 | """ |
| #771 | Searches for memories based on a query |
| #772 | Args: |
| #773 | query (str): Query to search for. |
| #774 | user_id (str, optional): ID of the user to search for. Defaults to None. |
| #775 | agent_id (str, optional): ID of the agent to search for. Defaults to None. |
| #776 | run_id (str, optional): ID of the run to search for. Defaults to None. |
| #777 | limit (int, optional): Limit the number of results. Defaults to 100. |
| #778 | filters (dict, optional): Legacy filters to apply to the search. Defaults to None. |
| #779 | threshold (float, optional): Minimum score for a memory to be included in the results. Defaults to None. |
| #780 | filters (dict, optional): Enhanced metadata filtering with operators: |
| #781 | - {"key": "value"} - exact match |
| #782 | - {"key": {"eq": "value"}} - equals |
| #783 | - {"key": {"ne": "value"}} - not equals |
| #784 | - {"key": {"in": ["val1", "val2"]}} - in list |
| #785 | - {"key": {"nin": ["val1", "val2"]}} - not in list |
| #786 | - {"key": {"gt": 10}} - greater than |
| #787 | - {"key": {"gte": 10}} - greater than or equal |
| #788 | - {"key": {"lt": 10}} - less than |
| #789 | - {"key": {"lte": 10}} - less than or equal |
| #790 | - {"key": {"contains": "text"}} - contains text |
| #791 | - {"key": {"icontains": "text"}} - case-insensitive contains |
| #792 | - {"key": "*"} - wildcard match (any value) |
| #793 | - {"AND": [filter1, filter2]} - logical AND |
| #794 | - {"OR": [filter1, filter2]} - logical OR |
| #795 | - {"NOT": [filter1]} - logical NOT |
| #796 | |
| #797 | Returns: |
| #798 | dict: A dictionary containing the search results, typically under a "results" key, |
| #799 | and potentially "relations" if graph store is enabled. |
| #800 | Example for v1.1+: `{"results": [{"id": "...", "memory": "...", "score": 0.8, ...}]}` |
| #801 | """ |
| #802 | _, effective_filters = _build_filters_and_metadata( |
| #803 | user_id=user_id, agent_id=agent_id, run_id=run_id, input_filters=filters |
| #804 | ) |
| #805 | |
| #806 | if not any(key in effective_filters for key in ("user_id", "agent_id", "run_id")): |
| #807 | raise ValueError("At least one of 'user_id', 'agent_id', or 'run_id' must be specified.") |
| #808 | |
| #809 | # Apply enhanced metadata filtering if advanced operators are detected |
| #810 | if filters and self._has_advanced_operators(filters): |
| #811 | processed_filters = self._process_metadata_filters(filters) |
| #812 | effective_filters.update(processed_filters) |
| #813 | elif filters: |
| #814 | # Simple filters, merge directly |
| #815 | effective_filters.update(filters) |
| #816 | |
| #817 | keys, encoded_ids = process_telemetry_filters(effective_filters) |
| #818 | capture_event( |
| #819 | "mem0.search", |
| #820 | self, |
| #821 | { |
| #822 | "limit": limit, |
| #823 | "version": self.api_version, |
| #824 | "keys": keys, |
| #825 | "encoded_ids": encoded_ids, |
| #826 | "sync_type": "sync", |
| #827 | "threshold": threshold, |
| #828 | "advanced_filters": bool(filters and self._has_advanced_operators(filters)), |
| #829 | }, |
| #830 | ) |
| #831 | |
| #832 | with concurrent.futures.ThreadPoolExecutor() as executor: |
| #833 | future_memories = executor.submit(self._search_vector_store, query, effective_filters, limit, threshold) |
| #834 | future_graph_entities = ( |
| #835 | executor.submit(self.graph.search, query, effective_filters, limit) if self.enable_graph else None |
| #836 | ) |
| #837 | |
| #838 | concurrent.futures.wait( |
| #839 | [future_memories, future_graph_entities] if future_graph_entities else [future_memories] |
| #840 | ) |
| #841 | |
| #842 | original_memories = future_memories.result() |
| #843 | graph_entities = future_graph_entities.result() if future_graph_entities else None |
| #844 | |
| #845 | # Apply reranking if enabled and reranker is available |
| #846 | if rerank and self.reranker and original_memories: |
| #847 | try: |
| #848 | reranked_memories = self.reranker.rerank(query, original_memories, limit) |
| #849 | original_memories = reranked_memories |
| #850 | except Exception as e: |
| #851 | logger.warning(f"Reranking failed, using original results: {e}") |
| #852 | |
| #853 | if self.enable_graph: |
| #854 | return {"results": original_memories, "relations": graph_entities} |
| #855 | |
| #856 | return {"results": original_memories} |
| #857 | |
| #858 | def _process_metadata_filters(self, metadata_filters: Dict[str, Any]) -> Dict[str, Any]: |
| #859 | """ |
| #860 | Process enhanced metadata filters and convert them to vector store compatible format. |
| #861 | |
| #862 | Args: |
| #863 | metadata_filters: Enhanced metadata filters with operators |
| #864 | |
| #865 | Returns: |
| #866 | Dict of processed filters compatible with vector store |
| #867 | """ |
| #868 | processed_filters = {} |
| #869 | |
| #870 | def process_condition(key: str, condition: Any) -> Dict[str, Any]: |
| #871 | if not isinstance(condition, dict): |
| #872 | # Simple equality: {"key": "value"} |
| #873 | if condition == "*": |
| #874 | # Wildcard: match everything for this field (implementation depends on vector store) |
| #875 | return {key: "*"} |
| #876 | return {key: condition} |
| #877 | |
| #878 | result = {} |
| #879 | for operator, value in condition.items(): |
| #880 | # Map platform operators to universal format that can be translated by each vector store |
| #881 | operator_map = { |
| #882 | "eq": "eq", "ne": "ne", "gt": "gt", "gte": "gte", |
| #883 | "lt": "lt", "lte": "lte", "in": "in", "nin": "nin", |
| #884 | "contains": "contains", "icontains": "icontains" |
| #885 | } |
| #886 | |
| #887 | if operator in operator_map: |
| #888 | result[key] = {operator_map[operator]: value} |
| #889 | else: |
| #890 | raise ValueError(f"Unsupported metadata filter operator: {operator}") |
| #891 | return result |
| #892 | |
| #893 | for key, value in metadata_filters.items(): |
| #894 | if key == "AND": |
| #895 | # Logical AND: combine multiple conditions |
| #896 | if not isinstance(value, list): |
| #897 | raise ValueError("AND operator requires a list of conditions") |
| #898 | for condition in value: |
| #899 | for sub_key, sub_value in condition.items(): |
| #900 | processed_filters.update(process_condition(sub_key, sub_value)) |
| #901 | elif key == "OR": |
| #902 | # Logical OR: Pass through to vector store for implementation-specific handling |
| #903 | if not isinstance(value, list) or not value: |
| #904 | raise ValueError("OR operator requires a non-empty list of conditions") |
| #905 | # Store OR conditions in a way that vector stores can interpret |
| #906 | processed_filters["$or"] = [] |
| #907 | for condition in value: |
| #908 | or_condition = {} |
| #909 | for sub_key, sub_value in condition.items(): |
| #910 | or_condition.update(process_condition(sub_key, sub_value)) |
| #911 | processed_filters["$or"].append(or_condition) |
| #912 | elif key == "NOT": |
| #913 | # Logical NOT: Pass through to vector store for implementation-specific handling |
| #914 | if not isinstance(value, list) or not value: |
| #915 | raise ValueError("NOT operator requires a non-empty list of conditions") |
| #916 | processed_filters["$not"] = [] |
| #917 | for condition in value: |
| #918 | not_condition = {} |
| #919 | for sub_key, sub_value in condition.items(): |
| #920 | not_condition.update(process_condition(sub_key, sub_value)) |
| #921 | processed_filters["$not"].append(not_condition) |
| #922 | else: |
| #923 | processed_filters.update(process_condition(key, value)) |
| #924 | |
| #925 | return processed_filters |
| #926 | |
| #927 | def _has_advanced_operators(self, filters: Dict[str, Any]) -> bool: |
| #928 | """ |
| #929 | Check if filters contain advanced operators that need special processing. |
| #930 | |
| #931 | Args: |
| #932 | filters: Dictionary of filters to check |
| #933 | |
| #934 | Returns: |
| #935 | bool: True if advanced operators are detected |
| #936 | """ |
| #937 | if not isinstance(filters, dict): |
| #938 | return False |
| #939 | |
| #940 | for key, value in filters.items(): |
| #941 | # Check for platform-style logical operators |
| #942 | if key in ["AND", "OR", "NOT"]: |
| #943 | return True |
| #944 | # Check for comparison operators (without $ prefix for universal compatibility) |
| #945 | if isinstance(value, dict): |
| #946 | for op in value.keys(): |
| #947 | if op in ["eq", "ne", "gt", "gte", "lt", "lte", "in", "nin", "contains", "icontains"]: |
| #948 | return True |
| #949 | # Check for wildcard values |
| #950 | if value == "*": |
| #951 | return True |
| #952 | return False |
| #953 | |
| #954 | def _search_vector_store(self, query, filters, limit, threshold: Optional[float] = None): |
| #955 | embeddings = self.embedding_model.embed(query, "search") |
| #956 | memories = self.vector_store.search(query=query, vectors=embeddings, limit=limit, filters=filters) |
| #957 | |
| #958 | promoted_payload_keys = [ |
| #959 | "user_id", |
| #960 | "agent_id", |
| #961 | "run_id", |
| #962 | "actor_id", |
| #963 | "role", |
| #964 | ] |
| #965 | |
| #966 | core_and_promoted_keys = {"data", "hash", "created_at", "updated_at", "id", *promoted_payload_keys} |
| #967 | |
| #968 | original_memories = [] |
| #969 | for mem in memories: |
| #970 | memory_item_dict = MemoryItem( |
| #971 | id=mem.id, |
| #972 | memory=mem.payload.get("data", ""), |
| #973 | hash=mem.payload.get("hash"), |
| #974 | created_at=mem.payload.get("created_at"), |
| #975 | updated_at=mem.payload.get("updated_at"), |
| #976 | score=mem.score, |
| #977 | ).model_dump() |
| #978 | |
| #979 | for key in promoted_payload_keys: |
| #980 | if key in mem.payload: |
| #981 | memory_item_dict[key] = mem.payload[key] |
| #982 | |
| #983 | additional_metadata = {k: v for k, v in mem.payload.items() if k not in core_and_promoted_keys} |
| #984 | if additional_metadata: |
| #985 | memory_item_dict["metadata"] = additional_metadata |
| #986 | |
| #987 | if threshold is None or mem.score >= threshold: |
| #988 | original_memories.append(memory_item_dict) |
| #989 | |
| #990 | return original_memories |
| #991 | |
| #992 | def update(self, memory_id, data): |
| #993 | """ |
| #994 | Update a memory by ID. |
| #995 | |
| #996 | Args: |
| #997 | memory_id (str): ID of the memory to update. |
| #998 | data (str): New content to update the memory with. |
| #999 | |
| #1000 | Returns: |
| #1001 | dict: Success message indicating the memory was updated. |
| #1002 | |
| #1003 | Example: |
| #1004 | >>> m.update(memory_id="mem_123", data="Likes to play tennis on weekends") |
| #1005 | {'message': 'Memory updated successfully!'} |
| #1006 | """ |
| #1007 | capture_event("mem0.update", self, {"memory_id": memory_id, "sync_type": "sync"}) |
| #1008 | |
| #1009 | existing_embeddings = {data: self.embedding_model.embed(data, "update")} |
| #1010 | |
| #1011 | self._update_memory(memory_id, data, existing_embeddings) |
| #1012 | return {"message": "Memory updated successfully!"} |
| #1013 | |
| #1014 | def delete(self, memory_id): |
| #1015 | """ |
| #1016 | Delete a memory by ID. |
| #1017 | |
| #1018 | Args: |
| #1019 | memory_id (str): ID of the memory to delete. |
| #1020 | """ |
| #1021 | capture_event("mem0.delete", self, {"memory_id": memory_id, "sync_type": "sync"}) |
| #1022 | self._delete_memory(memory_id) |
| #1023 | return {"message": "Memory deleted successfully!"} |
| #1024 | |
| #1025 | def delete_all(self, user_id: Optional[str] = None, agent_id: Optional[str] = None, run_id: Optional[str] = None): |
| #1026 | """ |
| #1027 | Delete all memories. |
| #1028 | |
| #1029 | Args: |
| #1030 | user_id (str, optional): ID of the user to delete memories for. Defaults to None. |
| #1031 | agent_id (str, optional): ID of the agent to delete memories for. Defaults to None. |
| #1032 | run_id (str, optional): ID of the run to delete memories for. Defaults to None. |
| #1033 | """ |
| #1034 | filters: Dict[str, Any] = {} |
| #1035 | if user_id: |
| #1036 | filters["user_id"] = user_id |
| #1037 | if agent_id: |
| #1038 | filters["agent_id"] = agent_id |
| #1039 | if run_id: |
| #1040 | filters["run_id"] = run_id |
| #1041 | |
| #1042 | if not filters: |
| #1043 | raise ValueError( |
| #1044 | "At least one filter is required to delete all memories. If you want to delete all memories, use the `reset()` method." |
| #1045 | ) |
| #1046 | |
| #1047 | keys, encoded_ids = process_telemetry_filters(filters) |
| #1048 | capture_event("mem0.delete_all", self, {"keys": keys, "encoded_ids": encoded_ids, "sync_type": "sync"}) |
| #1049 | # delete all vector memories and reset the collections |
| #1050 | memories = self.vector_store.list(filters=filters)[0] |
| #1051 | for memory in memories: |
| #1052 | self._delete_memory(memory.id) |
| #1053 | self.vector_store.reset() |
| #1054 | |
| #1055 | logger.info(f"Deleted {len(memories)} memories") |
| #1056 | |
| #1057 | if self.enable_graph: |
| #1058 | self.graph.delete_all(filters) |
| #1059 | |
| #1060 | return {"message": "Memories deleted successfully!"} |
| #1061 | |
| #1062 | def history(self, memory_id): |
| #1063 | """ |
| #1064 | Get the history of changes for a memory by ID. |
| #1065 | |
| #1066 | Args: |
| #1067 | memory_id (str): ID of the memory to get history for. |
| #1068 | |
| #1069 | Returns: |
| #1070 | list: List of changes for the memory. |
| #1071 | """ |
| #1072 | capture_event("mem0.history", self, {"memory_id": memory_id, "sync_type": "sync"}) |
| #1073 | return self.db.get_history(memory_id) |
| #1074 | |
| #1075 | def _create_memory(self, data, existing_embeddings, metadata=None): |
| #1076 | logger.debug(f"Creating memory with {data=}") |
| #1077 | if data in existing_embeddings: |
| #1078 | embeddings = existing_embeddings[data] |
| #1079 | else: |
| #1080 | embeddings = self.embedding_model.embed(data, memory_action="add") |
| #1081 | memory_id = str(uuid.uuid4()) |
| #1082 | metadata = metadata or {} |
| #1083 | metadata["data"] = data |
| #1084 | metadata["hash"] = hashlib.md5(data.encode()).hexdigest() |
| #1085 | metadata["created_at"] = datetime.now(pytz.timezone("US/Pacific")).isoformat() |
| #1086 | |
| #1087 | self.vector_store.insert( |
| #1088 | vectors=[embeddings], |
| #1089 | ids=[memory_id], |
| #1090 | payloads=[metadata], |
| #1091 | ) |
| #1092 | self.db.add_history( |
| #1093 | memory_id, |
| #1094 | None, |
| #1095 | data, |
| #1096 | "ADD", |
| #1097 | created_at=metadata.get("created_at"), |
| #1098 | actor_id=metadata.get("actor_id"), |
| #1099 | role=metadata.get("role"), |
| #1100 | ) |
| #1101 | return memory_id |
| #1102 | |
| #1103 | def _create_procedural_memory(self, messages, metadata=None, prompt=None): |
| #1104 | """ |
| #1105 | Create a procedural memory |
| #1106 | |
| #1107 | Args: |
| #1108 | messages (list): List of messages to create a procedural memory from. |
| #1109 | metadata (dict): Metadata to create a procedural memory from. |
| #1110 | prompt (str, optional): Prompt to use for the procedural memory creation. Defaults to None. |
| #1111 | """ |
| #1112 | logger.info("Creating procedural memory") |
| #1113 | |
| #1114 | parsed_messages = [ |
| #1115 | {"role": "system", "content": prompt or PROCEDURAL_MEMORY_SYSTEM_PROMPT}, |
| #1116 | *messages, |
| #1117 | { |
| #1118 | "role": "user", |
| #1119 | "content": "Create procedural memory of the above conversation.", |
| #1120 | }, |
| #1121 | ] |
| #1122 | |
| #1123 | try: |
| #1124 | procedural_memory = self.llm.generate_response(messages=parsed_messages) |
| #1125 | procedural_memory = remove_code_blocks(procedural_memory) |
| #1126 | except Exception as e: |
| #1127 | logger.error(f"Error generating procedural memory summary: {e}") |
| #1128 | raise |
| #1129 | |
| #1130 | if metadata is None: |
| #1131 | raise ValueError("Metadata cannot be done for procedural memory.") |
| #1132 | |
| #1133 | metadata["memory_type"] = MemoryType.PROCEDURAL.value |
| #1134 | embeddings = self.embedding_model.embed(procedural_memory, memory_action="add") |
| #1135 | memory_id = self._create_memory(procedural_memory, {procedural_memory: embeddings}, metadata=metadata) |
| #1136 | capture_event("mem0._create_procedural_memory", self, {"memory_id": memory_id, "sync_type": "sync"}) |
| #1137 | |
| #1138 | result = {"results": [{"id": memory_id, "memory": procedural_memory, "event": "ADD"}]} |
| #1139 | |
| #1140 | return result |
| #1141 | |
| #1142 | def _update_memory(self, memory_id, data, existing_embeddings, metadata=None): |
| #1143 | logger.info(f"Updating memory with {data=}") |
| #1144 | |
| #1145 | try: |
| #1146 | existing_memory = self.vector_store.get(vector_id=memory_id) |
| #1147 | except Exception: |
| #1148 | logger.error(f"Error getting memory with ID {memory_id} during update.") |
| #1149 | raise ValueError(f"Error getting memory with ID {memory_id}. Please provide a valid 'memory_id'") |
| #1150 | |
| #1151 | prev_value = existing_memory.payload.get("data") |
| #1152 | |
| #1153 | new_metadata = deepcopy(metadata) if metadata is not None else {} |
| #1154 | |
| #1155 | new_metadata["data"] = data |
| #1156 | new_metadata["hash"] = hashlib.md5(data.encode()).hexdigest() |
| #1157 | new_metadata["created_at"] = existing_memory.payload.get("created_at") |
| #1158 | new_metadata["updated_at"] = datetime.now(pytz.timezone("US/Pacific")).isoformat() |
| #1159 | |
| #1160 | # Preserve session identifiers from existing memory only if not provided in new metadata |
| #1161 | if "user_id" not in new_metadata and "user_id" in existing_memory.payload: |
| #1162 | new_metadata["user_id"] = existing_memory.payload["user_id"] |
| #1163 | if "agent_id" not in new_metadata and "agent_id" in existing_memory.payload: |
| #1164 | new_metadata["agent_id"] = existing_memory.payload["agent_id"] |
| #1165 | if "run_id" not in new_metadata and "run_id" in existing_memory.payload: |
| #1166 | new_metadata["run_id"] = existing_memory.payload["run_id"] |
| #1167 | if "actor_id" not in new_metadata and "actor_id" in existing_memory.payload: |
| #1168 | new_metadata["actor_id"] = existing_memory.payload["actor_id"] |
| #1169 | if "role" not in new_metadata and "role" in existing_memory.payload: |
| #1170 | new_metadata["role"] = existing_memory.payload["role"] |
| #1171 | |
| #1172 | if data in existing_embeddings: |
| #1173 | embeddings = existing_embeddings[data] |
| #1174 | else: |
| #1175 | embeddings = self.embedding_model.embed(data, "update") |
| #1176 | |
| #1177 | self.vector_store.update( |
| #1178 | vector_id=memory_id, |
| #1179 | vector=embeddings, |
| #1180 | payload=new_metadata, |
| #1181 | ) |
| #1182 | logger.info(f"Updating memory with ID {memory_id=} with {data=}") |
| #1183 | |
| #1184 | self.db.add_history( |
| #1185 | memory_id, |
| #1186 | prev_value, |
| #1187 | data, |
| #1188 | "UPDATE", |
| #1189 | created_at=new_metadata["created_at"], |
| #1190 | updated_at=new_metadata["updated_at"], |
| #1191 | actor_id=new_metadata.get("actor_id"), |
| #1192 | role=new_metadata.get("role"), |
| #1193 | ) |
| #1194 | return memory_id |
| #1195 | |
| #1196 | def _delete_memory(self, memory_id): |
| #1197 | logger.info(f"Deleting memory with {memory_id=}") |
| #1198 | existing_memory = self.vector_store.get(vector_id=memory_id) |
| #1199 | prev_value = existing_memory.payload.get("data", "") |
| #1200 | self.vector_store.delete(vector_id=memory_id) |
| #1201 | self.db.add_history( |
| #1202 | memory_id, |
| #1203 | prev_value, |
| #1204 | None, |
| #1205 | "DELETE", |
| #1206 | actor_id=existing_memory.payload.get("actor_id"), |
| #1207 | role=existing_memory.payload.get("role"), |
| #1208 | is_deleted=1, |
| #1209 | ) |
| #1210 | return memory_id |
| #1211 | |
| #1212 | def reset(self): |
| #1213 | """ |
| #1214 | Reset the memory store by: |
| #1215 | Deletes the vector store collection |
| #1216 | Resets the database |
| #1217 | Recreates the vector store with a new client |
| #1218 | """ |
| #1219 | logger.warning("Resetting all memories") |
| #1220 | |
| #1221 | if hasattr(self.db, "connection") and self.db.connection: |
| #1222 | self.db.connection.execute("DROP TABLE IF EXISTS history") |
| #1223 | self.db.connection.close() |
| #1224 | |
| #1225 | self.db = SQLiteManager(self.config.history_db_path) |
| #1226 | |
| #1227 | if hasattr(self.vector_store, "reset"): |
| #1228 | self.vector_store = VectorStoreFactory.reset(self.vector_store) |
| #1229 | else: |
| #1230 | logger.warning("Vector store does not support reset. Skipping.") |
| #1231 | self.vector_store.delete_col() |
| #1232 | self.vector_store = VectorStoreFactory.create( |
| #1233 | self.config.vector_store.provider, self.config.vector_store.config |
| #1234 | ) |
| #1235 | capture_event("mem0.reset", self, {"sync_type": "sync"}) |
| #1236 | |
| #1237 | def chat(self, query): |
| #1238 | raise NotImplementedError("Chat function not implemented yet.") |
| #1239 | |
| #1240 | |
| #1241 | class AsyncMemory(MemoryBase): |
| #1242 | def __init__(self, config: MemoryConfig = MemoryConfig()): |
| #1243 | self.config = config |
| #1244 | |
| #1245 | self.embedding_model = EmbedderFactory.create( |
| #1246 | self.config.embedder.provider, |
| #1247 | self.config.embedder.config, |
| #1248 | self.config.vector_store.config, |
| #1249 | ) |
| #1250 | self.vector_store = VectorStoreFactory.create( |
| #1251 | self.config.vector_store.provider, self.config.vector_store.config |
| #1252 | ) |
| #1253 | self.llm = LlmFactory.create(self.config.llm.provider, self.config.llm.config) |
| #1254 | self.db = SQLiteManager(self.config.history_db_path) |
| #1255 | self.collection_name = self.config.vector_store.config.collection_name |
| #1256 | self.api_version = self.config.version |
| #1257 | |
| #1258 | # Initialize reranker if configured |
| #1259 | self.reranker = None |
| #1260 | if config.reranker: |
| #1261 | self.reranker = RerankerFactory.create( |
| #1262 | config.reranker.provider, |
| #1263 | config.reranker.config |
| #1264 | ) |
| #1265 | |
| #1266 | self.enable_graph = False |
| #1267 | |
| #1268 | if self.config.graph_store.config: |
| #1269 | provider = self.config.graph_store.provider |
| #1270 | self.graph = GraphStoreFactory.create(provider, self.config) |
| #1271 | self.enable_graph = True |
| #1272 | else: |
| #1273 | self.graph = None |
| #1274 | |
| #1275 | telemetry_config = _safe_deepcopy_config(self.config.vector_store.config) |
| #1276 | telemetry_config.collection_name = "mem0migrations" |
| #1277 | if self.config.vector_store.provider in ["faiss", "qdrant"]: |
| #1278 | provider_path = f"migrations_{self.config.vector_store.provider}" |
| #1279 | telemetry_config.path = os.path.join(mem0_dir, provider_path) |
| #1280 | os.makedirs(telemetry_config.path, exist_ok=True) |
| #1281 | self._telemetry_vector_store = VectorStoreFactory.create(self.config.vector_store.provider, telemetry_config) |
| #1282 | |
| #1283 | capture_event("mem0.init", self, {"sync_type": "async"}) |
| #1284 | |
| #1285 | @classmethod |
| #1286 | async def from_config(cls, config_dict: Dict[str, Any]): |
| #1287 | try: |
| #1288 | config = cls._process_config(config_dict) |
| #1289 | config = MemoryConfig(**config_dict) |
| #1290 | except ValidationError as e: |
| #1291 | logger.error(f"Configuration validation error: {e}") |
| #1292 | raise |
| #1293 | return cls(config) |
| #1294 | |
| #1295 | @staticmethod |
| #1296 | def _process_config(config_dict: Dict[str, Any]) -> Dict[str, Any]: |
| #1297 | if "graph_store" in config_dict: |
| #1298 | if "vector_store" not in config_dict and "embedder" in config_dict: |
| #1299 | config_dict["vector_store"] = {} |
| #1300 | config_dict["vector_store"]["config"] = {} |
| #1301 | config_dict["vector_store"]["config"]["embedding_model_dims"] = config_dict["embedder"]["config"][ |
| #1302 | "embedding_dims" |
| #1303 | ] |
| #1304 | try: |
| #1305 | return config_dict |
| #1306 | except ValidationError as e: |
| #1307 | logger.error(f"Configuration validation error: {e}") |
| #1308 | raise |
| #1309 | |
| #1310 | def _should_use_agent_memory_extraction(self, messages, metadata): |
| #1311 | """Determine whether to use agent memory extraction based on the logic: |
| #1312 | - If agent_id is present and messages contain assistant role -> True |
| #1313 | - Otherwise -> False |
| #1314 | |
| #1315 | Args: |
| #1316 | messages: List of message dictionaries |
| #1317 | metadata: Metadata containing user_id, agent_id, etc. |
| #1318 | |
| #1319 | Returns: |
| #1320 | bool: True if should use agent memory extraction, False for user memory extraction |
| #1321 | """ |
| #1322 | # Check if agent_id is present in metadata |
| #1323 | has_agent_id = metadata.get("agent_id") is not None |
| #1324 | |
| #1325 | # Check if there are assistant role messages |
| #1326 | has_assistant_messages = any(msg.get("role") == "assistant" for msg in messages) |
| #1327 | |
| #1328 | # Use agent memory extraction if agent_id is present and there are assistant messages |
| #1329 | return has_agent_id and has_assistant_messages |
| #1330 | |
| #1331 | async def add( |
| #1332 | self, |
| #1333 | messages, |
| #1334 | *, |
| #1335 | user_id: Optional[str] = None, |
| #1336 | agent_id: Optional[str] = None, |
| #1337 | run_id: Optional[str] = None, |
| #1338 | metadata: Optional[Dict[str, Any]] = None, |
| #1339 | infer: bool = True, |
| #1340 | memory_type: Optional[str] = None, |
| #1341 | prompt: Optional[str] = None, |
| #1342 | llm=None, |
| #1343 | ): |
| #1344 | """ |
| #1345 | Create a new memory asynchronously. |
| #1346 | |
| #1347 | Args: |
| #1348 | messages (str or List[Dict[str, str]]): Messages to store in the memory. |
| #1349 | user_id (str, optional): ID of the user creating the memory. |
| #1350 | agent_id (str, optional): ID of the agent creating the memory. Defaults to None. |
| #1351 | run_id (str, optional): ID of the run creating the memory. Defaults to None. |
| #1352 | metadata (dict, optional): Metadata to store with the memory. Defaults to None. |
| #1353 | infer (bool, optional): Whether to infer the memories. Defaults to True. |
| #1354 | memory_type (str, optional): Type of memory to create. Defaults to None. |
| #1355 | Pass "procedural_memory" to create procedural memories. |
| #1356 | prompt (str, optional): Prompt to use for the memory creation. Defaults to None. |
| #1357 | llm (BaseChatModel, optional): LLM class to use for generating procedural memories. Defaults to None. Useful when user is using LangChain ChatModel. |
| #1358 | Returns: |
| #1359 | dict: A dictionary containing the result of the memory addition operation. |
| #1360 | """ |
| #1361 | processed_metadata, effective_filters = _build_filters_and_metadata( |
| #1362 | user_id=user_id, agent_id=agent_id, run_id=run_id, input_metadata=metadata |
| #1363 | ) |
| #1364 | |
| #1365 | if memory_type is not None and memory_type != MemoryType.PROCEDURAL.value: |
| #1366 | raise ValueError( |
| #1367 | f"Invalid 'memory_type'. Please pass {MemoryType.PROCEDURAL.value} to create procedural memories." |
| #1368 | ) |
| #1369 | |
| #1370 | if isinstance(messages, str): |
| #1371 | messages = [{"role": "user", "content": messages}] |
| #1372 | |
| #1373 | elif isinstance(messages, dict): |
| #1374 | messages = [messages] |
| #1375 | |
| #1376 | elif not isinstance(messages, list): |
| #1377 | raise Mem0ValidationError( |
| #1378 | message="messages must be str, dict, or list[dict]", |
| #1379 | error_code="VALIDATION_003", |
| #1380 | details={"provided_type": type(messages).__name__, "valid_types": ["str", "dict", "list[dict]"]}, |
| #1381 | suggestion="Convert your input to a string, dictionary, or list of dictionaries." |
| #1382 | ) |
| #1383 | |
| #1384 | if agent_id is not None and memory_type == MemoryType.PROCEDURAL.value: |
| #1385 | results = await self._create_procedural_memory( |
| #1386 | messages, metadata=processed_metadata, prompt=prompt, llm=llm |
| #1387 | ) |
| #1388 | return results |
| #1389 | |
| #1390 | if self.config.llm.config.get("enable_vision"): |
| #1391 | messages = parse_vision_messages(messages, self.llm, self.config.llm.config.get("vision_details")) |
| #1392 | else: |
| #1393 | messages = parse_vision_messages(messages) |
| #1394 | |
| #1395 | vector_store_task = asyncio.create_task( |
| #1396 | self._add_to_vector_store(messages, processed_metadata, effective_filters, infer) |
| #1397 | ) |
| #1398 | graph_task = asyncio.create_task(self._add_to_graph(messages, effective_filters)) |
| #1399 | |
| #1400 | vector_store_result, graph_result = await asyncio.gather(vector_store_task, graph_task) |
| #1401 | |
| #1402 | if self.enable_graph: |
| #1403 | return { |
| #1404 | "results": vector_store_result, |
| #1405 | "relations": graph_result, |
| #1406 | } |
| #1407 | |
| #1408 | return {"results": vector_store_result} |
| #1409 | |
| #1410 | async def _add_to_vector_store( |
| #1411 | self, |
| #1412 | messages: list, |
| #1413 | metadata: dict, |
| #1414 | effective_filters: dict, |
| #1415 | infer: bool, |
| #1416 | ): |
| #1417 | if not infer: |
| #1418 | returned_memories = [] |
| #1419 | for message_dict in messages: |
| #1420 | if ( |
| #1421 | not isinstance(message_dict, dict) |
| #1422 | or message_dict.get("role") is None |
| #1423 | or message_dict.get("content") is None |
| #1424 | ): |
| #1425 | logger.warning(f"Skipping invalid message format (async): {message_dict}") |
| #1426 | continue |
| #1427 | |
| #1428 | if message_dict["role"] == "system": |
| #1429 | continue |
| #1430 | |
| #1431 | per_msg_meta = deepcopy(metadata) |
| #1432 | per_msg_meta["role"] = message_dict["role"] |
| #1433 | |
| #1434 | actor_name = message_dict.get("name") |
| #1435 | if actor_name: |
| #1436 | per_msg_meta["actor_id"] = actor_name |
| #1437 | |
| #1438 | msg_content = message_dict["content"] |
| #1439 | msg_embeddings = await asyncio.to_thread(self.embedding_model.embed, msg_content, "add") |
| #1440 | mem_id = await self._create_memory(msg_content, msg_embeddings, per_msg_meta) |
| #1441 | |
| #1442 | returned_memories.append( |
| #1443 | { |
| #1444 | "id": mem_id, |
| #1445 | "memory": msg_content, |
| #1446 | "event": "ADD", |
| #1447 | "actor_id": actor_name if actor_name else None, |
| #1448 | "role": message_dict["role"], |
| #1449 | } |
| #1450 | ) |
| #1451 | return returned_memories |
| #1452 | |
| #1453 | parsed_messages = parse_messages(messages) |
| #1454 | if self.config.custom_fact_extraction_prompt: |
| #1455 | system_prompt = self.config.custom_fact_extraction_prompt |
| #1456 | user_prompt = f"Input:\n{parsed_messages}" |
| #1457 | else: |
| #1458 | # Determine if this should use agent memory extraction based on agent_id presence |
| #1459 | # and role types in messages |
| #1460 | is_agent_memory = self._should_use_agent_memory_extraction(messages, metadata) |
| #1461 | system_prompt, user_prompt = get_fact_retrieval_messages(parsed_messages, is_agent_memory) |
| #1462 | |
| #1463 | response = await asyncio.to_thread( |
| #1464 | self.llm.generate_response, |
| #1465 | messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}], |
| #1466 | response_format={"type": "json_object"}, |
| #1467 | ) |
| #1468 | try: |
| #1469 | response = remove_code_blocks(response) |
| #1470 | if not response.strip(): |
| #1471 | new_retrieved_facts = [] |
| #1472 | else: |
| #1473 | try: |
| #1474 | # First try direct JSON parsing |
| #1475 | new_retrieved_facts = json.loads(response)["facts"] |
| #1476 | except json.JSONDecodeError: |
| #1477 | # Try extracting JSON from response using built-in function |
| #1478 | extracted_json = extract_json(response) |
| #1479 | new_retrieved_facts = json.loads(extracted_json)["facts"] |
| #1480 | except Exception as e: |
| #1481 | logger.error(f"Error in new_retrieved_facts: {e}") |
| #1482 | new_retrieved_facts = [] |
| #1483 | |
| #1484 | if not new_retrieved_facts: |
| #1485 | logger.debug("No new facts retrieved from input. Skipping memory update LLM call.") |
| #1486 | |
| #1487 | retrieved_old_memory = [] |
| #1488 | new_message_embeddings = {} |
| #1489 | # Search for existing memories using the provided session identifiers |
| #1490 | # Use all available session identifiers for accurate memory retrieval |
| #1491 | search_filters = {} |
| #1492 | if effective_filters.get("user_id"): |
| #1493 | search_filters["user_id"] = effective_filters["user_id"] |
| #1494 | if effective_filters.get("agent_id"): |
| #1495 | search_filters["agent_id"] = effective_filters["agent_id"] |
| #1496 | if effective_filters.get("run_id"): |
| #1497 | search_filters["run_id"] = effective_filters["run_id"] |
| #1498 | |
| #1499 | async def process_fact_for_search(new_mem_content): |
| #1500 | embeddings = await asyncio.to_thread(self.embedding_model.embed, new_mem_content, "add") |
| #1501 | new_message_embeddings[new_mem_content] = embeddings |
| #1502 | existing_mems = await asyncio.to_thread( |
| #1503 | self.vector_store.search, |
| #1504 | query=new_mem_content, |
| #1505 | vectors=embeddings, |
| #1506 | limit=5, |
| #1507 | filters=search_filters, |
| #1508 | ) |
| #1509 | return [{"id": mem.id, "text": mem.payload.get("data", "")} for mem in existing_mems] |
| #1510 | |
| #1511 | search_tasks = [process_fact_for_search(fact) for fact in new_retrieved_facts] |
| #1512 | search_results_list = await asyncio.gather(*search_tasks) |
| #1513 | for result_group in search_results_list: |
| #1514 | retrieved_old_memory.extend(result_group) |
| #1515 | |
| #1516 | unique_data = {} |
| #1517 | for item in retrieved_old_memory: |
| #1518 | unique_data[item["id"]] = item |
| #1519 | retrieved_old_memory = list(unique_data.values()) |
| #1520 | logger.info(f"Total existing memories: {len(retrieved_old_memory)}") |
| #1521 | temp_uuid_mapping = {} |
| #1522 | for idx, item in enumerate(retrieved_old_memory): |
| #1523 | temp_uuid_mapping[str(idx)] = item["id"] |
| #1524 | retrieved_old_memory[idx]["id"] = str(idx) |
| #1525 | |
| #1526 | if new_retrieved_facts: |
| #1527 | function_calling_prompt = get_update_memory_messages( |
| #1528 | retrieved_old_memory, new_retrieved_facts, self.config.custom_update_memory_prompt |
| #1529 | ) |
| #1530 | try: |
| #1531 | response = await asyncio.to_thread( |
| #1532 | self.llm.generate_response, |
| #1533 | messages=[{"role": "user", "content": function_calling_prompt}], |
| #1534 | response_format={"type": "json_object"}, |
| #1535 | ) |
| #1536 | except Exception as e: |
| #1537 | logger.error(f"Error in new memory actions response: {e}") |
| #1538 | response = "" |
| #1539 | try: |
| #1540 | if not response or not response.strip(): |
| #1541 | logger.warning("Empty response from LLM, no memories to extract") |
| #1542 | new_memories_with_actions = {} |
| #1543 | else: |
| #1544 | response = remove_code_blocks(response) |
| #1545 | new_memories_with_actions = json.loads(response) |
| #1546 | except Exception as e: |
| #1547 | logger.error(f"Invalid JSON response: {e}") |
| #1548 | new_memories_with_actions = {} |
| #1549 | else: |
| #1550 | new_memories_with_actions = {} |
| #1551 | |
| #1552 | returned_memories = [] |
| #1553 | try: |
| #1554 | memory_tasks = [] |
| #1555 | for resp in new_memories_with_actions.get("memory", []): |
| #1556 | logger.info(resp) |
| #1557 | try: |
| #1558 | action_text = resp.get("text") |
| #1559 | if not action_text: |
| #1560 | continue |
| #1561 | event_type = resp.get("event") |
| #1562 | |
| #1563 | if event_type == "ADD": |
| #1564 | task = asyncio.create_task( |
| #1565 | self._create_memory( |
| #1566 | data=action_text, |
| #1567 | existing_embeddings=new_message_embeddings, |
| #1568 | metadata=deepcopy(metadata), |
| #1569 | ) |
| #1570 | ) |
| #1571 | memory_tasks.append((task, resp, "ADD", None)) |
| #1572 | elif event_type == "UPDATE": |
| #1573 | task = asyncio.create_task( |
| #1574 | self._update_memory( |
| #1575 | memory_id=temp_uuid_mapping[resp["id"]], |
| #1576 | data=action_text, |
| #1577 | existing_embeddings=new_message_embeddings, |
| #1578 | metadata=deepcopy(metadata), |
| #1579 | ) |
| #1580 | ) |
| #1581 | memory_tasks.append((task, resp, "UPDATE", temp_uuid_mapping[resp["id"]])) |
| #1582 | elif event_type == "DELETE": |
| #1583 | task = asyncio.create_task(self._delete_memory(memory_id=temp_uuid_mapping[resp.get("id")])) |
| #1584 | memory_tasks.append((task, resp, "DELETE", temp_uuid_mapping[resp.get("id")])) |
| #1585 | elif event_type == "NONE": |
| #1586 | # Even if content doesn't need updating, update session IDs if provided |
| #1587 | memory_id = temp_uuid_mapping.get(resp.get("id")) |
| #1588 | if memory_id and (metadata.get("agent_id") or metadata.get("run_id")): |
| #1589 | # Create async task to update only the session identifiers |
| #1590 | async def update_session_ids(mem_id, meta): |
| #1591 | existing_memory = await asyncio.to_thread(self.vector_store.get, vector_id=mem_id) |
| #1592 | updated_metadata = deepcopy(existing_memory.payload) |
| #1593 | if meta.get("agent_id"): |
| #1594 | updated_metadata["agent_id"] = meta["agent_id"] |
| #1595 | if meta.get("run_id"): |
| #1596 | updated_metadata["run_id"] = meta["run_id"] |
| #1597 | updated_metadata["updated_at"] = datetime.now(pytz.timezone("US/Pacific")).isoformat() |
| #1598 | |
| #1599 | await asyncio.to_thread( |
| #1600 | self.vector_store.update, |
| #1601 | vector_id=mem_id, |
| #1602 | vector=None, # Keep same embeddings |
| #1603 | payload=updated_metadata, |
| #1604 | ) |
| #1605 | logger.info(f"Updated session IDs for memory {mem_id} (async)") |
| #1606 | |
| #1607 | task = asyncio.create_task(update_session_ids(memory_id, metadata)) |
| #1608 | memory_tasks.append((task, resp, "NONE", memory_id)) |
| #1609 | else: |
| #1610 | logger.info("NOOP for Memory (async).") |
| #1611 | except Exception as e: |
| #1612 | logger.error(f"Error processing memory action (async): {resp}, Error: {e}") |
| #1613 | |
| #1614 | for task, resp, event_type, mem_id in memory_tasks: |
| #1615 | try: |
| #1616 | result_id = await task |
| #1617 | if event_type == "ADD": |
| #1618 | returned_memories.append({"id": result_id, "memory": resp.get("text"), "event": event_type}) |
| #1619 | elif event_type == "UPDATE": |
| #1620 | returned_memories.append( |
| #1621 | { |
| #1622 | "id": mem_id, |
| #1623 | "memory": resp.get("text"), |
| #1624 | "event": event_type, |
| #1625 | "previous_memory": resp.get("old_memory"), |
| #1626 | } |
| #1627 | ) |
| #1628 | elif event_type == "DELETE": |
| #1629 | returned_memories.append({"id": mem_id, "memory": resp.get("text"), "event": event_type}) |
| #1630 | except Exception as e: |
| #1631 | logger.error(f"Error awaiting memory task (async): {e}") |
| #1632 | except Exception as e: |
| #1633 | logger.error(f"Error in memory processing loop (async): {e}") |
| #1634 | |
| #1635 | keys, encoded_ids = process_telemetry_filters(effective_filters) |
| #1636 | capture_event( |
| #1637 | "mem0.add", |
| #1638 | self, |
| #1639 | {"version": self.api_version, "keys": keys, "encoded_ids": encoded_ids, "sync_type": "async"}, |
| #1640 | ) |
| #1641 | return returned_memories |
| #1642 | |
| #1643 | async def _add_to_graph(self, messages, filters): |
| #1644 | added_entities = [] |
| #1645 | if self.enable_graph: |
| #1646 | if filters.get("user_id") is None: |
| #1647 | filters["user_id"] = "user" |
| #1648 | |
| #1649 | data = "\n".join([msg["content"] for msg in messages if "content" in msg and msg["role"] != "system"]) |
| #1650 | added_entities = await asyncio.to_thread(self.graph.add, data, filters) |
| #1651 | |
| #1652 | return added_entities |
| #1653 | |
| #1654 | async def get(self, memory_id): |
| #1655 | """ |
| #1656 | Retrieve a memory by ID asynchronously. |
| #1657 | |
| #1658 | Args: |
| #1659 | memory_id (str): ID of the memory to retrieve. |
| #1660 | |
| #1661 | Returns: |
| #1662 | dict: Retrieved memory. |
| #1663 | """ |
| #1664 | capture_event("mem0.get", self, {"memory_id": memory_id, "sync_type": "async"}) |
| #1665 | memory = await asyncio.to_thread(self.vector_store.get, vector_id=memory_id) |
| #1666 | if not memory: |
| #1667 | return None |
| #1668 | |
| #1669 | promoted_payload_keys = [ |
| #1670 | "user_id", |
| #1671 | "agent_id", |
| #1672 | "run_id", |
| #1673 | "actor_id", |
| #1674 | "role", |
| #1675 | ] |
| #1676 | |
| #1677 | core_and_promoted_keys = {"data", "hash", "created_at", "updated_at", "id", *promoted_payload_keys} |
| #1678 | |
| #1679 | result_item = MemoryItem( |
| #1680 | id=memory.id, |
| #1681 | memory=memory.payload.get("data", ""), |
| #1682 | hash=memory.payload.get("hash"), |
| #1683 | created_at=memory.payload.get("created_at"), |
| #1684 | updated_at=memory.payload.get("updated_at"), |
| #1685 | ).model_dump() |
| #1686 | |
| #1687 | for key in promoted_payload_keys: |
| #1688 | if key in memory.payload: |
| #1689 | result_item[key] = memory.payload[key] |
| #1690 | |
| #1691 | additional_metadata = {k: v for k, v in memory.payload.items() if k not in core_and_promoted_keys} |
| #1692 | if additional_metadata: |
| #1693 | result_item["metadata"] = additional_metadata |
| #1694 | |
| #1695 | return result_item |
| #1696 | |
| #1697 | async def get_all( |
| #1698 | self, |
| #1699 | *, |
| #1700 | user_id: Optional[str] = None, |
| #1701 | agent_id: Optional[str] = None, |
| #1702 | run_id: Optional[str] = None, |
| #1703 | filters: Optional[Dict[str, Any]] = None, |
| #1704 | limit: int = 100, |
| #1705 | ): |
| #1706 | """ |
| #1707 | List all memories. |
| #1708 | |
| #1709 | Args: |
| #1710 | user_id (str, optional): user id |
| #1711 | agent_id (str, optional): agent id |
| #1712 | run_id (str, optional): run id |
| #1713 | filters (dict, optional): Additional custom key-value filters to apply to the search. |
| #1714 | These are merged with the ID-based scoping filters. For example, |
| #1715 | `filters={"actor_id": "some_user"}`. |
| #1716 | limit (int, optional): The maximum number of memories to return. Defaults to 100. |
| #1717 | |
| #1718 | Returns: |
| #1719 | dict: A dictionary containing a list of memories under the "results" key, |
| #1720 | and potentially "relations" if graph store is enabled. For API v1.0, |
| #1721 | it might return a direct list (see deprecation warning). |
| #1722 | Example for v1.1+: `{"results": [{"id": "...", "memory": "...", ...}]}` |
| #1723 | """ |
| #1724 | |
| #1725 | _, effective_filters = _build_filters_and_metadata( |
| #1726 | user_id=user_id, agent_id=agent_id, run_id=run_id, input_filters=filters |
| #1727 | ) |
| #1728 | |
| #1729 | if not any(key in effective_filters for key in ("user_id", "agent_id", "run_id")): |
| #1730 | raise ValueError( |
| #1731 | "When 'conversation_id' is not provided (classic mode), " |
| #1732 | "at least one of 'user_id', 'agent_id', or 'run_id' must be specified for get_all." |
| #1733 | ) |
| #1734 | |
| #1735 | keys, encoded_ids = process_telemetry_filters(effective_filters) |
| #1736 | capture_event( |
| #1737 | "mem0.get_all", self, {"limit": limit, "keys": keys, "encoded_ids": encoded_ids, "sync_type": "async"} |
| #1738 | ) |
| #1739 | |
| #1740 | vector_store_task = asyncio.create_task(self._get_all_from_vector_store(effective_filters, limit)) |
| #1741 | |
| #1742 | graph_task = None |
| #1743 | if self.enable_graph: |
| #1744 | graph_get_all = getattr(self.graph, "get_all", None) |
| #1745 | if callable(graph_get_all): |
| #1746 | if asyncio.iscoroutinefunction(graph_get_all): |
| #1747 | graph_task = asyncio.create_task(graph_get_all(effective_filters, limit)) |
| #1748 | else: |
| #1749 | graph_task = asyncio.create_task(asyncio.to_thread(graph_get_all, effective_filters, limit)) |
| #1750 | |
| #1751 | results_dict = {} |
| #1752 | if graph_task: |
| #1753 | vector_store_result, graph_entities_result = await asyncio.gather(vector_store_task, graph_task) |
| #1754 | results_dict.update({"results": vector_store_result, "relations": graph_entities_result}) |
| #1755 | else: |
| #1756 | results_dict.update({"results": await vector_store_task}) |
| #1757 | |
| #1758 | return results_dict |
| #1759 | |
| #1760 | async def _get_all_from_vector_store(self, filters, limit): |
| #1761 | memories_result = await asyncio.to_thread(self.vector_store.list, filters=filters, limit=limit) |
| #1762 | |
| #1763 | # Handle different vector store return formats by inspecting first element |
| #1764 | if isinstance(memories_result, (tuple, list)) and len(memories_result) > 0: |
| #1765 | first_element = memories_result[0] |
| #1766 | |
| #1767 | # If first element is a container, unwrap one level |
| #1768 | if isinstance(first_element, (list, tuple)): |
| #1769 | actual_memories = first_element |
| #1770 | else: |
| #1771 | # First element is a memory object, structure is already flat |
| #1772 | actual_memories = memories_result |
| #1773 | else: |
| #1774 | actual_memories = memories_result |
| #1775 | |
| #1776 | promoted_payload_keys = [ |
| #1777 | "user_id", |
| #1778 | "agent_id", |
| #1779 | "run_id", |
| #1780 | "actor_id", |
| #1781 | "role", |
| #1782 | ] |
| #1783 | core_and_promoted_keys = {"data", "hash", "created_at", "updated_at", "id", *promoted_payload_keys} |
| #1784 | |
| #1785 | formatted_memories = [] |
| #1786 | for mem in actual_memories: |
| #1787 | memory_item_dict = MemoryItem( |
| #1788 | id=mem.id, |
| #1789 | memory=mem.payload.get("data", ""), |
| #1790 | hash=mem.payload.get("hash"), |
| #1791 | created_at=mem.payload.get("created_at"), |
| #1792 | updated_at=mem.payload.get("updated_at"), |
| #1793 | ).model_dump(exclude={"score"}) |
| #1794 | |
| #1795 | for key in promoted_payload_keys: |
| #1796 | if key in mem.payload: |
| #1797 | memory_item_dict[key] = mem.payload[key] |
| #1798 | |
| #1799 | additional_metadata = {k: v for k, v in mem.payload.items() if k not in core_and_promoted_keys} |
| #1800 | if additional_metadata: |
| #1801 | memory_item_dict["metadata"] = additional_metadata |
| #1802 | |
| #1803 | formatted_memories.append(memory_item_dict) |
| #1804 | |
| #1805 | return formatted_memories |
| #1806 | |
| #1807 | async def search( |
| #1808 | self, |
| #1809 | query: str, |
| #1810 | *, |
| #1811 | user_id: Optional[str] = None, |
| #1812 | agent_id: Optional[str] = None, |
| #1813 | run_id: Optional[str] = None, |
| #1814 | limit: int = 100, |
| #1815 | filters: Optional[Dict[str, Any]] = None, |
| #1816 | threshold: Optional[float] = None, |
| #1817 | metadata_filters: Optional[Dict[str, Any]] = None, |
| #1818 | rerank: bool = True, |
| #1819 | ): |
| #1820 | """ |
| #1821 | Searches for memories based on a query |
| #1822 | Args: |
| #1823 | query (str): Query to search for. |
| #1824 | user_id (str, optional): ID of the user to search for. Defaults to None. |
| #1825 | agent_id (str, optional): ID of the agent to search for. Defaults to None. |
| #1826 | run_id (str, optional): ID of the run to search for. Defaults to None. |
| #1827 | limit (int, optional): Limit the number of results. Defaults to 100. |
| #1828 | filters (dict, optional): Legacy filters to apply to the search. Defaults to None. |
| #1829 | threshold (float, optional): Minimum score for a memory to be included in the results. Defaults to None. |
| #1830 | filters (dict, optional): Enhanced metadata filtering with operators: |
| #1831 | - {"key": "value"} - exact match |
| #1832 | - {"key": {"eq": "value"}} - equals |
| #1833 | - {"key": {"ne": "value"}} - not equals |
| #1834 | - {"key": {"in": ["val1", "val2"]}} - in list |
| #1835 | - {"key": {"nin": ["val1", "val2"]}} - not in list |
| #1836 | - {"key": {"gt": 10}} - greater than |
| #1837 | - {"key": {"gte": 10}} - greater than or equal |
| #1838 | - {"key": {"lt": 10}} - less than |
| #1839 | - {"key": {"lte": 10}} - less than or equal |
| #1840 | - {"key": {"contains": "text"}} - contains text |
| #1841 | - {"key": {"icontains": "text"}} - case-insensitive contains |
| #1842 | - {"key": "*"} - wildcard match (any value) |
| #1843 | - {"AND": [filter1, filter2]} - logical AND |
| #1844 | - {"OR": [filter1, filter2]} - logical OR |
| #1845 | - {"NOT": [filter1]} - logical NOT |
| #1846 | |
| #1847 | Returns: |
| #1848 | dict: A dictionary containing the search results, typically under a "results" key, |
| #1849 | and potentially "relations" if graph store is enabled. |
| #1850 | Example for v1.1+: `{"results": [{"id": "...", "memory": "...", "score": 0.8, ...}]}` |
| #1851 | """ |
| #1852 | |
| #1853 | _, effective_filters = _build_filters_and_metadata( |
| #1854 | user_id=user_id, agent_id=agent_id, run_id=run_id, input_filters=filters |
| #1855 | ) |
| #1856 | |
| #1857 | if not any(key in effective_filters for key in ("user_id", "agent_id", "run_id")): |
| #1858 | raise ValueError("at least one of 'user_id', 'agent_id', or 'run_id' must be specified ") |
| #1859 | |
| #1860 | # Apply enhanced metadata filtering if advanced operators are detected |
| #1861 | if filters and self._has_advanced_operators(filters): |
| #1862 | processed_filters = self._process_metadata_filters(filters) |
| #1863 | effective_filters.update(processed_filters) |
| #1864 | elif filters: |
| #1865 | # Simple filters, merge directly |
| #1866 | effective_filters.update(filters) |
| #1867 | |
| #1868 | keys, encoded_ids = process_telemetry_filters(effective_filters) |
| #1869 | capture_event( |
| #1870 | "mem0.search", |
| #1871 | self, |
| #1872 | { |
| #1873 | "limit": limit, |
| #1874 | "version": self.api_version, |
| #1875 | "keys": keys, |
| #1876 | "encoded_ids": encoded_ids, |
| #1877 | "sync_type": "async", |
| #1878 | "threshold": threshold, |
| #1879 | "advanced_filters": bool(filters and self._has_advanced_operators(filters)), |
| #1880 | }, |
| #1881 | ) |
| #1882 | |
| #1883 | vector_store_task = asyncio.create_task(self._search_vector_store(query, effective_filters, limit, threshold)) |
| #1884 | |
| #1885 | graph_task = None |
| #1886 | if self.enable_graph: |
| #1887 | if hasattr(self.graph.search, "__await__"): # Check if graph search is async |
| #1888 | graph_task = asyncio.create_task(self.graph.search(query, effective_filters, limit)) |
| #1889 | else: |
| #1890 | graph_task = asyncio.create_task(asyncio.to_thread(self.graph.search, query, effective_filters, limit)) |
| #1891 | |
| #1892 | if graph_task: |
| #1893 | original_memories, graph_entities = await asyncio.gather(vector_store_task, graph_task) |
| #1894 | else: |
| #1895 | original_memories = await vector_store_task |
| #1896 | graph_entities = None |
| #1897 | |
| #1898 | # Apply reranking if enabled and reranker is available |
| #1899 | if rerank and self.reranker and original_memories: |
| #1900 | try: |
| #1901 | # Run reranking in thread pool to avoid blocking async loop |
| #1902 | reranked_memories = await asyncio.to_thread( |
| #1903 | self.reranker.rerank, query, original_memories, limit |
| #1904 | ) |
| #1905 | original_memories = reranked_memories |
| #1906 | except Exception as e: |
| #1907 | logger.warning(f"Reranking failed, using original results: {e}") |
| #1908 | |
| #1909 | if self.enable_graph: |
| #1910 | return {"results": original_memories, "relations": graph_entities} |
| #1911 | |
| #1912 | return {"results": original_memories} |
| #1913 | |
| #1914 | def _process_metadata_filters(self, metadata_filters: Dict[str, Any]) -> Dict[str, Any]: |
| #1915 | """ |
| #1916 | Process enhanced metadata filters and convert them to vector store compatible format. |
| #1917 | |
| #1918 | Args: |
| #1919 | metadata_filters: Enhanced metadata filters with operators |
| #1920 | |
| #1921 | Returns: |
| #1922 | Dict of processed filters compatible with vector store |
| #1923 | """ |
| #1924 | processed_filters = {} |
| #1925 | |
| #1926 | def process_condition(key: str, condition: Any) -> Dict[str, Any]: |
| #1927 | if not isinstance(condition, dict): |
| #1928 | # Simple equality: {"key": "value"} |
| #1929 | if condition == "*": |
| #1930 | # Wildcard: match everything for this field (implementation depends on vector store) |
| #1931 | return {key: "*"} |
| #1932 | return {key: condition} |
| #1933 | |
| #1934 | result = {} |
| #1935 | for operator, value in condition.items(): |
| #1936 | # Map platform operators to universal format that can be translated by each vector store |
| #1937 | operator_map = { |
| #1938 | "eq": "eq", "ne": "ne", "gt": "gt", "gte": "gte", |
| #1939 | "lt": "lt", "lte": "lte", "in": "in", "nin": "nin", |
| #1940 | "contains": "contains", "icontains": "icontains" |
| #1941 | } |
| #1942 | |
| #1943 | if operator in operator_map: |
| #1944 | result[key] = {operator_map[operator]: value} |
| #1945 | else: |
| #1946 | raise ValueError(f"Unsupported metadata filter operator: {operator}") |
| #1947 | return result |
| #1948 | |
| #1949 | for key, value in metadata_filters.items(): |
| #1950 | if key == "AND": |
| #1951 | # Logical AND: combine multiple conditions |
| #1952 | if not isinstance(value, list): |
| #1953 | raise ValueError("AND operator requires a list of conditions") |
| #1954 | for condition in value: |
| #1955 | for sub_key, sub_value in condition.items(): |
| #1956 | processed_filters.update(process_condition(sub_key, sub_value)) |
| #1957 | elif key == "OR": |
| #1958 | # Logical OR: Pass through to vector store for implementation-specific handling |
| #1959 | if not isinstance(value, list) or not value: |
| #1960 | raise ValueError("OR operator requires a non-empty list of conditions") |
| #1961 | # Store OR conditions in a way that vector stores can interpret |
| #1962 | processed_filters["$or"] = [] |
| #1963 | for condition in value: |
| #1964 | or_condition = {} |
| #1965 | for sub_key, sub_value in condition.items(): |
| #1966 | or_condition.update(process_condition(sub_key, sub_value)) |
| #1967 | processed_filters["$or"].append(or_condition) |
| #1968 | elif key == "NOT": |
| #1969 | # Logical NOT: Pass through to vector store for implementation-specific handling |
| #1970 | if not isinstance(value, list) or not value: |
| #1971 | raise ValueError("NOT operator requires a non-empty list of conditions") |
| #1972 | processed_filters["$not"] = [] |
| #1973 | for condition in value: |
| #1974 | not_condition = {} |
| #1975 | for sub_key, sub_value in condition.items(): |
| #1976 | not_condition.update(process_condition(sub_key, sub_value)) |
| #1977 | processed_filters["$not"].append(not_condition) |
| #1978 | else: |
| #1979 | processed_filters.update(process_condition(key, value)) |
| #1980 | |
| #1981 | return processed_filters |
| #1982 | |
| #1983 | def _has_advanced_operators(self, filters: Dict[str, Any]) -> bool: |
| #1984 | """ |
| #1985 | Check if filters contain advanced operators that need special processing. |
| #1986 | |
| #1987 | Args: |
| #1988 | filters: Dictionary of filters to check |
| #1989 | |
| #1990 | Returns: |
| #1991 | bool: True if advanced operators are detected |
| #1992 | """ |
| #1993 | if not isinstance(filters, dict): |
| #1994 | return False |
| #1995 | |
| #1996 | for key, value in filters.items(): |
| #1997 | # Check for platform-style logical operators |
| #1998 | if key in ["AND", "OR", "NOT"]: |
| #1999 | return True |
| #2000 | # Check for comparison operators (without $ prefix for universal compatibility) |
| #2001 | if isinstance(value, dict): |
| #2002 | for op in value.keys(): |
| #2003 | if op in ["eq", "ne", "gt", "gte", "lt", "lte", "in", "nin", "contains", "icontains"]: |
| #2004 | return True |
| #2005 | # Check for wildcard values |
| #2006 | if value == "*": |
| #2007 | return True |
| #2008 | return False |
| #2009 | |
| #2010 | async def _search_vector_store(self, query, filters, limit, threshold: Optional[float] = None): |
| #2011 | embeddings = await asyncio.to_thread(self.embedding_model.embed, query, "search") |
| #2012 | memories = await asyncio.to_thread( |
| #2013 | self.vector_store.search, query=query, vectors=embeddings, limit=limit, filters=filters |
| #2014 | ) |
| #2015 | |
| #2016 | promoted_payload_keys = [ |
| #2017 | "user_id", |
| #2018 | "agent_id", |
| #2019 | "run_id", |
| #2020 | "actor_id", |
| #2021 | "role", |
| #2022 | ] |
| #2023 | |
| #2024 | core_and_promoted_keys = {"data", "hash", "created_at", "updated_at", "id", *promoted_payload_keys} |
| #2025 | |
| #2026 | original_memories = [] |
| #2027 | for mem in memories: |
| #2028 | memory_item_dict = MemoryItem( |
| #2029 | id=mem.id, |
| #2030 | memory=mem.payload.get("data", ""), |
| #2031 | hash=mem.payload.get("hash"), |
| #2032 | created_at=mem.payload.get("created_at"), |
| #2033 | updated_at=mem.payload.get("updated_at"), |
| #2034 | score=mem.score, |
| #2035 | ).model_dump() |
| #2036 | |
| #2037 | for key in promoted_payload_keys: |
| #2038 | if key in mem.payload: |
| #2039 | memory_item_dict[key] = mem.payload[key] |
| #2040 | |
| #2041 | additional_metadata = {k: v for k, v in mem.payload.items() if k not in core_and_promoted_keys} |
| #2042 | if additional_metadata: |
| #2043 | memory_item_dict["metadata"] = additional_metadata |
| #2044 | |
| #2045 | if threshold is None or mem.score >= threshold: |
| #2046 | original_memories.append(memory_item_dict) |
| #2047 | |
| #2048 | return original_memories |
| #2049 | |
| #2050 | async def update(self, memory_id, data): |
| #2051 | """ |
| #2052 | Update a memory by ID asynchronously. |
| #2053 | |
| #2054 | Args: |
| #2055 | memory_id (str): ID of the memory to update. |
| #2056 | data (str): New content to update the memory with. |
| #2057 | |
| #2058 | Returns: |
| #2059 | dict: Success message indicating the memory was updated. |
| #2060 | |
| #2061 | Example: |
| #2062 | >>> await m.update(memory_id="mem_123", data="Likes to play tennis on weekends") |
| #2063 | {'message': 'Memory updated successfully!'} |
| #2064 | """ |
| #2065 | capture_event("mem0.update", self, {"memory_id": memory_id, "sync_type": "async"}) |
| #2066 | |
| #2067 | embeddings = await asyncio.to_thread(self.embedding_model.embed, data, "update") |
| #2068 | existing_embeddings = {data: embeddings} |
| #2069 | |
| #2070 | await self._update_memory(memory_id, data, existing_embeddings) |
| #2071 | return {"message": "Memory updated successfully!"} |
| #2072 | |
| #2073 | async def delete(self, memory_id): |
| #2074 | """ |
| #2075 | Delete a memory by ID asynchronously. |
| #2076 | |
| #2077 | Args: |
| #2078 | memory_id (str): ID of the memory to delete. |
| #2079 | """ |
| #2080 | capture_event("mem0.delete", self, {"memory_id": memory_id, "sync_type": "async"}) |
| #2081 | await self._delete_memory(memory_id) |
| #2082 | return {"message": "Memory deleted successfully!"} |
| #2083 | |
| #2084 | async def delete_all(self, user_id=None, agent_id=None, run_id=None): |
| #2085 | """ |
| #2086 | Delete all memories asynchronously. |
| #2087 | |
| #2088 | Args: |
| #2089 | user_id (str, optional): ID of the user to delete memories for. Defaults to None. |
| #2090 | agent_id (str, optional): ID of the agent to delete memories for. Defaults to None. |
| #2091 | run_id (str, optional): ID of the run to delete memories for. Defaults to None. |
| #2092 | """ |
| #2093 | filters = {} |
| #2094 | if user_id: |
| #2095 | filters["user_id"] = user_id |
| #2096 | if agent_id: |
| #2097 | filters["agent_id"] = agent_id |
| #2098 | if run_id: |
| #2099 | filters["run_id"] = run_id |
| #2100 | |
| #2101 | if not filters: |
| #2102 | raise ValueError( |
| #2103 | "At least one filter is required to delete all memories. If you want to delete all memories, use the `reset()` method." |
| #2104 | ) |
| #2105 | |
| #2106 | keys, encoded_ids = process_telemetry_filters(filters) |
| #2107 | capture_event("mem0.delete_all", self, {"keys": keys, "encoded_ids": encoded_ids, "sync_type": "async"}) |
| #2108 | memories = await asyncio.to_thread(self.vector_store.list, filters=filters) |
| #2109 | |
| #2110 | delete_tasks = [] |
| #2111 | for memory in memories[0]: |
| #2112 | delete_tasks.append(self._delete_memory(memory.id)) |
| #2113 | |
| #2114 | await asyncio.gather(*delete_tasks) |
| #2115 | |
| #2116 | logger.info(f"Deleted {len(memories[0])} memories") |
| #2117 | |
| #2118 | if self.enable_graph: |
| #2119 | await asyncio.to_thread(self.graph.delete_all, filters) |
| #2120 | |
| #2121 | return {"message": "Memories deleted successfully!"} |
| #2122 | |
| #2123 | async def history(self, memory_id): |
| #2124 | """ |
| #2125 | Get the history of changes for a memory by ID asynchronously. |
| #2126 | |
| #2127 | Args: |
| #2128 | memory_id (str): ID of the memory to get history for. |
| #2129 | |
| #2130 | Returns: |
| #2131 | list: List of changes for the memory. |
| #2132 | """ |
| #2133 | capture_event("mem0.history", self, {"memory_id": memory_id, "sync_type": "async"}) |
| #2134 | return await asyncio.to_thread(self.db.get_history, memory_id) |
| #2135 | |
| #2136 | async def _create_memory(self, data, existing_embeddings, metadata=None): |
| #2137 | logger.debug(f"Creating memory with {data=}") |
| #2138 | if data in existing_embeddings: |
| #2139 | embeddings = existing_embeddings[data] |
| #2140 | else: |
| #2141 | embeddings = await asyncio.to_thread(self.embedding_model.embed, data, memory_action="add") |
| #2142 | |
| #2143 | memory_id = str(uuid.uuid4()) |
| #2144 | metadata = metadata or {} |
| #2145 | metadata["data"] = data |
| #2146 | metadata["hash"] = hashlib.md5(data.encode()).hexdigest() |
| #2147 | metadata["created_at"] = datetime.now(pytz.timezone("US/Pacific")).isoformat() |
| #2148 | |
| #2149 | await asyncio.to_thread( |
| #2150 | self.vector_store.insert, |
| #2151 | vectors=[embeddings], |
| #2152 | ids=[memory_id], |
| #2153 | payloads=[metadata], |
| #2154 | ) |
| #2155 | |
| #2156 | await asyncio.to_thread( |
| #2157 | self.db.add_history, |
| #2158 | memory_id, |
| #2159 | None, |
| #2160 | data, |
| #2161 | "ADD", |
| #2162 | created_at=metadata.get("created_at"), |
| #2163 | actor_id=metadata.get("actor_id"), |
| #2164 | role=metadata.get("role"), |
| #2165 | ) |
| #2166 | |
| #2167 | return memory_id |
| #2168 | |
| #2169 | async def _create_procedural_memory(self, messages, metadata=None, llm=None, prompt=None): |
| #2170 | """ |
| #2171 | Create a procedural memory asynchronously |
| #2172 | |
| #2173 | Args: |
| #2174 | messages (list): List of messages to create a procedural memory from. |
| #2175 | metadata (dict): Metadata to create a procedural memory from. |
| #2176 | llm (llm, optional): LLM to use for the procedural memory creation. Defaults to None. |
| #2177 | prompt (str, optional): Prompt to use for the procedural memory creation. Defaults to None. |
| #2178 | """ |
| #2179 | try: |
| #2180 | from langchain_core.messages.utils import ( |
| #2181 | convert_to_messages, # type: ignore |
| #2182 | ) |
| #2183 | except Exception: |
| #2184 | logger.error( |
| #2185 | "Import error while loading langchain-core. Please install 'langchain-core' to use procedural memory." |
| #2186 | ) |
| #2187 | raise |
| #2188 | |
| #2189 | logger.info("Creating procedural memory") |
| #2190 | |
| #2191 | parsed_messages = [ |
| #2192 | {"role": "system", "content": prompt or PROCEDURAL_MEMORY_SYSTEM_PROMPT}, |
| #2193 | *messages, |
| #2194 | {"role": "user", "content": "Create procedural memory of the above conversation."}, |
| #2195 | ] |
| #2196 | |
| #2197 | try: |
| #2198 | if llm is not None: |
| #2199 | parsed_messages = convert_to_messages(parsed_messages) |
| #2200 | response = await asyncio.to_thread(llm.invoke, input=parsed_messages) |
| #2201 | procedural_memory = response.content |
| #2202 | else: |
| #2203 | procedural_memory = await asyncio.to_thread(self.llm.generate_response, messages=parsed_messages) |
| #2204 | procedural_memory = remove_code_blocks(procedural_memory) |
| #2205 | |
| #2206 | except Exception as e: |
| #2207 | logger.error(f"Error generating procedural memory summary: {e}") |
| #2208 | raise |
| #2209 | |
| #2210 | if metadata is None: |
| #2211 | raise ValueError("Metadata cannot be done for procedural memory.") |
| #2212 | |
| #2213 | metadata["memory_type"] = MemoryType.PROCEDURAL.value |
| #2214 | embeddings = await asyncio.to_thread(self.embedding_model.embed, procedural_memory, memory_action="add") |
| #2215 | memory_id = await self._create_memory(procedural_memory, {procedural_memory: embeddings}, metadata=metadata) |
| #2216 | capture_event("mem0._create_procedural_memory", self, {"memory_id": memory_id, "sync_type": "async"}) |
| #2217 | |
| #2218 | result = {"results": [{"id": memory_id, "memory": procedural_memory, "event": "ADD"}]} |
| #2219 | |
| #2220 | return result |
| #2221 | |
| #2222 | async def _update_memory(self, memory_id, data, existing_embeddings, metadata=None): |
| #2223 | logger.info(f"Updating memory with {data=}") |
| #2224 | |
| #2225 | try: |
| #2226 | existing_memory = await asyncio.to_thread(self.vector_store.get, vector_id=memory_id) |
| #2227 | except Exception: |
| #2228 | logger.error(f"Error getting memory with ID {memory_id} during update.") |
| #2229 | raise ValueError(f"Error getting memory with ID {memory_id}. Please provide a valid 'memory_id'") |
| #2230 | |
| #2231 | prev_value = existing_memory.payload.get("data") |
| #2232 | |
| #2233 | new_metadata = deepcopy(metadata) if metadata is not None else {} |
| #2234 | |
| #2235 | new_metadata["data"] = data |
| #2236 | new_metadata["hash"] = hashlib.md5(data.encode()).hexdigest() |
| #2237 | new_metadata["created_at"] = existing_memory.payload.get("created_at") |
| #2238 | new_metadata["updated_at"] = datetime.now(pytz.timezone("US/Pacific")).isoformat() |
| #2239 | |
| #2240 | # Preserve session identifiers from existing memory only if not provided in new metadata |
| #2241 | if "user_id" not in new_metadata and "user_id" in existing_memory.payload: |
| #2242 | new_metadata["user_id"] = existing_memory.payload["user_id"] |
| #2243 | if "agent_id" not in new_metadata and "agent_id" in existing_memory.payload: |
| #2244 | new_metadata["agent_id"] = existing_memory.payload["agent_id"] |
| #2245 | if "run_id" not in new_metadata and "run_id" in existing_memory.payload: |
| #2246 | new_metadata["run_id"] = existing_memory.payload["run_id"] |
| #2247 | |
| #2248 | if "actor_id" not in new_metadata and "actor_id" in existing_memory.payload: |
| #2249 | new_metadata["actor_id"] = existing_memory.payload["actor_id"] |
| #2250 | if "role" not in new_metadata and "role" in existing_memory.payload: |
| #2251 | new_metadata["role"] = existing_memory.payload["role"] |
| #2252 | |
| #2253 | if data in existing_embeddings: |
| #2254 | embeddings = existing_embeddings[data] |
| #2255 | else: |
| #2256 | embeddings = await asyncio.to_thread(self.embedding_model.embed, data, "update") |
| #2257 | |
| #2258 | await asyncio.to_thread( |
| #2259 | self.vector_store.update, |
| #2260 | vector_id=memory_id, |
| #2261 | vector=embeddings, |
| #2262 | payload=new_metadata, |
| #2263 | ) |
| #2264 | logger.info(f"Updating memory with ID {memory_id=} with {data=}") |
| #2265 | |
| #2266 | await asyncio.to_thread( |
| #2267 | self.db.add_history, |
| #2268 | memory_id, |
| #2269 | prev_value, |
| #2270 | data, |
| #2271 | "UPDATE", |
| #2272 | created_at=new_metadata["created_at"], |
| #2273 | updated_at=new_metadata["updated_at"], |
| #2274 | actor_id=new_metadata.get("actor_id"), |
| #2275 | role=new_metadata.get("role"), |
| #2276 | ) |
| #2277 | return memory_id |
| #2278 | |
| #2279 | async def _delete_memory(self, memory_id): |
| #2280 | logger.info(f"Deleting memory with {memory_id=}") |
| #2281 | existing_memory = await asyncio.to_thread(self.vector_store.get, vector_id=memory_id) |
| #2282 | prev_value = existing_memory.payload.get("data", "") |
| #2283 | |
| #2284 | await asyncio.to_thread(self.vector_store.delete, vector_id=memory_id) |
| #2285 | await asyncio.to_thread( |
| #2286 | self.db.add_history, |
| #2287 | memory_id, |
| #2288 | prev_value, |
| #2289 | None, |
| #2290 | "DELETE", |
| #2291 | actor_id=existing_memory.payload.get("actor_id"), |
| #2292 | role=existing_memory.payload.get("role"), |
| #2293 | is_deleted=1, |
| #2294 | ) |
| #2295 | |
| #2296 | return memory_id |
| #2297 | |
| #2298 | async def reset(self): |
| #2299 | """ |
| #2300 | Reset the memory store asynchronously by: |
| #2301 | Deletes the vector store collection |
| #2302 | Resets the database |
| #2303 | Recreates the vector store with a new client |
| #2304 | """ |
| #2305 | logger.warning("Resetting all memories") |
| #2306 | await asyncio.to_thread(self.vector_store.delete_col) |
| #2307 | |
| #2308 | gc.collect() |
| #2309 | |
| #2310 | if hasattr(self.vector_store, "client") and hasattr(self.vector_store.client, "close"): |
| #2311 | await asyncio.to_thread(self.vector_store.client.close) |
| #2312 | |
| #2313 | if hasattr(self.db, "connection") and self.db.connection: |
| #2314 | await asyncio.to_thread(lambda: self.db.connection.execute("DROP TABLE IF EXISTS history")) |
| #2315 | await asyncio.to_thread(self.db.connection.close) |
| #2316 | |
| #2317 | self.db = SQLiteManager(self.config.history_db_path) |
| #2318 | |
| #2319 | self.vector_store = VectorStoreFactory.create( |
| #2320 | self.config.vector_store.provider, self.config.vector_store.config |
| #2321 | ) |
| #2322 | capture_event("mem0.reset", self, {"sync_type": "async"}) |
| #2323 | |
| #2324 | async def chat(self, query): |
| #2325 | raise NotImplementedError("Chat function not implemented yet.") |
| #2326 |