repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | import json |
| #2 | import logging |
| #3 | from contextlib import contextmanager |
| #4 | from typing import Any, List, Optional |
| #5 | |
| #6 | from pydantic import BaseModel |
| #7 | |
| #8 | # Try to import psycopg (psycopg3) first, then fall back to psycopg2 |
| #9 | try: |
| #10 | from psycopg.types.json import Json |
| #11 | from psycopg_pool import ConnectionPool |
| #12 | PSYCOPG_VERSION = 3 |
| #13 | logger = logging.getLogger(__name__) |
| #14 | logger.info("Using psycopg (psycopg3) with ConnectionPool for PostgreSQL connections") |
| #15 | except ImportError: |
| #16 | try: |
| #17 | from psycopg2.extras import Json, execute_values |
| #18 | from psycopg2.pool import ThreadedConnectionPool as ConnectionPool |
| #19 | PSYCOPG_VERSION = 2 |
| #20 | logger = logging.getLogger(__name__) |
| #21 | logger.info("Using psycopg2 with ThreadedConnectionPool for PostgreSQL connections") |
| #22 | except ImportError: |
| #23 | raise ImportError( |
| #24 | "Neither 'psycopg' nor 'psycopg2' library is available. " |
| #25 | "Please install one of them using 'pip install psycopg[pool]' or 'pip install psycopg2'" |
| #26 | ) |
| #27 | |
| #28 | from mem0.vector_stores.base import VectorStoreBase |
| #29 | |
| #30 | logger = logging.getLogger(__name__) |
| #31 | |
| #32 | |
| #33 | class OutputData(BaseModel): |
| #34 | id: Optional[str] |
| #35 | score: Optional[float] |
| #36 | payload: Optional[dict] |
| #37 | |
| #38 | |
| #39 | class PGVector(VectorStoreBase): |
| #40 | def __init__( |
| #41 | self, |
| #42 | dbname, |
| #43 | collection_name, |
| #44 | embedding_model_dims, |
| #45 | user, |
| #46 | password, |
| #47 | host, |
| #48 | port, |
| #49 | diskann, |
| #50 | hnsw, |
| #51 | minconn=1, |
| #52 | maxconn=5, |
| #53 | sslmode=None, |
| #54 | connection_string=None, |
| #55 | connection_pool=None, |
| #56 | ): |
| #57 | """ |
| #58 | Initialize the PGVector database. |
| #59 | |
| #60 | Args: |
| #61 | dbname (str): Database name |
| #62 | collection_name (str): Collection name |
| #63 | embedding_model_dims (int): Dimension of the embedding vector |
| #64 | user (str): Database user |
| #65 | password (str): Database password |
| #66 | host (str, optional): Database host |
| #67 | port (int, optional): Database port |
| #68 | diskann (bool, optional): Use DiskANN for faster search |
| #69 | hnsw (bool, optional): Use HNSW for faster search |
| #70 | minconn (int): Minimum number of connections to keep in the connection pool |
| #71 | maxconn (int): Maximum number of connections allowed in the connection pool |
| #72 | sslmode (str, optional): SSL mode for PostgreSQL connection (e.g., 'require', 'prefer', 'disable') |
| #73 | connection_string (str, optional): PostgreSQL connection string (overrides individual connection parameters) |
| #74 | connection_pool (Any, optional): psycopg2 connection pool object (overrides connection string and individual parameters) |
| #75 | """ |
| #76 | self.collection_name = collection_name |
| #77 | self.use_diskann = diskann |
| #78 | self.use_hnsw = hnsw |
| #79 | self.embedding_model_dims = embedding_model_dims |
| #80 | self.connection_pool = None |
| #81 | |
| #82 | # Connection setup with priority: connection_pool > connection_string > individual parameters |
| #83 | if connection_pool is not None: |
| #84 | # Use provided connection pool |
| #85 | self.connection_pool = connection_pool |
| #86 | elif connection_string: |
| #87 | if sslmode: |
| #88 | # Append sslmode to connection string if provided |
| #89 | if 'sslmode=' in connection_string: |
| #90 | # Replace existing sslmode |
| #91 | import re |
| #92 | connection_string = re.sub(r'sslmode=[^ ]*', f'sslmode={sslmode}', connection_string) |
| #93 | else: |
| #94 | # Add sslmode to connection string |
| #95 | connection_string = f"{connection_string} sslmode={sslmode}" |
| #96 | else: |
| #97 | connection_string = f"postgresql://{user}:{password}@{host}:{port}/{dbname}" |
| #98 | if sslmode: |
| #99 | connection_string = f"{connection_string} sslmode={sslmode}" |
| #100 | |
| #101 | if self.connection_pool is None: |
| #102 | if PSYCOPG_VERSION == 3: |
| #103 | # psycopg3 ConnectionPool |
| #104 | self.connection_pool = ConnectionPool(conninfo=connection_string, min_size=minconn, max_size=maxconn, open=True) |
| #105 | else: |
| #106 | # psycopg2 ThreadedConnectionPool |
| #107 | self.connection_pool = ConnectionPool(minconn=minconn, maxconn=maxconn, dsn=connection_string) |
| #108 | |
| #109 | collections = self.list_cols() |
| #110 | if collection_name not in collections: |
| #111 | self.create_col() |
| #112 | |
| #113 | @contextmanager |
| #114 | def _get_cursor(self, commit: bool = False): |
| #115 | """ |
| #116 | Unified context manager to get a cursor from the appropriate pool. |
| #117 | Auto-commits or rolls back based on exception, and returns the connection to the pool. |
| #118 | """ |
| #119 | if PSYCOPG_VERSION == 3: |
| #120 | # psycopg3 auto-manages commit/rollback and pool return |
| #121 | with self.connection_pool.connection() as conn: |
| #122 | with conn.cursor() as cur: |
| #123 | try: |
| #124 | yield cur |
| #125 | if commit: |
| #126 | conn.commit() |
| #127 | except Exception: |
| #128 | conn.rollback() |
| #129 | logger.error("Error in cursor context (psycopg3)", exc_info=True) |
| #130 | raise |
| #131 | else: |
| #132 | # psycopg2 manual getconn/putconn |
| #133 | conn = self.connection_pool.getconn() |
| #134 | cur = conn.cursor() |
| #135 | try: |
| #136 | yield cur |
| #137 | if commit: |
| #138 | conn.commit() |
| #139 | except Exception as exc: |
| #140 | conn.rollback() |
| #141 | logger.error(f"Error occurred: {exc}") |
| #142 | raise exc |
| #143 | finally: |
| #144 | cur.close() |
| #145 | self.connection_pool.putconn(conn) |
| #146 | |
| #147 | def create_col(self) -> None: |
| #148 | """ |
| #149 | Create a new collection (table in PostgreSQL). |
| #150 | Will also initialize vector search index if specified. |
| #151 | """ |
| #152 | with self._get_cursor(commit=True) as cur: |
| #153 | cur.execute("CREATE EXTENSION IF NOT EXISTS vector") |
| #154 | cur.execute( |
| #155 | f""" |
| #156 | CREATE TABLE IF NOT EXISTS {self.collection_name} ( |
| #157 | id UUID PRIMARY KEY, |
| #158 | vector vector({self.embedding_model_dims}), |
| #159 | payload JSONB |
| #160 | ); |
| #161 | """ |
| #162 | ) |
| #163 | if self.use_diskann and self.embedding_model_dims < 2000: |
| #164 | cur.execute("SELECT * FROM pg_extension WHERE extname = 'vectorscale'") |
| #165 | if cur.fetchone(): |
| #166 | # Create DiskANN index if extension is installed for faster search |
| #167 | cur.execute( |
| #168 | f""" |
| #169 | CREATE INDEX IF NOT EXISTS {self.collection_name}_diskann_idx |
| #170 | ON {self.collection_name} |
| #171 | USING diskann (vector); |
| #172 | """ |
| #173 | ) |
| #174 | elif self.use_hnsw: |
| #175 | cur.execute( |
| #176 | f""" |
| #177 | CREATE INDEX IF NOT EXISTS {self.collection_name}_hnsw_idx |
| #178 | ON {self.collection_name} |
| #179 | USING hnsw (vector vector_cosine_ops) |
| #180 | """ |
| #181 | ) |
| #182 | |
| #183 | def insert(self, vectors: list[list[float]], payloads=None, ids=None) -> None: |
| #184 | logger.info(f"Inserting {len(vectors)} vectors into collection {self.collection_name}") |
| #185 | json_payloads = [json.dumps(payload) for payload in payloads] |
| #186 | |
| #187 | data = [(id, vector, payload) for id, vector, payload in zip(ids, vectors, json_payloads)] |
| #188 | if PSYCOPG_VERSION == 3: |
| #189 | with self._get_cursor(commit=True) as cur: |
| #190 | cur.executemany( |
| #191 | f"INSERT INTO {self.collection_name} (id, vector, payload) VALUES (%s, %s, %s)", |
| #192 | data, |
| #193 | ) |
| #194 | else: |
| #195 | with self._get_cursor(commit=True) as cur: |
| #196 | execute_values( |
| #197 | cur, |
| #198 | f"INSERT INTO {self.collection_name} (id, vector, payload) VALUES %s", |
| #199 | data, |
| #200 | ) |
| #201 | |
| #202 | def search( |
| #203 | self, |
| #204 | query: str, |
| #205 | vectors: list[float], |
| #206 | limit: Optional[int] = 5, |
| #207 | filters: Optional[dict] = None, |
| #208 | ) -> List[OutputData]: |
| #209 | """ |
| #210 | Search for similar vectors. |
| #211 | |
| #212 | Args: |
| #213 | query (str): Query. |
| #214 | vectors (List[float]): Query vector. |
| #215 | limit (int, optional): Number of results to return. Defaults to 5. |
| #216 | filters (Dict, optional): Filters to apply to the search. Defaults to None. |
| #217 | |
| #218 | Returns: |
| #219 | list: Search results. |
| #220 | """ |
| #221 | filter_conditions = [] |
| #222 | filter_params = [] |
| #223 | |
| #224 | if filters: |
| #225 | for k, v in filters.items(): |
| #226 | filter_conditions.append("payload->>%s = %s") |
| #227 | filter_params.extend([k, str(v)]) |
| #228 | |
| #229 | filter_clause = "WHERE " + " AND ".join(filter_conditions) if filter_conditions else "" |
| #230 | |
| #231 | with self._get_cursor() as cur: |
| #232 | cur.execute( |
| #233 | f""" |
| #234 | SELECT id, vector <=> %s::vector AS distance, payload |
| #235 | FROM {self.collection_name} |
| #236 | {filter_clause} |
| #237 | ORDER BY distance |
| #238 | LIMIT %s |
| #239 | """, |
| #240 | (vectors, *filter_params, limit), |
| #241 | ) |
| #242 | |
| #243 | results = cur.fetchall() |
| #244 | return [OutputData(id=str(r[0]), score=float(r[1]), payload=r[2]) for r in results] |
| #245 | |
| #246 | def delete(self, vector_id: str) -> None: |
| #247 | """ |
| #248 | Delete a vector by ID. |
| #249 | |
| #250 | Args: |
| #251 | vector_id (str): ID of the vector to delete. |
| #252 | """ |
| #253 | with self._get_cursor(commit=True) as cur: |
| #254 | cur.execute(f"DELETE FROM {self.collection_name} WHERE id = %s", (vector_id,)) |
| #255 | |
| #256 | def update( |
| #257 | self, |
| #258 | vector_id: str, |
| #259 | vector: Optional[list[float]] = None, |
| #260 | payload: Optional[dict] = None, |
| #261 | ) -> None: |
| #262 | """ |
| #263 | Update a vector and its payload. |
| #264 | |
| #265 | Args: |
| #266 | vector_id (str): ID of the vector to update. |
| #267 | vector (List[float], optional): Updated vector. |
| #268 | payload (Dict, optional): Updated payload. |
| #269 | """ |
| #270 | with self._get_cursor(commit=True) as cur: |
| #271 | if vector: |
| #272 | cur.execute( |
| #273 | f"UPDATE {self.collection_name} SET vector = %s WHERE id = %s", |
| #274 | (vector, vector_id), |
| #275 | ) |
| #276 | if payload: |
| #277 | # Handle JSON serialization based on psycopg version |
| #278 | if PSYCOPG_VERSION == 3: |
| #279 | # psycopg3 uses psycopg.types.json.Json |
| #280 | cur.execute( |
| #281 | f"UPDATE {self.collection_name} SET payload = %s WHERE id = %s", |
| #282 | (Json(payload), vector_id), |
| #283 | ) |
| #284 | else: |
| #285 | # psycopg2 uses psycopg2.extras.Json |
| #286 | cur.execute( |
| #287 | f"UPDATE {self.collection_name} SET payload = %s WHERE id = %s", |
| #288 | (Json(payload), vector_id), |
| #289 | ) |
| #290 | |
| #291 | |
| #292 | def get(self, vector_id: str) -> OutputData: |
| #293 | """ |
| #294 | Retrieve a vector by ID. |
| #295 | |
| #296 | Args: |
| #297 | vector_id (str): ID of the vector to retrieve. |
| #298 | |
| #299 | Returns: |
| #300 | OutputData: Retrieved vector. |
| #301 | """ |
| #302 | with self._get_cursor() as cur: |
| #303 | cur.execute( |
| #304 | f"SELECT id, vector, payload FROM {self.collection_name} WHERE id = %s", |
| #305 | (vector_id,), |
| #306 | ) |
| #307 | result = cur.fetchone() |
| #308 | if not result: |
| #309 | return None |
| #310 | return OutputData(id=str(result[0]), score=None, payload=result[2]) |
| #311 | |
| #312 | def list_cols(self) -> List[str]: |
| #313 | """ |
| #314 | List all collections. |
| #315 | |
| #316 | Returns: |
| #317 | List[str]: List of collection names. |
| #318 | """ |
| #319 | with self._get_cursor() as cur: |
| #320 | cur.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'") |
| #321 | return [row[0] for row in cur.fetchall()] |
| #322 | |
| #323 | def delete_col(self) -> None: |
| #324 | """Delete a collection.""" |
| #325 | with self._get_cursor(commit=True) as cur: |
| #326 | cur.execute(f"DROP TABLE IF EXISTS {self.collection_name}") |
| #327 | |
| #328 | def col_info(self) -> dict[str, Any]: |
| #329 | """ |
| #330 | Get information about a collection. |
| #331 | |
| #332 | Returns: |
| #333 | Dict[str, Any]: Collection information. |
| #334 | """ |
| #335 | with self._get_cursor() as cur: |
| #336 | cur.execute( |
| #337 | f""" |
| #338 | SELECT |
| #339 | table_name, |
| #340 | (SELECT COUNT(*) FROM {self.collection_name}) as row_count, |
| #341 | (SELECT pg_size_pretty(pg_total_relation_size('{self.collection_name}'))) as total_size |
| #342 | FROM information_schema.tables |
| #343 | WHERE table_schema = 'public' AND table_name = %s |
| #344 | """, |
| #345 | (self.collection_name,), |
| #346 | ) |
| #347 | result = cur.fetchone() |
| #348 | return {"name": result[0], "count": result[1], "size": result[2]} |
| #349 | |
| #350 | def list( |
| #351 | self, |
| #352 | filters: Optional[dict] = None, |
| #353 | limit: Optional[int] = 100 |
| #354 | ) -> List[OutputData]: |
| #355 | """ |
| #356 | List all vectors in a collection. |
| #357 | |
| #358 | Args: |
| #359 | filters (Dict, optional): Filters to apply to the list. |
| #360 | limit (int, optional): Number of vectors to return. Defaults to 100. |
| #361 | |
| #362 | Returns: |
| #363 | List[OutputData]: List of vectors. |
| #364 | """ |
| #365 | filter_conditions = [] |
| #366 | filter_params = [] |
| #367 | |
| #368 | if filters: |
| #369 | for k, v in filters.items(): |
| #370 | filter_conditions.append("payload->>%s = %s") |
| #371 | filter_params.extend([k, str(v)]) |
| #372 | |
| #373 | filter_clause = "WHERE " + " AND ".join(filter_conditions) if filter_conditions else "" |
| #374 | |
| #375 | query = f""" |
| #376 | SELECT id, vector, payload |
| #377 | FROM {self.collection_name} |
| #378 | {filter_clause} |
| #379 | LIMIT %s |
| #380 | """ |
| #381 | |
| #382 | with self._get_cursor() as cur: |
| #383 | cur.execute(query, (*filter_params, limit)) |
| #384 | results = cur.fetchall() |
| #385 | return [[OutputData(id=str(r[0]), score=None, payload=r[2]) for r in results]] |
| #386 | |
| #387 | def __del__(self) -> None: |
| #388 | """ |
| #389 | Close the database connection pool when the object is deleted. |
| #390 | """ |
| #391 | try: |
| #392 | # Close pool appropriately |
| #393 | if PSYCOPG_VERSION == 3: |
| #394 | self.connection_pool.close() |
| #395 | else: |
| #396 | self.connection_pool.closeall() |
| #397 | except Exception: |
| #398 | pass |
| #399 | |
| #400 | def reset(self) -> None: |
| #401 | """Reset the index by deleting and recreating it.""" |
| #402 | logger.warning(f"Resetting index {self.collection_name}...") |
| #403 | self.delete_col() |
| #404 | self.create_col() |
| #405 |