repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources15d ago| #1 | """ |
| #2 | Shared test fixtures for Mnemosyne test suite. |
| #3 | |
| #4 | Provides fixtures that handle SQLite thread-local connection cleanup |
| #5 | to prevent "database is locked" and UNIQUE constraint collisions |
| #6 | between tests. |
| #7 | """ |
| #8 | |
| #9 | import pytest |
| #10 | |
| #11 | |
| #12 | def _close_cached_connections(): |
| #13 | """Close and reset thread-local SQLite connection caches in both modules.""" |
| #14 | for mod_path in ( |
| #15 | "mnemosyne.core.beam", |
| #16 | "mnemosyne.core.memory", |
| #17 | ): |
| #18 | try: |
| #19 | import importlib |
| #20 | mod = importlib.import_module(mod_path) |
| #21 | tl = getattr(mod, "_thread_local", None) |
| #22 | if tl is not None and hasattr(tl, "conn") and tl.conn is not None: |
| #23 | try: |
| #24 | tl.conn.close() |
| #25 | except Exception: |
| #26 | pass |
| #27 | tl.conn = None |
| #28 | if hasattr(tl, "db_path"): |
| #29 | tl.db_path = None |
| #30 | except Exception: |
| #31 | pass |
| #32 | |
| #33 | # Reset the global Mnemosyne default instance to avoid cross-test |
| #34 | # contamination of the singleton |
| #35 | try: |
| #36 | from mnemosyne.core import memory as _mem_mod |
| #37 | _mem_mod._default_instance = None |
| #38 | _mem_mod._default_bank = "default" |
| #39 | except Exception: |
| #40 | pass |
| #41 | |
| #42 | # Reset hermes_plugin singleton |
| #43 | try: |
| #44 | import hermes_plugin |
| #45 | hermes_plugin._memory_instance = None |
| #46 | hermes_plugin._current_session_id = None |
| #47 | hermes_plugin._triple_store = None |
| #48 | except Exception: |
| #49 | pass |
| #50 | |
| #51 | # Reset host LLM backend registry to prevent cross-test contamination. |
| #52 | # The registry is a process-global; a test that forgets to unregister |
| #53 | # would otherwise bleed into the next. |
| #54 | try: |
| #55 | from mnemosyne.core import llm_backends as _llm_backends_mod |
| #56 | _llm_backends_mod._backend = None |
| #57 | except Exception: |
| #58 | pass |
| #59 | |
| #60 | |
| #61 | @pytest.fixture(autouse=True) |
| #62 | def _reset_thread_local_connections(): |
| #63 | """ |
| #64 | Auto-use fixture that resets thread-local SQLite connection caches |
| #65 | before and after every test. This prevents connection leakage between |
| #66 | tests that use different database paths. |
| #67 | |
| #68 | Both mnemosyne.core.beam and mnemosyne.core.memory maintain their own |
| #69 | thread-local caches (_thread_local.conn / _thread_local.db_path). |
| #70 | When tests create instances with different db_paths, the old connection |
| #71 | is never closed, leading to "database is locked" errors. |
| #72 | """ |
| #73 | _close_cached_connections() |
| #74 | yield |
| #75 | _close_cached_connections() |
| #76 |