repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | import json |
| #2 | import logging |
| #3 | from datetime import datetime |
| #4 | from typing import Dict |
| #5 | |
| #6 | import numpy as np |
| #7 | import pytz |
| #8 | import valkey |
| #9 | from pydantic import BaseModel |
| #10 | from valkey.exceptions import ResponseError |
| #11 | |
| #12 | from mem0.memory.utils import extract_json |
| #13 | from mem0.vector_stores.base import VectorStoreBase |
| #14 | |
| #15 | logger = logging.getLogger(__name__) |
| #16 | |
| #17 | # Default fields for the Valkey index |
| #18 | DEFAULT_FIELDS = [ |
| #19 | {"name": "memory_id", "type": "tag"}, |
| #20 | {"name": "hash", "type": "tag"}, |
| #21 | {"name": "agent_id", "type": "tag"}, |
| #22 | {"name": "run_id", "type": "tag"}, |
| #23 | {"name": "user_id", "type": "tag"}, |
| #24 | {"name": "memory", "type": "tag"}, # Using TAG instead of TEXT for Valkey compatibility |
| #25 | {"name": "metadata", "type": "tag"}, # Using TAG instead of TEXT for Valkey compatibility |
| #26 | {"name": "created_at", "type": "numeric"}, |
| #27 | {"name": "updated_at", "type": "numeric"}, |
| #28 | { |
| #29 | "name": "embedding", |
| #30 | "type": "vector", |
| #31 | "attrs": {"distance_metric": "cosine", "algorithm": "flat", "datatype": "float32"}, |
| #32 | }, |
| #33 | ] |
| #34 | |
| #35 | excluded_keys = {"user_id", "agent_id", "run_id", "hash", "data", "created_at", "updated_at"} |
| #36 | |
| #37 | |
| #38 | class OutputData(BaseModel): |
| #39 | id: str |
| #40 | score: float |
| #41 | payload: Dict |
| #42 | |
| #43 | |
| #44 | class ValkeyDB(VectorStoreBase): |
| #45 | def __init__( |
| #46 | self, |
| #47 | valkey_url: str, |
| #48 | collection_name: str, |
| #49 | embedding_model_dims: int, |
| #50 | timezone: str = "UTC", |
| #51 | index_type: str = "hnsw", |
| #52 | hnsw_m: int = 16, |
| #53 | hnsw_ef_construction: int = 200, |
| #54 | hnsw_ef_runtime: int = 10, |
| #55 | ): |
| #56 | """ |
| #57 | Initialize the Valkey vector store. |
| #58 | |
| #59 | Args: |
| #60 | valkey_url (str): Valkey URL. |
| #61 | collection_name (str): Collection name. |
| #62 | embedding_model_dims (int): Embedding model dimensions. |
| #63 | timezone (str, optional): Timezone for timestamps. Defaults to "UTC". |
| #64 | index_type (str, optional): Index type ('hnsw' or 'flat'). Defaults to "hnsw". |
| #65 | hnsw_m (int, optional): HNSW M parameter (connections per node). Defaults to 16. |
| #66 | hnsw_ef_construction (int, optional): HNSW ef_construction parameter. Defaults to 200. |
| #67 | hnsw_ef_runtime (int, optional): HNSW ef_runtime parameter. Defaults to 10. |
| #68 | """ |
| #69 | self.embedding_model_dims = embedding_model_dims |
| #70 | self.collection_name = collection_name |
| #71 | self.prefix = f"mem0:{collection_name}" |
| #72 | self.timezone = timezone |
| #73 | self.index_type = index_type.lower() |
| #74 | self.hnsw_m = hnsw_m |
| #75 | self.hnsw_ef_construction = hnsw_ef_construction |
| #76 | self.hnsw_ef_runtime = hnsw_ef_runtime |
| #77 | |
| #78 | # Validate index type |
| #79 | if self.index_type not in ["hnsw", "flat"]: |
| #80 | raise ValueError(f"Invalid index_type: {index_type}. Must be 'hnsw' or 'flat'") |
| #81 | |
| #82 | # Connect to Valkey |
| #83 | try: |
| #84 | self.client = valkey.from_url(valkey_url) |
| #85 | logger.debug(f"Successfully connected to Valkey at {valkey_url}") |
| #86 | except Exception as e: |
| #87 | logger.exception(f"Failed to connect to Valkey at {valkey_url}: {e}") |
| #88 | raise |
| #89 | |
| #90 | # Create the index schema |
| #91 | self._create_index(embedding_model_dims) |
| #92 | |
| #93 | def _build_index_schema(self, collection_name, embedding_dims, distance_metric, prefix): |
| #94 | """ |
| #95 | Build the FT.CREATE command for index creation. |
| #96 | |
| #97 | Args: |
| #98 | collection_name (str): Name of the collection/index |
| #99 | embedding_dims (int): Vector embedding dimensions |
| #100 | distance_metric (str): Distance metric (e.g., "COSINE", "L2", "IP") |
| #101 | prefix (str): Key prefix for the index |
| #102 | |
| #103 | Returns: |
| #104 | list: Complete FT.CREATE command as list of arguments |
| #105 | """ |
| #106 | # Build the vector field configuration based on index type |
| #107 | if self.index_type == "hnsw": |
| #108 | vector_config = [ |
| #109 | "embedding", |
| #110 | "VECTOR", |
| #111 | "HNSW", |
| #112 | "12", # Attribute count: TYPE, FLOAT32, DIM, dims, DISTANCE_METRIC, metric, M, m, EF_CONSTRUCTION, ef_construction, EF_RUNTIME, ef_runtime |
| #113 | "TYPE", |
| #114 | "FLOAT32", |
| #115 | "DIM", |
| #116 | str(embedding_dims), |
| #117 | "DISTANCE_METRIC", |
| #118 | distance_metric, |
| #119 | "M", |
| #120 | str(self.hnsw_m), |
| #121 | "EF_CONSTRUCTION", |
| #122 | str(self.hnsw_ef_construction), |
| #123 | "EF_RUNTIME", |
| #124 | str(self.hnsw_ef_runtime), |
| #125 | ] |
| #126 | elif self.index_type == "flat": |
| #127 | vector_config = [ |
| #128 | "embedding", |
| #129 | "VECTOR", |
| #130 | "FLAT", |
| #131 | "6", # Attribute count: TYPE, FLOAT32, DIM, dims, DISTANCE_METRIC, metric |
| #132 | "TYPE", |
| #133 | "FLOAT32", |
| #134 | "DIM", |
| #135 | str(embedding_dims), |
| #136 | "DISTANCE_METRIC", |
| #137 | distance_metric, |
| #138 | ] |
| #139 | else: |
| #140 | # This should never happen due to constructor validation, but be defensive |
| #141 | raise ValueError(f"Unsupported index_type: {self.index_type}. Must be 'hnsw' or 'flat'") |
| #142 | |
| #143 | # Build the complete command (comma is default separator for TAG fields) |
| #144 | cmd = [ |
| #145 | "FT.CREATE", |
| #146 | collection_name, |
| #147 | "ON", |
| #148 | "HASH", |
| #149 | "PREFIX", |
| #150 | "1", |
| #151 | prefix, |
| #152 | "SCHEMA", |
| #153 | "memory_id", |
| #154 | "TAG", |
| #155 | "hash", |
| #156 | "TAG", |
| #157 | "agent_id", |
| #158 | "TAG", |
| #159 | "run_id", |
| #160 | "TAG", |
| #161 | "user_id", |
| #162 | "TAG", |
| #163 | "memory", |
| #164 | "TAG", |
| #165 | "metadata", |
| #166 | "TAG", |
| #167 | "created_at", |
| #168 | "NUMERIC", |
| #169 | "updated_at", |
| #170 | "NUMERIC", |
| #171 | ] + vector_config |
| #172 | |
| #173 | return cmd |
| #174 | |
| #175 | def _create_index(self, embedding_model_dims): |
| #176 | """ |
| #177 | Create the search index with the specified schema. |
| #178 | |
| #179 | Args: |
| #180 | embedding_model_dims (int): Dimensions for the vector embeddings. |
| #181 | |
| #182 | Raises: |
| #183 | ValueError: If the search module is not available. |
| #184 | Exception: For other errors during index creation. |
| #185 | """ |
| #186 | # Check if the search module is available |
| #187 | try: |
| #188 | # Try to execute a search command |
| #189 | self.client.execute_command("FT._LIST") |
| #190 | except ResponseError as e: |
| #191 | if "unknown command" in str(e).lower(): |
| #192 | raise ValueError( |
| #193 | "Valkey search module is not available. Please ensure Valkey is running with the search module enabled. " |
| #194 | "The search module can be loaded using the --loadmodule option with the valkey-search library. " |
| #195 | "For installation and setup instructions, refer to the Valkey Search documentation." |
| #196 | ) |
| #197 | else: |
| #198 | logger.exception(f"Error checking search module: {e}") |
| #199 | raise |
| #200 | |
| #201 | # Check if the index already exists |
| #202 | try: |
| #203 | self.client.ft(self.collection_name).info() |
| #204 | return |
| #205 | except ResponseError as e: |
| #206 | if "not found" not in str(e).lower(): |
| #207 | logger.exception(f"Error checking index existence: {e}") |
| #208 | raise |
| #209 | |
| #210 | # Build and execute the index creation command |
| #211 | cmd = self._build_index_schema( |
| #212 | self.collection_name, |
| #213 | embedding_model_dims, |
| #214 | "COSINE", # Fixed distance metric for initialization |
| #215 | self.prefix, |
| #216 | ) |
| #217 | |
| #218 | try: |
| #219 | self.client.execute_command(*cmd) |
| #220 | logger.info(f"Successfully created {self.index_type.upper()} index {self.collection_name}") |
| #221 | except Exception as e: |
| #222 | logger.exception(f"Error creating index {self.collection_name}: {e}") |
| #223 | raise |
| #224 | |
| #225 | def create_col(self, name=None, vector_size=None, distance=None): |
| #226 | """ |
| #227 | Create a new collection (index) in Valkey. |
| #228 | |
| #229 | Args: |
| #230 | name (str, optional): Name for the collection. Defaults to None, which uses the current collection_name. |
| #231 | vector_size (int, optional): Size of the vector embeddings. Defaults to None, which uses the current embedding_model_dims. |
| #232 | distance (str, optional): Distance metric to use. Defaults to None, which uses 'cosine'. |
| #233 | |
| #234 | Returns: |
| #235 | The created index object. |
| #236 | """ |
| #237 | # Use provided parameters or fall back to instance attributes |
| #238 | collection_name = name or self.collection_name |
| #239 | embedding_dims = vector_size or self.embedding_model_dims |
| #240 | distance_metric = distance or "COSINE" |
| #241 | prefix = f"mem0:{collection_name}" |
| #242 | |
| #243 | # Try to drop the index if it exists (cleanup before creation) |
| #244 | self._drop_index(collection_name, log_level="silent") |
| #245 | |
| #246 | # Build and execute the index creation command |
| #247 | cmd = self._build_index_schema( |
| #248 | collection_name, |
| #249 | embedding_dims, |
| #250 | distance_metric, # Configurable distance metric |
| #251 | prefix, |
| #252 | ) |
| #253 | |
| #254 | try: |
| #255 | self.client.execute_command(*cmd) |
| #256 | logger.info(f"Successfully created {self.index_type.upper()} index {collection_name}") |
| #257 | |
| #258 | # Update instance attributes if creating a new collection |
| #259 | if name: |
| #260 | self.collection_name = collection_name |
| #261 | self.prefix = prefix |
| #262 | |
| #263 | return self.client.ft(collection_name) |
| #264 | except Exception as e: |
| #265 | logger.exception(f"Error creating collection {collection_name}: {e}") |
| #266 | raise |
| #267 | |
| #268 | def insert(self, vectors: list, payloads: list = None, ids: list = None): |
| #269 | """ |
| #270 | Insert vectors and their payloads into the index. |
| #271 | |
| #272 | Args: |
| #273 | vectors (list): List of vectors to insert. |
| #274 | payloads (list, optional): List of payloads corresponding to the vectors. |
| #275 | ids (list, optional): List of IDs for the vectors. |
| #276 | """ |
| #277 | for vector, payload, id in zip(vectors, payloads, ids): |
| #278 | try: |
| #279 | # Create the key for the hash |
| #280 | key = f"{self.prefix}:{id}" |
| #281 | |
| #282 | # Check for required fields and provide defaults if missing |
| #283 | if "data" not in payload: |
| #284 | # Silently use default value for missing 'data' field |
| #285 | pass |
| #286 | |
| #287 | # Ensure created_at is present |
| #288 | if "created_at" not in payload: |
| #289 | payload["created_at"] = datetime.now(pytz.timezone(self.timezone)).isoformat() |
| #290 | |
| #291 | # Prepare the hash data |
| #292 | hash_data = { |
| #293 | "memory_id": id, |
| #294 | "hash": payload.get("hash", f"hash_{id}"), # Use a default hash if not provided |
| #295 | "memory": payload.get("data", f"data_{id}"), # Use a default data if not provided |
| #296 | "created_at": int(datetime.fromisoformat(payload["created_at"]).timestamp()), |
| #297 | "embedding": np.array(vector, dtype=np.float32).tobytes(), |
| #298 | } |
| #299 | |
| #300 | # Add optional fields |
| #301 | for field in ["agent_id", "run_id", "user_id"]: |
| #302 | if field in payload: |
| #303 | hash_data[field] = payload[field] |
| #304 | |
| #305 | # Add metadata |
| #306 | hash_data["metadata"] = json.dumps({k: v for k, v in payload.items() if k not in excluded_keys}) |
| #307 | |
| #308 | # Store in Valkey |
| #309 | self.client.hset(key, mapping=hash_data) |
| #310 | logger.debug(f"Successfully inserted vector with ID {id}") |
| #311 | except KeyError as e: |
| #312 | logger.error(f"Error inserting vector with ID {id}: Missing required field {e}") |
| #313 | except Exception as e: |
| #314 | logger.exception(f"Error inserting vector with ID {id}: {e}") |
| #315 | raise |
| #316 | |
| #317 | def _build_search_query(self, knn_part, filters=None): |
| #318 | """ |
| #319 | Build a search query string with filters. |
| #320 | |
| #321 | Args: |
| #322 | knn_part (str): The KNN part of the query. |
| #323 | filters (dict, optional): Filters to apply to the search. Each key-value pair |
| #324 | becomes a tag filter (@key:{value}). None values are ignored. |
| #325 | Values are used as-is (no validation) - wildcards, lists, etc. are |
| #326 | passed through literally to Valkey search. Multiple filters are |
| #327 | combined with AND logic (space-separated). |
| #328 | |
| #329 | Returns: |
| #330 | str: The complete search query string in format "filter_expr =>[KNN...]" |
| #331 | or "*=>[KNN...]" if no valid filters. |
| #332 | """ |
| #333 | # No filters, just use the KNN search |
| #334 | if not filters or not any(value is not None for key, value in filters.items()): |
| #335 | return f"*=>{knn_part}" |
| #336 | |
| #337 | # Build filter expression |
| #338 | filter_parts = [] |
| #339 | for key, value in filters.items(): |
| #340 | if value is not None: |
| #341 | # Use the correct filter syntax for Valkey |
| #342 | filter_parts.append(f"@{key}:{{{value}}}") |
| #343 | |
| #344 | # No valid filter parts |
| #345 | if not filter_parts: |
| #346 | return f"*=>{knn_part}" |
| #347 | |
| #348 | # Combine filter parts with proper syntax |
| #349 | filter_expr = " ".join(filter_parts) |
| #350 | return f"{filter_expr} =>{knn_part}" |
| #351 | |
| #352 | def _execute_search(self, query, params): |
| #353 | """ |
| #354 | Execute a search query. |
| #355 | |
| #356 | Args: |
| #357 | query (str): The search query to execute. |
| #358 | params (dict): The query parameters. |
| #359 | |
| #360 | Returns: |
| #361 | The search results. |
| #362 | """ |
| #363 | try: |
| #364 | return self.client.ft(self.collection_name).search(query, query_params=params) |
| #365 | except ResponseError as e: |
| #366 | logger.error(f"Search failed with query '{query}': {e}") |
| #367 | raise |
| #368 | |
| #369 | def _process_search_results(self, results): |
| #370 | """ |
| #371 | Process search results into OutputData objects. |
| #372 | |
| #373 | Args: |
| #374 | results: The search results from Valkey. |
| #375 | |
| #376 | Returns: |
| #377 | list: List of OutputData objects. |
| #378 | """ |
| #379 | memory_results = [] |
| #380 | for doc in results.docs: |
| #381 | # Extract the score |
| #382 | score = float(doc.vector_score) if hasattr(doc, "vector_score") else None |
| #383 | |
| #384 | # Create the payload |
| #385 | payload = { |
| #386 | "hash": doc.hash, |
| #387 | "data": doc.memory, |
| #388 | "created_at": self._format_timestamp(int(doc.created_at), self.timezone), |
| #389 | } |
| #390 | |
| #391 | # Add updated_at if available |
| #392 | if hasattr(doc, "updated_at"): |
| #393 | payload["updated_at"] = self._format_timestamp(int(doc.updated_at), self.timezone) |
| #394 | |
| #395 | # Add optional fields |
| #396 | for field in ["agent_id", "run_id", "user_id"]: |
| #397 | if hasattr(doc, field): |
| #398 | payload[field] = getattr(doc, field) |
| #399 | |
| #400 | # Add metadata |
| #401 | if hasattr(doc, "metadata"): |
| #402 | try: |
| #403 | metadata = json.loads(extract_json(doc.metadata)) |
| #404 | payload.update(metadata) |
| #405 | except (json.JSONDecodeError, TypeError) as e: |
| #406 | logger.warning(f"Failed to parse metadata: {e}") |
| #407 | |
| #408 | # Create the result |
| #409 | memory_results.append(OutputData(id=doc.memory_id, score=score, payload=payload)) |
| #410 | |
| #411 | return memory_results |
| #412 | |
| #413 | def search(self, query: str, vectors: list, limit: int = 5, filters: dict = None, ef_runtime: int = None): |
| #414 | """ |
| #415 | Search for similar vectors in the index. |
| #416 | |
| #417 | Args: |
| #418 | query (str): The search query. |
| #419 | vectors (list): The vector to search for. |
| #420 | limit (int, optional): Maximum number of results to return. Defaults to 5. |
| #421 | filters (dict, optional): Filters to apply to the search. Defaults to None. |
| #422 | ef_runtime (int, optional): HNSW ef_runtime parameter for this query. Only used with HNSW index. Defaults to None. |
| #423 | |
| #424 | Returns: |
| #425 | list: List of OutputData objects. |
| #426 | """ |
| #427 | # Convert the vector to bytes |
| #428 | vector_bytes = np.array(vectors, dtype=np.float32).tobytes() |
| #429 | |
| #430 | # Build the KNN part with optional EF_RUNTIME for HNSW |
| #431 | if self.index_type == "hnsw" and ef_runtime is not None: |
| #432 | knn_part = f"[KNN {limit} @embedding $vec_param EF_RUNTIME {ef_runtime} AS vector_score]" |
| #433 | else: |
| #434 | # For FLAT indexes or when ef_runtime is None, use basic KNN |
| #435 | knn_part = f"[KNN {limit} @embedding $vec_param AS vector_score]" |
| #436 | |
| #437 | # Build the complete query |
| #438 | q = self._build_search_query(knn_part, filters) |
| #439 | |
| #440 | # Log the query for debugging (only in debug mode) |
| #441 | logger.debug(f"Valkey search query: {q}") |
| #442 | |
| #443 | # Set up the query parameters |
| #444 | params = {"vec_param": vector_bytes} |
| #445 | |
| #446 | # Execute the search |
| #447 | results = self._execute_search(q, params) |
| #448 | |
| #449 | # Process the results |
| #450 | return self._process_search_results(results) |
| #451 | |
| #452 | def delete(self, vector_id): |
| #453 | """ |
| #454 | Delete a vector from the index. |
| #455 | |
| #456 | Args: |
| #457 | vector_id (str): ID of the vector to delete. |
| #458 | """ |
| #459 | try: |
| #460 | key = f"{self.prefix}:{vector_id}" |
| #461 | self.client.delete(key) |
| #462 | logger.debug(f"Successfully deleted vector with ID {vector_id}") |
| #463 | except Exception as e: |
| #464 | logger.exception(f"Error deleting vector with ID {vector_id}: {e}") |
| #465 | raise |
| #466 | |
| #467 | def update(self, vector_id=None, vector=None, payload=None): |
| #468 | """ |
| #469 | Update a vector in the index. |
| #470 | |
| #471 | Args: |
| #472 | vector_id (str): ID of the vector to update. |
| #473 | vector (list, optional): New vector data. |
| #474 | payload (dict, optional): New payload data. |
| #475 | """ |
| #476 | try: |
| #477 | key = f"{self.prefix}:{vector_id}" |
| #478 | |
| #479 | # Check for required fields and provide defaults if missing |
| #480 | if "data" not in payload: |
| #481 | # Silently use default value for missing 'data' field |
| #482 | pass |
| #483 | |
| #484 | # Ensure created_at is present |
| #485 | if "created_at" not in payload: |
| #486 | payload["created_at"] = datetime.now(pytz.timezone(self.timezone)).isoformat() |
| #487 | |
| #488 | # Prepare the hash data |
| #489 | hash_data = { |
| #490 | "memory_id": vector_id, |
| #491 | "hash": payload.get("hash", f"hash_{vector_id}"), # Use a default hash if not provided |
| #492 | "memory": payload.get("data", f"data_{vector_id}"), # Use a default data if not provided |
| #493 | "created_at": int(datetime.fromisoformat(payload["created_at"]).timestamp()), |
| #494 | "embedding": np.array(vector, dtype=np.float32).tobytes(), |
| #495 | } |
| #496 | |
| #497 | # Add updated_at if available |
| #498 | if "updated_at" in payload: |
| #499 | hash_data["updated_at"] = int(datetime.fromisoformat(payload["updated_at"]).timestamp()) |
| #500 | |
| #501 | # Add optional fields |
| #502 | for field in ["agent_id", "run_id", "user_id"]: |
| #503 | if field in payload: |
| #504 | hash_data[field] = payload[field] |
| #505 | |
| #506 | # Add metadata |
| #507 | hash_data["metadata"] = json.dumps({k: v for k, v in payload.items() if k not in excluded_keys}) |
| #508 | |
| #509 | # Update in Valkey |
| #510 | self.client.hset(key, mapping=hash_data) |
| #511 | logger.debug(f"Successfully updated vector with ID {vector_id}") |
| #512 | except KeyError as e: |
| #513 | logger.error(f"Error updating vector with ID {vector_id}: Missing required field {e}") |
| #514 | except Exception as e: |
| #515 | logger.exception(f"Error updating vector with ID {vector_id}: {e}") |
| #516 | raise |
| #517 | |
| #518 | def _format_timestamp(self, timestamp, timezone=None): |
| #519 | """ |
| #520 | Format a timestamp with the specified timezone. |
| #521 | |
| #522 | Args: |
| #523 | timestamp (int): The timestamp to format. |
| #524 | timezone (str, optional): The timezone to use. Defaults to UTC. |
| #525 | |
| #526 | Returns: |
| #527 | str: The formatted timestamp. |
| #528 | """ |
| #529 | # Use UTC as default timezone if not specified |
| #530 | tz = pytz.timezone(timezone or "UTC") |
| #531 | return datetime.fromtimestamp(timestamp, tz=tz).isoformat(timespec="microseconds") |
| #532 | |
| #533 | def _process_document_fields(self, result, vector_id): |
| #534 | """ |
| #535 | Process document fields from a Valkey hash result. |
| #536 | |
| #537 | Args: |
| #538 | result (dict): The hash result from Valkey. |
| #539 | vector_id (str): The vector ID. |
| #540 | |
| #541 | Returns: |
| #542 | dict: The processed payload. |
| #543 | str: The memory ID. |
| #544 | """ |
| #545 | # Create the payload with error handling |
| #546 | payload = {} |
| #547 | |
| #548 | # Convert bytes to string for text fields |
| #549 | for k in result: |
| #550 | if k not in ["embedding"]: |
| #551 | if isinstance(result[k], bytes): |
| #552 | try: |
| #553 | result[k] = result[k].decode("utf-8") |
| #554 | except UnicodeDecodeError: |
| #555 | # If decoding fails, keep the bytes |
| #556 | pass |
| #557 | |
| #558 | # Add required fields with error handling |
| #559 | for field in ["hash", "memory", "created_at"]: |
| #560 | if field in result: |
| #561 | if field == "created_at": |
| #562 | try: |
| #563 | payload[field] = self._format_timestamp(int(result[field]), self.timezone) |
| #564 | except (ValueError, TypeError): |
| #565 | payload[field] = result[field] |
| #566 | else: |
| #567 | payload[field] = result[field] |
| #568 | else: |
| #569 | # Use default values for missing fields |
| #570 | if field == "hash": |
| #571 | payload[field] = "unknown" |
| #572 | elif field == "memory": |
| #573 | payload[field] = "unknown" |
| #574 | elif field == "created_at": |
| #575 | payload[field] = self._format_timestamp( |
| #576 | int(datetime.now(tz=pytz.timezone(self.timezone)).timestamp()), self.timezone |
| #577 | ) |
| #578 | |
| #579 | # Rename memory to data for consistency |
| #580 | if "memory" in payload: |
| #581 | payload["data"] = payload.pop("memory") |
| #582 | |
| #583 | # Add updated_at if available |
| #584 | if "updated_at" in result: |
| #585 | try: |
| #586 | payload["updated_at"] = self._format_timestamp(int(result["updated_at"]), self.timezone) |
| #587 | except (ValueError, TypeError): |
| #588 | payload["updated_at"] = result["updated_at"] |
| #589 | |
| #590 | # Add optional fields |
| #591 | for field in ["agent_id", "run_id", "user_id"]: |
| #592 | if field in result: |
| #593 | payload[field] = result[field] |
| #594 | |
| #595 | # Add metadata |
| #596 | if "metadata" in result: |
| #597 | try: |
| #598 | metadata = json.loads(extract_json(result["metadata"])) |
| #599 | payload.update(metadata) |
| #600 | except (json.JSONDecodeError, TypeError): |
| #601 | logger.warning(f"Failed to parse metadata: {result.get('metadata')}") |
| #602 | |
| #603 | # Use memory_id from result if available, otherwise use vector_id |
| #604 | memory_id = result.get("memory_id", vector_id) |
| #605 | |
| #606 | return payload, memory_id |
| #607 | |
| #608 | def _convert_bytes(self, data): |
| #609 | """Convert bytes data back to string""" |
| #610 | if isinstance(data, bytes): |
| #611 | try: |
| #612 | return data.decode("utf-8") |
| #613 | except UnicodeDecodeError: |
| #614 | return data |
| #615 | if isinstance(data, dict): |
| #616 | return {self._convert_bytes(key): self._convert_bytes(value) for key, value in data.items()} |
| #617 | if isinstance(data, list): |
| #618 | return [self._convert_bytes(item) for item in data] |
| #619 | if isinstance(data, tuple): |
| #620 | return tuple(self._convert_bytes(item) for item in data) |
| #621 | return data |
| #622 | |
| #623 | def get(self, vector_id): |
| #624 | """ |
| #625 | Get a vector by ID. |
| #626 | |
| #627 | Args: |
| #628 | vector_id (str): ID of the vector to get. |
| #629 | |
| #630 | Returns: |
| #631 | OutputData: The retrieved vector. |
| #632 | """ |
| #633 | try: |
| #634 | key = f"{self.prefix}:{vector_id}" |
| #635 | result = self.client.hgetall(key) |
| #636 | |
| #637 | if not result: |
| #638 | raise KeyError(f"Vector with ID {vector_id} not found") |
| #639 | |
| #640 | # Convert bytes keys/values to strings |
| #641 | result = self._convert_bytes(result) |
| #642 | |
| #643 | logger.debug(f"Retrieved result keys: {result.keys()}") |
| #644 | |
| #645 | # Process the document fields |
| #646 | payload, memory_id = self._process_document_fields(result, vector_id) |
| #647 | |
| #648 | return OutputData(id=memory_id, payload=payload, score=0.0) |
| #649 | except KeyError: |
| #650 | raise |
| #651 | except Exception as e: |
| #652 | logger.exception(f"Error getting vector with ID {vector_id}: {e}") |
| #653 | raise |
| #654 | |
| #655 | def list_cols(self): |
| #656 | """ |
| #657 | List all collections (indices) in Valkey. |
| #658 | |
| #659 | Returns: |
| #660 | list: List of collection names. |
| #661 | """ |
| #662 | try: |
| #663 | # Use the FT._LIST command to list all indices |
| #664 | return self.client.execute_command("FT._LIST") |
| #665 | except Exception as e: |
| #666 | logger.exception(f"Error listing collections: {e}") |
| #667 | raise |
| #668 | |
| #669 | def _drop_index(self, collection_name, log_level="error"): |
| #670 | """ |
| #671 | Drop an index by name using the documented FT.DROPINDEX command. |
| #672 | |
| #673 | Args: |
| #674 | collection_name (str): Name of the index to drop. |
| #675 | log_level (str): Logging level for missing index ("silent", "info", "error"). |
| #676 | """ |
| #677 | try: |
| #678 | self.client.execute_command("FT.DROPINDEX", collection_name) |
| #679 | logger.info(f"Successfully deleted index {collection_name}") |
| #680 | return True |
| #681 | except ResponseError as e: |
| #682 | if "Unknown index name" in str(e): |
| #683 | # Index doesn't exist - handle based on context |
| #684 | if log_level == "silent": |
| #685 | pass # No logging in situations where this is expected such as initial index creation |
| #686 | elif log_level == "info": |
| #687 | logger.info(f"Index {collection_name} doesn't exist, skipping deletion") |
| #688 | return False |
| #689 | else: |
| #690 | # Real error - always log and raise |
| #691 | logger.error(f"Error deleting index {collection_name}: {e}") |
| #692 | raise |
| #693 | except Exception as e: |
| #694 | # Non-ResponseError exceptions - always log and raise |
| #695 | logger.error(f"Error deleting index {collection_name}: {e}") |
| #696 | raise |
| #697 | |
| #698 | def delete_col(self): |
| #699 | """ |
| #700 | Delete the current collection (index). |
| #701 | """ |
| #702 | return self._drop_index(self.collection_name, log_level="info") |
| #703 | |
| #704 | def col_info(self, name=None): |
| #705 | """ |
| #706 | Get information about a collection (index). |
| #707 | |
| #708 | Args: |
| #709 | name (str, optional): Name of the collection. Defaults to None, which uses the current collection_name. |
| #710 | |
| #711 | Returns: |
| #712 | dict: Information about the collection. |
| #713 | """ |
| #714 | try: |
| #715 | collection_name = name or self.collection_name |
| #716 | return self.client.ft(collection_name).info() |
| #717 | except Exception as e: |
| #718 | logger.exception(f"Error getting collection info for {collection_name}: {e}") |
| #719 | raise |
| #720 | |
| #721 | def reset(self): |
| #722 | """ |
| #723 | Reset the index by deleting and recreating it. |
| #724 | """ |
| #725 | try: |
| #726 | collection_name = self.collection_name |
| #727 | logger.warning(f"Resetting index {collection_name}...") |
| #728 | |
| #729 | # Delete the index |
| #730 | self.delete_col() |
| #731 | |
| #732 | # Recreate the index |
| #733 | self._create_index(self.embedding_model_dims) |
| #734 | |
| #735 | return True |
| #736 | except Exception as e: |
| #737 | logger.exception(f"Error resetting index {self.collection_name}: {e}") |
| #738 | raise |
| #739 | |
| #740 | def _build_list_query(self, filters=None): |
| #741 | """ |
| #742 | Build a query for listing vectors. |
| #743 | |
| #744 | Args: |
| #745 | filters (dict, optional): Filters to apply to the list. Each key-value pair |
| #746 | becomes a tag filter (@key:{value}). None values are ignored. |
| #747 | Values are used as-is (no validation) - wildcards, lists, etc. are |
| #748 | passed through literally to Valkey search. |
| #749 | |
| #750 | Returns: |
| #751 | str: The query string. Returns "*" if no valid filters provided. |
| #752 | """ |
| #753 | # Default query |
| #754 | q = "*" |
| #755 | |
| #756 | # Add filters if provided |
| #757 | if filters and any(value is not None for key, value in filters.items()): |
| #758 | filter_conditions = [] |
| #759 | for key, value in filters.items(): |
| #760 | if value is not None: |
| #761 | filter_conditions.append(f"@{key}:{{{value}}}") |
| #762 | |
| #763 | if filter_conditions: |
| #764 | q = " ".join(filter_conditions) |
| #765 | |
| #766 | return q |
| #767 | |
| #768 | def list(self, filters: dict = None, limit: int = None) -> list: |
| #769 | """ |
| #770 | List all recent created memories from the vector store. |
| #771 | |
| #772 | Args: |
| #773 | filters (dict, optional): Filters to apply to the list. Each key-value pair |
| #774 | becomes a tag filter (@key:{value}). None values are ignored. |
| #775 | Values are used as-is without validation - wildcards, special characters, |
| #776 | lists, etc. are passed through literally to Valkey search. |
| #777 | Multiple filters are combined with AND logic. |
| #778 | limit (int, optional): Maximum number of results to return. Defaults to 1000 |
| #779 | if not specified. |
| #780 | |
| #781 | Returns: |
| #782 | list: Nested list format [[MemoryResult(), ...]] matching Redis implementation. |
| #783 | Each MemoryResult contains id and payload with hash, data, timestamps, etc. |
| #784 | """ |
| #785 | try: |
| #786 | # Since Valkey search requires vector format, use a dummy vector search |
| #787 | # that returns all documents by using a zero vector and large K |
| #788 | dummy_vector = [0.0] * self.embedding_model_dims |
| #789 | search_limit = limit if limit is not None else 1000 # Large default |
| #790 | |
| #791 | # Use the existing search method which handles filters properly |
| #792 | search_results = self.search("", dummy_vector, limit=search_limit, filters=filters) |
| #793 | |
| #794 | # Convert search results to list format (match Redis format) |
| #795 | class MemoryResult: |
| #796 | def __init__(self, id: str, payload: dict, score: float = None): |
| #797 | self.id = id |
| #798 | self.payload = payload |
| #799 | self.score = score |
| #800 | |
| #801 | memory_results = [] |
| #802 | for result in search_results: |
| #803 | # Create payload in the expected format |
| #804 | payload = { |
| #805 | "hash": result.payload.get("hash", ""), |
| #806 | "data": result.payload.get("data", ""), |
| #807 | "created_at": result.payload.get("created_at"), |
| #808 | "updated_at": result.payload.get("updated_at"), |
| #809 | } |
| #810 | |
| #811 | # Add metadata (exclude system fields) |
| #812 | for key, value in result.payload.items(): |
| #813 | if key not in ["data", "hash", "created_at", "updated_at"]: |
| #814 | payload[key] = value |
| #815 | |
| #816 | # Create MemoryResult object (matching Redis format) |
| #817 | memory_results.append(MemoryResult(id=result.id, payload=payload)) |
| #818 | |
| #819 | # Return nested list format like Redis |
| #820 | return [memory_results] |
| #821 | |
| #822 | except Exception as e: |
| #823 | logger.exception(f"Error in list method: {e}") |
| #824 | return [[]] # Return empty result on error |
| #825 |