repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | """ |
| #2 | Mnemosyne Streaming Memory + Delta Sync |
| #3 | ======================================== |
| #4 | |
| #5 | Real-time memory event streaming and incremental synchronization |
| #6 | between Mnemosyne instances. |
| #7 | |
| #8 | Event-driven architecture: |
| #9 | - Push: callbacks registered on the stream |
| #10 | - Pull: iterate over events as they occur |
| #11 | |
| #12 | Delta sync: |
| #13 | - Diff-based: only changed memories since last sync |
| #14 | - Incremental: track sync checkpoints per peer |
| #15 | """ |
| #16 | |
| #17 | import json |
| #18 | import hashlib |
| #19 | import threading |
| #20 | from datetime import datetime |
| #21 | from typing import List, Dict, Optional, Any, Callable, Iterator, Union |
| #22 | from dataclasses import dataclass, field, asdict |
| #23 | from enum import Enum, auto |
| #24 | from pathlib import Path |
| #25 | |
| #26 | |
| #27 | class EventType(Enum): |
| #28 | MEMORY_ADDED = auto() |
| #29 | MEMORY_RECALLED = auto() |
| #30 | MEMORY_INVALIDATED = auto() |
| #31 | MEMORY_CONSOLIDATED = auto() |
| #32 | MEMORY_UPDATED = auto() |
| #33 | |
| #34 | |
| #35 | @dataclass |
| #36 | class MemoryEvent: |
| #37 | """A memory system event.""" |
| #38 | event_type: EventType |
| #39 | memory_id: str |
| #40 | timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) |
| #41 | session_id: Optional[str] = None |
| #42 | content: Optional[str] = None |
| #43 | source: Optional[str] = None |
| #44 | importance: Optional[float] = None |
| #45 | metadata: Optional[Dict[str, Any]] = None |
| #46 | delta: Optional[Dict[str, Any]] = None # Only changed fields for updates |
| #47 | |
| #48 | def to_dict(self) -> Dict[str, Any]: |
| #49 | d = asdict(self) |
| #50 | d["event_type"] = self.event_type.name |
| #51 | return d |
| #52 | |
| #53 | def to_json(self) -> str: |
| #54 | return json.dumps(self.to_dict(), default=str) |
| #55 | |
| #56 | @classmethod |
| #57 | def from_dict(cls, data: Dict[str, Any]) -> "MemoryEvent": |
| #58 | data["event_type"] = EventType[data["event_type"]] |
| #59 | return cls(**{k: v for k, v in data.items() if k in [f.name for f in cls.__dataclass_fields__.values()]}) |
| #60 | |
| #61 | |
| #62 | class MemoryStream: |
| #63 | """ |
| #64 | Real-time event stream for memory operations. |
| #65 | |
| #66 | Supports both push (callbacks) and pull (iterator) patterns. |
| #67 | Thread-safe. Events are buffered for iterators that connect |
| #68 | after the event fired. |
| #69 | """ |
| #70 | |
| #71 | def __init__(self, max_buffer: int = 1000): |
| #72 | self._callbacks: Dict[EventType, List[Callable[[MemoryEvent], None]]] = { |
| #73 | et: [] for et in EventType |
| #74 | } |
| #75 | self._any_callbacks: List[Callable[[MemoryEvent], None]] = [] |
| #76 | self._buffer: List[MemoryEvent] = [] |
| #77 | self._max_buffer = max_buffer |
| #78 | self._lock = threading.Lock() |
| #79 | self._iterators: List["_StreamIterator"] = [] |
| #80 | |
| #81 | def on(self, event_type: EventType, callback: Callable[[MemoryEvent], None]) -> None: |
| #82 | """Register a callback for a specific event type.""" |
| #83 | with self._lock: |
| #84 | self._callbacks[event_type].append(callback) |
| #85 | |
| #86 | def on_any(self, callback: Callable[[MemoryEvent], None]) -> None: |
| #87 | """Register a callback for all event types.""" |
| #88 | with self._lock: |
| #89 | self._any_callbacks.append(callback) |
| #90 | |
| #91 | def off(self, event_type: EventType, callback: Callable[[MemoryEvent], None]) -> None: |
| #92 | """Remove a callback for a specific event type.""" |
| #93 | with self._lock: |
| #94 | if callback in self._callbacks[event_type]: |
| #95 | self._callbacks[event_type].remove(callback) |
| #96 | |
| #97 | def off_any(self, callback: Callable[[MemoryEvent], None]) -> None: |
| #98 | """Remove an any-event callback.""" |
| #99 | with self._lock: |
| #100 | if callback in self._any_callbacks: |
| #101 | self._any_callbacks.remove(callback) |
| #102 | |
| #103 | def emit(self, event: MemoryEvent) -> None: |
| #104 | """Emit an event to all registered callbacks and iterators.""" |
| #105 | with self._lock: |
| #106 | # Buffer for late-joining iterators |
| #107 | self._buffer.append(event) |
| #108 | if len(self._buffer) > self._max_buffer: |
| #109 | self._buffer = self._buffer[-self._max_buffer:] |
| #110 | |
| #111 | # Notify type-specific callbacks |
| #112 | callbacks = list(self._callbacks[event.event_type]) |
| #113 | any_callbacks = list(self._any_callbacks) |
| #114 | iterators = list(self._iterators) |
| #115 | |
| #116 | # Call outside lock to avoid blocking |
| #117 | for cb in callbacks: |
| #118 | try: |
| #119 | cb(event) |
| #120 | except Exception: |
| #121 | pass # Never let a callback break the stream |
| #122 | for cb in any_callbacks: |
| #123 | try: |
| #124 | cb(event) |
| #125 | except Exception: |
| #126 | pass |
| #127 | for it in iterators: |
| #128 | it._push(event) |
| #129 | |
| #130 | def listen(self, event_types: Optional[List[EventType]] = None) -> Iterator[MemoryEvent]: |
| #131 | """Return an iterator that yields events as they occur.""" |
| #132 | it = _StreamIterator(self, event_types) |
| #133 | with self._lock: |
| #134 | self._iterators.append(it) |
| #135 | return iter(it) |
| #136 | |
| #137 | def _remove_iterator(self, it: "_StreamIterator") -> None: |
| #138 | with self._lock: |
| #139 | if it in self._iterators: |
| #140 | self._iterators.remove(it) |
| #141 | |
| #142 | def get_buffer(self, event_types: Optional[List[EventType]] = None, |
| #143 | since: Optional[str] = None) -> List[MemoryEvent]: |
| #144 | """Get buffered events, optionally filtered.""" |
| #145 | with self._lock: |
| #146 | events = list(self._buffer) |
| #147 | if event_types: |
| #148 | events = [e for e in events if e.event_type in event_types] |
| #149 | if since: |
| #150 | events = [e for e in events if e.timestamp >= since] |
| #151 | return events |
| #152 | |
| #153 | def clear_buffer(self) -> None: |
| #154 | """Clear the event buffer.""" |
| #155 | with self._lock: |
| #156 | self._buffer.clear() |
| #157 | |
| #158 | |
| #159 | class _StreamIterator: |
| #160 | """Internal iterator that buffers events from the stream.""" |
| #161 | |
| #162 | def __init__(self, stream: MemoryStream, event_types: Optional[List[EventType]] = None): |
| #163 | self._stream = stream |
| #164 | self._event_types = event_types |
| #165 | self._queue: List[MemoryEvent] = [] |
| #166 | self._lock = threading.Lock() |
| #167 | self._index = 0 |
| #168 | |
| #169 | def _push(self, event: MemoryEvent) -> None: |
| #170 | if self._event_types is None or event.event_type in self._event_types: |
| #171 | with self._lock: |
| #172 | self._queue.append(event) |
| #173 | |
| #174 | def __iter__(self): |
| #175 | return self |
| #176 | |
| #177 | def __next__(self) -> MemoryEvent: |
| #178 | while True: |
| #179 | with self._lock: |
| #180 | if self._index < len(self._queue): |
| #181 | event = self._queue[self._index] |
| #182 | self._index += 1 |
| #183 | return event |
| #184 | # Small sleep to avoid busy-waiting |
| #185 | import time |
| #186 | time.sleep(0.01) |
| #187 | |
| #188 | def __del__(self): |
| #189 | self._stream._remove_iterator(self) |
| #190 | |
| #191 | |
| #192 | @dataclass |
| #193 | class SyncCheckpoint: |
| #194 | """Checkpoint for incremental delta sync.""" |
| #195 | peer_id: str |
| #196 | last_sync_at: str |
| #197 | last_memory_id: Optional[str] = None |
| #198 | last_rowid: int = 0 |
| #199 | |
| #200 | def to_dict(self) -> Dict[str, Any]: |
| #201 | return asdict(self) |
| #202 | |
| #203 | def to_json(self) -> str: |
| #204 | return json.dumps(self.to_dict()) |
| #205 | |
| #206 | |
| #207 | class DeltaSync: |
| #208 | """ |
| #209 | Incremental memory synchronization between two Mnemosyne instances. |
| #210 | |
| #211 | Only transfers memories that have changed since the last sync checkpoint. |
| #212 | Uses delta encoding: only changed fields, not full objects. |
| #213 | """ |
| #214 | |
| #215 | def __init__(self, mnemosyne_instance, checkpoint_dir: Optional[Path] = None): |
| #216 | from mnemosyne.core.memory import Mnemosyne |
| #217 | if not isinstance(mnemosyne_instance, Mnemosyne): |
| #218 | raise TypeError("DeltaSync requires a Mnemosyne instance") |
| #219 | self.mnemosyne = mnemosyne_instance |
| #220 | self.checkpoint_dir = checkpoint_dir or (Path.home() / ".hermes" / "mnemosyne" / "sync") |
| #221 | self.checkpoint_dir.mkdir(parents=True, exist_ok=True) |
| #222 | self._checkpoints: Dict[str, SyncCheckpoint] = {} |
| #223 | self._lock = threading.Lock() |
| #224 | self._load_checkpoints() |
| #225 | |
| #226 | def _checkpoint_path(self, peer_id: str) -> Path: |
| #227 | return self.checkpoint_dir / f"checkpoint_{peer_id}.json" |
| #228 | |
| #229 | def _load_checkpoints(self) -> None: |
| #230 | """Load all saved checkpoints.""" |
| #231 | if not self.checkpoint_dir.exists(): |
| #232 | return |
| #233 | for f in self.checkpoint_dir.glob("checkpoint_*.json"): |
| #234 | peer_id = f.stem.replace("checkpoint_", "") |
| #235 | try: |
| #236 | with open(f, "r") as fh: |
| #237 | data = json.load(fh) |
| #238 | self._checkpoints[peer_id] = SyncCheckpoint(**data) |
| #239 | except Exception: |
| #240 | pass |
| #241 | |
| #242 | def _save_checkpoint(self, peer_id: str) -> None: |
| #243 | """Save checkpoint to disk.""" |
| #244 | cp = self._checkpoints.get(peer_id) |
| #245 | if cp: |
| #246 | path = self._checkpoint_path(peer_id) |
| #247 | with open(path, "w") as f: |
| #248 | f.write(cp.to_json()) |
| #249 | |
| #250 | def get_checkpoint(self, peer_id: str) -> Optional[SyncCheckpoint]: |
| #251 | """Get the current checkpoint for a peer.""" |
| #252 | with self._lock: |
| #253 | return self._checkpoints.get(peer_id) |
| #254 | |
| #255 | def set_checkpoint(self, peer_id: str, checkpoint: SyncCheckpoint) -> None: |
| #256 | """Set and save a checkpoint.""" |
| #257 | with self._lock: |
| #258 | self._checkpoints[peer_id] = checkpoint |
| #259 | self._save_checkpoint(peer_id) |
| #260 | |
| #261 | def compute_delta(self, peer_id: str, table: str = "working_memory") -> List[Dict[str, Any]]: |
| #262 | """ |
| #263 | Compute the delta of changed memories since last sync with peer. |
| #264 | |
| #265 | Returns list of memory dicts with only changed fields if possible, |
| #266 | or full memory objects for new memories. |
| #267 | """ |
| #268 | checkpoint = self.get_checkpoint(peer_id) |
| #269 | conn = self.mnemosyne.conn |
| #270 | cursor = conn.cursor() |
| #271 | |
| #272 | if checkpoint: |
| #273 | # Get memories modified since last sync |
| #274 | cursor.execute(f""" |
| #275 | SELECT * FROM {table} |
| #276 | WHERE rowid > ? OR timestamp > ? |
| #277 | ORDER BY rowid ASC |
| #278 | """, (checkpoint.last_rowid, checkpoint.last_sync_at)) |
| #279 | else: |
| #280 | # First sync: send everything |
| #281 | cursor.execute(f""" |
| #282 | SELECT * FROM {table} |
| #283 | ORDER BY rowid ASC |
| #284 | """) |
| #285 | |
| #286 | rows = cursor.fetchall() |
| #287 | delta = [] |
| #288 | for row in rows: |
| #289 | mem = dict(row) |
| #290 | # Strip internal fields |
| #291 | mem.pop("embedding", None) |
| #292 | delta.append(mem) |
| #293 | |
| #294 | return delta |
| #295 | |
| #296 | def apply_delta(self, peer_id: str, delta: List[Dict[str, Any]], |
| #297 | table: str = "working_memory") -> Dict[str, int]: |
| #298 | """ |
| #299 | Apply an incoming delta from a peer. |
| #300 | |
| #301 | Returns stats: {inserted: N, updated: N, skipped: N} |
| #302 | """ |
| #303 | conn = self.mnemosyne.conn |
| #304 | cursor = conn.cursor() |
| #305 | stats = {"inserted": 0, "updated": 0, "skipped": 0} |
| #306 | |
| #307 | for mem in delta: |
| #308 | mid = mem.get("id") |
| #309 | if not mid: |
| #310 | stats["skipped"] += 1 |
| #311 | continue |
| #312 | |
| #313 | # Check if exists |
| #314 | cursor.execute(f"SELECT 1 FROM {table} WHERE id = ?", (mid,)) |
| #315 | exists = cursor.fetchone() is not None |
| #316 | |
| #317 | if exists: |
| #318 | # Update changed fields |
| #319 | updatable = {k: v for k, v in mem.items() |
| #320 | if k not in ("id", "rowid", "timestamp", "created_at") |
| #321 | and v is not None} |
| #322 | if updatable: |
| #323 | sets = ", ".join(f"{k} = ?" for k in updatable.keys()) |
| #324 | cursor.execute( |
| #325 | f"UPDATE {table} SET {sets} WHERE id = ?", |
| #326 | list(updatable.values()) + [mid] |
| #327 | ) |
| #328 | stats["updated"] += 1 |
| #329 | else: |
| #330 | stats["skipped"] += 1 |
| #331 | else: |
| #332 | # Insert new |
| #333 | cols = [k for k in mem.keys() if k not in ("rowid",)] |
| #334 | placeholders = ", ".join("?" for _ in cols) |
| #335 | cursor.execute( |
| #336 | f"INSERT INTO {table} ({', '.join(cols)}) VALUES ({placeholders})", |
| #337 | [mem.get(c) for c in cols] |
| #338 | ) |
| #339 | stats["inserted"] += 1 |
| #340 | |
| #341 | conn.commit() |
| #342 | |
| #343 | # Update checkpoint |
| #344 | cursor.execute(f"SELECT MAX(rowid) FROM {table}") |
| #345 | max_rowid = cursor.fetchone()[0] or 0 |
| #346 | self.set_checkpoint(peer_id, SyncCheckpoint( |
| #347 | peer_id=peer_id, |
| #348 | last_sync_at=datetime.now().isoformat(), |
| #349 | last_rowid=max_rowid |
| #350 | )) |
| #351 | |
| #352 | return stats |
| #353 | |
| #354 | def sync_to(self, peer_id: str, table: str = "working_memory") -> Dict[str, Any]: |
| #355 | """ |
| #356 | Full sync cycle: compute delta for peer, return it. |
| #357 | The caller is responsible for sending the delta to the peer. |
| #358 | """ |
| #359 | delta = self.compute_delta(peer_id, table) |
| #360 | return { |
| #361 | "peer_id": peer_id, |
| #362 | "table": table, |
| #363 | "delta": delta, |
| #364 | "count": len(delta), |
| #365 | "checkpoint": self.get_checkpoint(peer_id).to_dict() if self.get_checkpoint(peer_id) else None |
| #366 | } |
| #367 | |
| #368 | def sync_from(self, peer_id: str, delta: List[Dict[str, Any]], |
| #369 | table: str = "working_memory") -> Dict[str, Any]: |
| #370 | """ |
| #371 | Full sync cycle: apply delta from peer. |
| #372 | """ |
| #373 | stats = self.apply_delta(peer_id, delta, table) |
| #374 | return { |
| #375 | "peer_id": peer_id, |
| #376 | "table": table, |
| #377 | "stats": stats, |
| #378 | "checkpoint": self.get_checkpoint(peer_id).to_dict() if self.get_checkpoint(peer_id) else None |
| #379 | } |
| #380 |