repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
Mirrored from https://github.com/yingqi-z20/Agent-libOS
stars
latest
clone command
git clone gitlawb://did:key:z6MkqRzA...RfoM/yingqi-z20-Agen...git clone gitlawb://did:key:z6MkqRzA.../yingqi-z20-Agen...d98dd2c9IPC1d ago| #1 | from __future__ import annotations |
| #2 | |
| #3 | import unittest |
| #4 | from uuid import uuid4 |
| #5 | |
| #6 | from agent_libos import Runtime |
| #7 | from agent_libos.capability.manager import CapabilityManager |
| #8 | from agent_libos.config import DEFAULT_CONFIG |
| #9 | from agent_libos.models.exceptions import HumanApprovalRequired, ValidationError |
| #10 | from agent_libos.models import CapabilityRight, HumanRequestStatus |
| #11 | |
| #12 | _TOOL_DEFAULTS = DEFAULT_CONFIG.tools |
| #13 | DEFAULT_FILESYSTEM_READ_HARD_LIMIT = _TOOL_DEFAULTS.filesystem_read_hard_limit_bytes |
| #14 | DEFAULT_DIRECTORY_ENTRY_HARD_LIMIT = _TOOL_DEFAULTS.directory_entry_hard_limit |
| #15 | |
| #16 | |
| #17 | class FilesystemDirectoryToolTests(unittest.TestCase): |
| #18 | def setUp(self) -> None: |
| #19 | self.runtime = Runtime.open("local") |
| #20 | self.human_output: list[str] = [] |
| #21 | self.runtime.substrate.human.output_sink = self.human_output.append |
| #22 | |
| #23 | def tearDown(self) -> None: |
| #24 | self.runtime.close() |
| #25 | |
| #26 | def test_read_write_and_delete_directory_and_file_tools(self) -> None: |
| #27 | base = f"agent_outputs/fs_ops_{uuid4().hex}" |
| #28 | existing_file = self._write_fixture(f"{base}/existing.txt", "existing") |
| #29 | pid = self.runtime.process.spawn(image="review-agent:v0", goal="filesystem ops") |
| #30 | self.runtime.filesystem.grant_path_list( |
| #31 | pid, |
| #32 | read_dirs=[base], |
| #33 | write_dirs=[base], |
| #34 | delete_dirs=[base], |
| #35 | issued_by="test", |
| #36 | ) |
| #37 | |
| #38 | listed = self.runtime.tools.call(pid, "read_directory", {"path": base}) |
| #39 | made_dir = self.runtime.tools.call(pid, "write_directory", {"path": f"{base}/created/nested"}) |
| #40 | wrote_file = self.runtime.tools.call( |
| #41 | pid, |
| #42 | "write_text_file", |
| #43 | {"path": f"{base}/created/nested/out.txt", "content": "created"}, |
| #44 | ) |
| #45 | deleted_file = self.runtime.tools.call(pid, "delete_file", {"path": existing_file}) |
| #46 | deleted_dir = self.runtime.tools.call( |
| #47 | pid, |
| #48 | "delete_directory", |
| #49 | {"path": f"{base}/created", "recursive": True}, |
| #50 | ) |
| #51 | |
| #52 | self.assertTrue(listed.ok, listed.error) |
| #53 | self.assertEqual([entry["name"] for entry in listed.payload["entries"]], ["existing.txt"]) |
| #54 | self.assertTrue(made_dir.ok, made_dir.error) |
| #55 | self.assertTrue(made_dir.payload["created"]) |
| #56 | self.assertTrue(wrote_file.ok, wrote_file.error) |
| #57 | self.assertTrue(deleted_file.ok, deleted_file.error) |
| #58 | self.assertFalse((self.runtime.workspace_root / existing_file).exists()) |
| #59 | self.assertTrue(deleted_dir.ok, deleted_dir.error) |
| #60 | self.assertFalse((self.runtime.workspace_root / base / "created").exists()) |
| #61 | |
| #62 | def test_delete_requires_delete_capability_not_write_capability(self) -> None: |
| #63 | path = self._write_fixture(f"agent_outputs/delete_denied_{uuid4().hex}.txt", "keep") |
| #64 | pid = self.runtime.process.spawn(image="review-agent:v0", goal="delete denied") |
| #65 | self.runtime.filesystem.grant_path(pid, path, [CapabilityRight.WRITE], issued_by="test") |
| #66 | |
| #67 | denied = self.runtime.tools.call(pid, "delete_file", {"path": path}) |
| #68 | |
| #69 | self.assertFalse(denied.ok) |
| #70 | self.assertIn("lacks delete", denied.error or "") |
| #71 | self.assertTrue((self.runtime.workspace_root / path).exists()) |
| #72 | |
| #73 | def test_delete_ask_each_time_uses_filesystem_primitive_context(self) -> None: |
| #74 | path = self._write_fixture(f"agent_outputs/delete_prompt_{uuid4().hex}.txt", "delete me") |
| #75 | pid = self.runtime.process.spawn(image="review-agent:v0", goal="delete with prompt") |
| #76 | resource = self.runtime.filesystem.resource_for_path(path) |
| #77 | self.runtime.capability.set_permission_policy( |
| #78 | subject=pid, |
| #79 | resource=resource, |
| #80 | rights=[CapabilityRight.DELETE], |
| #81 | policy=CapabilityManager.ASK_EACH_TIME, |
| #82 | issued_by="test", |
| #83 | ) |
| #84 | |
| #85 | with self.assertRaises(HumanApprovalRequired): |
| #86 | self.runtime.tools.call(pid, "delete_file", {"path": path}) |
| #87 | request = self.runtime.human.pending()[0] |
| #88 | processed = self.runtime.human.drain_terminal_queue(auto_approve=True) |
| #89 | retried = self.runtime.tools.call(pid, "delete_file", {"path": path}) |
| #90 | |
| #91 | self.assertEqual(request.payload["context"]["primitive"], "runtime.filesystem.delete_file") |
| #92 | self.assertEqual(request.payload["context"]["operation"], "delete_file") |
| #93 | self.assertEqual(request.payload["context"]["right"], "delete") |
| #94 | self.assertIn("target", request.payload["context"]) |
| #95 | self.assertEqual(processed[0].status, HumanRequestStatus.APPROVED) |
| #96 | self.assertTrue(retried.ok, retried.error) |
| #97 | self.assertFalse((self.runtime.workspace_root / path).exists()) |
| #98 | |
| #99 | def test_truncated_utf8_read_does_not_split_codepoint(self) -> None: |
| #100 | path = self._write_fixture(f"agent_outputs/utf8_{uuid4().hex}.txt", "éx") |
| #101 | pid = self.runtime.process.spawn(image="review-agent:v0", goal="read utf8") |
| #102 | self.runtime.filesystem.grant_path(pid, path, [CapabilityRight.READ], issued_by="test") |
| #103 | |
| #104 | result = self.runtime.filesystem.read_text(pid, path, max_bytes=1) |
| #105 | |
| #106 | self.assertTrue(result.truncated) |
| #107 | self.assertEqual(result.bytes_read, 1) |
| #108 | self.assertEqual(result.content, "") |
| #109 | |
| #110 | def test_filesystem_primitive_enforces_read_limits_without_tool_schema(self) -> None: |
| #111 | path = self._write_fixture(f"agent_outputs/read_limit_{uuid4().hex}.txt", "content") |
| #112 | pid = self.runtime.process.spawn(image="review-agent:v0", goal="read limit") |
| #113 | self.runtime.filesystem.grant_path(pid, path, [CapabilityRight.READ], issued_by="test") |
| #114 | |
| #115 | with self.assertRaises(ValidationError): |
| #116 | self.runtime.filesystem.read_text(pid, path, max_bytes=0) |
| #117 | with self.assertRaises(ValidationError): |
| #118 | self.runtime.filesystem.read_text( |
| #119 | pid, |
| #120 | path, |
| #121 | max_bytes=DEFAULT_FILESYSTEM_READ_HARD_LIMIT + 1, |
| #122 | ) |
| #123 | |
| #124 | def test_directory_primitive_enforces_limit_without_tool_schema(self) -> None: |
| #125 | base = f"agent_outputs/list_limit_{uuid4().hex}" |
| #126 | self._write_fixture(f"{base}/item.txt", "content") |
| #127 | pid = self.runtime.process.spawn(image="review-agent:v0", goal="directory limit") |
| #128 | self.runtime.filesystem.grant_directory(pid, base, [CapabilityRight.READ], issued_by="test") |
| #129 | |
| #130 | with self.assertRaises(ValidationError): |
| #131 | self.runtime.filesystem.read_directory(pid, base, limit=0) |
| #132 | with self.assertRaises(ValidationError): |
| #133 | self.runtime.filesystem.read_directory(pid, base, limit=DEFAULT_DIRECTORY_ENTRY_HARD_LIMIT + 1) |
| #134 | |
| #135 | def _write_fixture(self, path: str, content: str) -> str: |
| #136 | target = self.runtime.workspace_root / path |
| #137 | target.parent.mkdir(parents=True, exist_ok=True) |
| #138 | target.write_text(content, encoding="utf-8") |
| #139 | return path |
| #140 | |
| #141 | |
| #142 | if __name__ == "__main__": |
| #143 | unittest.main() |
| #144 |