repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | import hashlib |
| #2 | import logging |
| #3 | from typing import Any, Optional |
| #4 | |
| #5 | from embedchain.loaders.base_loader import BaseLoader |
| #6 | from embedchain.utils.misc import clean_string |
| #7 | |
| #8 | logger = logging.getLogger(__name__) |
| #9 | |
| #10 | |
| #11 | class MySQLLoader(BaseLoader): |
| #12 | def __init__(self, config: Optional[dict[str, Any]]): |
| #13 | super().__init__() |
| #14 | if not config: |
| #15 | raise ValueError( |
| #16 | f"Invalid sql config: {config}.", |
| #17 | "Provide the correct config, refer `https://docs.embedchain.ai/data-sources/mysql`.", |
| #18 | ) |
| #19 | |
| #20 | self.config = config |
| #21 | self.connection = None |
| #22 | self.cursor = None |
| #23 | self._setup_loader(config=config) |
| #24 | |
| #25 | def _setup_loader(self, config: dict[str, Any]): |
| #26 | try: |
| #27 | import mysql.connector as sqlconnector |
| #28 | except ImportError as e: |
| #29 | raise ImportError( |
| #30 | "Unable to import required packages for MySQL loader. Run `pip install --upgrade 'embedchain[mysql]'`." # noqa: E501 |
| #31 | ) from e |
| #32 | |
| #33 | try: |
| #34 | self.connection = sqlconnector.connection.MySQLConnection(**config) |
| #35 | self.cursor = self.connection.cursor() |
| #36 | except (sqlconnector.Error, IOError) as err: |
| #37 | logger.info(f"Connection failed: {err}") |
| #38 | raise ValueError( |
| #39 | f"Unable to connect with the given config: {config}.", |
| #40 | "Please provide the correct configuration to load data from you MySQL DB. \ |
| #41 | Refer `https://docs.embedchain.ai/data-sources/mysql`.", |
| #42 | ) |
| #43 | |
| #44 | @staticmethod |
| #45 | def _check_query(query): |
| #46 | if not isinstance(query, str): |
| #47 | raise ValueError( |
| #48 | f"Invalid mysql query: {query}", |
| #49 | "Provide the valid query to add from mysql, \ |
| #50 | make sure you are following `https://docs.embedchain.ai/data-sources/mysql`", |
| #51 | ) |
| #52 | |
| #53 | def load_data(self, query): |
| #54 | self._check_query(query=query) |
| #55 | data = [] |
| #56 | data_content = [] |
| #57 | self.cursor.execute(query) |
| #58 | rows = self.cursor.fetchall() |
| #59 | for row in rows: |
| #60 | doc_content = clean_string(str(row)) |
| #61 | data.append({"content": doc_content, "meta_data": {"url": query}}) |
| #62 | data_content.append(doc_content) |
| #63 | doc_id = hashlib.sha256((query + ", ".join(data_content)).encode()).hexdigest() |
| #64 | return { |
| #65 | "doc_id": doc_id, |
| #66 | "data": data, |
| #67 | } |
| #68 |