repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | import hashlib |
| #2 | import logging |
| #3 | from typing import Any, Optional |
| #4 | |
| #5 | from embedchain.config.add_config import ChunkerConfig |
| #6 | from embedchain.helpers.json_serializable import JSONSerializable |
| #7 | from embedchain.models.data_type import DataType |
| #8 | |
| #9 | logger = logging.getLogger(__name__) |
| #10 | |
| #11 | |
| #12 | class BaseChunker(JSONSerializable): |
| #13 | def __init__(self, text_splitter): |
| #14 | """Initialize the chunker.""" |
| #15 | self.text_splitter = text_splitter |
| #16 | self.data_type = None |
| #17 | |
| #18 | def create_chunks( |
| #19 | self, |
| #20 | loader, |
| #21 | src, |
| #22 | app_id=None, |
| #23 | config: Optional[ChunkerConfig] = None, |
| #24 | **kwargs: Optional[dict[str, Any]], |
| #25 | ): |
| #26 | """ |
| #27 | Loads data and chunks it. |
| #28 | |
| #29 | :param loader: The loader whose `load_data` method is used to create |
| #30 | the raw data. |
| #31 | :param src: The data to be handled by the loader. Can be a URL for |
| #32 | remote sources or local content for local loaders. |
| #33 | :param app_id: App id used to generate the doc_id. |
| #34 | """ |
| #35 | documents = [] |
| #36 | chunk_ids = [] |
| #37 | id_map = {} |
| #38 | min_chunk_size = config.min_chunk_size if config is not None else 1 |
| #39 | logger.info(f"Skipping chunks smaller than {min_chunk_size} characters") |
| #40 | data_result = loader.load_data(src, **kwargs) |
| #41 | data_records = data_result["data"] |
| #42 | doc_id = data_result["doc_id"] |
| #43 | # Prefix app_id in the document id if app_id is not None to |
| #44 | # distinguish between different documents stored in the same |
| #45 | # elasticsearch or opensearch index |
| #46 | doc_id = f"{app_id}--{doc_id}" if app_id is not None else doc_id |
| #47 | metadatas = [] |
| #48 | for data in data_records: |
| #49 | content = data["content"] |
| #50 | |
| #51 | metadata = data["meta_data"] |
| #52 | # add data type to meta data to allow query using data type |
| #53 | metadata["data_type"] = self.data_type.value |
| #54 | metadata["doc_id"] = doc_id |
| #55 | |
| #56 | # TODO: Currently defaulting to the src as the url. This is done intentianally since some |
| #57 | # of the data types like 'gmail' loader doesn't have the url in the meta data. |
| #58 | url = metadata.get("url", src) |
| #59 | |
| #60 | chunks = self.get_chunks(content) |
| #61 | for chunk in chunks: |
| #62 | chunk_id = hashlib.sha256((chunk + url).encode()).hexdigest() |
| #63 | chunk_id = f"{app_id}--{chunk_id}" if app_id is not None else chunk_id |
| #64 | if id_map.get(chunk_id) is None and len(chunk) >= min_chunk_size: |
| #65 | id_map[chunk_id] = True |
| #66 | chunk_ids.append(chunk_id) |
| #67 | documents.append(chunk) |
| #68 | metadatas.append(metadata) |
| #69 | return { |
| #70 | "documents": documents, |
| #71 | "ids": chunk_ids, |
| #72 | "metadatas": metadatas, |
| #73 | "doc_id": doc_id, |
| #74 | } |
| #75 | |
| #76 | def get_chunks(self, content): |
| #77 | """ |
| #78 | Returns chunks using text splitter instance. |
| #79 | |
| #80 | Override in child class if custom logic. |
| #81 | """ |
| #82 | return self.text_splitter.split_text(content) |
| #83 | |
| #84 | def set_data_type(self, data_type: DataType): |
| #85 | """ |
| #86 | set the data type of chunker |
| #87 | """ |
| #88 | self.data_type = data_type |
| #89 | |
| #90 | # TODO: This should be done during initialization. This means it has to be done in the child classes. |
| #91 | |
| #92 | @staticmethod |
| #93 | def get_word_count(documents) -> int: |
| #94 | return sum(len(document.split(" ")) for document in documents) |
| #95 |