repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | import logging |
| #2 | import os |
| #3 | from typing import Optional, Union |
| #4 | |
| #5 | try: |
| #6 | import pinecone |
| #7 | except ImportError: |
| #8 | raise ImportError( |
| #9 | "Pinecone requires extra dependencies. Install with `pip install pinecone-text pinecone-client`" |
| #10 | ) from None |
| #11 | |
| #12 | from pinecone_text.sparse import BM25Encoder |
| #13 | |
| #14 | from embedchain.config.vector_db.pinecone import PineconeDBConfig |
| #15 | from embedchain.helpers.json_serializable import register_deserializable |
| #16 | from embedchain.utils.misc import chunks |
| #17 | from embedchain.vectordb.base import BaseVectorDB |
| #18 | |
| #19 | logger = logging.getLogger(__name__) |
| #20 | |
| #21 | |
| #22 | @register_deserializable |
| #23 | class PineconeDB(BaseVectorDB): |
| #24 | """ |
| #25 | Pinecone as vector database |
| #26 | """ |
| #27 | |
| #28 | def __init__( |
| #29 | self, |
| #30 | config: Optional[PineconeDBConfig] = None, |
| #31 | ): |
| #32 | """Pinecone as vector database. |
| #33 | |
| #34 | :param config: Pinecone database config, defaults to None |
| #35 | :type config: PineconeDBConfig, optional |
| #36 | :raises ValueError: No config provided |
| #37 | """ |
| #38 | if config is None: |
| #39 | self.config = PineconeDBConfig() |
| #40 | else: |
| #41 | if not isinstance(config, PineconeDBConfig): |
| #42 | raise TypeError( |
| #43 | "config is not a `PineconeDBConfig` instance. " |
| #44 | "Please make sure the type is right and that you are passing an instance." |
| #45 | ) |
| #46 | self.config = config |
| #47 | self._setup_pinecone_index() |
| #48 | |
| #49 | # Setup BM25Encoder if sparse vectors are to be used |
| #50 | self.bm25_encoder = None |
| #51 | self.batch_size = self.config.batch_size |
| #52 | if self.config.hybrid_search: |
| #53 | logger.info("Initializing BM25Encoder for sparse vectors..") |
| #54 | self.bm25_encoder = self.config.bm25_encoder if self.config.bm25_encoder else BM25Encoder.default() |
| #55 | |
| #56 | # Call parent init here because embedder is needed |
| #57 | super().__init__(config=self.config) |
| #58 | |
| #59 | def _initialize(self): |
| #60 | """ |
| #61 | This method is needed because `embedder` attribute needs to be set externally before it can be initialized. |
| #62 | """ |
| #63 | if not self.embedder: |
| #64 | raise ValueError("Embedder not set. Please set an embedder with `set_embedder` before initialization.") |
| #65 | |
| #66 | def _setup_pinecone_index(self): |
| #67 | """ |
| #68 | Loads the Pinecone index or creates it if not present. |
| #69 | """ |
| #70 | api_key = self.config.api_key or os.environ.get("PINECONE_API_KEY") |
| #71 | if not api_key: |
| #72 | raise ValueError("Please set the PINECONE_API_KEY environment variable or pass it in config.") |
| #73 | self.client = pinecone.Pinecone(api_key=api_key, **self.config.extra_params) |
| #74 | indexes = self.client.list_indexes().names() |
| #75 | if indexes is None or self.config.index_name not in indexes: |
| #76 | if self.config.pod_config: |
| #77 | spec = pinecone.PodSpec(**self.config.pod_config) |
| #78 | elif self.config.serverless_config: |
| #79 | spec = pinecone.ServerlessSpec(**self.config.serverless_config) |
| #80 | else: |
| #81 | raise ValueError("No pod_config or serverless_config found.") |
| #82 | |
| #83 | self.client.create_index( |
| #84 | name=self.config.index_name, |
| #85 | metric=self.config.metric, |
| #86 | dimension=self.config.vector_dimension, |
| #87 | spec=spec, |
| #88 | ) |
| #89 | self.pinecone_index = self.client.Index(self.config.index_name) |
| #90 | |
| #91 | def get(self, ids: Optional[list[str]] = None, where: Optional[dict[str, any]] = None, limit: Optional[int] = None): |
| #92 | """ |
| #93 | Get existing doc ids present in vector database |
| #94 | |
| #95 | :param ids: _list of doc ids to check for existence |
| #96 | :type ids: list[str] |
| #97 | :param where: to filter data |
| #98 | :type where: dict[str, any] |
| #99 | :return: ids |
| #100 | :rtype: Set[str] |
| #101 | """ |
| #102 | existing_ids = list() |
| #103 | metadatas = [] |
| #104 | |
| #105 | if ids is not None: |
| #106 | for i in range(0, len(ids), self.batch_size): |
| #107 | result = self.pinecone_index.fetch(ids=ids[i : i + self.batch_size]) |
| #108 | vectors = result.get("vectors") |
| #109 | batch_existing_ids = list(vectors.keys()) |
| #110 | existing_ids.extend(batch_existing_ids) |
| #111 | metadatas.extend([vectors.get(ids).get("metadata") for ids in batch_existing_ids]) |
| #112 | return {"ids": existing_ids, "metadatas": metadatas} |
| #113 | |
| #114 | def add( |
| #115 | self, |
| #116 | documents: list[str], |
| #117 | metadatas: list[object], |
| #118 | ids: list[str], |
| #119 | **kwargs: Optional[dict[str, any]], |
| #120 | ): |
| #121 | """add data in vector database |
| #122 | |
| #123 | :param documents: list of texts to add |
| #124 | :type documents: list[str] |
| #125 | :param metadatas: list of metadata associated with docs |
| #126 | :type metadatas: list[object] |
| #127 | :param ids: ids of docs |
| #128 | :type ids: list[str] |
| #129 | """ |
| #130 | docs = [] |
| #131 | embeddings = self.embedder.embedding_fn(documents) |
| #132 | for id, text, metadata, embedding in zip(ids, documents, metadatas, embeddings): |
| #133 | # Insert sparse vectors as well if the user wants to do the hybrid search |
| #134 | sparse_vector_dict = ( |
| #135 | {"sparse_values": self.bm25_encoder.encode_documents(text)} if self.bm25_encoder else {} |
| #136 | ) |
| #137 | docs.append( |
| #138 | { |
| #139 | "id": id, |
| #140 | "values": embedding, |
| #141 | "metadata": {**metadata, "text": text}, |
| #142 | **sparse_vector_dict, |
| #143 | }, |
| #144 | ) |
| #145 | |
| #146 | for chunk in chunks(docs, self.batch_size, desc="Adding chunks in batches"): |
| #147 | self.pinecone_index.upsert(chunk, **kwargs) |
| #148 | |
| #149 | def query( |
| #150 | self, |
| #151 | input_query: str, |
| #152 | n_results: int, |
| #153 | where: Optional[dict[str, any]] = None, |
| #154 | raw_filter: Optional[dict[str, any]] = None, |
| #155 | citations: bool = False, |
| #156 | app_id: Optional[str] = None, |
| #157 | **kwargs: Optional[dict[str, any]], |
| #158 | ) -> Union[list[tuple[str, dict]], list[str]]: |
| #159 | """ |
| #160 | Query contents from vector database based on vector similarity. |
| #161 | |
| #162 | Args: |
| #163 | input_query (str): query string. |
| #164 | n_results (int): Number of similar documents to fetch from the database. |
| #165 | where (dict[str, any], optional): Filter criteria for the search. |
| #166 | raw_filter (dict[str, any], optional): Advanced raw filter criteria for the search. |
| #167 | citations (bool, optional): Flag to return context along with metadata. Defaults to False. |
| #168 | app_id (str, optional): Application ID to be passed to Pinecone. |
| #169 | |
| #170 | Returns: |
| #171 | Union[list[tuple[str, dict]], list[str]]: List of document contexts, optionally with metadata. |
| #172 | """ |
| #173 | query_filter = raw_filter if raw_filter is not None else self._generate_filter(where) |
| #174 | if app_id: |
| #175 | query_filter["app_id"] = {"$eq": app_id} |
| #176 | |
| #177 | query_vector = self.embedder.embedding_fn([input_query])[0] |
| #178 | params = { |
| #179 | "vector": query_vector, |
| #180 | "filter": query_filter, |
| #181 | "top_k": n_results, |
| #182 | "include_metadata": True, |
| #183 | **kwargs, |
| #184 | } |
| #185 | |
| #186 | if self.bm25_encoder: |
| #187 | sparse_query_vector = self.bm25_encoder.encode_queries(input_query) |
| #188 | params["sparse_vector"] = sparse_query_vector |
| #189 | |
| #190 | data = self.pinecone_index.query(**params) |
| #191 | return [ |
| #192 | (metadata.get("text"), {**metadata, "score": doc.get("score")}) if citations else metadata.get("text") |
| #193 | for doc in data.get("matches", []) |
| #194 | for metadata in [doc.get("metadata", {})] |
| #195 | ] |
| #196 | |
| #197 | def set_collection_name(self, name: str): |
| #198 | """ |
| #199 | Set the name of the collection. A collection is an isolated space for vectors. |
| #200 | |
| #201 | :param name: Name of the collection. |
| #202 | :type name: str |
| #203 | """ |
| #204 | if not isinstance(name, str): |
| #205 | raise TypeError("Collection name must be a string") |
| #206 | self.config.collection_name = name |
| #207 | |
| #208 | def count(self) -> int: |
| #209 | """ |
| #210 | Count number of documents/chunks embedded in the database. |
| #211 | |
| #212 | :return: number of documents |
| #213 | :rtype: int |
| #214 | """ |
| #215 | data = self.pinecone_index.describe_index_stats() |
| #216 | return data["total_vector_count"] |
| #217 | |
| #218 | def _get_or_create_db(self): |
| #219 | """Called during initialization""" |
| #220 | return self.client |
| #221 | |
| #222 | def reset(self): |
| #223 | """ |
| #224 | Resets the database. Deletes all embeddings irreversibly. |
| #225 | """ |
| #226 | # Delete all data from the database |
| #227 | self.client.delete_index(self.config.index_name) |
| #228 | self._setup_pinecone_index() |
| #229 | |
| #230 | @staticmethod |
| #231 | def _generate_filter(where: dict): |
| #232 | query = {} |
| #233 | if where is None: |
| #234 | return query |
| #235 | |
| #236 | for k, v in where.items(): |
| #237 | query[k] = {"$eq": v} |
| #238 | return query |
| #239 | |
| #240 | def delete(self, where: dict): |
| #241 | """Delete from database. |
| #242 | :param ids: list of ids to delete |
| #243 | :type ids: list[str] |
| #244 | """ |
| #245 | # Deleting with filters is not supported for `starter` index type. |
| #246 | # Follow `https://docs.pinecone.io/docs/metadata-filtering#deleting-vectors-by-metadata-filter` for more details |
| #247 | db_filter = self._generate_filter(where) |
| #248 | try: |
| #249 | self.pinecone_index.delete(filter=db_filter) |
| #250 | except Exception as e: |
| #251 | print(f"Failed to delete from Pinecone: {e}") |
| #252 | return |
| #253 |