repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources15d ago| #1 | """ |
| #2 | Tests for Mnemosyne memory compression + pattern detection. |
| #3 | """ |
| #4 | |
| #5 | import pytest |
| #6 | from datetime import datetime, timedelta |
| #7 | |
| #8 | from mnemosyne.core.patterns import ( |
| #9 | MemoryCompressor, PatternDetector, CompressionStats, |
| #10 | DetectedPattern |
| #11 | ) |
| #12 | |
| #13 | |
| #14 | # ─── CompressionStats ─────────────────────────────────────────────── |
| #15 | |
| #16 | class TestCompressionStats: |
| #17 | def test_basic_stats(self): |
| #18 | stats = CompressionStats(original_size=100, compressed_size=70, ratio=0.7, method="dict") |
| #19 | assert abs(stats.savings_percent - 30.0) < 0.001 |
| #20 | |
| #21 | def test_zero_size(self): |
| #22 | stats = CompressionStats(original_size=0, compressed_size=0, ratio=1.0, method="none") |
| #23 | assert stats.savings_percent == 0.0 |
| #24 | |
| #25 | def test_no_savings(self): |
| #26 | stats = CompressionStats(original_size=100, compressed_size=100, ratio=1.0, method="dict") |
| #27 | assert stats.savings_percent == 0.0 |
| #28 | |
| #29 | |
| #30 | # ─── MemoryCompressor ─────────────────────────────────────────────── |
| #31 | |
| #32 | class TestMemoryCompressor: |
| #33 | def test_dict_compression(self): |
| #34 | comp = MemoryCompressor() |
| #35 | text = "remember that the user said something important" |
| #36 | compressed, stats = comp.compress(text, method="dict") |
| #37 | assert stats.method == "dict" |
| #38 | assert len(compressed) <= len(text) |
| #39 | |
| #40 | def test_dict_decompression(self): |
| #41 | comp = MemoryCompressor() |
| #42 | text = "remember that the user said something" |
| #43 | compressed, _ = comp.compress(text, method="dict") |
| #44 | decompressed = comp.decompress(compressed, method="dict") |
| #45 | assert decompressed == text |
| #46 | |
| #47 | def test_rle_compression(self): |
| #48 | comp = MemoryCompressor() |
| #49 | text = "aaaaabbbbbccccc" # 15 chars, lots of repetition |
| #50 | compressed, stats = comp.compress(text, method="rle") |
| #51 | assert stats.method == "rle" |
| #52 | # Should achieve some compression |
| #53 | decompressed = comp.decompress(compressed, method="rle") |
| #54 | assert decompressed == text |
| #55 | |
| #56 | def test_rle_no_repetition(self): |
| #57 | comp = MemoryCompressor() |
| #58 | text = "abcdefghijklmnop" # No repetition |
| #59 | compressed, stats = comp.compress(text, method="rle") |
| #60 | # With no repetition, RLE may expand slightly due to brackets |
| #61 | decompressed = comp.decompress(compressed, method="rle") |
| #62 | assert decompressed == text |
| #63 | |
| #64 | def test_semantic_compression_long(self): |
| #65 | comp = MemoryCompressor() |
| #66 | text = "x" * 600 # Very long |
| #67 | compressed, stats = comp.compress(text, method="semantic") |
| #68 | assert stats.method == "semantic" |
| #69 | assert len(compressed) < len(text) |
| #70 | |
| #71 | def test_semantic_compression_short(self): |
| #72 | comp = MemoryCompressor() |
| #73 | text = "Short text" |
| #74 | compressed, stats = comp.compress(text, method="semantic") |
| #75 | # Short text should not be compressed |
| #76 | assert compressed == text |
| #77 | |
| #78 | def test_auto_compression(self): |
| #79 | comp = MemoryCompressor() |
| #80 | # Text with dictionary phrases |
| #81 | text = "remember that the user said mnemosyne is great" |
| #82 | compressed, stats = comp.compress(text, method="auto") |
| #83 | assert stats.method in ("dict", "rle") |
| #84 | |
| #85 | def test_compress_batch(self): |
| #86 | comp = MemoryCompressor() |
| #87 | memories = [ |
| #88 | {"content": "remember that the user said hello"}, |
| #89 | {"content": "the user asked about mnemosyne"}, |
| #90 | {"content": "conversation about memory systems"}, |
| #91 | ] |
| #92 | compressed, stats = comp.compress_batch(memories, method="dict") |
| #93 | assert len(compressed) == 3 |
| #94 | assert stats.memories_compressed == 3 |
| #95 | assert all("_compressed" in m for m in compressed) |
| #96 | |
| #97 | def test_decompress_unknown_method(self): |
| #98 | comp = MemoryCompressor() |
| #99 | text = "some text" |
| #100 | result = comp.decompress(text, method="unknown") |
| #101 | assert result == text |
| #102 | |
| #103 | def test_empty_compression(self): |
| #104 | comp = MemoryCompressor() |
| #105 | compressed, stats = comp.compress("") |
| #106 | assert compressed == "" |
| #107 | |
| #108 | |
| #109 | # ─── PatternDetector ──────────────────────────────────────────────── |
| #110 | |
| #111 | class TestPatternDetector: |
| #112 | def test_detect_temporal_hour_pattern(self): |
| #113 | detector = PatternDetector(min_confidence=0.3) |
| #114 | base = datetime(2026, 1, 1, 9, 0, 0) |
| #115 | memories = [ |
| #116 | {"content": "Morning meeting", "timestamp": base.isoformat()}, |
| #117 | {"content": "Code review", "timestamp": (base + timedelta(hours=1)).isoformat()}, |
| #118 | {"content": "Standup", "timestamp": (base + timedelta(days=1)).isoformat()}, |
| #119 | {"content": "Planning", "timestamp": (base + timedelta(days=2)).isoformat()}, |
| #120 | ] |
| #121 | patterns = detector.detect_temporal(memories) |
| #122 | assert len(patterns) > 0 |
| #123 | assert any(p.pattern_type == "temporal" for p in patterns) |
| #124 | |
| #125 | def test_detect_temporal_empty(self): |
| #126 | detector = PatternDetector() |
| #127 | patterns = detector.detect_temporal([]) |
| #128 | assert len(patterns) == 0 |
| #129 | |
| #130 | def test_detect_temporal_insufficient_data(self): |
| #131 | detector = PatternDetector() |
| #132 | memories = [ |
| #133 | {"content": "One", "timestamp": datetime.now().isoformat()}, |
| #134 | ] |
| #135 | patterns = detector.detect_temporal(memories) |
| #136 | assert len(patterns) == 0 |
| #137 | |
| #138 | def test_detect_content_keywords(self): |
| #139 | detector = PatternDetector(min_confidence=0.1) |
| #140 | memories = [ |
| #141 | {"content": "The user likes Python programming"}, |
| #142 | {"content": "Python is great for scripting"}, |
| #143 | {"content": "The user prefers Python over Java"}, |
| #144 | {"content": "Something unrelated"}, |
| #145 | ] |
| #146 | patterns = detector.detect_content(memories) |
| #147 | assert len(patterns) > 0 |
| #148 | assert any("python" in p.description.lower() for p in patterns) |
| #149 | |
| #150 | def test_detect_content_cooccurrence(self): |
| #151 | detector = PatternDetector(min_confidence=0.1) |
| #152 | memories = [ |
| #153 | {"content": "The user likes Python programming and Rust language"}, |
| #154 | {"content": "Python programming and Rust language are both great"}, |
| #155 | {"content": "Comparing Python programming with Rust language"}, |
| #156 | ] |
| #157 | patterns = detector.detect_content(memories) |
| #158 | # Should detect co-occurrence pattern (words must be 5+ chars) |
| #159 | cooccurrence = [p for p in patterns if "co-occurring" in p.description.lower()] |
| #160 | assert len(cooccurrence) > 0 |
| #161 | |
| #162 | def test_detect_content_empty(self): |
| #163 | detector = PatternDetector() |
| #164 | patterns = detector.detect_content([]) |
| #165 | assert len(patterns) == 0 |
| #166 | |
| #167 | def test_detect_sequence(self): |
| #168 | detector = PatternDetector(min_confidence=0.1) |
| #169 | memories = [ |
| #170 | {"content": "User asks question", "source": "user", "timestamp": "2026-01-01T09:00:00"}, |
| #171 | {"content": "Agent responds", "source": "agent", "timestamp": "2026-01-01T09:01:00"}, |
| #172 | {"content": "User asks again", "source": "user", "timestamp": "2026-01-01T09:05:00"}, |
| #173 | {"content": "Agent responds again", "source": "agent", "timestamp": "2026-01-01T09:06:00"}, |
| #174 | ] |
| #175 | patterns = detector.detect_sequence(memories) |
| #176 | assert len(patterns) > 0 |
| #177 | assert any("user" in p.description.lower() and "agent" in p.description.lower() for p in patterns) |
| #178 | |
| #179 | def test_detect_sequence_insufficient(self): |
| #180 | detector = PatternDetector() |
| #181 | patterns = detector.detect_sequence([{"content": "Only one"}]) |
| #182 | assert len(patterns) == 0 |
| #183 | |
| #184 | def test_detect_all_combined(self): |
| #185 | detector = PatternDetector(min_confidence=0.1) |
| #186 | base = datetime(2026, 1, 1, 9, 0, 0) |
| #187 | memories = [ |
| #188 | {"content": "The user likes Python", "source": "user", "timestamp": base.isoformat()}, |
| #189 | {"content": "Agent suggests Rust", "source": "agent", "timestamp": (base + timedelta(minutes=1)).isoformat()}, |
| #190 | {"content": "User asks about Python", "source": "user", "timestamp": (base + timedelta(days=1)).isoformat()}, |
| #191 | {"content": "Agent responds", "source": "agent", "timestamp": (base + timedelta(days=1, minutes=1)).isoformat()}, |
| #192 | ] |
| #193 | patterns = detector.detect_all(memories) |
| #194 | assert len(patterns) > 0 |
| #195 | # Should have temporal, content, and sequence patterns |
| #196 | types = set(p.pattern_type for p in patterns) |
| #197 | assert "temporal" in types or "content" in types or "sequence" in types |
| #198 | |
| #199 | def test_detect_all_sorted_by_confidence(self): |
| #200 | detector = PatternDetector(min_confidence=0.1) |
| #201 | memories = [ |
| #202 | {"content": "Python Python Python", "timestamp": "2026-01-01T09:00:00"}, |
| #203 | {"content": "Rust Rust", "timestamp": "2026-01-01T10:00:00"}, |
| #204 | ] |
| #205 | patterns = detector.detect_all(memories) |
| #206 | if len(patterns) >= 2: |
| #207 | assert patterns[0].confidence >= patterns[1].confidence |
| #208 | |
| #209 | def test_summarize_patterns(self): |
| #210 | detector = PatternDetector(min_confidence=0.1) |
| #211 | base = datetime(2026, 1, 1, 9, 0, 0) |
| #212 | memories = [ |
| #213 | {"content": "Python is great", "source": "user", "timestamp": base.isoformat()}, |
| #214 | {"content": "Agent agrees", "source": "agent", "timestamp": (base + timedelta(minutes=1)).isoformat()}, |
| #215 | ] |
| #216 | summary = detector.summarize_patterns(memories) |
| #217 | assert "total_memories" in summary |
| #218 | assert "patterns_found" in summary |
| #219 | assert summary["total_memories"] == 2 |
| #220 | |
| #221 | def test_pattern_to_dict(self): |
| #222 | pattern = DetectedPattern( |
| #223 | pattern_type="content", |
| #224 | description="Test pattern", |
| #225 | confidence=0.85, |
| #226 | samples=["sample1", "sample2"], |
| #227 | metadata={"key": "value"} |
| #228 | ) |
| #229 | d = pattern.to_dict() |
| #230 | assert d["pattern_type"] == "content" |
| #231 | assert d["confidence"] == 0.85 |
| #232 | assert d["samples"] == ["sample1", "sample2"] |
| #233 | |
| #234 | def test_high_confidence_filter(self): |
| #235 | detector = PatternDetector(min_confidence=0.9) |
| #236 | memories = [ |
| #237 | {"content": "Python is mentioned once"}, |
| #238 | {"content": "Something else entirely"}, |
| #239 | ] |
| #240 | patterns = detector.detect_content(memories) |
| #241 | # With high confidence threshold, should find nothing |
| #242 | assert len(patterns) == 0 |
| #243 | |
| #244 | |
| #245 | # ─── Mnemosyne wrapper integration (C26) ──────────────────────────── |
| #246 | |
| #247 | class TestMnemosynePatternMethods: |
| #248 | """Regression tests for [C26]: Mnemosyne.detect_patterns() and |
| #249 | summarize_patterns() called self.get_all_memories() which did not exist, |
| #250 | raising AttributeError on first invocation when no memories arg was passed. |
| #251 | """ |
| #252 | |
| #253 | def test_detect_patterns_no_args_does_not_raise(self, tmp_path): |
| #254 | from mnemosyne.core.memory import Mnemosyne |
| #255 | mem = Mnemosyne(session_id="c26", db_path=tmp_path / "c26.db") |
| #256 | mem.remember("Morning standup notes", source="meeting", importance=0.6) |
| #257 | mem.remember("User likes Python over Java", source="user", importance=0.7) |
| #258 | result = mem.detect_patterns() |
| #259 | assert isinstance(result, list) |
| #260 | |
| #261 | def test_summarize_patterns_no_args_does_not_raise(self, tmp_path): |
| #262 | from mnemosyne.core.memory import Mnemosyne |
| #263 | mem = Mnemosyne(session_id="c26", db_path=tmp_path / "c26.db") |
| #264 | mem.remember("Morning standup notes", source="meeting", importance=0.6) |
| #265 | mem.remember("Afternoon review", source="meeting", importance=0.6) |
| #266 | summary = mem.summarize_patterns() |
| #267 | assert isinstance(summary, dict) |
| #268 | assert "total_memories" in summary |
| #269 | assert summary["total_memories"] >= 2 |
| #270 | |
| #271 | def test_get_all_memories_returns_working_and_episodic(self, tmp_path): |
| #272 | """get_all_memories must combine working_memory and episodic_memory rows. |
| #273 | |
| #274 | Drives the episodic insert through the public consolidate_to_episodic |
| #275 | API instead of raw SQL, so the test does not break the next time the |
| #276 | episodic_memory schema gains a NOT NULL column. |
| #277 | """ |
| #278 | from mnemosyne.core.memory import Mnemosyne |
| #279 | mem = Mnemosyne(session_id="c26", db_path=tmp_path / "c26.db") |
| #280 | wm_id_one = mem.remember("Working item one", source="user", importance=0.5) |
| #281 | mem.remember("Working item two", source="agent", importance=0.5) |
| #282 | mem.beam.consolidate_to_episodic( |
| #283 | summary="Episodic summary one", |
| #284 | source_wm_ids=[wm_id_one], |
| #285 | source="consolidation", |
| #286 | importance=0.6, |
| #287 | ) |
| #288 | |
| #289 | rows = mem.get_all_memories() |
| #290 | assert isinstance(rows, list) |
| #291 | contents = [r["content"] for r in rows] |
| #292 | assert "Working item one" in contents |
| #293 | assert "Working item two" in contents |
| #294 | assert "Episodic summary one" in contents |
| #295 | # PatternDetector relies on these fields: |
| #296 | for r in rows: |
| #297 | assert "content" in r |
| #298 | assert "timestamp" in r |
| #299 | assert "source" in r |
| #300 | |
| #301 | def test_get_all_memories_excludes_invalidated(self, tmp_path): |
| #302 | """Invalidated memories must not surface in pattern analysis.""" |
| #303 | from mnemosyne.core.memory import Mnemosyne |
| #304 | mem = Mnemosyne(session_id="c26", db_path=tmp_path / "c26.db") |
| #305 | mem.remember("Keep me visible", source="user", importance=0.5) |
| #306 | drop_id = mem.remember("Forget about this rule", source="user", importance=0.5) |
| #307 | mem.invalidate(drop_id) |
| #308 | |
| #309 | contents = [r["content"] for r in mem.get_all_memories()] |
| #310 | assert "Keep me visible" in contents |
| #311 | assert "Forget about this rule" not in contents |
| #312 |