repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | import hashlib |
| #2 | import re |
| #3 | |
| #4 | from mem0.configs.prompts import ( |
| #5 | FACT_RETRIEVAL_PROMPT, |
| #6 | USER_MEMORY_EXTRACTION_PROMPT, |
| #7 | AGENT_MEMORY_EXTRACTION_PROMPT, |
| #8 | ) |
| #9 | |
| #10 | |
| #11 | def get_fact_retrieval_messages(message, is_agent_memory=False): |
| #12 | """Get fact retrieval messages based on the memory type. |
| #13 | |
| #14 | Args: |
| #15 | message: The message content to extract facts from |
| #16 | is_agent_memory: If True, use agent memory extraction prompt, else use user memory extraction prompt |
| #17 | |
| #18 | Returns: |
| #19 | tuple: (system_prompt, user_prompt) |
| #20 | """ |
| #21 | if is_agent_memory: |
| #22 | return AGENT_MEMORY_EXTRACTION_PROMPT, f"Input:\n{message}" |
| #23 | else: |
| #24 | return USER_MEMORY_EXTRACTION_PROMPT, f"Input:\n{message}" |
| #25 | |
| #26 | |
| #27 | def get_fact_retrieval_messages_legacy(message): |
| #28 | """Legacy function for backward compatibility.""" |
| #29 | return FACT_RETRIEVAL_PROMPT, f"Input:\n{message}" |
| #30 | |
| #31 | |
| #32 | def parse_messages(messages): |
| #33 | response = "" |
| #34 | for msg in messages: |
| #35 | if msg["role"] == "system": |
| #36 | response += f"system: {msg['content']}\n" |
| #37 | if msg["role"] == "user": |
| #38 | response += f"user: {msg['content']}\n" |
| #39 | if msg["role"] == "assistant": |
| #40 | response += f"assistant: {msg['content']}\n" |
| #41 | return response |
| #42 | |
| #43 | |
| #44 | def format_entities(entities): |
| #45 | if not entities: |
| #46 | return "" |
| #47 | |
| #48 | formatted_lines = [] |
| #49 | for entity in entities: |
| #50 | simplified = f"{entity['source']} -- {entity['relationship']} -- {entity['destination']}" |
| #51 | formatted_lines.append(simplified) |
| #52 | |
| #53 | return "\n".join(formatted_lines) |
| #54 | |
| #55 | |
| #56 | def remove_code_blocks(content: str) -> str: |
| #57 | """ |
| #58 | Removes enclosing code block markers ```[language] and ``` from a given string. |
| #59 | |
| #60 | Remarks: |
| #61 | - The function uses a regex pattern to match code blocks that may start with ``` followed by an optional language tag (letters or numbers) and end with ```. |
| #62 | - If a code block is detected, it returns only the inner content, stripping out the markers. |
| #63 | - If no code block markers are found, the original content is returned as-is. |
| #64 | """ |
| #65 | pattern = r"^```[a-zA-Z0-9]*\n([\s\S]*?)\n```$" |
| #66 | match = re.match(pattern, content.strip()) |
| #67 | match_res=match.group(1).strip() if match else content.strip() |
| #68 | return re.sub(r"<think>.*?</think>", "", match_res, flags=re.DOTALL).strip() |
| #69 | |
| #70 | |
| #71 | |
| #72 | def extract_json(text): |
| #73 | """ |
| #74 | Extracts JSON content from a string, removing enclosing triple backticks and optional 'json' tag if present. |
| #75 | If no code block is found, returns the text as-is. |
| #76 | """ |
| #77 | text = text.strip() |
| #78 | match = re.search(r"```(?:json)?\s*(.*?)\s*```", text, re.DOTALL) |
| #79 | if match: |
| #80 | json_str = match.group(1) |
| #81 | else: |
| #82 | json_str = text # assume it's raw JSON |
| #83 | return json_str |
| #84 | |
| #85 | |
| #86 | def get_image_description(image_obj, llm, vision_details): |
| #87 | """ |
| #88 | Get the description of the image |
| #89 | """ |
| #90 | |
| #91 | if isinstance(image_obj, str): |
| #92 | messages = [ |
| #93 | { |
| #94 | "role": "user", |
| #95 | "content": [ |
| #96 | { |
| #97 | "type": "text", |
| #98 | "text": "A user is providing an image. Provide a high level description of the image and do not include any additional text.", |
| #99 | }, |
| #100 | {"type": "image_url", "image_url": {"url": image_obj, "detail": vision_details}}, |
| #101 | ], |
| #102 | }, |
| #103 | ] |
| #104 | else: |
| #105 | messages = [image_obj] |
| #106 | |
| #107 | response = llm.generate_response(messages=messages) |
| #108 | return response |
| #109 | |
| #110 | |
| #111 | def parse_vision_messages(messages, llm=None, vision_details="auto"): |
| #112 | """ |
| #113 | Parse the vision messages from the messages |
| #114 | """ |
| #115 | returned_messages = [] |
| #116 | for msg in messages: |
| #117 | if msg["role"] == "system": |
| #118 | returned_messages.append(msg) |
| #119 | continue |
| #120 | |
| #121 | # Handle message content |
| #122 | if isinstance(msg["content"], list): |
| #123 | # Multiple image URLs in content |
| #124 | description = get_image_description(msg, llm, vision_details) |
| #125 | returned_messages.append({"role": msg["role"], "content": description}) |
| #126 | elif isinstance(msg["content"], dict) and msg["content"].get("type") == "image_url": |
| #127 | # Single image content |
| #128 | image_url = msg["content"]["image_url"]["url"] |
| #129 | try: |
| #130 | description = get_image_description(image_url, llm, vision_details) |
| #131 | returned_messages.append({"role": msg["role"], "content": description}) |
| #132 | except Exception: |
| #133 | raise Exception(f"Error while downloading {image_url}.") |
| #134 | else: |
| #135 | # Regular text content |
| #136 | returned_messages.append(msg) |
| #137 | |
| #138 | return returned_messages |
| #139 | |
| #140 | |
| #141 | def process_telemetry_filters(filters): |
| #142 | """ |
| #143 | Process the telemetry filters |
| #144 | """ |
| #145 | if filters is None: |
| #146 | return {} |
| #147 | |
| #148 | encoded_ids = {} |
| #149 | if "user_id" in filters: |
| #150 | encoded_ids["user_id"] = hashlib.md5(filters["user_id"].encode()).hexdigest() |
| #151 | if "agent_id" in filters: |
| #152 | encoded_ids["agent_id"] = hashlib.md5(filters["agent_id"].encode()).hexdigest() |
| #153 | if "run_id" in filters: |
| #154 | encoded_ids["run_id"] = hashlib.md5(filters["run_id"].encode()).hexdigest() |
| #155 | |
| #156 | return list(filters.keys()), encoded_ids |
| #157 | |
| #158 | |
| #159 | def sanitize_relationship_for_cypher(relationship) -> str: |
| #160 | """Sanitize relationship text for Cypher queries by replacing problematic characters.""" |
| #161 | char_map = { |
| #162 | "...": "_ellipsis_", |
| #163 | "…": "_ellipsis_", |
| #164 | "。": "_period_", |
| #165 | ",": "_comma_", |
| #166 | ";": "_semicolon_", |
| #167 | ":": "_colon_", |
| #168 | "!": "_exclamation_", |
| #169 | "?": "_question_", |
| #170 | "(": "_lparen_", |
| #171 | ")": "_rparen_", |
| #172 | "【": "_lbracket_", |
| #173 | "】": "_rbracket_", |
| #174 | "《": "_langle_", |
| #175 | "》": "_rangle_", |
| #176 | "'": "_apostrophe_", |
| #177 | '"': "_quote_", |
| #178 | "\\": "_backslash_", |
| #179 | "/": "_slash_", |
| #180 | "|": "_pipe_", |
| #181 | "&": "_ampersand_", |
| #182 | "=": "_equals_", |
| #183 | "+": "_plus_", |
| #184 | "*": "_asterisk_", |
| #185 | "^": "_caret_", |
| #186 | "%": "_percent_", |
| #187 | "$": "_dollar_", |
| #188 | "#": "_hash_", |
| #189 | "@": "_at_", |
| #190 | "!": "_bang_", |
| #191 | "?": "_question_", |
| #192 | "(": "_lparen_", |
| #193 | ")": "_rparen_", |
| #194 | "[": "_lbracket_", |
| #195 | "]": "_rbracket_", |
| #196 | "{": "_lbrace_", |
| #197 | "}": "_rbrace_", |
| #198 | "<": "_langle_", |
| #199 | ">": "_rangle_", |
| #200 | } |
| #201 | |
| #202 | # Apply replacements and clean up |
| #203 | sanitized = relationship |
| #204 | for old, new in char_map.items(): |
| #205 | sanitized = sanitized.replace(old, new) |
| #206 | |
| #207 | return re.sub(r"_+", "_", sanitized).strip("_") |
| #208 | |
| #209 |