repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | """ |
| #2 | Cognee memory provider importer. |
| #3 | |
| #4 | Cognee uses a triple-store architecture: Kùzu graph + LanceDB vectors + SQLite. |
| #5 | Extraction via get_graph_data() returns (nodes, edges) tuples. |
| #6 | Nodes → episodic memories, edges → triples. |
| #7 | |
| #8 | Extraction methods: |
| #9 | 1. Python SDK: cognee.graph_db.get_graph_data() |
| #10 | 2. REST API: GET /datasets/{id}/data |
| #11 | 3. Direct addition via add_data_points() |
| #12 | """ |
| #13 | |
| #14 | import json |
| #15 | from datetime import datetime |
| #16 | from typing import List, Dict, Optional, Any |
| #17 | |
| #18 | from mnemosyne.core.importers.base import BaseImporter, ImporterResult |
| #19 | |
| #20 | |
| #21 | class CogneeImporter(BaseImporter): |
| #22 | """Import memories from Cognee into Mnemosyne. |
| #23 | |
| #24 | Usage: |
| #25 | importer = CogneeImporter( |
| #26 | dataset_id="my-dataset", # optional: filter by dataset |
| #27 | data_dir="./.cognee-data", # path to Cognee data directory |
| #28 | direct_db=True, # if True, read Kùzu/LanceDB directly |
| #29 | ) |
| #30 | result = importer.run(mnemosyne_instance) |
| #31 | """ |
| #32 | |
| #33 | provider_name = "cognee" |
| #34 | |
| #35 | def __init__(self, dataset_id: str = None, data_dir: str = None, |
| #36 | direct_db: bool = False, **kwargs): |
| #37 | super().__init__(**kwargs) |
| #38 | self.dataset_id = dataset_id |
| #39 | self.data_dir = data_dir |
| #40 | self.direct_db = direct_db |
| #41 | |
| #42 | def extract(self) -> List[Dict]: |
| #43 | """Extract memories from Cognee.""" |
| #44 | if self.direct_db and self.data_dir: |
| #45 | return self._extract_direct() |
| #46 | try: |
| #47 | return self._extract_via_sdk() |
| #48 | except (ImportError, Exception): |
| #49 | pass |
| #50 | try: |
| #51 | return self._extract_via_rest() |
| #52 | except Exception: |
| #53 | pass |
| #54 | raise RuntimeError( |
| #55 | "Could not extract from Cognee. Install: pip install cognee" |
| #56 | ) |
| #57 | |
| #58 | def _extract_via_sdk(self) -> List[Dict]: |
| #59 | """Extract using Cognee Python SDK.""" |
| #60 | import cognee |
| #61 | import asyncio |
| #62 | |
| #63 | async def _extract(): |
| #64 | # Use cognee's adapter to get graph data |
| #65 | graph_data = await cognee.graph_db.get_graph_data() |
| #66 | return self._parse_graph_data(graph_data) |
| #67 | |
| #68 | return asyncio.run(_extract()) |
| #69 | |
| #70 | def _extract_via_rest(self) -> List[Dict]: |
| #71 | """Extract using Cognee REST API.""" |
| #72 | import urllib.request |
| #73 | |
| #74 | base = "http://localhost:8000/api/v1" |
| #75 | if self.dataset_id: |
| #76 | url = f"{base}/datasets/{self.dataset_id}/data" |
| #77 | else: |
| #78 | url = f"{base}/datasets/data" |
| #79 | |
| #80 | req = urllib.request.Request(url) |
| #81 | with urllib.request.urlopen(req, timeout=30) as resp: |
| #82 | data = json.loads(resp.read().decode()) |
| #83 | return self._parse_api_data(data) |
| #84 | |
| #85 | def _extract_direct(self) -> List[Dict]: |
| #86 | """Directly read Cognee's file-based stores.""" |
| #87 | from pathlib import Path |
| #88 | data_dir = Path(self.data_dir or "./.cognee-data") |
| #89 | items = [] |
| #90 | |
| #91 | # Try reading SQLite metadata |
| #92 | sqlite_path = data_dir / "cognee_db" |
| #93 | if sqlite_path.exists(): |
| #94 | import sqlite3 |
| #95 | conn = sqlite3.connect(str(sqlite_path)) |
| #96 | conn.row_factory = sqlite3.Row |
| #97 | try: |
| #98 | rows = conn.execute( |
| #99 | "SELECT * FROM data_chunks ORDER BY created_at" |
| #100 | ).fetchall() |
| #101 | for raw_row in rows: |
| #102 | # sqlite3.Row supports bracket access but not .get(); |
| #103 | # convert to dict so the column-with-default reads |
| #104 | # below work. Pre-fix the .get() calls raised |
| #105 | # AttributeError and the broad `except` below swallowed |
| #106 | # it silently, returning [] for every direct cognee |
| #107 | # import even when data_chunks was populated. Same |
| #108 | # pattern as the fact_recall fix in C12.a. |
| #109 | row = dict(raw_row) |
| #110 | items.append({ |
| #111 | "content": row.get("text") or row.get("content") or "", |
| #112 | "source": "cognee_direct", |
| #113 | "metadata": { |
| #114 | "chunk_id": row.get("id") or "", |
| #115 | "document_id": row.get("document_id") or "", |
| #116 | }, |
| #117 | "timestamp": row.get("created_at"), |
| #118 | }) |
| #119 | except Exception: |
| #120 | pass |
| #121 | finally: |
| #122 | conn.close() |
| #123 | |
| #124 | return items |
| #125 | |
| #126 | def _parse_graph_data(self, graph_data) -> List[Dict]: |
| #127 | """Parse Cognee graph (nodes, edges) into memory dicts.""" |
| #128 | if isinstance(graph_data, tuple) and len(graph_data) == 2: |
| #129 | nodes, edges = graph_data |
| #130 | else: |
| #131 | nodes = graph_data.get("nodes", []) |
| #132 | edges = graph_data.get("edges", []) |
| #133 | |
| #134 | items = [] |
| #135 | |
| #136 | # Nodes → episodic-like memories |
| #137 | for node in nodes: |
| #138 | if isinstance(node, tuple) and len(node) >= 2: |
| #139 | node_id, props = node[0], node[1] |
| #140 | else: |
| #141 | node_id = node.get("id", node.get("node_id", "")) |
| #142 | props = node.get("properties", node) |
| #143 | |
| #144 | content_parts = [] |
| #145 | if isinstance(props, dict): |
| #146 | for k, v in props.items(): |
| #147 | if k not in ("id", "node_id", "embedding"): |
| #148 | content_parts.append(f"{k}: {v}") |
| #149 | content = "; ".join(content_parts) |
| #150 | if content: |
| #151 | items.append({ |
| #152 | "content": content, |
| #153 | "source": "cognee_node", |
| #154 | "node_id": str(node_id), |
| #155 | "type": "node", |
| #156 | "metadata": props if isinstance(props, dict) else {}, |
| #157 | }) |
| #158 | |
| #159 | # Edges → triples (subject→predicate→object) |
| #160 | for edge in edges: |
| #161 | if isinstance(edge, tuple) and len(edge) >= 4: |
| #162 | src, tgt, rel, props = edge[0], edge[1], edge[2], edge[3] if len(edge) > 3 else {} |
| #163 | else: |
| #164 | src = edge.get("source", edge.get("source_node_id", "")) |
| #165 | tgt = edge.get("target", edge.get("target_node_id", "")) |
| #166 | rel = edge.get("relationship", edge.get("label", "")) |
| #167 | props = edge.get("properties", {}) |
| #168 | |
| #169 | fact = f"{src} {rel} {tgt}" |
| #170 | items.append({ |
| #171 | "content": fact, |
| #172 | "source": "cognee_edge", |
| #173 | "type": "edge", |
| #174 | "metadata": { |
| #175 | "source_node": str(src), |
| #176 | "target_node": str(tgt), |
| #177 | "relationship": str(rel), |
| #178 | ** (props if isinstance(props, dict) else {}), |
| #179 | }, |
| #180 | }) |
| #181 | |
| #182 | return items |
| #183 | |
| #184 | def _parse_api_data(self, data) -> List[Dict]: |
| #185 | """Parse Cognee REST API response.""" |
| #186 | if isinstance(data, list): |
| #187 | items = data |
| #188 | else: |
| #189 | items = data.get("data", data.get("items", data.get("results", []))) |
| #190 | |
| #191 | result = [] |
| #192 | for item in items: |
| #193 | content = item.get("content", item.get("text", item.get("name", ""))) |
| #194 | if content: |
| #195 | result.append({ |
| #196 | "content": content, |
| #197 | "source": "cognee_api", |
| #198 | "metadata": item.get("metadata", {}), |
| #199 | "timestamp": item.get("created_at"), |
| #200 | }) |
| #201 | return result |
| #202 | |
| #203 | def transform(self, raw_data: List[Dict]) -> List[Dict]: |
| #204 | """Transform Cognee data to Mnemosyne format.""" |
| #205 | memories = [] |
| #206 | for item in raw_data: |
| #207 | content = item.get("content", "") |
| #208 | if not content: |
| #209 | continue |
| #210 | |
| #211 | item_type = item.get("type", "") |
| #212 | if item_type == "edge": |
| #213 | source = "cognee_triple" |
| #214 | importance = 0.6 |
| #215 | elif item_type == "node": |
| #216 | source = "cognee_node" |
| #217 | importance = 0.5 |
| #218 | else: |
| #219 | source = "cognee_import" |
| #220 | importance = 0.5 |
| #221 | |
| #222 | meta = item.get("metadata", {}) or {} |
| #223 | if item.get("node_id"): |
| #224 | meta["_cognee_node_id"] = item["node_id"] |
| #225 | |
| #226 | memories.append({ |
| #227 | "content": content, |
| #228 | "source": source, |
| #229 | "importance": importance, |
| #230 | "metadata": meta, |
| #231 | "valid_until": None, |
| #232 | "scope": "session", |
| #233 | "_author_id": "cognee_system", |
| #234 | "_author_type": "system", |
| #235 | "_channel_id": self.dataset_id, |
| #236 | "_timestamp": item.get("timestamp"), |
| #237 | }) |
| #238 | |
| #239 | return memories |
| #240 | |
| #241 | def run(self, mnemosyne, dry_run=False, session_id=None, channel_id=None): |
| #242 | """Override run to handle identity-aware import.""" |
| #243 | result = ImporterResult(provider=self.provider_name, |
| #244 | started_at=datetime.now().isoformat()) |
| #245 | try: |
| #246 | raw_data = self.extract() |
| #247 | result.total = len(raw_data) |
| #248 | if result.total == 0: |
| #249 | result.errors.append("No memories found in Cognee") |
| #250 | return result |
| #251 | if not self.validate(raw_data): |
| #252 | result.errors.append("Validation failed") |
| #253 | return result |
| #254 | |
| #255 | memories = self.transform(raw_data) |
| #256 | if dry_run: |
| #257 | result.imported = len(memories) |
| #258 | return result |
| #259 | |
| #260 | for mem_dict in memories: |
| #261 | try: |
| #262 | author_id = mem_dict.pop("_author_id", None) |
| #263 | author_type = mem_dict.pop("_author_type", None) |
| #264 | chan = mem_dict.pop("_channel_id", None) or channel_id |
| #265 | ts = mem_dict.pop("_timestamp", None) |
| #266 | meta = mem_dict.get("metadata", {}) |
| #267 | if ts: |
| #268 | meta["imported_at_original"] = ts |
| #269 | |
| #270 | mid = mnemosyne.remember( |
| #271 | content=mem_dict["content"], |
| #272 | source=mem_dict.get("source", self.provider_name), |
| #273 | importance=mem_dict.get("importance", 0.5), |
| #274 | metadata=meta, |
| #275 | valid_until=mem_dict.get("valid_until"), |
| #276 | scope=mem_dict.get("scope", "session"), |
| #277 | ) |
| #278 | if author_id or author_type or chan: |
| #279 | try: |
| #280 | mnemosyne.beam.conn.execute(""" |
| #281 | UPDATE working_memory |
| #282 | SET author_id = COALESCE(author_id, ?), |
| #283 | author_type = COALESCE(author_type, ?), |
| #284 | channel_id = COALESCE(channel_id, ?) |
| #285 | WHERE id = ? |
| #286 | """, (author_id, author_type, chan, mid)) |
| #287 | mnemosyne.beam.conn.commit() |
| #288 | except Exception: |
| #289 | pass |
| #290 | result.memory_ids.append(mid) |
| #291 | result.imported += 1 |
| #292 | except Exception as e: |
| #293 | result.failed += 1 |
| #294 | result.errors.append(f"Failed: {str(e)[:100]}") |
| #295 | except Exception as e: |
| #296 | result.errors.append(f"Cognee import failed: {e}") |
| #297 | result.finished_at = datetime.now().isoformat() |
| #298 | return result |
| #299 |