repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | import hashlib |
| #2 | import logging |
| #3 | from typing import Any, Optional |
| #4 | |
| #5 | from embedchain.loaders.base_loader import BaseLoader |
| #6 | |
| #7 | logger = logging.getLogger(__name__) |
| #8 | |
| #9 | |
| #10 | class PostgresLoader(BaseLoader): |
| #11 | def __init__(self, config: Optional[dict[str, Any]] = None): |
| #12 | super().__init__() |
| #13 | if not config: |
| #14 | raise ValueError(f"Must provide the valid config. Received: {config}") |
| #15 | |
| #16 | self.connection = None |
| #17 | self.cursor = None |
| #18 | self._setup_loader(config=config) |
| #19 | |
| #20 | def _setup_loader(self, config: dict[str, Any]): |
| #21 | try: |
| #22 | import psycopg |
| #23 | except ImportError as e: |
| #24 | raise ImportError( |
| #25 | "Unable to import required packages. \ |
| #26 | Run `pip install --upgrade 'embedchain[postgres]'`" |
| #27 | ) from e |
| #28 | |
| #29 | if "url" in config: |
| #30 | config_info = config.get("url") |
| #31 | else: |
| #32 | conn_params = [] |
| #33 | for key, value in config.items(): |
| #34 | conn_params.append(f"{key}={value}") |
| #35 | config_info = " ".join(conn_params) |
| #36 | |
| #37 | logger.info(f"Connecting to postrgres sql: {config_info}") |
| #38 | self.connection = psycopg.connect(conninfo=config_info) |
| #39 | self.cursor = self.connection.cursor() |
| #40 | |
| #41 | @staticmethod |
| #42 | def _check_query(query): |
| #43 | if not isinstance(query, str): |
| #44 | raise ValueError( |
| #45 | f"Invalid postgres query: {query}. Provide the valid source to add from postgres, make sure you are following `https://docs.embedchain.ai/data-sources/postgres`", # noqa:E501 |
| #46 | ) |
| #47 | |
| #48 | def load_data(self, query): |
| #49 | self._check_query(query) |
| #50 | try: |
| #51 | data = [] |
| #52 | data_content = [] |
| #53 | self.cursor.execute(query) |
| #54 | results = self.cursor.fetchall() |
| #55 | for result in results: |
| #56 | doc_content = str(result) |
| #57 | data.append({"content": doc_content, "meta_data": {"url": query}}) |
| #58 | data_content.append(doc_content) |
| #59 | doc_id = hashlib.sha256((query + ", ".join(data_content)).encode()).hexdigest() |
| #60 | return { |
| #61 | "doc_id": doc_id, |
| #62 | "data": data, |
| #63 | } |
| #64 | except Exception as e: |
| #65 | raise ValueError(f"Failed to load data using query={query} with: {e}") |
| #66 | |
| #67 | def close_connection(self): |
| #68 | if self.cursor: |
| #69 | self.cursor.close() |
| #70 | self.cursor = None |
| #71 | if self.connection: |
| #72 | self.connection.close() |
| #73 | self.connection = None |
| #74 |