repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | import json |
| #2 | import logging |
| #3 | import uuid |
| #4 | from typing import Any, Optional |
| #5 | |
| #6 | from embedchain.core.db.database import get_session |
| #7 | from embedchain.core.db.models import ChatHistory as ChatHistoryModel |
| #8 | from embedchain.memory.message import ChatMessage |
| #9 | from embedchain.memory.utils import merge_metadata_dict |
| #10 | |
| #11 | logger = logging.getLogger(__name__) |
| #12 | |
| #13 | |
| #14 | class ChatHistory: |
| #15 | def __init__(self) -> None: |
| #16 | self.db_session = get_session() |
| #17 | |
| #18 | def add(self, app_id, session_id, chat_message: ChatMessage) -> Optional[str]: |
| #19 | memory_id = str(uuid.uuid4()) |
| #20 | metadata_dict = merge_metadata_dict(chat_message.human_message.metadata, chat_message.ai_message.metadata) |
| #21 | if metadata_dict: |
| #22 | metadata = self._serialize_json(metadata_dict) |
| #23 | self.db_session.add( |
| #24 | ChatHistoryModel( |
| #25 | app_id=app_id, |
| #26 | id=memory_id, |
| #27 | session_id=session_id, |
| #28 | question=chat_message.human_message.content, |
| #29 | answer=chat_message.ai_message.content, |
| #30 | metadata=metadata if metadata_dict else "{}", |
| #31 | ) |
| #32 | ) |
| #33 | try: |
| #34 | self.db_session.commit() |
| #35 | except Exception as e: |
| #36 | logger.error(f"Error adding chat memory to db: {e}") |
| #37 | self.db_session.rollback() |
| #38 | return None |
| #39 | |
| #40 | logger.info(f"Added chat memory to db with id: {memory_id}") |
| #41 | return memory_id |
| #42 | |
| #43 | def delete(self, app_id: str, session_id: Optional[str] = None): |
| #44 | """ |
| #45 | Delete all chat history for a given app_id and session_id. |
| #46 | This is useful for deleting chat history for a given user. |
| #47 | |
| #48 | :param app_id: The app_id to delete chat history for |
| #49 | :param session_id: The session_id to delete chat history for |
| #50 | |
| #51 | :return: None |
| #52 | """ |
| #53 | params = {"app_id": app_id} |
| #54 | if session_id: |
| #55 | params["session_id"] = session_id |
| #56 | self.db_session.query(ChatHistoryModel).filter_by(**params).delete() |
| #57 | try: |
| #58 | self.db_session.commit() |
| #59 | except Exception as e: |
| #60 | logger.error(f"Error deleting chat history: {e}") |
| #61 | self.db_session.rollback() |
| #62 | |
| #63 | def get( |
| #64 | self, app_id, session_id: str = "default", num_rounds=10, fetch_all: bool = False, display_format=False |
| #65 | ) -> list[ChatMessage]: |
| #66 | """ |
| #67 | Get the chat history for a given app_id. |
| #68 | |
| #69 | param: app_id - The app_id to get chat history |
| #70 | param: session_id (optional) - The session_id to get chat history. Defaults to "default" |
| #71 | param: num_rounds (optional) - The number of rounds to get chat history. Defaults to 10 |
| #72 | param: fetch_all (optional) - Whether to fetch all chat history or not. Defaults to False |
| #73 | param: display_format (optional) - Whether to return the chat history in display format. Defaults to False |
| #74 | """ |
| #75 | params = {"app_id": app_id} |
| #76 | if not fetch_all: |
| #77 | params["session_id"] = session_id |
| #78 | results = ( |
| #79 | self.db_session.query(ChatHistoryModel).filter_by(**params).order_by(ChatHistoryModel.created_at.asc()) |
| #80 | ) |
| #81 | results = results.limit(num_rounds) if not fetch_all else results |
| #82 | history = [] |
| #83 | for result in results: |
| #84 | metadata = self._deserialize_json(metadata=result.meta_data or "{}") |
| #85 | # Return list of dict if display_format is True |
| #86 | if display_format: |
| #87 | history.append( |
| #88 | { |
| #89 | "session_id": result.session_id, |
| #90 | "human": result.question, |
| #91 | "ai": result.answer, |
| #92 | "metadata": result.meta_data, |
| #93 | "timestamp": result.created_at, |
| #94 | } |
| #95 | ) |
| #96 | else: |
| #97 | memory = ChatMessage() |
| #98 | memory.add_user_message(result.question, metadata=metadata) |
| #99 | memory.add_ai_message(result.answer, metadata=metadata) |
| #100 | history.append(memory) |
| #101 | return history |
| #102 | |
| #103 | def count(self, app_id: str, session_id: Optional[str] = None): |
| #104 | """ |
| #105 | Count the number of chat messages for a given app_id and session_id. |
| #106 | |
| #107 | :param app_id: The app_id to count chat history for |
| #108 | :param session_id: The session_id to count chat history for |
| #109 | |
| #110 | :return: The number of chat messages for a given app_id and session_id |
| #111 | """ |
| #112 | # Rewrite the logic below with sqlalchemy |
| #113 | params = {"app_id": app_id} |
| #114 | if session_id: |
| #115 | params["session_id"] = session_id |
| #116 | return self.db_session.query(ChatHistoryModel).filter_by(**params).count() |
| #117 | |
| #118 | @staticmethod |
| #119 | def _serialize_json(metadata: dict[str, Any]): |
| #120 | return json.dumps(metadata) |
| #121 | |
| #122 | @staticmethod |
| #123 | def _deserialize_json(metadata: str): |
| #124 | return json.loads(metadata) |
| #125 | |
| #126 | def close_connection(self): |
| #127 | self.connection.close() |
| #128 |