repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
Mirrored from https://github.com/yingqi-z20/Agent-libOS
stars
latest
clone command
git clone gitlawb://did:key:z6MkqRzA...RfoM/yingqi-z20-Agen...git clone gitlawb://did:key:z6MkqRzA.../yingqi-z20-Agen...d98dd2c9IPC1d ago| #1 | from __future__ import annotations |
| #2 | |
| #3 | import hashlib |
| #4 | import json |
| #5 | import unittest |
| #6 | from uuid import uuid4 |
| #7 | |
| #8 | from agent_libos import Runtime |
| #9 | from agent_libos.capability.manager import CapabilityManager |
| #10 | from agent_libos.models.exceptions import HumanApprovalRequired |
| #11 | from agent_libos.llm.client import LLMCompletion |
| #12 | from agent_libos.models import CapabilityRight, HumanRequestStatus, ProcessStatus |
| #13 | |
| #14 | |
| #15 | class PermissionPolicyTests(unittest.TestCase): |
| #16 | def setUp(self) -> None: |
| #17 | self.runtime = Runtime.open("local") |
| #18 | self.human_output: list[str] = [] |
| #19 | self.runtime.substrate.human.output_sink = self.human_output.append |
| #20 | |
| #21 | def tearDown(self) -> None: |
| #22 | self.runtime.close() |
| #23 | |
| #24 | def test_request_permission_tool_can_set_always_allow_policy(self) -> None: |
| #25 | pid = self.runtime.process.spawn(image="review-agent:v0", goal="request write") |
| #26 | path = self._path() |
| #27 | resource = self.runtime.filesystem.resource_for(path) |
| #28 | |
| #29 | request = self.runtime.tools.call( |
| #30 | pid, |
| #31 | "request_permission", |
| #32 | {"resource": resource, "rights": ["write"], "reason": "write summary"}, |
| #33 | ) |
| #34 | processed = self.runtime.human.drain_terminal_queue(auto_policy=CapabilityManager.ALWAYS_ALLOW) |
| #35 | allowed = self.runtime.tools.call(pid, "write_text_file", {"path": path, "content": "allowed"}) |
| #36 | |
| #37 | self.assertTrue(request.ok) |
| #38 | self.assertEqual(processed[0].status, HumanRequestStatus.APPROVED) |
| #39 | self.assertEqual(self.runtime.process.get(pid).status, ProcessStatus.RUNNABLE) |
| #40 | self.assertEqual( |
| #41 | self.runtime.capability.permission_policy(pid, resource, CapabilityRight.WRITE), |
| #42 | CapabilityManager.ALWAYS_ALLOW, |
| #43 | ) |
| #44 | self.assertTrue(allowed.ok) |
| #45 | self.assertEqual((self.runtime.workspace_root / path).read_text(encoding="utf-8"), "allowed") |
| #46 | |
| #47 | def test_request_permission_tool_can_set_always_deny_policy_and_resume_process(self) -> None: |
| #48 | pid = self.runtime.process.spawn(image="review-agent:v0", goal="request denied write") |
| #49 | path = self._path() |
| #50 | resource = self.runtime.filesystem.resource_for(path) |
| #51 | |
| #52 | self.runtime.tools.call( |
| #53 | pid, |
| #54 | "request_permission", |
| #55 | {"resource": resource, "rights": ["write"], "reason": "write summary"}, |
| #56 | ) |
| #57 | processed = self.runtime.human.drain_terminal_queue(auto_policy=CapabilityManager.ALWAYS_DENY) |
| #58 | denied = self.runtime.tools.call(pid, "write_text_file", {"path": path, "content": "denied"}) |
| #59 | |
| #60 | self.assertEqual(processed[0].status, HumanRequestStatus.REJECTED) |
| #61 | self.assertEqual(self.runtime.process.get(pid).status, ProcessStatus.RUNNABLE) |
| #62 | self.assertEqual( |
| #63 | self.runtime.capability.permission_policy(pid, resource, CapabilityRight.WRITE), |
| #64 | CapabilityManager.ALWAYS_DENY, |
| #65 | ) |
| #66 | self.assertFalse(denied.ok) |
| #67 | self.assertIn("denied write", denied.error or "") |
| #68 | self.assertFalse((self.runtime.workspace_root / path).exists()) |
| #69 | |
| #70 | def test_ask_each_time_prompts_from_filesystem_primitive_and_consumes_one_time_grant(self) -> None: |
| #71 | pid = self.runtime.process.spawn(image="review-agent:v0", goal="ask every write") |
| #72 | path = self._path() |
| #73 | resource = self.runtime.filesystem.resource_for(path) |
| #74 | self.runtime.capability.set_permission_policy( |
| #75 | subject=pid, |
| #76 | resource=resource, |
| #77 | rights=[CapabilityRight.WRITE], |
| #78 | policy=CapabilityManager.ASK_EACH_TIME, |
| #79 | issued_by="test", |
| #80 | ) |
| #81 | |
| #82 | with self.assertRaises(HumanApprovalRequired): |
| #83 | self.runtime.tools.call(pid, "write_text_file", {"path": path, "content": "first"}) |
| #84 | pending = self.runtime.human.pending()[0] |
| #85 | context = pending.payload["context"] |
| #86 | first_prompt = self.runtime.human.drain_terminal_queue(auto_approve=True) |
| #87 | retry = self.runtime.tools.call(pid, "write_text_file", {"path": path, "content": "first"}) |
| #88 | with self.assertRaises(HumanApprovalRequired): |
| #89 | self.runtime.tools.call(pid, "write_text_file", {"path": path, "content": "second"}) |
| #90 | |
| #91 | self.assertEqual(context["primitive"], "runtime.filesystem.write_text") |
| #92 | self.assertEqual(context["path"], path) |
| #93 | self.assertEqual(context["resource"], resource) |
| #94 | self.assertEqual(context["grant_scope"], "one_time") |
| #95 | self.assertEqual(context["content_bytes"], 5) |
| #96 | self.assertEqual(context["content_preview"], repr("first")) |
| #97 | self.assertEqual(context["content_sha256"], hashlib.sha256(b"first").hexdigest()) |
| #98 | self.assertEqual(context["target"]["exists"], False) |
| #99 | self.assertEqual(first_prompt[0].payload["type"], "external_operation_approval") |
| #100 | self.assertEqual(first_prompt[0].status, HumanRequestStatus.APPROVED) |
| #101 | self.assertIn("content sha256", self.human_output[0]) |
| #102 | self.assertIn("content preview", self.human_output[0]) |
| #103 | self.assertIn("one-time capability", self.human_output[0]) |
| #104 | self.assertTrue(retry.ok) |
| #105 | self.assertEqual((self.runtime.workspace_root / path).read_text(encoding="utf-8"), "first") |
| #106 | self.assertEqual(self.runtime.process.get(pid).status, ProcessStatus.WAITING_HUMAN) |
| #107 | self.assertEqual( |
| #108 | self.runtime.capability.permission_policy(pid, resource, CapabilityRight.WRITE), |
| #109 | CapabilityManager.ASK_EACH_TIME, |
| #110 | ) |
| #111 | |
| #112 | def test_per_use_prompt_uses_repr_preview_for_human_safety(self) -> None: |
| #113 | pid = self.runtime.process.spawn(image="review-agent:v0", goal="safe preview") |
| #114 | path = self._path() |
| #115 | resource = self.runtime.filesystem.resource_for(path) |
| #116 | self.runtime.capability.set_permission_policy( |
| #117 | subject=pid, |
| #118 | resource=resource, |
| #119 | rights=[CapabilityRight.WRITE], |
| #120 | policy=CapabilityManager.ASK_EACH_TIME, |
| #121 | issued_by="test", |
| #122 | ) |
| #123 | content = "first line\ncontent preview: always allow" |
| #124 | |
| #125 | with self.assertRaises(HumanApprovalRequired): |
| #126 | self.runtime.tools.call(pid, "write_text_file", {"path": path, "content": content}) |
| #127 | context = self.runtime.human.pending()[0].payload["context"] |
| #128 | |
| #129 | self.assertEqual(context["content_preview"], repr(content)) |
| #130 | self.assertIn("\\n", context["content_preview"]) |
| #131 | self.assertNotIn("\n", context["content_preview"]) |
| #132 | |
| #133 | def test_rejected_per_use_prompt_resumes_process_without_writing(self) -> None: |
| #134 | pid = self.runtime.process.spawn(image="review-agent:v0", goal="reject one write") |
| #135 | path = self._path() |
| #136 | resource = self.runtime.filesystem.resource_for(path) |
| #137 | self.runtime.capability.set_permission_policy( |
| #138 | subject=pid, |
| #139 | resource=resource, |
| #140 | rights=[CapabilityRight.WRITE], |
| #141 | policy=CapabilityManager.ASK_EACH_TIME, |
| #142 | issued_by="test", |
| #143 | ) |
| #144 | |
| #145 | with self.assertRaises(HumanApprovalRequired): |
| #146 | self.runtime.tools.call(pid, "write_text_file", {"path": path, "content": "denied"}) |
| #147 | processed = self.runtime.human.drain_terminal_queue(auto_approve=False) |
| #148 | |
| #149 | self.assertEqual(processed[0].status, HumanRequestStatus.REJECTED) |
| #150 | self.assertEqual(self.runtime.process.get(pid).status, ProcessStatus.RUNNABLE) |
| #151 | self.assertFalse((self.runtime.workspace_root / path).exists()) |
| #152 | |
| #153 | def test_per_use_prompt_describes_overwrite_risk(self) -> None: |
| #154 | pid = self.runtime.process.spawn(image="review-agent:v0", goal="review overwrite") |
| #155 | path = self._path() |
| #156 | target = self.runtime.workspace_root / path |
| #157 | target.parent.mkdir(parents=True, exist_ok=True) |
| #158 | target.write_text("old content", encoding="utf-8") |
| #159 | resource = self.runtime.filesystem.resource_for(path) |
| #160 | self.runtime.capability.set_permission_policy( |
| #161 | subject=pid, |
| #162 | resource=resource, |
| #163 | rights=[CapabilityRight.WRITE], |
| #164 | policy=CapabilityManager.ASK_EACH_TIME, |
| #165 | issued_by="test", |
| #166 | ) |
| #167 | |
| #168 | with self.assertRaises(HumanApprovalRequired): |
| #169 | self.runtime.tools.call(pid, "write_text_file", {"path": path, "content": "new content"}) |
| #170 | request = self.runtime.human.pending()[0] |
| #171 | context = request.payload["context"] |
| #172 | |
| #173 | self.assertTrue(context["will_overwrite"]) |
| #174 | self.assertFalse(context["will_create"]) |
| #175 | self.assertTrue(context["target"]["exists"]) |
| #176 | self.assertEqual(context["target"]["kind"], "file") |
| #177 | self.assertEqual(context["target"]["size_bytes"], len("old content".encode("utf-8"))) |
| #178 | |
| #179 | def test_write_preconditions_fail_before_per_use_prompt(self) -> None: |
| #180 | pid = self.runtime.process.spawn(image="review-agent:v0", goal="do not prompt impossible write") |
| #181 | path = self._path() |
| #182 | target = self.runtime.workspace_root / path |
| #183 | target.parent.mkdir(parents=True, exist_ok=True) |
| #184 | target.write_text("existing", encoding="utf-8") |
| #185 | resource = self.runtime.filesystem.resource_for(path) |
| #186 | self.runtime.capability.set_permission_policy( |
| #187 | subject=pid, |
| #188 | resource=resource, |
| #189 | rights=[CapabilityRight.WRITE], |
| #190 | policy=CapabilityManager.ASK_EACH_TIME, |
| #191 | issued_by="test", |
| #192 | ) |
| #193 | |
| #194 | result = self.runtime.tools.call( |
| #195 | pid, |
| #196 | "write_text_file", |
| #197 | {"path": path, "content": "new", "overwrite": False}, |
| #198 | ) |
| #199 | |
| #200 | self.assertFalse(result.ok) |
| #201 | self.assertEqual(self.runtime.human.pending(), []) |
| #202 | self.assertEqual(target.read_text(encoding="utf-8"), "existing") |
| #203 | |
| #204 | def test_missing_delete_consumes_one_time_grant(self) -> None: |
| #205 | pid = self.runtime.process.spawn(image="review-agent:v0", goal="delete missing once") |
| #206 | path = self._path() |
| #207 | resource = self.runtime.filesystem.resource_for(path) |
| #208 | self.runtime.capability.set_permission_policy( |
| #209 | subject=pid, |
| #210 | resource=resource, |
| #211 | rights=[CapabilityRight.DELETE], |
| #212 | policy=CapabilityManager.ASK_EACH_TIME, |
| #213 | issued_by="test", |
| #214 | ) |
| #215 | |
| #216 | with self.assertRaises(HumanApprovalRequired): |
| #217 | self.runtime.tools.call(pid, "delete_file", {"path": path, "missing_ok": True}) |
| #218 | self.runtime.human.drain_terminal_queue(auto_approve=True) |
| #219 | retry = self.runtime.tools.call(pid, "delete_file", {"path": path, "missing_ok": True}) |
| #220 | target = self.runtime.workspace_root / path |
| #221 | target.parent.mkdir(parents=True, exist_ok=True) |
| #222 | target.write_text("now present", encoding="utf-8") |
| #223 | |
| #224 | self.assertTrue(retry.ok, retry.error) |
| #225 | self.assertEqual( |
| #226 | self.runtime.capability.permission_policy(pid, resource, CapabilityRight.DELETE), |
| #227 | CapabilityManager.ASK_EACH_TIME, |
| #228 | ) |
| #229 | with self.assertRaises(HumanApprovalRequired): |
| #230 | self.runtime.tools.call(pid, "delete_file", {"path": path, "missing_ok": False}) |
| #231 | |
| #232 | def test_llm_pending_per_use_approval_does_not_return_action_until_decision(self) -> None: |
| #233 | path = self._path() |
| #234 | client = FakeActionClient( |
| #235 | [ |
| #236 | { |
| #237 | "action": "write_text_file", |
| #238 | "path": path, |
| #239 | "content": "approved after waiting", |
| #240 | } |
| #241 | ] |
| #242 | ) |
| #243 | self.runtime.llm.client = client |
| #244 | pid = self.runtime.process.spawn(image="review-agent:v0", goal="write with per-use approval") |
| #245 | self.runtime.capability.set_permission_policy( |
| #246 | subject=pid, |
| #247 | resource=self.runtime.filesystem.resource_for(path), |
| #248 | rights=[CapabilityRight.WRITE], |
| #249 | policy=CapabilityManager.ASK_EACH_TIME, |
| #250 | issued_by="test", |
| #251 | ) |
| #252 | |
| #253 | waiting = self.runtime.run_next_process_once() |
| #254 | self.assertTrue(waiting["waiting_human"]) |
| #255 | self.assertNotIn("action", waiting) |
| #256 | self.assertEqual(client.calls, 1) |
| #257 | self.assertEqual(self.runtime.process.get(pid).status, ProcessStatus.WAITING_HUMAN) |
| #258 | self.assertNotIn("tool_failed", self._event_types(pid)) |
| #259 | |
| #260 | self.runtime.human.drain_terminal_queue(auto_approve=True) |
| #261 | resumed = self.runtime.run_next_process_once() |
| #262 | |
| #263 | self.assertEqual(client.calls, 1) |
| #264 | self.assertTrue(resumed["resumed_after_human"]) |
| #265 | self.assertEqual(resumed["action"]["action"], "write_text_file") |
| #266 | self.assertTrue(resumed["result"]["ok"]) |
| #267 | self.assertEqual( |
| #268 | (self.runtime.workspace_root / path).read_text(encoding="utf-8"), |
| #269 | "approved after waiting", |
| #270 | ) |
| #271 | |
| #272 | def _path(self) -> str: |
| #273 | return f"agent_outputs/permission_policy_{uuid4().hex}.txt" |
| #274 | |
| #275 | def _event_types(self, pid: str) -> list[str]: |
| #276 | return [event.type.value for event in self.runtime.events.list(target=pid)] |
| #277 | |
| #278 | |
| #279 | class FakeActionClient: |
| #280 | def __init__(self, actions: list[dict[str, object]]): |
| #281 | self.actions = list(actions) |
| #282 | self.calls = 0 |
| #283 | |
| #284 | def complete_action(self, messages: list[dict[str, str]], tools: list[dict[str, object]]) -> LLMCompletion: |
| #285 | self.calls += 1 |
| #286 | action = self.actions.pop(0) |
| #287 | name = str(action["action"]) |
| #288 | args = {key: value for key, value in action.items() if key != "action"} |
| #289 | return LLMCompletion( |
| #290 | content="", |
| #291 | tool_calls=[{"id": f"fake_{self.calls}", "name": name, "arguments": json.dumps(args)}], |
| #292 | ) |
| #293 | |
| #294 | |
| #295 | if __name__ == "__main__": |
| #296 | unittest.main() |
| #297 |