repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | """ |
| #2 | Mnemosyne Memory Compression + Pattern Detection |
| #3 | ================================================== |
| #4 | |
| #5 | Compress memory content and detect recurring patterns. |
| #6 | |
| #7 | Compression strategies: |
| #8 | - Run-length encoding for repetitive sequences |
| #9 | - Dictionary-based compression for common phrases |
| #10 | - Semantic compression: summarize similar memories |
| #11 | |
| #12 | Pattern detection: |
| #13 | - Temporal patterns: recurring times, intervals |
| #14 | - Content patterns: co-occurring topics, sequences |
| #15 | - Sequence patterns: ordered memory chains |
| #16 | """ |
| #17 | |
| #18 | import re |
| #19 | import json |
| #20 | import math |
| #21 | from datetime import datetime, timedelta |
| #22 | from typing import List, Dict, Optional, Any, Callable, Tuple, Set |
| #23 | from dataclasses import dataclass, field |
| #24 | from collections import Counter, defaultdict |
| #25 | |
| #26 | |
| #27 | @dataclass |
| #28 | class CompressionStats: |
| #29 | """Statistics from compression operations.""" |
| #30 | original_size: int = 0 |
| #31 | compressed_size: int = 0 |
| #32 | ratio: float = 0.0 |
| #33 | method: str = "" |
| #34 | patterns_found: int = 0 |
| #35 | memories_compressed: int = 0 |
| #36 | |
| #37 | @property |
| #38 | def savings_percent(self) -> float: |
| #39 | if self.original_size == 0: |
| #40 | return 0.0 |
| #41 | return (1.0 - self.compressed_size / self.original_size) * 100 |
| #42 | |
| #43 | |
| #44 | class MemoryCompressor: |
| #45 | """ |
| #46 | Compress memory content using multiple strategies. |
| #47 | |
| #48 | Strategies are applied in order of aggressiveness: |
| #49 | 1. Dictionary-based: replace common phrases with tokens |
| #50 | 2. Run-length: collapse repeated character sequences |
| #51 | 3. Semantic: summarize groups of similar memories |
| #52 | """ |
| #53 | |
| #54 | def __init__(self, dictionary: Optional[Dict[str, str]] = None): |
| #55 | self.dictionary = dictionary or self._build_default_dict() |
| #56 | self._stats = CompressionStats() |
| #57 | |
| #58 | @staticmethod |
| #59 | def _build_default_dict() -> Dict[str, str]: |
| #60 | """Build a default compression dictionary for common phrases.""" |
| #61 | return { |
| #62 | "remember that ": "", |
| #63 | "the user said ": "", |
| #64 | "the user asked ": "", |
| #65 | "the user wants ": "", |
| #66 | "conversation about ": "", |
| #67 | "please note that ": "", |
| #68 | "important: ": "", |
| #69 | "user preference: ": "", |
| #70 | "project context: ": " ", |
| #71 | "api key ": "\x0A", |
| #72 | "token ": "\x0B", |
| #73 | "session ": "\x0C", |
| #74 | "mnemosyne ": "\x0D", |
| #75 | } |
| #76 | |
| #77 | def compress(self, content: str, method: str = "dict") -> Tuple[str, CompressionStats]: |
| #78 | """ |
| #79 | Compress a single memory content string. |
| #80 | |
| #81 | Args: |
| #82 | content: The memory content to compress |
| #83 | method: Compression method — "dict", "rle", "semantic", or "auto" |
| #84 | |
| #85 | Returns: |
| #86 | Tuple of (compressed_content, stats) |
| #87 | """ |
| #88 | original_size = len(content.encode("utf-8")) |
| #89 | |
| #90 | if method == "auto": |
| #91 | # Try dict first, fall back to RLE if no savings |
| #92 | compressed, stats = self._dict_compress(content) |
| #93 | if stats.savings_percent < 5: |
| #94 | compressed, stats = self._rle_compress(content) |
| #95 | return compressed, stats |
| #96 | |
| #97 | if method == "dict": |
| #98 | compressed, stats = self._dict_compress(content) |
| #99 | elif method == "rle": |
| #100 | compressed, stats = self._rle_compress(content) |
| #101 | elif method == "semantic": |
| #102 | compressed, stats = self._semantic_compress_single(content) |
| #103 | else: |
| #104 | compressed, stats = content, CompressionStats( |
| #105 | original_size=original_size, compressed_size=original_size, |
| #106 | ratio=1.0, method="none" |
| #107 | ) |
| #108 | |
| #109 | return compressed, stats |
| #110 | |
| #111 | def _dict_compress(self, content: str) -> Tuple[str, CompressionStats]: |
| #112 | """Dictionary-based compression.""" |
| #113 | original_size = len(content.encode("utf-8")) |
| #114 | compressed = content |
| #115 | for phrase, token in self.dictionary.items(): |
| #116 | compressed = compressed.replace(phrase, token) |
| #117 | compressed_size = len(compressed.encode("utf-8")) |
| #118 | ratio = compressed_size / original_size if original_size > 0 else 1.0 |
| #119 | stats = CompressionStats( |
| #120 | original_size=original_size, compressed_size=compressed_size, |
| #121 | ratio=ratio, method="dict" |
| #122 | ) |
| #123 | return compressed, stats |
| #124 | |
| #125 | def _rle_compress(self, content: str) -> Tuple[str, CompressionStats]: |
| #126 | """Run-length encoding for repeated characters.""" |
| #127 | original_size = len(content.encode("utf-8")) |
| #128 | if not content: |
| #129 | return content, CompressionStats(original_size=0, compressed_size=0, ratio=1.0, method="rle") |
| #130 | |
| #131 | compressed = [] |
| #132 | count = 1 |
| #133 | for i in range(1, len(content)): |
| #134 | if content[i] == content[i - 1] and count < 255: |
| #135 | count += 1 |
| #136 | else: |
| #137 | if count > 3: |
| #138 | compressed.append(f"[{content[i-1]}*{count}]") |
| #139 | else: |
| #140 | compressed.append(content[i-count:i]) |
| #141 | count = 1 |
| #142 | # Handle last run |
| #143 | if count > 3: |
| #144 | compressed.append(f"[{content[-1]}*{count}]") |
| #145 | else: |
| #146 | compressed.append(content[-count:]) |
| #147 | |
| #148 | compressed_str = "".join(compressed) |
| #149 | compressed_size = len(compressed_str.encode("utf-8")) |
| #150 | ratio = compressed_size / original_size if original_size > 0 else 1.0 |
| #151 | stats = CompressionStats( |
| #152 | original_size=original_size, compressed_size=compressed_size, |
| #153 | ratio=ratio, method="rle" |
| #154 | ) |
| #155 | return compressed_str, stats |
| #156 | |
| #157 | def _semantic_compress_single(self, content: str) -> Tuple[str, CompressionStats]: |
| #158 | """Semantic compression for a single memory (placeholder for LLM-based).""" |
| #159 | # For now, just truncate with ellipsis if very long |
| #160 | original_size = len(content.encode("utf-8")) |
| #161 | if original_size > 500: |
| #162 | compressed = content[:250] + " [...] " + content[-100:] |
| #163 | else: |
| #164 | compressed = content |
| #165 | compressed_size = len(compressed.encode("utf-8")) |
| #166 | ratio = compressed_size / original_size if original_size > 0 else 1.0 |
| #167 | stats = CompressionStats( |
| #168 | original_size=original_size, compressed_size=compressed_size, |
| #169 | ratio=ratio, method="semantic" |
| #170 | ) |
| #171 | return compressed, stats |
| #172 | |
| #173 | def compress_batch(self, memories: List[Dict[str, Any]], |
| #174 | method: str = "auto") -> Tuple[List[Dict[str, Any]], CompressionStats]: |
| #175 | """ |
| #176 | Compress a batch of memories. |
| #177 | |
| #178 | Returns: |
| #179 | Tuple of (compressed_memories, aggregate_stats) |
| #180 | """ |
| #181 | total_original = 0 |
| #182 | total_compressed = 0 |
| #183 | compressed_memories = [] |
| #184 | |
| #185 | for mem in memories: |
| #186 | content = mem.get("content", "") |
| #187 | c, s = self.compress(content, method=method) |
| #188 | total_original += s.original_size |
| #189 | total_compressed += s.compressed_size |
| #190 | new_mem = dict(mem) |
| #191 | new_mem["content"] = c |
| #192 | new_mem["_compressed"] = True |
| #193 | new_mem["_compression_method"] = s.method |
| #194 | compressed_memories.append(new_mem) |
| #195 | |
| #196 | ratio = total_compressed / total_original if total_original > 0 else 1.0 |
| #197 | stats = CompressionStats( |
| #198 | original_size=total_original, |
| #199 | compressed_size=total_compressed, |
| #200 | ratio=ratio, |
| #201 | method=method, |
| #202 | memories_compressed=len(memories) |
| #203 | ) |
| #204 | return compressed_memories, stats |
| #205 | |
| #206 | def decompress(self, content: str, method: str = "dict") -> str: |
| #207 | """Decompress content compressed with the given method.""" |
| #208 | if method == "dict": |
| #209 | # Reverse dictionary |
| #210 | reverse = {v: k for k, v in self.dictionary.items()} |
| #211 | for token, phrase in reverse.items(): |
| #212 | content = content.replace(token, phrase) |
| #213 | return content |
| #214 | elif method == "rle": |
| #215 | # Expand RLE sequences like [a*5] -> aaaaa |
| #216 | def expand(match): |
| #217 | char, count = match.group(1), int(match.group(2)) |
| #218 | return char * count |
| #219 | return re.sub(r'\[(.)\*(\d+)\]', expand, content) |
| #220 | else: |
| #221 | return content |
| #222 | |
| #223 | |
| #224 | @dataclass |
| #225 | class DetectedPattern: |
| #226 | """A detected pattern in memory data.""" |
| #227 | pattern_type: str # "temporal", "content", "sequence" |
| #228 | description: str |
| #229 | confidence: float # 0.0 - 1.0 |
| #230 | samples: List[str] = field(default_factory=list) |
| #231 | metadata: Dict[str, Any] = field(default_factory=dict) |
| #232 | |
| #233 | def to_dict(self) -> Dict[str, Any]: |
| #234 | return { |
| #235 | "pattern_type": self.pattern_type, |
| #236 | "description": self.description, |
| #237 | "confidence": self.confidence, |
| #238 | "samples": self.samples, |
| #239 | "metadata": self.metadata, |
| #240 | } |
| #241 | |
| #242 | |
| #243 | class PatternDetector: |
| #244 | """ |
| #245 | Detect recurring patterns in memory data. |
| #246 | |
| #247 | Pattern types: |
| #248 | - Temporal: recurring times, daily/weekly patterns |
| #249 | - Content: co-occurring topics, frequent keywords |
| #250 | - Sequence: ordered chains of related memories |
| #251 | """ |
| #252 | |
| #253 | def __init__(self, min_confidence: float = 0.6): |
| #254 | self.min_confidence = min_confidence |
| #255 | |
| #256 | def detect_temporal(self, memories: List[Dict[str, Any]]) -> List[DetectedPattern]: |
| #257 | """Detect temporal patterns in memory timestamps.""" |
| #258 | patterns = [] |
| #259 | timestamps = [] |
| #260 | for mem in memories: |
| #261 | ts = mem.get("timestamp") or mem.get("created_at") |
| #262 | if ts: |
| #263 | try: |
| #264 | dt = datetime.fromisoformat(ts.replace("Z", "+00:00")) |
| #265 | timestamps.append(dt) |
| #266 | except Exception: |
| #267 | pass |
| #268 | |
| #269 | if len(timestamps) < 3: |
| #270 | return patterns |
| #271 | |
| #272 | # Hour-of-day distribution |
| #273 | hours = [t.hour for t in timestamps] |
| #274 | hour_counts = Counter(hours) |
| #275 | total = len(hours) |
| #276 | |
| #277 | for hour, count in hour_counts.most_common(3): |
| #278 | confidence = count / total |
| #279 | if confidence >= self.min_confidence: |
| #280 | patterns.append(DetectedPattern( |
| #281 | pattern_type="temporal", |
| #282 | description=f"Memories frequently created at {hour:02d}:00 ({count}/{total} times)", |
| #283 | confidence=confidence, |
| #284 | samples=[t.isoformat() for t in timestamps if t.hour == hour][:3], |
| #285 | metadata={"hour": hour, "count": count, "total": total} |
| #286 | )) |
| #287 | |
| #288 | # Day-of-week distribution |
| #289 | weekdays = [t.weekday() for t in timestamps] |
| #290 | day_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] |
| #291 | day_counts = Counter(weekdays) |
| #292 | for day, count in day_counts.most_common(2): |
| #293 | confidence = count / total |
| #294 | if confidence >= self.min_confidence: |
| #295 | patterns.append(DetectedPattern( |
| #296 | pattern_type="temporal", |
| #297 | description=f"Memories frequently created on {day_names[day]} ({count}/{total} times)", |
| #298 | confidence=confidence, |
| #299 | samples=[t.isoformat() for t in timestamps if t.weekday() == day][:3], |
| #300 | metadata={"day": day_names[day], "count": count, "total": total} |
| #301 | )) |
| #302 | |
| #303 | return patterns |
| #304 | |
| #305 | def detect_content(self, memories: List[Dict[str, Any]]) -> List[DetectedPattern]: |
| #306 | """Detect content patterns (co-occurring topics, frequent keywords).""" |
| #307 | patterns = [] |
| #308 | all_text = " ".join(m.get("content", "") for m in memories) |
| #309 | |
| #310 | # Simple keyword extraction (words > 4 chars, frequency > 1) |
| #311 | words = re.findall(r'\b[a-zA-Z]{5,}\b', all_text.lower()) |
| #312 | stopwords = {"about", "after", "before", "being", "could", "doing", "every", "having", "might", |
| #313 | "other", "should", "their", "there", "these", "those", "through", "under", "where", |
| #314 | "which", "while", "would", "mnemosyne", "memory", "memories"} |
| #315 | words = [w for w in words if w not in stopwords] |
| #316 | word_counts = Counter(words) |
| #317 | total_words = len(words) |
| #318 | |
| #319 | for word, count in word_counts.most_common(5): |
| #320 | confidence = min(1.0, count / max(3, total_words * 0.05)) |
| #321 | if count >= 2 and confidence >= self.min_confidence: |
| #322 | samples = [m.get("content", "") for m in memories if word in m.get("content", "").lower()][:3] |
| #323 | patterns.append(DetectedPattern( |
| #324 | pattern_type="content", |
| #325 | description=f"Frequent topic: '{word}' appears {count} times", |
| #326 | confidence=confidence, |
| #327 | samples=samples, |
| #328 | metadata={"word": word, "count": count} |
| #329 | )) |
| #330 | |
| #331 | # Co-occurrence: pairs of keywords that appear together |
| #332 | if len(memories) >= 3: |
| #333 | cooccurrence = defaultdict(int) |
| #334 | for mem in memories: |
| #335 | content = mem.get("content", "").lower() |
| #336 | mem_words = set(re.findall(r'\b[a-zA-Z]{5,}\b', content)) - stopwords |
| #337 | for w1 in mem_words: |
| #338 | for w2 in mem_words: |
| #339 | if w1 < w2: |
| #340 | cooccurrence[(w1, w2)] += 1 |
| #341 | |
| #342 | for (w1, w2), count in sorted(cooccurrence.items(), key=lambda x: -x[1])[:3]: |
| #343 | confidence = min(1.0, count / len(memories)) |
| #344 | if count >= 2 and confidence >= self.min_confidence: |
| #345 | patterns.append(DetectedPattern( |
| #346 | pattern_type="content", |
| #347 | description=f"Co-occurring topics: '{w1}' + '{w2}' appear together {count} times", |
| #348 | confidence=confidence, |
| #349 | samples=[m.get("content", "") for m in memories |
| #350 | if w1 in m.get("content", "").lower() and w2 in m.get("content", "").lower()][:3], |
| #351 | metadata={"word1": w1, "word2": w2, "count": count} |
| #352 | )) |
| #353 | |
| #354 | return patterns |
| #355 | |
| #356 | def detect_sequence(self, memories: List[Dict[str, Any]]) -> List[DetectedPattern]: |
| #357 | """Detect sequence patterns (ordered chains of related memories).""" |
| #358 | patterns = [] |
| #359 | if len(memories) < 3: |
| #360 | return patterns |
| #361 | |
| #362 | # Sort by timestamp |
| #363 | sorted_mems = sorted( |
| #364 | [m for m in memories if m.get("timestamp")], |
| #365 | key=lambda m: m.get("timestamp", "") |
| #366 | ) |
| #367 | |
| #368 | # Look for source sequences |
| #369 | sources = [m.get("source", "unknown") for m in sorted_mems] |
| #370 | source_pairs = [(sources[i], sources[i+1]) for i in range(len(sources)-1)] |
| #371 | pair_counts = Counter(source_pairs) |
| #372 | |
| #373 | for (s1, s2), count in pair_counts.most_common(3): |
| #374 | confidence = min(1.0, count / max(2, len(sources) - 1)) |
| #375 | if count >= 2 and confidence >= self.min_confidence: |
| #376 | samples = [] |
| #377 | for i in range(len(sources) - 1): |
| #378 | if sources[i] == s1 and sources[i+1] == s2: |
| #379 | samples.append(f"{sorted_mems[i].get('content', '')[:50]}... -> {sorted_mems[i+1].get('content', '')[:50]}...") |
| #380 | if len(samples) >= 2: |
| #381 | break |
| #382 | patterns.append(DetectedPattern( |
| #383 | pattern_type="sequence", |
| #384 | description=f"Sequence pattern: '{s1}' often followed by '{s2}' ({count} times)", |
| #385 | confidence=confidence, |
| #386 | samples=samples, |
| #387 | metadata={"source1": s1, "source2": s2, "count": count} |
| #388 | )) |
| #389 | |
| #390 | return patterns |
| #391 | |
| #392 | def detect_all(self, memories: List[Dict[str, Any]]) -> List[DetectedPattern]: |
| #393 | """Run all pattern detectors and return combined results.""" |
| #394 | patterns = [] |
| #395 | patterns.extend(self.detect_temporal(memories)) |
| #396 | patterns.extend(self.detect_content(memories)) |
| #397 | patterns.extend(self.detect_sequence(memories)) |
| #398 | # Sort by confidence descending |
| #399 | patterns.sort(key=lambda p: p.confidence, reverse=True) |
| #400 | return patterns |
| #401 | |
| #402 | def summarize_patterns(self, memories: List[Dict[str, Any]]) -> Dict[str, Any]: |
| #403 | """Generate a human-readable summary of detected patterns.""" |
| #404 | patterns = self.detect_all(memories) |
| #405 | return { |
| #406 | "total_memories": len(memories), |
| #407 | "patterns_found": len(patterns), |
| #408 | "temporal_patterns": [p.to_dict() for p in patterns if p.pattern_type == "temporal"], |
| #409 | "content_patterns": [p.to_dict() for p in patterns if p.pattern_type == "content"], |
| #410 | "sequence_patterns": [p.to_dict() for p in patterns if p.pattern_type == "sequence"], |
| #411 | "top_pattern": patterns[0].to_dict() if patterns else None, |
| #412 | } |
| #413 |