repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | import base64 |
| #2 | import hashlib |
| #3 | import logging |
| #4 | import os |
| #5 | from email import message_from_bytes |
| #6 | from email.utils import parsedate_to_datetime |
| #7 | from textwrap import dedent |
| #8 | from typing import Optional |
| #9 | |
| #10 | from bs4 import BeautifulSoup |
| #11 | |
| #12 | try: |
| #13 | from google.auth.transport.requests import Request |
| #14 | from google.oauth2.credentials import Credentials |
| #15 | from google_auth_oauthlib.flow import InstalledAppFlow |
| #16 | from googleapiclient.discovery import build |
| #17 | except ImportError: |
| #18 | raise ImportError( |
| #19 | 'Gmail requires extra dependencies. Install with `pip install --upgrade "embedchain[gmail]"`' |
| #20 | ) from None |
| #21 | |
| #22 | from embedchain.loaders.base_loader import BaseLoader |
| #23 | from embedchain.utils.misc import clean_string |
| #24 | |
| #25 | logger = logging.getLogger(__name__) |
| #26 | |
| #27 | |
| #28 | class GmailReader: |
| #29 | SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"] |
| #30 | |
| #31 | def __init__(self, query: str, service=None, results_per_page: int = 10): |
| #32 | self.query = query |
| #33 | self.service = service or self._initialize_service() |
| #34 | self.results_per_page = results_per_page |
| #35 | |
| #36 | @staticmethod |
| #37 | def _initialize_service(): |
| #38 | credentials = GmailReader._get_credentials() |
| #39 | return build("gmail", "v1", credentials=credentials) |
| #40 | |
| #41 | @staticmethod |
| #42 | def _get_credentials(): |
| #43 | if not os.path.exists("credentials.json"): |
| #44 | raise FileNotFoundError("Missing 'credentials.json'. Download it from your Google Developer account.") |
| #45 | |
| #46 | creds = ( |
| #47 | Credentials.from_authorized_user_file("token.json", GmailReader.SCOPES) |
| #48 | if os.path.exists("token.json") |
| #49 | else None |
| #50 | ) |
| #51 | |
| #52 | if not creds or not creds.valid: |
| #53 | if creds and creds.expired and creds.refresh_token: |
| #54 | creds.refresh(Request()) |
| #55 | else: |
| #56 | flow = InstalledAppFlow.from_client_secrets_file("credentials.json", GmailReader.SCOPES) |
| #57 | creds = flow.run_local_server(port=8080) |
| #58 | with open("token.json", "w") as token: |
| #59 | token.write(creds.to_json()) |
| #60 | return creds |
| #61 | |
| #62 | def load_emails(self) -> list[dict]: |
| #63 | response = self.service.users().messages().list(userId="me", q=self.query).execute() |
| #64 | messages = response.get("messages", []) |
| #65 | |
| #66 | return [self._parse_email(self._get_email(message["id"])) for message in messages] |
| #67 | |
| #68 | def _get_email(self, message_id: str): |
| #69 | raw_message = self.service.users().messages().get(userId="me", id=message_id, format="raw").execute() |
| #70 | return base64.urlsafe_b64decode(raw_message["raw"]) |
| #71 | |
| #72 | def _parse_email(self, raw_email) -> dict: |
| #73 | mime_msg = message_from_bytes(raw_email) |
| #74 | return { |
| #75 | "subject": self._get_header(mime_msg, "Subject"), |
| #76 | "from": self._get_header(mime_msg, "From"), |
| #77 | "to": self._get_header(mime_msg, "To"), |
| #78 | "date": self._format_date(mime_msg), |
| #79 | "body": self._get_body(mime_msg), |
| #80 | } |
| #81 | |
| #82 | @staticmethod |
| #83 | def _get_header(mime_msg, header_name: str) -> str: |
| #84 | return mime_msg.get(header_name, "") |
| #85 | |
| #86 | @staticmethod |
| #87 | def _format_date(mime_msg) -> Optional[str]: |
| #88 | date_header = GmailReader._get_header(mime_msg, "Date") |
| #89 | return parsedate_to_datetime(date_header).isoformat() if date_header else None |
| #90 | |
| #91 | @staticmethod |
| #92 | def _get_body(mime_msg) -> str: |
| #93 | def decode_payload(part): |
| #94 | charset = part.get_content_charset() or "utf-8" |
| #95 | try: |
| #96 | return part.get_payload(decode=True).decode(charset) |
| #97 | except UnicodeDecodeError: |
| #98 | return part.get_payload(decode=True).decode(charset, errors="replace") |
| #99 | |
| #100 | if mime_msg.is_multipart(): |
| #101 | for part in mime_msg.walk(): |
| #102 | ctype = part.get_content_type() |
| #103 | cdispo = str(part.get("Content-Disposition")) |
| #104 | |
| #105 | if ctype == "text/plain" and "attachment" not in cdispo: |
| #106 | return decode_payload(part) |
| #107 | elif ctype == "text/html": |
| #108 | return decode_payload(part) |
| #109 | else: |
| #110 | return decode_payload(mime_msg) |
| #111 | |
| #112 | return "" |
| #113 | |
| #114 | |
| #115 | class GmailLoader(BaseLoader): |
| #116 | def load_data(self, query: str): |
| #117 | reader = GmailReader(query=query) |
| #118 | emails = reader.load_emails() |
| #119 | logger.info(f"Gmail Loader: {len(emails)} emails found for query '{query}'") |
| #120 | |
| #121 | data = [] |
| #122 | for email in emails: |
| #123 | content = self._process_email(email) |
| #124 | data.append({"content": content, "meta_data": email}) |
| #125 | |
| #126 | return {"doc_id": self._generate_doc_id(query, data), "data": data} |
| #127 | |
| #128 | @staticmethod |
| #129 | def _process_email(email: dict) -> str: |
| #130 | content = BeautifulSoup(email["body"], "html.parser").get_text() |
| #131 | content = clean_string(content) |
| #132 | return dedent( |
| #133 | f""" |
| #134 | Email from '{email['from']}' to '{email['to']}' |
| #135 | Subject: {email['subject']} |
| #136 | Date: {email['date']} |
| #137 | Content: {content} |
| #138 | """ |
| #139 | ) |
| #140 | |
| #141 | @staticmethod |
| #142 | def _generate_doc_id(query: str, data: list[dict]) -> str: |
| #143 | content_strings = [email["content"] for email in data] |
| #144 | return hashlib.sha256((query + ", ".join(content_strings)).encode()).hexdigest() |
| #145 |