repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
Mirrored from https://github.com/yingqi-z20/Agent-libOS
stars
latest
clone command
git clone gitlawb://did:key:z6MkqRzA...RfoM/yingqi-z20-Agen...git clone gitlawb://did:key:z6MkqRzA.../yingqi-z20-Agen...d98dd2c9IPC1d ago| #1 | from __future__ import annotations |
| #2 | |
| #3 | import tempfile |
| #4 | import unittest |
| #5 | from pathlib import Path |
| #6 | |
| #7 | from agent_libos import Runtime |
| #8 | from agent_libos.config import AgentLibOSConfig, ShellCommandRule, ShellDefaults |
| #9 | from agent_libos.models import CapabilityRight, HumanRequestStatus |
| #10 | from agent_libos.models.exceptions import CapabilityDenied, HumanApprovalRequired, ValidationError |
| #11 | from agent_libos.substrate import ( |
| #12 | CommandResult, |
| #13 | LocalClockProvider, |
| #14 | LocalFilesystemProvider, |
| #15 | LocalHumanProvider, |
| #16 | LocalResourceProviderSubstrate, |
| #17 | ) |
| #18 | |
| #19 | |
| #20 | class ShellPrimitiveTests(unittest.TestCase): |
| #21 | def test_whitelist_policy_auto_allows_exact_safe_command(self) -> None: |
| #22 | runtime, provider = self._runtime_with_fake_shell() |
| #23 | try: |
| #24 | pid = runtime.process.spawn(image="review-agent:v0", goal="run safe shell") |
| #25 | runtime.shell.grant_policy(pid, runtime.config.shell.allowlist_auto_else_ask_level, issued_by="test") |
| #26 | |
| #27 | result = runtime.shell.run(pid, ["git", "status", "--short"], timeout=2.0) |
| #28 | |
| #29 | self.assertEqual(result.stdout, "ok\n") |
| #30 | self.assertEqual(provider.calls, [(["git", "status", "--short"], 2.0)]) |
| #31 | self.assertIn("primitive.shell.run", self._audit_actions(runtime)) |
| #32 | finally: |
| #33 | runtime.close() |
| #34 | |
| #35 | def test_unlisted_command_requires_approval_and_consumes_once(self) -> None: |
| #36 | runtime, provider = self._runtime_with_fake_shell() |
| #37 | try: |
| #38 | pid = runtime.process.spawn(image="review-agent:v0", goal="run unlisted shell") |
| #39 | runtime.shell.grant_policy(pid, runtime.config.shell.allowlist_auto_else_ask_level, issued_by="test") |
| #40 | |
| #41 | with self.assertRaises(HumanApprovalRequired): |
| #42 | runtime.shell.run(pid, ["git", "show", "--stat"]) |
| #43 | |
| #44 | pending = runtime.human.pending() |
| #45 | self.assertEqual(len(pending), 1) |
| #46 | self.assertEqual(pending[0].payload["context"]["argv"], ["git", "show", "--stat"]) |
| #47 | self.assertEqual(runtime.human.drain_terminal_queue(auto_approve=True)[0].status, HumanRequestStatus.APPROVED) |
| #48 | |
| #49 | allowed = runtime.shell.run(pid, ["git", "show", "--stat"]) |
| #50 | self.assertEqual(allowed.stdout, "ok\n") |
| #51 | self.assertEqual(provider.calls, [(["git", "show", "--stat"], runtime.config.tools.shell_timeout_s)]) |
| #52 | |
| #53 | with self.assertRaises(HumanApprovalRequired): |
| #54 | runtime.shell.run(pid, ["git", "show", "--stat"]) |
| #55 | finally: |
| #56 | runtime.close() |
| #57 | |
| #58 | def test_always_deny_shell_policy_overrides_exact_command_grant(self) -> None: |
| #59 | runtime, _provider = self._runtime_with_fake_shell() |
| #60 | try: |
| #61 | pid = runtime.process.spawn(image="review-agent:v0", goal="deny shell") |
| #62 | runtime.shell.grant_policy(pid, runtime.config.shell.always_deny_level, issued_by="test") |
| #63 | runtime.capability.grant(pid, "shell:git", [CapabilityRight.EXECUTE], issued_by="test") |
| #64 | |
| #65 | with self.assertRaises(CapabilityDenied): |
| #66 | runtime.shell.run(pid, ["git", "status", "--short"]) |
| #67 | finally: |
| #68 | runtime.close() |
| #69 | |
| #70 | def test_blacklist_policy_asks_for_nested_shell_interpreter(self) -> None: |
| #71 | runtime, _provider = self._runtime_with_fake_shell() |
| #72 | try: |
| #73 | pid = runtime.process.spawn(image="review-agent:v0", goal="blacklist shell") |
| #74 | runtime.shell.grant_policy(pid, runtime.config.shell.blocklist_ask_else_auto_level, issued_by="test") |
| #75 | |
| #76 | with self.assertRaises(HumanApprovalRequired): |
| #77 | runtime.shell.run(pid, ["env", "bash", "-c", "echo unsafe"]) |
| #78 | |
| #79 | request = runtime.human.pending()[0] |
| #80 | self.assertEqual(request.payload["context"]["matched_rule"], ["bash"]) |
| #81 | finally: |
| #82 | runtime.close() |
| #83 | |
| #84 | def test_path_qualified_whitelist_command_does_not_match_bare_rule(self) -> None: |
| #85 | runtime, _provider = self._runtime_with_fake_shell() |
| #86 | try: |
| #87 | pid = runtime.process.spawn(image="review-agent:v0", goal="path bypass") |
| #88 | runtime.shell.grant_policy(pid, runtime.config.shell.allowlist_auto_else_ask_level, issued_by="test") |
| #89 | |
| #90 | with self.assertRaises(HumanApprovalRequired): |
| #91 | runtime.shell.run(pid, ["./git", "status", "--short"]) |
| #92 | finally: |
| #93 | runtime.close() |
| #94 | |
| #95 | def test_shell_tool_uses_runtime_shell_primitive(self) -> None: |
| #96 | runtime, provider = self._runtime_with_fake_shell() |
| #97 | try: |
| #98 | pid = runtime.process.spawn(image="review-agent:v0", goal="run shell tool") |
| #99 | runtime.shell.grant_policy(pid, runtime.config.shell.allowlist_auto_else_ask_level, issued_by="test") |
| #100 | |
| #101 | result = runtime.tools.call(pid, "run_shell_command", {"argv": ["git", "status", "--short"]}) |
| #102 | |
| #103 | self.assertTrue(result.ok, result.error) |
| #104 | self.assertEqual(result.payload["stdout"], "ok\n") |
| #105 | self.assertEqual(provider.calls, [(["git", "status", "--short"], runtime.config.tools.shell_timeout_s)]) |
| #106 | finally: |
| #107 | runtime.close() |
| #108 | |
| #109 | def test_shell_primitive_truncates_output_before_tool_layer(self) -> None: |
| #110 | config = AgentLibOSConfig( |
| #111 | shell=ShellDefaults( |
| #112 | max_stdout_chars=3, |
| #113 | max_stderr_chars=2, |
| #114 | whitelist=(ShellCommandRule(("tool",)),), |
| #115 | blacklist=(), |
| #116 | ) |
| #117 | ) |
| #118 | runtime, provider = self._runtime_with_config(config) |
| #119 | provider.stdout = "abcdef" |
| #120 | provider.stderr = "wxyz" |
| #121 | try: |
| #122 | pid = runtime.process.spawn(image="review-agent:v0", goal="bounded shell") |
| #123 | runtime.shell.grant_policy(pid, config.shell.allowlist_auto_else_ask_level, issued_by="test") |
| #124 | |
| #125 | result = runtime.shell.run(pid, ["tool"]) |
| #126 | tool_result = runtime.tools.call( |
| #127 | pid, |
| #128 | "run_shell_command", |
| #129 | {"argv": ["tool"], "max_stdout_chars": 10, "max_stderr_chars": 10}, |
| #130 | ) |
| #131 | |
| #132 | self.assertEqual(result.stdout, "abc") |
| #133 | self.assertEqual(result.stderr, "wx") |
| #134 | self.assertTrue(result.stdout_truncated) |
| #135 | self.assertTrue(result.stderr_truncated) |
| #136 | self.assertEqual(tool_result.payload["stdout"], "abc") |
| #137 | self.assertTrue(tool_result.payload["stdout_truncated"]) |
| #138 | self.assertEqual(tool_result.payload["stderr"], "wx") |
| #139 | self.assertTrue(tool_result.payload["stderr_truncated"]) |
| #140 | finally: |
| #141 | runtime.close() |
| #142 | |
| #143 | def test_shell_primitive_enforces_timeout_limit_without_tool_schema(self) -> None: |
| #144 | runtime, _provider = self._runtime_with_fake_shell() |
| #145 | try: |
| #146 | pid = runtime.process.spawn(image="review-agent:v0", goal="bounded timeout") |
| #147 | runtime.shell.grant_policy(pid, runtime.config.shell.allowlist_auto_else_ask_level, issued_by="test") |
| #148 | |
| #149 | with self.assertRaises(ValidationError): |
| #150 | runtime.shell.run(pid, ["git", "status", "--short"], timeout=0) |
| #151 | with self.assertRaises(ValidationError): |
| #152 | runtime.shell.run( |
| #153 | pid, |
| #154 | ["git", "status", "--short"], |
| #155 | timeout=runtime.config.shell.timeout_hard_limit_s + 1, |
| #156 | ) |
| #157 | finally: |
| #158 | runtime.close() |
| #159 | |
| #160 | def _runtime_with_fake_shell(self) -> tuple[Runtime, "FakeShellProvider"]: |
| #161 | temp_dir = tempfile.TemporaryDirectory() |
| #162 | self.addCleanup(temp_dir.cleanup) |
| #163 | provider = FakeShellProvider() |
| #164 | substrate = RecordingShellSubstrate(temp_dir.name, provider) |
| #165 | runtime = Runtime.open("local", substrate=substrate) |
| #166 | runtime.substrate.human.output_sink = lambda _message: None |
| #167 | return runtime, provider |
| #168 | |
| #169 | def _runtime_with_config(self, config: AgentLibOSConfig) -> tuple[Runtime, "FakeShellProvider"]: |
| #170 | temp_dir = tempfile.TemporaryDirectory() |
| #171 | self.addCleanup(temp_dir.cleanup) |
| #172 | provider = FakeShellProvider() |
| #173 | substrate = RecordingShellSubstrate(temp_dir.name, provider) |
| #174 | runtime = Runtime.open("local", substrate=substrate, config=config) |
| #175 | runtime.substrate.human.output_sink = lambda _message: None |
| #176 | return runtime, provider |
| #177 | |
| #178 | def _audit_actions(self, runtime: Runtime) -> list[str]: |
| #179 | return [record.action for record in runtime.audit.trace()] |
| #180 | |
| #181 | |
| #182 | class ShellMatcherTests(unittest.TestCase): |
| #183 | def test_custom_exact_whitelist_rule_does_not_prefix_match_extra_args(self) -> None: |
| #184 | config = AgentLibOSConfig( |
| #185 | shell=ShellDefaults(whitelist=(ShellCommandRule(("tool", "safe")),), blacklist=()) |
| #186 | ) |
| #187 | runtime, _provider = self._runtime_with_config(config) |
| #188 | try: |
| #189 | pid = runtime.process.spawn(image="review-agent:v0", goal="exact shell") |
| #190 | runtime.shell.grant_policy(pid, config.shell.allowlist_auto_else_ask_level, issued_by="test") |
| #191 | |
| #192 | with self.assertRaises(HumanApprovalRequired): |
| #193 | runtime.shell.run(pid, ["tool", "safe", "--extra"]) |
| #194 | finally: |
| #195 | runtime.close() |
| #196 | |
| #197 | def _runtime_with_config(self, config: AgentLibOSConfig) -> tuple[Runtime, "FakeShellProvider"]: |
| #198 | temp_dir = tempfile.TemporaryDirectory() |
| #199 | self.addCleanup(temp_dir.cleanup) |
| #200 | provider = FakeShellProvider() |
| #201 | substrate = RecordingShellSubstrate(temp_dir.name, provider) |
| #202 | runtime = Runtime.open("local", substrate=substrate, config=config) |
| #203 | runtime.substrate.human.output_sink = lambda _message: None |
| #204 | return runtime, provider |
| #205 | |
| #206 | |
| #207 | class RecordingShellSubstrate(LocalResourceProviderSubstrate): |
| #208 | def __init__(self, root: str, shell: "FakeShellProvider"): |
| #209 | self.workspace_root = Path(root).resolve() |
| #210 | self.workspace_display = str(self.workspace_root) |
| #211 | self.filesystem = LocalFilesystemProvider(root) |
| #212 | self.clock = LocalClockProvider() |
| #213 | self.shell = shell |
| #214 | self.human = LocalHumanProvider() |
| #215 | |
| #216 | |
| #217 | class FakeShellProvider: |
| #218 | def __init__(self): |
| #219 | self.calls: list[tuple[list[str], float]] = [] |
| #220 | self.stdout = "ok\n" |
| #221 | self.stderr = "" |
| #222 | |
| #223 | def run(self, argv: list[str], *, timeout: float = 30.0, cwd: str | None = None) -> CommandResult: |
| #224 | self.calls.append((list(argv), timeout)) |
| #225 | return CommandResult(argv=list(argv), returncode=0, stdout=self.stdout, stderr=self.stderr) |
| #226 | |
| #227 | |
| #228 | if __name__ == "__main__": |
| #229 | unittest.main() |
| #230 |