repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | import json |
| #2 | import os |
| #3 | import time |
| #4 | from collections import defaultdict |
| #5 | from concurrent.futures import ThreadPoolExecutor |
| #6 | |
| #7 | from dotenv import load_dotenv |
| #8 | from jinja2 import Template |
| #9 | from openai import OpenAI |
| #10 | from prompts import ANSWER_PROMPT, ANSWER_PROMPT_GRAPH |
| #11 | from tqdm import tqdm |
| #12 | |
| #13 | from mem0 import MemoryClient |
| #14 | |
| #15 | load_dotenv() |
| #16 | |
| #17 | |
| #18 | class MemorySearch: |
| #19 | def __init__(self, output_path="results.json", top_k=10, filter_memories=False, is_graph=False): |
| #20 | self.mem0_client = MemoryClient( |
| #21 | api_key=os.getenv("MEM0_API_KEY"), |
| #22 | org_id=os.getenv("MEM0_ORGANIZATION_ID"), |
| #23 | project_id=os.getenv("MEM0_PROJECT_ID"), |
| #24 | ) |
| #25 | self.top_k = top_k |
| #26 | self.openai_client = OpenAI() |
| #27 | self.results = defaultdict(list) |
| #28 | self.output_path = output_path |
| #29 | self.filter_memories = filter_memories |
| #30 | self.is_graph = is_graph |
| #31 | |
| #32 | if self.is_graph: |
| #33 | self.ANSWER_PROMPT = ANSWER_PROMPT_GRAPH |
| #34 | else: |
| #35 | self.ANSWER_PROMPT = ANSWER_PROMPT |
| #36 | |
| #37 | def search_memory(self, user_id, query, max_retries=3, retry_delay=1): |
| #38 | start_time = time.time() |
| #39 | retries = 0 |
| #40 | while retries < max_retries: |
| #41 | try: |
| #42 | if self.is_graph: |
| #43 | print("Searching with graph") |
| #44 | memories = self.mem0_client.search( |
| #45 | query, |
| #46 | user_id=user_id, |
| #47 | top_k=self.top_k, |
| #48 | filter_memories=self.filter_memories, |
| #49 | enable_graph=True, |
| #50 | output_format="v1.1", |
| #51 | ) |
| #52 | else: |
| #53 | memories = self.mem0_client.search( |
| #54 | query, user_id=user_id, top_k=self.top_k, filter_memories=self.filter_memories |
| #55 | ) |
| #56 | break |
| #57 | except Exception as e: |
| #58 | print("Retrying...") |
| #59 | retries += 1 |
| #60 | if retries >= max_retries: |
| #61 | raise e |
| #62 | time.sleep(retry_delay) |
| #63 | |
| #64 | end_time = time.time() |
| #65 | if not self.is_graph: |
| #66 | semantic_memories = [ |
| #67 | { |
| #68 | "memory": memory["memory"], |
| #69 | "timestamp": memory["metadata"]["timestamp"], |
| #70 | "score": round(memory["score"], 2), |
| #71 | } |
| #72 | for memory in memories |
| #73 | ] |
| #74 | graph_memories = None |
| #75 | else: |
| #76 | semantic_memories = [ |
| #77 | { |
| #78 | "memory": memory["memory"], |
| #79 | "timestamp": memory["metadata"]["timestamp"], |
| #80 | "score": round(memory["score"], 2), |
| #81 | } |
| #82 | for memory in memories["results"] |
| #83 | ] |
| #84 | graph_memories = [ |
| #85 | {"source": relation["source"], "relationship": relation["relationship"], "target": relation["target"]} |
| #86 | for relation in memories["relations"] |
| #87 | ] |
| #88 | return semantic_memories, graph_memories, end_time - start_time |
| #89 | |
| #90 | def answer_question(self, speaker_1_user_id, speaker_2_user_id, question, answer, category): |
| #91 | speaker_1_memories, speaker_1_graph_memories, speaker_1_memory_time = self.search_memory( |
| #92 | speaker_1_user_id, question |
| #93 | ) |
| #94 | speaker_2_memories, speaker_2_graph_memories, speaker_2_memory_time = self.search_memory( |
| #95 | speaker_2_user_id, question |
| #96 | ) |
| #97 | |
| #98 | search_1_memory = [f"{item['timestamp']}: {item['memory']}" for item in speaker_1_memories] |
| #99 | search_2_memory = [f"{item['timestamp']}: {item['memory']}" for item in speaker_2_memories] |
| #100 | |
| #101 | template = Template(self.ANSWER_PROMPT) |
| #102 | answer_prompt = template.render( |
| #103 | speaker_1_user_id=speaker_1_user_id.split("_")[0], |
| #104 | speaker_2_user_id=speaker_2_user_id.split("_")[0], |
| #105 | speaker_1_memories=json.dumps(search_1_memory, indent=4), |
| #106 | speaker_2_memories=json.dumps(search_2_memory, indent=4), |
| #107 | speaker_1_graph_memories=json.dumps(speaker_1_graph_memories, indent=4), |
| #108 | speaker_2_graph_memories=json.dumps(speaker_2_graph_memories, indent=4), |
| #109 | question=question, |
| #110 | ) |
| #111 | |
| #112 | t1 = time.time() |
| #113 | response = self.openai_client.chat.completions.create( |
| #114 | model=os.getenv("MODEL"), messages=[{"role": "system", "content": answer_prompt}], temperature=0.0 |
| #115 | ) |
| #116 | t2 = time.time() |
| #117 | response_time = t2 - t1 |
| #118 | return ( |
| #119 | response.choices[0].message.content, |
| #120 | speaker_1_memories, |
| #121 | speaker_2_memories, |
| #122 | speaker_1_memory_time, |
| #123 | speaker_2_memory_time, |
| #124 | speaker_1_graph_memories, |
| #125 | speaker_2_graph_memories, |
| #126 | response_time, |
| #127 | ) |
| #128 | |
| #129 | def process_question(self, val, speaker_a_user_id, speaker_b_user_id): |
| #130 | question = val.get("question", "") |
| #131 | answer = val.get("answer", "") |
| #132 | category = val.get("category", -1) |
| #133 | evidence = val.get("evidence", []) |
| #134 | adversarial_answer = val.get("adversarial_answer", "") |
| #135 | |
| #136 | ( |
| #137 | response, |
| #138 | speaker_1_memories, |
| #139 | speaker_2_memories, |
| #140 | speaker_1_memory_time, |
| #141 | speaker_2_memory_time, |
| #142 | speaker_1_graph_memories, |
| #143 | speaker_2_graph_memories, |
| #144 | response_time, |
| #145 | ) = self.answer_question(speaker_a_user_id, speaker_b_user_id, question, answer, category) |
| #146 | |
| #147 | result = { |
| #148 | "question": question, |
| #149 | "answer": answer, |
| #150 | "category": category, |
| #151 | "evidence": evidence, |
| #152 | "response": response, |
| #153 | "adversarial_answer": adversarial_answer, |
| #154 | "speaker_1_memories": speaker_1_memories, |
| #155 | "speaker_2_memories": speaker_2_memories, |
| #156 | "num_speaker_1_memories": len(speaker_1_memories), |
| #157 | "num_speaker_2_memories": len(speaker_2_memories), |
| #158 | "speaker_1_memory_time": speaker_1_memory_time, |
| #159 | "speaker_2_memory_time": speaker_2_memory_time, |
| #160 | "speaker_1_graph_memories": speaker_1_graph_memories, |
| #161 | "speaker_2_graph_memories": speaker_2_graph_memories, |
| #162 | "response_time": response_time, |
| #163 | } |
| #164 | |
| #165 | # Save results after each question is processed |
| #166 | with open(self.output_path, "w") as f: |
| #167 | json.dump(self.results, f, indent=4) |
| #168 | |
| #169 | return result |
| #170 | |
| #171 | def process_data_file(self, file_path): |
| #172 | with open(file_path, "r") as f: |
| #173 | data = json.load(f) |
| #174 | |
| #175 | for idx, item in tqdm(enumerate(data), total=len(data), desc="Processing conversations"): |
| #176 | qa = item["qa"] |
| #177 | conversation = item["conversation"] |
| #178 | speaker_a = conversation["speaker_a"] |
| #179 | speaker_b = conversation["speaker_b"] |
| #180 | |
| #181 | speaker_a_user_id = f"{speaker_a}_{idx}" |
| #182 | speaker_b_user_id = f"{speaker_b}_{idx}" |
| #183 | |
| #184 | for question_item in tqdm( |
| #185 | qa, total=len(qa), desc=f"Processing questions for conversation {idx}", leave=False |
| #186 | ): |
| #187 | result = self.process_question(question_item, speaker_a_user_id, speaker_b_user_id) |
| #188 | self.results[idx].append(result) |
| #189 | |
| #190 | # Save results after each question is processed |
| #191 | with open(self.output_path, "w") as f: |
| #192 | json.dump(self.results, f, indent=4) |
| #193 | |
| #194 | # Final save at the end |
| #195 | with open(self.output_path, "w") as f: |
| #196 | json.dump(self.results, f, indent=4) |
| #197 | |
| #198 | def process_questions_parallel(self, qa_list, speaker_a_user_id, speaker_b_user_id, max_workers=1): |
| #199 | def process_single_question(val): |
| #200 | result = self.process_question(val, speaker_a_user_id, speaker_b_user_id) |
| #201 | # Save results after each question is processed |
| #202 | with open(self.output_path, "w") as f: |
| #203 | json.dump(self.results, f, indent=4) |
| #204 | return result |
| #205 | |
| #206 | with ThreadPoolExecutor(max_workers=max_workers) as executor: |
| #207 | results = list( |
| #208 | tqdm(executor.map(process_single_question, qa_list), total=len(qa_list), desc="Answering Questions") |
| #209 | ) |
| #210 | |
| #211 | # Final save at the end |
| #212 | with open(self.output_path, "w") as f: |
| #213 | json.dump(self.results, f, indent=4) |
| #214 | |
| #215 | return results |
| #216 |