repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
Mirrored from https://github.com/yingqi-z20/Agent-libOS
stars
latest
clone command
git clone gitlawb://did:key:z6MkqRzA...RfoM/yingqi-z20-Agen...git clone gitlawb://did:key:z6MkqRzA.../yingqi-z20-Agen...d98dd2c9IPC1d ago| #1 | from __future__ import annotations |
| #2 | |
| #3 | import json |
| #4 | import unittest |
| #5 | from uuid import uuid4 |
| #6 | |
| #7 | from agent_libos import Runtime |
| #8 | from agent_libos.llm.client import LLMCompletion |
| #9 | from agent_libos.models import CapabilityRight, ProcessStatus |
| #10 | |
| #11 | |
| #12 | class ObjectFileToolTests(unittest.TestCase): |
| #13 | def setUp(self) -> None: |
| #14 | self.runtime = Runtime.open("local") |
| #15 | |
| #16 | def tearDown(self) -> None: |
| #17 | self.runtime.close() |
| #18 | |
| #19 | def test_copy_file_via_named_object_without_materializing_content_to_process(self) -> None: |
| #20 | sentinel = f"OBJECT_COPY_SENTINEL_{uuid4().hex}" |
| #21 | source = f"agent_outputs/object_copy_source_{uuid4().hex}.txt" |
| #22 | target = f"agent_outputs/object_copy_target_{uuid4().hex}.txt" |
| #23 | object_name = f"copy.object.{uuid4().hex}" |
| #24 | source_path = self.runtime.workspace_root / source |
| #25 | source_path.parent.mkdir(parents=True, exist_ok=True) |
| #26 | source_path.write_text(f"alpha\n{sentinel}\nomega\n", encoding="utf-8") |
| #27 | |
| #28 | client = GuardedActionClient( |
| #29 | actions=[ |
| #30 | {"action": "create_object_from_file", "name": object_name, "path": source}, |
| #31 | {"action": "write_object_to_file", "name": object_name, "path": target}, |
| #32 | {"action": "process_exit", "payload": {"copied": True, "object_name": object_name}}, |
| #33 | ], |
| #34 | forbidden_text=sentinel, |
| #35 | ) |
| #36 | self.runtime.llm.client = client |
| #37 | pid = self.runtime.process.spawn(image="review-agent:v0", goal="copy a file through Object Memory") |
| #38 | self.runtime.filesystem.grant_path(pid, source, [CapabilityRight.READ], issued_by="test") |
| #39 | self.runtime.filesystem.grant_path(pid, target, [CapabilityRight.WRITE], issued_by="test") |
| #40 | |
| #41 | results = [] |
| #42 | for _ in range(5): |
| #43 | result = self.runtime.run_next_process_once() |
| #44 | if result is None: |
| #45 | break |
| #46 | results.append(result) |
| #47 | if self.runtime.process.get(pid).status == ProcessStatus.EXITED: |
| #48 | break |
| #49 | |
| #50 | action_names = [result["action"]["action"] for result in results if "action" in result] |
| #51 | create_result, write_result = results[0]["result"], results[1]["result"] |
| #52 | |
| #53 | self.assertEqual(action_names, ["create_object_from_file", "write_object_to_file", "process_exit"]) |
| #54 | self.assertTrue(create_result["ok"]) |
| #55 | self.assertTrue(write_result["ok"]) |
| #56 | self.assertNotIn(sentinel, json.dumps(create_result, ensure_ascii=False)) |
| #57 | self.assertNotIn(sentinel, json.dumps(write_result, ensure_ascii=False)) |
| #58 | self.assertEqual((self.runtime.workspace_root / target).read_text(encoding="utf-8"), source_path.read_text(encoding="utf-8")) |
| #59 | self.assertEqual(client.calls, 3) |
| #60 | |
| #61 | def test_object_file_tools_enforce_filesystem_and_object_capabilities(self) -> None: |
| #62 | source = f"agent_outputs/object_tool_source_{uuid4().hex}.txt" |
| #63 | target = f"agent_outputs/object_tool_target_{uuid4().hex}.txt" |
| #64 | object_name = f"secure.object.{uuid4().hex}" |
| #65 | source_path = self.runtime.workspace_root / source |
| #66 | source_path.parent.mkdir(parents=True, exist_ok=True) |
| #67 | source_path.write_text("capability checked", encoding="utf-8") |
| #68 | pid = self.runtime.process.spawn(image="review-agent:v0", goal="use object file tools") |
| #69 | |
| #70 | denied_read = self.runtime.tools.call(pid, "create_object_from_file", {"name": object_name, "path": source}) |
| #71 | self.assertFalse(denied_read.ok) |
| #72 | self.assertIn("lacks read", denied_read.error or "") |
| #73 | |
| #74 | self.runtime.filesystem.grant_path(pid, source, [CapabilityRight.READ], issued_by="test") |
| #75 | created = self.runtime.tools.call(pid, "create_object_from_file", {"name": object_name, "path": source}) |
| #76 | self.assertTrue(created.ok) |
| #77 | self.assertNotIn("capability checked", json.dumps(created.payload, ensure_ascii=False)) |
| #78 | |
| #79 | denied_write = self.runtime.tools.call(pid, "write_object_to_file", {"name": object_name, "path": target}) |
| #80 | self.assertFalse(denied_write.ok) |
| #81 | self.assertIn("lacks write", denied_write.error or "") |
| #82 | |
| #83 | self.runtime.filesystem.grant_path(pid, target, [CapabilityRight.WRITE], issued_by="test") |
| #84 | written = self.runtime.tools.call(pid, "write_object_to_file", {"name": object_name, "path": target}) |
| #85 | self.assertTrue(written.ok) |
| #86 | self.assertEqual((self.runtime.workspace_root / target).read_text(encoding="utf-8"), "capability checked") |
| #87 | |
| #88 | |
| #89 | class GuardedActionClient: |
| #90 | def __init__(self, actions: list[dict[str, object]], forbidden_text: str): |
| #91 | self.actions = list(actions) |
| #92 | self.forbidden_text = forbidden_text |
| #93 | self.calls = 0 |
| #94 | |
| #95 | def complete_action(self, messages: list[dict[str, str]], tools: list[dict[str, object]]) -> LLMCompletion: |
| #96 | self.calls += 1 |
| #97 | serialized_messages = json.dumps(messages, ensure_ascii=False) |
| #98 | if self.forbidden_text and self.forbidden_text in serialized_messages: |
| #99 | raise AssertionError("file content was materialized into the process prompt") |
| #100 | action = self.actions.pop(0) |
| #101 | name = str(action["action"]) |
| #102 | args = {key: value for key, value in action.items() if key != "action"} |
| #103 | return LLMCompletion( |
| #104 | content="", |
| #105 | tool_calls=[{"id": f"fake_{self.calls}", "name": name, "arguments": json.dumps(args)}], |
| #106 | ) |
| #107 | |
| #108 | |
| #109 | if __name__ == "__main__": |
| #110 | unittest.main() |
| #111 |