repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | import logging |
| #2 | from typing import Any, Dict, List, Optional |
| #3 | |
| #4 | try: |
| #5 | from elasticsearch import Elasticsearch |
| #6 | from elasticsearch.helpers import bulk |
| #7 | except ImportError: |
| #8 | raise ImportError("Elasticsearch requires extra dependencies. Install with `pip install elasticsearch`") from None |
| #9 | |
| #10 | from pydantic import BaseModel |
| #11 | |
| #12 | from mem0.configs.vector_stores.elasticsearch import ElasticsearchConfig |
| #13 | from mem0.vector_stores.base import VectorStoreBase |
| #14 | |
| #15 | logger = logging.getLogger(__name__) |
| #16 | |
| #17 | |
| #18 | class OutputData(BaseModel): |
| #19 | id: str |
| #20 | score: float |
| #21 | payload: Dict |
| #22 | |
| #23 | |
| #24 | class ElasticsearchDB(VectorStoreBase): |
| #25 | def __init__(self, **kwargs): |
| #26 | config = ElasticsearchConfig(**kwargs) |
| #27 | |
| #28 | # Initialize Elasticsearch client |
| #29 | if config.cloud_id: |
| #30 | self.client = Elasticsearch( |
| #31 | cloud_id=config.cloud_id, |
| #32 | api_key=config.api_key, |
| #33 | verify_certs=config.verify_certs, |
| #34 | headers= config.headers or {}, |
| #35 | ) |
| #36 | else: |
| #37 | self.client = Elasticsearch( |
| #38 | hosts=[f"{config.host}" if config.port is None else f"{config.host}:{config.port}"], |
| #39 | basic_auth=(config.user, config.password) if (config.user and config.password) else None, |
| #40 | verify_certs=config.verify_certs, |
| #41 | headers= config.headers or {}, |
| #42 | ) |
| #43 | |
| #44 | self.collection_name = config.collection_name |
| #45 | self.embedding_model_dims = config.embedding_model_dims |
| #46 | |
| #47 | # Create index only if auto_create_index is True |
| #48 | if config.auto_create_index: |
| #49 | self.create_index() |
| #50 | |
| #51 | if config.custom_search_query: |
| #52 | self.custom_search_query = config.custom_search_query |
| #53 | else: |
| #54 | self.custom_search_query = None |
| #55 | |
| #56 | def create_index(self) -> None: |
| #57 | """Create Elasticsearch index with proper mappings if it doesn't exist""" |
| #58 | index_settings = { |
| #59 | "settings": {"index": {"number_of_replicas": 1, "number_of_shards": 5, "refresh_interval": "1s"}}, |
| #60 | "mappings": { |
| #61 | "properties": { |
| #62 | "text": {"type": "text"}, |
| #63 | "vector": { |
| #64 | "type": "dense_vector", |
| #65 | "dims": self.embedding_model_dims, |
| #66 | "index": True, |
| #67 | "similarity": "cosine", |
| #68 | }, |
| #69 | "metadata": {"type": "object", "properties": {"user_id": {"type": "keyword"}}}, |
| #70 | } |
| #71 | }, |
| #72 | } |
| #73 | |
| #74 | if not self.client.indices.exists(index=self.collection_name): |
| #75 | self.client.indices.create(index=self.collection_name, body=index_settings) |
| #76 | logger.info(f"Created index {self.collection_name}") |
| #77 | else: |
| #78 | logger.info(f"Index {self.collection_name} already exists") |
| #79 | |
| #80 | def create_col(self, name: str, vector_size: int, distance: str = "cosine") -> None: |
| #81 | """Create a new collection (index in Elasticsearch).""" |
| #82 | index_settings = { |
| #83 | "mappings": { |
| #84 | "properties": { |
| #85 | "vector": {"type": "dense_vector", "dims": vector_size, "index": True, "similarity": "cosine"}, |
| #86 | "payload": {"type": "object"}, |
| #87 | "id": {"type": "keyword"}, |
| #88 | } |
| #89 | } |
| #90 | } |
| #91 | |
| #92 | if not self.client.indices.exists(index=name): |
| #93 | self.client.indices.create(index=name, body=index_settings) |
| #94 | logger.info(f"Created index {name}") |
| #95 | |
| #96 | def insert( |
| #97 | self, vectors: List[List[float]], payloads: Optional[List[Dict]] = None, ids: Optional[List[str]] = None |
| #98 | ) -> List[OutputData]: |
| #99 | """Insert vectors into the index.""" |
| #100 | if not ids: |
| #101 | ids = [str(i) for i in range(len(vectors))] |
| #102 | |
| #103 | if payloads is None: |
| #104 | payloads = [{} for _ in range(len(vectors))] |
| #105 | |
| #106 | actions = [] |
| #107 | for i, (vec, id_) in enumerate(zip(vectors, ids)): |
| #108 | action = { |
| #109 | "_index": self.collection_name, |
| #110 | "_id": id_, |
| #111 | "_source": { |
| #112 | "vector": vec, |
| #113 | "metadata": payloads[i], # Store all metadata in the metadata field |
| #114 | }, |
| #115 | } |
| #116 | actions.append(action) |
| #117 | |
| #118 | bulk(self.client, actions) |
| #119 | |
| #120 | results = [] |
| #121 | for i, id_ in enumerate(ids): |
| #122 | results.append( |
| #123 | OutputData( |
| #124 | id=id_, |
| #125 | score=1.0, # Default score for inserts |
| #126 | payload=payloads[i], |
| #127 | ) |
| #128 | ) |
| #129 | return results |
| #130 | |
| #131 | def search( |
| #132 | self, query: str, vectors: List[float], limit: int = 5, filters: Optional[Dict] = None |
| #133 | ) -> List[OutputData]: |
| #134 | """ |
| #135 | Search with two options: |
| #136 | 1. Use custom search query if provided |
| #137 | 2. Use KNN search on vectors with pre-filtering if no custom search query is provided |
| #138 | """ |
| #139 | if self.custom_search_query: |
| #140 | search_query = self.custom_search_query(vectors, limit, filters) |
| #141 | else: |
| #142 | search_query = { |
| #143 | "knn": {"field": "vector", "query_vector": vectors, "k": limit, "num_candidates": limit * 2} |
| #144 | } |
| #145 | if filters: |
| #146 | filter_conditions = [] |
| #147 | for key, value in filters.items(): |
| #148 | filter_conditions.append({"term": {f"metadata.{key}": value}}) |
| #149 | search_query["knn"]["filter"] = {"bool": {"must": filter_conditions}} |
| #150 | |
| #151 | response = self.client.search(index=self.collection_name, body=search_query) |
| #152 | |
| #153 | results = [] |
| #154 | for hit in response["hits"]["hits"]: |
| #155 | results.append( |
| #156 | OutputData(id=hit["_id"], score=hit["_score"], payload=hit.get("_source", {}).get("metadata", {})) |
| #157 | ) |
| #158 | |
| #159 | return results |
| #160 | |
| #161 | def delete(self, vector_id: str) -> None: |
| #162 | """Delete a vector by ID.""" |
| #163 | self.client.delete(index=self.collection_name, id=vector_id) |
| #164 | |
| #165 | def update(self, vector_id: str, vector: Optional[List[float]] = None, payload: Optional[Dict] = None) -> None: |
| #166 | """Update a vector and its payload.""" |
| #167 | doc = {} |
| #168 | if vector is not None: |
| #169 | doc["vector"] = vector |
| #170 | if payload is not None: |
| #171 | doc["metadata"] = payload |
| #172 | |
| #173 | self.client.update(index=self.collection_name, id=vector_id, body={"doc": doc}) |
| #174 | |
| #175 | def get(self, vector_id: str) -> Optional[OutputData]: |
| #176 | """Retrieve a vector by ID.""" |
| #177 | try: |
| #178 | response = self.client.get(index=self.collection_name, id=vector_id) |
| #179 | return OutputData( |
| #180 | id=response["_id"], |
| #181 | score=1.0, # Default score for direct get |
| #182 | payload=response["_source"].get("metadata", {}), |
| #183 | ) |
| #184 | except KeyError as e: |
| #185 | logger.warning(f"Missing key in Elasticsearch response: {e}") |
| #186 | return None |
| #187 | except TypeError as e: |
| #188 | logger.warning(f"Invalid response type from Elasticsearch: {e}") |
| #189 | return None |
| #190 | except Exception as e: |
| #191 | logger.error(f"Unexpected error while parsing Elasticsearch response: {e}") |
| #192 | return None |
| #193 | |
| #194 | def list_cols(self) -> List[str]: |
| #195 | """List all collections (indices).""" |
| #196 | return list(self.client.indices.get_alias().keys()) |
| #197 | |
| #198 | def delete_col(self) -> None: |
| #199 | """Delete a collection (index).""" |
| #200 | self.client.indices.delete(index=self.collection_name) |
| #201 | |
| #202 | def col_info(self, name: str) -> Any: |
| #203 | """Get information about a collection (index).""" |
| #204 | return self.client.indices.get(index=name) |
| #205 | |
| #206 | def list(self, filters: Optional[Dict] = None, limit: Optional[int] = None) -> List[List[OutputData]]: |
| #207 | """List all memories.""" |
| #208 | query: Dict[str, Any] = {"query": {"match_all": {}}} |
| #209 | |
| #210 | if filters: |
| #211 | filter_conditions = [] |
| #212 | for key, value in filters.items(): |
| #213 | filter_conditions.append({"term": {f"metadata.{key}": value}}) |
| #214 | query["query"] = {"bool": {"must": filter_conditions}} |
| #215 | |
| #216 | if limit: |
| #217 | query["size"] = limit |
| #218 | |
| #219 | response = self.client.search(index=self.collection_name, body=query) |
| #220 | |
| #221 | results = [] |
| #222 | for hit in response["hits"]["hits"]: |
| #223 | results.append( |
| #224 | OutputData( |
| #225 | id=hit["_id"], |
| #226 | score=1.0, # Default score for list operation |
| #227 | payload=hit.get("_source", {}).get("metadata", {}), |
| #228 | ) |
| #229 | ) |
| #230 | |
| #231 | return [results] |
| #232 | |
| #233 | def reset(self): |
| #234 | """Reset the index by deleting and recreating it.""" |
| #235 | logger.warning(f"Resetting index {self.collection_name}...") |
| #236 | self.delete_col() |
| #237 | self.create_index() |
| #238 |