repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
Mirrored from https://github.com/yingqi-z20/Agent-libOS
stars
latest
clone command
git clone gitlawb://did:key:z6MkqRzA...RfoM/yingqi-z20-Agen...git clone gitlawb://did:key:z6MkqRzA.../yingqi-z20-Agen...d98dd2c9IPC1d ago| #1 | from __future__ import annotations |
| #2 | |
| #3 | from typing import Any |
| #4 | |
| #5 | from agent_libos.utils.ids import new_id, utc_now |
| #6 | from agent_libos.models import AuditRecord |
| #7 | from agent_libos.storage import SQLiteStore |
| #8 | |
| #9 | |
| #10 | class AuditManager: |
| #11 | def __init__(self, store: SQLiteStore): |
| #12 | self.store = store |
| #13 | |
| #14 | def record( |
| #15 | self, |
| #16 | actor: str, |
| #17 | action: str, |
| #18 | target: str | None = None, |
| #19 | input_refs: list[str] | None = None, |
| #20 | output_refs: list[str] | None = None, |
| #21 | capability_refs: list[str] | None = None, |
| #22 | decision: dict[str, Any] | None = None, |
| #23 | correlation_id: str | None = None, |
| #24 | parent_record_id: str | None = None, |
| #25 | ) -> AuditRecord: |
| #26 | record = AuditRecord( |
| #27 | record_id=new_id("audit"), |
| #28 | timestamp=utc_now(), |
| #29 | actor=actor, |
| #30 | action=action, |
| #31 | target=target, |
| #32 | input_refs=input_refs or [], |
| #33 | output_refs=output_refs or [], |
| #34 | capability_refs=capability_refs or [], |
| #35 | decision=decision, |
| #36 | correlation_id=correlation_id, |
| #37 | parent_record_id=parent_record_id, |
| #38 | ) |
| #39 | self.store.insert_audit(record) |
| #40 | return record |
| #41 | |
| #42 | def trace(self, limit: int | None = None) -> list[AuditRecord]: |
| #43 | return self.store.list_audit(limit=limit) |
| #44 | |
| #45 |