repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
Mirrored from https://github.com/yingqi-z20/Agent-libOS
stars
latest
clone command
git clone gitlawb://did:key:z6MkqRzA...RfoM/yingqi-z20-Agen...git clone gitlawb://did:key:z6MkqRzA.../yingqi-z20-Agen...d98dd2c9IPC1d ago| #1 | from __future__ import annotations |
| #2 | |
| #3 | import tempfile |
| #4 | import unittest |
| #5 | from datetime import datetime, tzinfo |
| #6 | from pathlib import Path |
| #7 | from typing import Any |
| #8 | |
| #9 | from agent_libos import Runtime |
| #10 | from agent_libos.primitives.shell import ShellAdapter |
| #11 | from agent_libos.models import CapabilityRight |
| #12 | from agent_libos.substrate import ( |
| #13 | CommandResult, |
| #14 | LocalClockProvider, |
| #15 | LocalFilesystemProvider, |
| #16 | LocalHumanProvider, |
| #17 | LocalResourceProviderSubstrate, |
| #18 | LocalShellProvider, |
| #19 | ResolvedPath, |
| #20 | ) |
| #21 | |
| #22 | |
| #23 | class ResourceProviderSubstrateTests(unittest.TestCase): |
| #24 | def test_runtime_filesystem_primitive_uses_injected_provider(self) -> None: |
| #25 | with tempfile.TemporaryDirectory() as temp_dir: |
| #26 | substrate = RecordingSubstrate(temp_dir) |
| #27 | runtime = Runtime.open("local", substrate=substrate) |
| #28 | try: |
| #29 | path = "agent_outputs/substrate_write.txt" |
| #30 | pid = runtime.process.spawn(image="review-agent:v0", goal="write through substrate") |
| #31 | runtime.filesystem.grant_path(pid, path, [CapabilityRight.WRITE], issued_by="test") |
| #32 | |
| #33 | result = runtime.tools.call(pid, "write_text_file", {"path": path, "content": "via provider"}) |
| #34 | |
| #35 | self.assertTrue(result.ok, result.error) |
| #36 | self.assertIn("resolve", substrate.filesystem.calls) |
| #37 | self.assertIn("write_text", substrate.filesystem.calls) |
| #38 | self.assertEqual((Path(temp_dir) / path).read_text(encoding="utf-8"), "via provider") |
| #39 | finally: |
| #40 | runtime.close() |
| #41 | |
| #42 | def test_runtime_clock_primitive_uses_injected_provider(self) -> None: |
| #43 | with tempfile.TemporaryDirectory() as temp_dir: |
| #44 | fake_clock = FakeClockProvider() |
| #45 | substrate = LocalResourceProviderSubstrate(temp_dir) |
| #46 | substrate.clock = fake_clock |
| #47 | runtime = Runtime.open("local", substrate=substrate) |
| #48 | try: |
| #49 | pid = runtime.process.spawn(image="base-agent:v0", goal="use fake clock") |
| #50 | |
| #51 | now = runtime.tools.call(pid, "get_current_time", {"timezone": "UTC"}) |
| #52 | slept = runtime.tools.call(pid, "sleep", {"seconds": 0.1}) |
| #53 | |
| #54 | self.assertTrue(now.ok, now.error) |
| #55 | self.assertEqual(now.payload["iso8601"], "2040-01-02T03:04:05+00:00") |
| #56 | self.assertTrue(slept.ok, slept.error) |
| #57 | self.assertEqual(slept.payload["elapsed_seconds"], 0.25) |
| #58 | self.assertEqual(fake_clock.sleeps, [("async", 0.1)]) |
| #59 | finally: |
| #60 | runtime.close() |
| #61 | |
| #62 | def test_shell_adapter_uses_injected_provider(self) -> None: |
| #63 | runtime = Runtime.open("local") |
| #64 | provider = FakeShellProvider() |
| #65 | shell = ShellAdapter(runtime.capability, runtime.audit, provider=provider) |
| #66 | try: |
| #67 | pid = runtime.process.spawn(image="base-agent:v0", goal="run shell through substrate") |
| #68 | runtime.capability.grant(pid, "shell:git", [CapabilityRight.EXECUTE], issued_by="test") |
| #69 | |
| #70 | result = shell.run(pid, ["git", "status", "--short"], timeout=2.0) |
| #71 | |
| #72 | self.assertEqual(result.stdout, "ok\n") |
| #73 | self.assertEqual(provider.calls, [(["git", "status", "--short"], 2.0)]) |
| #74 | finally: |
| #75 | runtime.close() |
| #76 | |
| #77 | def test_runtime_human_primitive_uses_injected_provider(self) -> None: |
| #78 | with tempfile.TemporaryDirectory() as temp_dir: |
| #79 | human = RecordingHumanProvider(answers=["blue"]) |
| #80 | substrate = LocalResourceProviderSubstrate(temp_dir) |
| #81 | substrate.human = human |
| #82 | runtime = Runtime.open("local", substrate=substrate) |
| #83 | try: |
| #84 | pid = runtime.process.spawn(image="base-agent:v0", goal="use fake human") |
| #85 | output = runtime.tools.call(pid, "human_output", {"message": "hello"}) |
| #86 | question_id = runtime.human.ask(pid, "Favorite color?", blocking=True) |
| #87 | processed = runtime.human.drain_terminal_queue() |
| #88 | |
| #89 | self.assertTrue(output.ok, output.error) |
| #90 | self.assertEqual(human.outputs[0], "hello") |
| #91 | self.assertEqual(human.prompts, ["Favorite color? "]) |
| #92 | self.assertEqual(processed[0].request_id, question_id) |
| #93 | self.assertEqual(processed[0].decision["answer"], "blue") |
| #94 | finally: |
| #95 | runtime.close() |
| #96 | |
| #97 | |
| #98 | class RecordingSubstrate: |
| #99 | def __init__(self, root: str): |
| #100 | self.workspace_root = Path(root).resolve() |
| #101 | self.workspace_display = str(self.workspace_root) |
| #102 | self.filesystem = RecordingFilesystemProvider(root) |
| #103 | self.clock = LocalClockProvider() |
| #104 | self.shell = LocalShellProvider(root) |
| #105 | self.human = LocalHumanProvider() |
| #106 | |
| #107 | |
| #108 | class RecordingFilesystemProvider: |
| #109 | def __init__(self, root: str): |
| #110 | self.inner = LocalFilesystemProvider(root) |
| #111 | self.namespace = self.inner.namespace |
| #112 | self.root_display = self.inner.root_display |
| #113 | self.calls: list[str] = [] |
| #114 | |
| #115 | def resolve(self, path: Any) -> ResolvedPath: |
| #116 | self.calls.append("resolve") |
| #117 | return self.inner.resolve(path) |
| #118 | |
| #119 | def state(self, path: ResolvedPath): |
| #120 | self.calls.append("state") |
| #121 | return self.inner.state(path) |
| #122 | |
| #123 | def write_text(self, path: ResolvedPath, text: str, encoding: str, newline: str | None = "\n") -> None: |
| #124 | self.calls.append("write_text") |
| #125 | self.inner.write_text(path, text, encoding, newline) |
| #126 | |
| #127 | def __getattr__(self, name: str): |
| #128 | return getattr(self.inner, name) |
| #129 | |
| #130 | |
| #131 | class FakeClockProvider: |
| #132 | def __init__(self): |
| #133 | self.sleeps: list[tuple[str, float]] = [] |
| #134 | self._monotonic = [100.0, 100.25] |
| #135 | |
| #136 | def now(self, timezone_: tzinfo) -> datetime: |
| #137 | return datetime(2040, 1, 2, 3, 4, 5, tzinfo=timezone_) |
| #138 | |
| #139 | def monotonic(self) -> float: |
| #140 | return self._monotonic.pop(0) |
| #141 | |
| #142 | def sleep(self, seconds: float) -> None: |
| #143 | self.sleeps.append(("sync", seconds)) |
| #144 | |
| #145 | async def asleep(self, seconds: float) -> None: |
| #146 | self.sleeps.append(("async", seconds)) |
| #147 | |
| #148 | |
| #149 | class FakeShellProvider: |
| #150 | def __init__(self): |
| #151 | self.calls: list[tuple[list[str], float]] = [] |
| #152 | |
| #153 | def run(self, argv: list[str], *, timeout: float = 30.0, cwd: str | None = None) -> CommandResult: |
| #154 | self.calls.append((list(argv), timeout)) |
| #155 | return CommandResult(argv=list(argv), returncode=0, stdout="ok\n", stderr="") |
| #156 | |
| #157 | |
| #158 | class RecordingHumanProvider: |
| #159 | def __init__(self, answers: list[str] | None = None): |
| #160 | self.outputs: list[str] = [] |
| #161 | self.prompts: list[str] = [] |
| #162 | self.answers = list(answers or []) |
| #163 | |
| #164 | def write(self, message: str) -> None: |
| #165 | self.outputs.append(message) |
| #166 | |
| #167 | def read(self, prompt: str) -> str: |
| #168 | self.prompts.append(prompt) |
| #169 | return self.answers.pop(0) if self.answers else "" |
| #170 | |
| #171 | |
| #172 | if __name__ == "__main__": |
| #173 | unittest.main() |
| #174 |