repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | import hashlib |
| #2 | import logging |
| #3 | import time |
| #4 | from typing import Any, Optional |
| #5 | |
| #6 | import requests |
| #7 | |
| #8 | from embedchain.loaders.base_loader import BaseLoader |
| #9 | from embedchain.utils.misc import clean_string |
| #10 | |
| #11 | logger = logging.getLogger(__name__) |
| #12 | |
| #13 | |
| #14 | class DiscourseLoader(BaseLoader): |
| #15 | def __init__(self, config: Optional[dict[str, Any]] = None): |
| #16 | super().__init__() |
| #17 | if not config: |
| #18 | raise ValueError( |
| #19 | "DiscourseLoader requires a config. Check the documentation for the correct format - `https://docs.embedchain.ai/components/data-sources/discourse`" # noqa: E501 |
| #20 | ) |
| #21 | |
| #22 | self.domain = config.get("domain") |
| #23 | if not self.domain: |
| #24 | raise ValueError( |
| #25 | "DiscourseLoader requires a domain. Check the documentation for the correct format - `https://docs.embedchain.ai/components/data-sources/discourse`" # noqa: E501 |
| #26 | ) |
| #27 | |
| #28 | def _check_query(self, query): |
| #29 | if not query or not isinstance(query, str): |
| #30 | raise ValueError( |
| #31 | "DiscourseLoader requires a query. Check the documentation for the correct format - `https://docs.embedchain.ai/components/data-sources/discourse`" # noqa: E501 |
| #32 | ) |
| #33 | |
| #34 | def _load_post(self, post_id): |
| #35 | post_url = f"{self.domain}posts/{post_id}.json" |
| #36 | response = requests.get(post_url) |
| #37 | try: |
| #38 | response.raise_for_status() |
| #39 | except Exception as e: |
| #40 | logger.error(f"Failed to load post {post_id}: {e}") |
| #41 | return |
| #42 | response_data = response.json() |
| #43 | post_contents = clean_string(response_data.get("raw")) |
| #44 | metadata = { |
| #45 | "url": post_url, |
| #46 | "created_at": response_data.get("created_at", ""), |
| #47 | "username": response_data.get("username", ""), |
| #48 | "topic_slug": response_data.get("topic_slug", ""), |
| #49 | "score": response_data.get("score", ""), |
| #50 | } |
| #51 | data = { |
| #52 | "content": post_contents, |
| #53 | "meta_data": metadata, |
| #54 | } |
| #55 | return data |
| #56 | |
| #57 | def load_data(self, query): |
| #58 | self._check_query(query) |
| #59 | data = [] |
| #60 | data_contents = [] |
| #61 | logger.info(f"Searching data on discourse url: {self.domain}, for query: {query}") |
| #62 | search_url = f"{self.domain}search.json?q={query}" |
| #63 | response = requests.get(search_url) |
| #64 | try: |
| #65 | response.raise_for_status() |
| #66 | except Exception as e: |
| #67 | raise ValueError(f"Failed to search query {query}: {e}") |
| #68 | response_data = response.json() |
| #69 | post_ids = response_data.get("grouped_search_result").get("post_ids") |
| #70 | for id in post_ids: |
| #71 | post_data = self._load_post(id) |
| #72 | if post_data: |
| #73 | data.append(post_data) |
| #74 | data_contents.append(post_data.get("content")) |
| #75 | # Sleep for 0.4 sec, to avoid rate limiting. Check `https://meta.discourse.org/t/api-rate-limits/208405/6` |
| #76 | time.sleep(0.4) |
| #77 | doc_id = hashlib.sha256((query + ", ".join(data_contents)).encode()).hexdigest() |
| #78 | response_data = {"doc_id": doc_id, "data": data} |
| #79 | return response_data |
| #80 |