repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | """ |
| #2 | Letta (formerly MemGPT) memory provider importer. |
| #3 | |
| #4 | Extracts memories from Letta's AgentFile (.af) format and imports |
| #5 | them into Mnemosyne. Letta uses a hierarchical memory model: |
| #6 | - Memory Blocks (core/working memory) → working_memory |
| #7 | - Archival Memory (long-term vector storage) → episodic_memory |
| #8 | - Message History → episodic with source="letta_message" |
| #9 | - Tools & Agent Config → metadata |
| #10 | |
| #11 | Extraction methods: |
| #12 | 1. AgentFile (.af) export via Letta SDK: client.agents.export_file(agent_id) |
| #13 | 2. Archival memory via SDK search |
| #14 | 3. Direct .af file parsing (offline) |
| #15 | """ |
| #16 | |
| #17 | import json |
| #18 | from datetime import datetime |
| #19 | from typing import List, Dict, Optional, Any |
| #20 | from pathlib import Path |
| #21 | |
| #22 | from mnemosyne.core.importers.base import BaseImporter, ImporterResult |
| #23 | |
| #24 | |
| #25 | class LettaImporter(BaseImporter): |
| #26 | """Import memories from Letta into Mnemosyne. |
| #27 | |
| #28 | Usage: |
| #29 | importer = LettaImporter( |
| #30 | api_key="sk-xxx", # Letta API key |
| #31 | agent_id="agent-uuid", # optional: specific agent |
| #32 | base_url=None, # for self-hosted Letta |
| #33 | agent_file_path=None, # path to a .af file (offline import) |
| #34 | ) |
| #35 | result = importer.run(mnemosyne_instance) |
| #36 | """ |
| #37 | |
| #38 | provider_name = "letta" |
| #39 | |
| #40 | def __init__(self, api_key: str = None, agent_id: str = None, |
| #41 | base_url: str = None, agent_file_path: str = None, |
| #42 | **kwargs): |
| #43 | super().__init__(**kwargs) |
| #44 | self.api_key = api_key |
| #45 | self.agent_id = agent_id |
| #46 | self.base_url = base_url |
| #47 | self.agent_file_path = agent_file_path |
| #48 | |
| #49 | def extract(self) -> List[Dict]: |
| #50 | """Extract memories from Letta. |
| #51 | |
| #52 | Priority: 1) offline .af file, 2) SDK export, 3) REST API. |
| #53 | """ |
| #54 | # Offline .af file |
| #55 | if self.agent_file_path: |
| #56 | return self._extract_from_file(self.agent_file_path) |
| #57 | |
| #58 | # Try SDK |
| #59 | try: |
| #60 | return self._extract_via_sdk() |
| #61 | except (ImportError, Exception): |
| #62 | pass |
| #63 | |
| #64 | # Try REST API |
| #65 | try: |
| #66 | return self._extract_via_rest() |
| #67 | except Exception: |
| #68 | pass |
| #69 | |
| #70 | raise RuntimeError( |
| #71 | "Could not extract memories from Letta. " |
| #72 | "Provide --agent-file-path for offline .af import, " |
| #73 | "or install the SDK: pip install letta-client" |
| #74 | ) |
| #75 | |
| #76 | def _extract_from_file(self, filepath: str) -> List[Dict]: |
| #77 | """Parse a Letta AgentFile (.af) directly.""" |
| #78 | path = Path(filepath) |
| #79 | if not path.exists(): |
| #80 | raise FileNotFoundError(f"AgentFile not found: {filepath}") |
| #81 | |
| #82 | # .af files can be JSON, YAML, or TOML |
| #83 | content = path.read_text() |
| #84 | |
| #85 | # Try JSON first |
| #86 | try: |
| #87 | data = json.loads(content) |
| #88 | except json.JSONDecodeError: |
| #89 | # Try YAML |
| #90 | try: |
| #91 | import yaml |
| #92 | data = yaml.safe_load(content) |
| #93 | except (ImportError, Exception): |
| #94 | # Try TOML |
| #95 | try: |
| #96 | import tomllib |
| #97 | data = tomllib.loads(content) |
| #98 | except (ImportError, Exception): |
| #99 | raise RuntimeError( |
| #100 | f"Could not parse AgentFile: {filepath}. " |
| #101 | "Supported formats: JSON, YAML, TOML" |
| #102 | ) |
| #103 | |
| #104 | return self._parse_agent_data(data) |
| #105 | |
| #106 | def _extract_via_sdk(self) -> List[Dict]: |
| #107 | """Extract using Letta Python SDK.""" |
| #108 | from letta_client import Letta |
| #109 | |
| #110 | client = Letta( |
| #111 | api_key=self.api_key, |
| #112 | base_url=self.base_url, |
| #113 | ) |
| #114 | |
| #115 | if self.agent_id: |
| #116 | # Export specific agent |
| #117 | agent_file = client.agents.export_file(self.agent_id) |
| #118 | return self._parse_agent_data(agent_file) |
| #119 | else: |
| #120 | # List all agents and export each |
| #121 | agents = client.agents.list() |
| #122 | all_data = [] |
| #123 | for agent in agents: |
| #124 | try: |
| #125 | agent_file = client.agents.export_file(agent.id) |
| #126 | all_data.extend(self._parse_agent_data(agent_file)) |
| #127 | except Exception: |
| #128 | continue |
| #129 | return all_data |
| #130 | |
| #131 | def _extract_via_rest(self) -> List[Dict]: |
| #132 | """Extract using Letta REST API directly.""" |
| #133 | import urllib.request |
| #134 | |
| #135 | base = self.base_url or "https://api.letta.com" |
| #136 | headers = {} |
| #137 | if self.api_key: |
| #138 | headers["Authorization"] = f"Bearer {self.api_key}" |
| #139 | headers["Content-Type"] = "application/json" |
| #140 | |
| #141 | if self.agent_id: |
| #142 | url = f"{base}/v1/agents/{self.agent_id}/export" |
| #143 | else: |
| #144 | url = f"{base}/v1/agents/export" |
| #145 | |
| #146 | req = urllib.request.Request(url, headers=headers, method="POST") |
| #147 | with urllib.request.urlopen(req, timeout=30) as resp: |
| #148 | data = json.loads(resp.read().decode()) |
| #149 | |
| #150 | return self._parse_agent_data(data) |
| #151 | |
| #152 | def _parse_agent_data(self, data: dict) -> List[Dict]: |
| #153 | """Parse AgentFile data into memory dicts.""" |
| #154 | memories = [] |
| #155 | agent_id = data.get("agent_id", data.get("id", "letta_agent")) |
| #156 | agent_name = data.get("agent_name", data.get("name", "unknown")) |
| #157 | |
| #158 | # 1. Memory Blocks (core working memory) |
| #159 | blocks = data.get("memory_blocks", data.get("blocks", {})) |
| #160 | if isinstance(blocks, list): |
| #161 | for block in blocks: |
| #162 | label = block.get("label", block.get("name", "memory")) |
| #163 | value = block.get("value", block.get("content", "")) |
| #164 | if not value: |
| #165 | continue |
| #166 | content = f"[{label}] {value}" |
| #167 | memories.append({ |
| #168 | "content": content, |
| #169 | "source": "letta_block", |
| #170 | "importance": 0.8 if block.get("read_only") else 0.6, |
| #171 | "metadata": { |
| #172 | "letta_label": label, |
| #173 | "letta_block_id": block.get("id", block.get("uuid", "")), |
| #174 | "letta_agent_id": agent_id, |
| #175 | "letta_agent_name": agent_name, |
| #176 | }, |
| #177 | }) |
| #178 | |
| #179 | # If blocks is a dict of label→value |
| #180 | elif isinstance(blocks, dict): |
| #181 | for label, value in blocks.items(): |
| #182 | if not value: |
| #183 | continue |
| #184 | memories.append({ |
| #185 | "content": f"[{label}] {value}", |
| #186 | "source": "letta_block", |
| #187 | "importance": 0.7, |
| #188 | "metadata": { |
| #189 | "letta_label": label, |
| #190 | "letta_agent_id": agent_id, |
| #191 | "letta_agent_name": agent_name, |
| #192 | }, |
| #193 | }) |
| #194 | |
| #195 | # 2. Message History |
| #196 | messages = data.get("messages", data.get("message_history", [])) |
| #197 | for msg in messages: |
| #198 | role = msg.get("role", msg.get("role_type", "user")) |
| #199 | content = msg.get("content", msg.get("text", "")) |
| #200 | if not content: |
| #201 | continue |
| #202 | ts = msg.get("created_at", msg.get("timestamp")) |
| #203 | memories.append({ |
| #204 | "content": content, |
| #205 | "source": "letta_message", |
| #206 | "importance": 0.3, |
| #207 | "metadata": { |
| #208 | "letta_role": role, |
| #209 | "letta_agent_id": agent_id, |
| #210 | "letta_agent_name": agent_name, |
| #211 | "_timestamp": ts or datetime.now().isoformat(), |
| #212 | }, |
| #213 | }) |
| #214 | |
| #215 | # 3. System prompt / persona |
| #216 | sys_prompt = data.get("system_prompt", data.get("persona", "")) |
| #217 | if sys_prompt: |
| #218 | memories.append({ |
| #219 | "content": f"[system_prompt] {sys_prompt[:2000]}", |
| #220 | "source": "letta_system", |
| #221 | "importance": 0.9, |
| #222 | "metadata": { |
| #223 | "letta_agent_id": agent_id, |
| #224 | "letta_agent_name": agent_name, |
| #225 | }, |
| #226 | }) |
| #227 | |
| #228 | return memories |
| #229 | |
| #230 | def transform(self, raw_data: List[Dict]) -> List[Dict]: |
| #231 | """Transform Letta data to Mnemosyne format.""" |
| #232 | memories = [] |
| #233 | for item in raw_data: |
| #234 | content = item.get("content", "") |
| #235 | if not content: |
| #236 | continue |
| #237 | |
| #238 | memories.append({ |
| #239 | "content": content, |
| #240 | "source": item.get("source", "letta_import"), |
| #241 | "importance": float(item.get("importance", 0.5)), |
| #242 | "metadata": item.get("metadata", {}), |
| #243 | "valid_until": None, |
| #244 | "scope": "session", |
| #245 | "_author_id": f"letta_agent:{item.get('metadata', {}).get('letta_agent_id', 'unknown')}", |
| #246 | "_author_type": "agent", |
| #247 | "_channel_id": None, |
| #248 | "_timestamp": item.get("metadata", {}).get("_timestamp"), |
| #249 | }) |
| #250 | |
| #251 | return memories |
| #252 | |
| #253 | def run(self, mnemosyne, dry_run=False, session_id=None, channel_id=None): |
| #254 | """Override run to handle identity-aware import.""" |
| #255 | result = ImporterResult(provider=self.provider_name, |
| #256 | started_at=datetime.now().isoformat()) |
| #257 | |
| #258 | try: |
| #259 | raw_data = self.extract() |
| #260 | result.total = len(raw_data) |
| #261 | if result.total == 0: |
| #262 | result.errors.append("No memories found in Letta export") |
| #263 | return result |
| #264 | if not self.validate(raw_data): |
| #265 | result.errors.append("Validation failed") |
| #266 | return result |
| #267 | |
| #268 | memories = self.transform(raw_data) |
| #269 | if dry_run: |
| #270 | result.imported = len(memories) |
| #271 | return result |
| #272 | |
| #273 | for mem_dict in memories: |
| #274 | try: |
| #275 | author_id = mem_dict.pop("_author_id", None) |
| #276 | author_type = mem_dict.pop("_author_type", None) |
| #277 | chan = mem_dict.pop("_channel_id", None) or channel_id |
| #278 | ts = mem_dict.pop("_timestamp", None) |
| #279 | meta = mem_dict.get("metadata", {}) |
| #280 | if ts: |
| #281 | meta["imported_at_original"] = ts |
| #282 | |
| #283 | mid = mnemosyne.remember( |
| #284 | content=mem_dict["content"], |
| #285 | source=mem_dict.get("source", self.provider_name), |
| #286 | importance=mem_dict.get("importance", 0.5), |
| #287 | metadata=meta, |
| #288 | valid_until=mem_dict.get("valid_until"), |
| #289 | scope=mem_dict.get("scope", "session"), |
| #290 | ) |
| #291 | if author_id or author_type or chan: |
| #292 | try: |
| #293 | mnemosyne.beam.conn.execute(""" |
| #294 | UPDATE working_memory |
| #295 | SET author_id = COALESCE(author_id, ?), |
| #296 | author_type = COALESCE(author_type, ?), |
| #297 | channel_id = COALESCE(channel_id, ?) |
| #298 | WHERE id = ? |
| #299 | """, (author_id, author_type, chan, mid)) |
| #300 | mnemosyne.beam.conn.commit() |
| #301 | except Exception: |
| #302 | pass |
| #303 | result.memory_ids.append(mid) |
| #304 | result.imported += 1 |
| #305 | except Exception as e: |
| #306 | result.failed += 1 |
| #307 | result.errors.append(f"Failed: {str(e)[:100]}") |
| #308 | |
| #309 | except Exception as e: |
| #310 | result.errors.append(f"Letta import failed: {e}") |
| #311 | |
| #312 | result.finished_at = datetime.now().isoformat() |
| #313 | return result |
| #314 |