repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | import logging |
| #2 | import time |
| #3 | from typing import Any, Dict, List, Optional |
| #4 | |
| #5 | try: |
| #6 | from opensearchpy import OpenSearch, RequestsHttpConnection |
| #7 | except ImportError: |
| #8 | raise ImportError("OpenSearch requires extra dependencies. Install with `pip install opensearch-py`") from None |
| #9 | |
| #10 | from pydantic import BaseModel |
| #11 | |
| #12 | from mem0.configs.vector_stores.opensearch import OpenSearchConfig |
| #13 | from mem0.vector_stores.base import VectorStoreBase |
| #14 | |
| #15 | logger = logging.getLogger(__name__) |
| #16 | |
| #17 | |
| #18 | class OutputData(BaseModel): |
| #19 | id: str |
| #20 | score: float |
| #21 | payload: Dict |
| #22 | |
| #23 | |
| #24 | class OpenSearchDB(VectorStoreBase): |
| #25 | def __init__(self, **kwargs): |
| #26 | config = OpenSearchConfig(**kwargs) |
| #27 | |
| #28 | # Initialize OpenSearch client |
| #29 | self.client = OpenSearch( |
| #30 | hosts=[{"host": config.host, "port": config.port or 9200}], |
| #31 | http_auth=config.http_auth |
| #32 | if config.http_auth |
| #33 | else ((config.user, config.password) if (config.user and config.password) else None), |
| #34 | use_ssl=config.use_ssl, |
| #35 | verify_certs=config.verify_certs, |
| #36 | connection_class=RequestsHttpConnection, |
| #37 | pool_maxsize=20, |
| #38 | ) |
| #39 | |
| #40 | self.collection_name = config.collection_name |
| #41 | self.embedding_model_dims = config.embedding_model_dims |
| #42 | self.create_col(self.collection_name, self.embedding_model_dims) |
| #43 | |
| #44 | def create_index(self) -> None: |
| #45 | """Create OpenSearch index with proper mappings if it doesn't exist.""" |
| #46 | index_settings = { |
| #47 | "settings": { |
| #48 | "index": {"number_of_replicas": 1, "number_of_shards": 5, "refresh_interval": "10s", "knn": True} |
| #49 | }, |
| #50 | "mappings": { |
| #51 | "properties": { |
| #52 | "text": {"type": "text"}, |
| #53 | "vector_field": { |
| #54 | "type": "knn_vector", |
| #55 | "dimension": self.embedding_model_dims, |
| #56 | "method": {"engine": "nmslib", "name": "hnsw", "space_type": "cosinesimil"}, |
| #57 | }, |
| #58 | "metadata": {"type": "object", "properties": {"user_id": {"type": "keyword"}}}, |
| #59 | } |
| #60 | }, |
| #61 | } |
| #62 | |
| #63 | if not self.client.indices.exists(index=self.collection_name): |
| #64 | self.client.indices.create(index=self.collection_name, body=index_settings) |
| #65 | logger.info(f"Created index {self.collection_name}") |
| #66 | else: |
| #67 | logger.info(f"Index {self.collection_name} already exists") |
| #68 | |
| #69 | def create_col(self, name: str, vector_size: int) -> None: |
| #70 | """Create a new collection (index in OpenSearch).""" |
| #71 | index_settings = { |
| #72 | "settings": {"index.knn": True}, |
| #73 | "mappings": { |
| #74 | "properties": { |
| #75 | "vector_field": { |
| #76 | "type": "knn_vector", |
| #77 | "dimension": vector_size, |
| #78 | "method": {"engine": "nmslib", "name": "hnsw", "space_type": "cosinesimil"}, |
| #79 | }, |
| #80 | "payload": {"type": "object"}, |
| #81 | "id": {"type": "keyword"}, |
| #82 | } |
| #83 | }, |
| #84 | } |
| #85 | |
| #86 | if not self.client.indices.exists(index=name): |
| #87 | logger.warning(f"Creating index {name}, it might take 1-2 minutes...") |
| #88 | self.client.indices.create(index=name, body=index_settings) |
| #89 | |
| #90 | # Wait for index to be ready |
| #91 | max_retries = 180 # 3 minutes timeout |
| #92 | retry_count = 0 |
| #93 | while retry_count < max_retries: |
| #94 | try: |
| #95 | # Check if index is ready by attempting a simple search |
| #96 | self.client.search(index=name, body={"query": {"match_all": {}}}) |
| #97 | time.sleep(1) |
| #98 | logger.info(f"Index {name} is ready") |
| #99 | return |
| #100 | except Exception: |
| #101 | retry_count += 1 |
| #102 | if retry_count == max_retries: |
| #103 | raise TimeoutError(f"Index {name} creation timed out after {max_retries} seconds") |
| #104 | time.sleep(0.5) |
| #105 | |
| #106 | def insert( |
| #107 | self, vectors: List[List[float]], payloads: Optional[List[Dict]] = None, ids: Optional[List[str]] = None |
| #108 | ) -> List[OutputData]: |
| #109 | """Insert vectors into the index.""" |
| #110 | if not ids: |
| #111 | ids = [str(i) for i in range(len(vectors))] |
| #112 | |
| #113 | if payloads is None: |
| #114 | payloads = [{} for _ in range(len(vectors))] |
| #115 | |
| #116 | results = [] |
| #117 | for i, (vec, id_) in enumerate(zip(vectors, ids)): |
| #118 | body = { |
| #119 | "vector_field": vec, |
| #120 | "payload": payloads[i], |
| #121 | "id": id_, |
| #122 | } |
| #123 | try: |
| #124 | self.client.index(index=self.collection_name, body=body) |
| #125 | # Force refresh to make documents immediately searchable for tests |
| #126 | self.client.indices.refresh(index=self.collection_name) |
| #127 | |
| #128 | results.append(OutputData( |
| #129 | id=id_, |
| #130 | score=1.0, # No score for inserts |
| #131 | payload=payloads[i] |
| #132 | )) |
| #133 | except Exception as e: |
| #134 | logger.error(f"Error inserting vector {id_}: {e}") |
| #135 | raise |
| #136 | |
| #137 | return results |
| #138 | |
| #139 | def search( |
| #140 | self, query: str, vectors: List[float], limit: int = 5, filters: Optional[Dict] = None |
| #141 | ) -> List[OutputData]: |
| #142 | """Search for similar vectors using OpenSearch k-NN search with optional filters.""" |
| #143 | |
| #144 | # Base KNN query |
| #145 | knn_query = { |
| #146 | "knn": { |
| #147 | "vector_field": { |
| #148 | "vector": vectors, |
| #149 | "k": limit * 2, |
| #150 | } |
| #151 | } |
| #152 | } |
| #153 | |
| #154 | # Start building the full query |
| #155 | query_body = {"size": limit * 2, "query": None} |
| #156 | |
| #157 | # Prepare filter conditions if applicable |
| #158 | filter_clauses = [] |
| #159 | if filters: |
| #160 | for key in ["user_id", "run_id", "agent_id"]: |
| #161 | value = filters.get(key) |
| #162 | if value: |
| #163 | filter_clauses.append({"term": {f"payload.{key}.keyword": value}}) |
| #164 | |
| #165 | # Combine knn with filters if needed |
| #166 | if filter_clauses: |
| #167 | query_body["query"] = {"bool": {"must": knn_query, "filter": filter_clauses}} |
| #168 | else: |
| #169 | query_body["query"] = knn_query |
| #170 | |
| #171 | try: |
| #172 | # Execute search |
| #173 | response = self.client.search(index=self.collection_name, body=query_body) |
| #174 | |
| #175 | hits = response["hits"]["hits"] |
| #176 | results = [ |
| #177 | OutputData(id=hit["_source"].get("id"), score=hit["_score"], payload=hit["_source"].get("payload", {})) |
| #178 | for hit in hits[:limit] # Ensure we don't exceed limit |
| #179 | ] |
| #180 | return results |
| #181 | except Exception as e: |
| #182 | logger.error(f"Error during search: {e}") |
| #183 | return [] |
| #184 | |
| #185 | def delete(self, vector_id: str) -> None: |
| #186 | """Delete a vector by custom ID.""" |
| #187 | # First, find the document by custom ID |
| #188 | search_query = {"query": {"term": {"id": vector_id}}} |
| #189 | |
| #190 | response = self.client.search(index=self.collection_name, body=search_query) |
| #191 | hits = response.get("hits", {}).get("hits", []) |
| #192 | |
| #193 | if not hits: |
| #194 | return |
| #195 | |
| #196 | opensearch_id = hits[0]["_id"] |
| #197 | |
| #198 | # Delete using the actual document ID |
| #199 | self.client.delete(index=self.collection_name, id=opensearch_id) |
| #200 | |
| #201 | def update(self, vector_id: str, vector: Optional[List[float]] = None, payload: Optional[Dict] = None) -> None: |
| #202 | """Update a vector and its payload using the custom 'id' field.""" |
| #203 | |
| #204 | # First, find the document by custom ID |
| #205 | search_query = {"query": {"term": {"id": vector_id}}} |
| #206 | |
| #207 | response = self.client.search(index=self.collection_name, body=search_query) |
| #208 | hits = response.get("hits", {}).get("hits", []) |
| #209 | |
| #210 | if not hits: |
| #211 | return |
| #212 | |
| #213 | opensearch_id = hits[0]["_id"] # The actual document ID in OpenSearch |
| #214 | |
| #215 | # Prepare updated fields |
| #216 | doc = {} |
| #217 | if vector is not None: |
| #218 | doc["vector_field"] = vector |
| #219 | if payload is not None: |
| #220 | doc["payload"] = payload |
| #221 | |
| #222 | if doc: |
| #223 | try: |
| #224 | response = self.client.update(index=self.collection_name, id=opensearch_id, body={"doc": doc}) |
| #225 | except Exception: |
| #226 | pass |
| #227 | |
| #228 | def get(self, vector_id: str) -> Optional[OutputData]: |
| #229 | """Retrieve a vector by ID.""" |
| #230 | try: |
| #231 | search_query = {"query": {"term": {"id": vector_id}}} |
| #232 | response = self.client.search(index=self.collection_name, body=search_query) |
| #233 | |
| #234 | hits = response["hits"]["hits"] |
| #235 | |
| #236 | if not hits: |
| #237 | return None |
| #238 | |
| #239 | return OutputData(id=hits[0]["_source"].get("id"), score=1.0, payload=hits[0]["_source"].get("payload", {})) |
| #240 | except Exception as e: |
| #241 | logger.error(f"Error retrieving vector {vector_id}: {str(e)}") |
| #242 | return None |
| #243 | |
| #244 | def list_cols(self) -> List[str]: |
| #245 | """List all collections (indices).""" |
| #246 | return list(self.client.indices.get_alias().keys()) |
| #247 | |
| #248 | def delete_col(self) -> None: |
| #249 | """Delete a collection (index).""" |
| #250 | self.client.indices.delete(index=self.collection_name) |
| #251 | |
| #252 | def col_info(self, name: str) -> Any: |
| #253 | """Get information about a collection (index).""" |
| #254 | return self.client.indices.get(index=name) |
| #255 | |
| #256 | def list(self, filters: Optional[Dict] = None, limit: Optional[int] = None) -> List[OutputData]: |
| #257 | try: |
| #258 | """List all memories with optional filters.""" |
| #259 | query: Dict = {"query": {"match_all": {}}} |
| #260 | |
| #261 | filter_clauses = [] |
| #262 | if filters: |
| #263 | for key in ["user_id", "run_id", "agent_id"]: |
| #264 | value = filters.get(key) |
| #265 | if value: |
| #266 | filter_clauses.append({"term": {f"payload.{key}.keyword": value}}) |
| #267 | |
| #268 | if filter_clauses: |
| #269 | query["query"] = {"bool": {"filter": filter_clauses}} |
| #270 | |
| #271 | if limit: |
| #272 | query["size"] = limit |
| #273 | |
| #274 | response = self.client.search(index=self.collection_name, body=query) |
| #275 | hits = response["hits"]["hits"] |
| #276 | |
| #277 | # Return a flat list, not a nested array |
| #278 | results = [ |
| #279 | OutputData(id=hit["_source"].get("id"), score=1.0, payload=hit["_source"].get("payload", {})) |
| #280 | for hit in hits |
| #281 | ] |
| #282 | return [results] # VectorStore expects tuple/list format |
| #283 | except Exception as e: |
| #284 | logger.error(f"Error listing vectors: {e}") |
| #285 | return [] |
| #286 | |
| #287 | |
| #288 | def reset(self): |
| #289 | """Reset the index by deleting and recreating it.""" |
| #290 | logger.warning(f"Resetting index {self.collection_name}...") |
| #291 | self.delete_col() |
| #292 | self.create_col(self.collection_name, self.embedding_model_dims) |
| #293 |