repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
Mirrored from https://github.com/yingqi-z20/Agent-libOS
stars
latest
clone command
git clone gitlawb://did:key:z6MkqRzA...RfoM/yingqi-z20-Agen...git clone gitlawb://did:key:z6MkqRzA.../yingqi-z20-Agen...d98dd2c9IPC1d ago| #1 | from __future__ import annotations |
| #2 | |
| #3 | import asyncio |
| #4 | import json |
| #5 | import unittest |
| #6 | |
| #7 | from agent_libos import Runtime |
| #8 | from agent_libos.models.exceptions import HumanResponseRequired |
| #9 | from agent_libos.llm.client import LLMCompletion |
| #10 | from agent_libos.models import CapabilityRight, HumanRequestStatus, ProcessStatus |
| #11 | |
| #12 | |
| #13 | class HumanQuestionToolTests(unittest.TestCase): |
| #14 | def setUp(self) -> None: |
| #15 | self.runtime = Runtime.open("local") |
| #16 | self.human_output: list[str] = [] |
| #17 | self.runtime.substrate.human.output_sink = self.human_output.append |
| #18 | |
| #19 | def tearDown(self) -> None: |
| #20 | self.runtime.close() |
| #21 | |
| #22 | def test_ask_human_tool_waits_and_returns_answer_after_queue_processing(self) -> None: |
| #23 | pid = self.runtime.process.spawn(image="review-agent:v0", goal="ask a human") |
| #24 | self.runtime.capability.grant(pid, "human:owner", [CapabilityRight.WRITE], issued_by="test") |
| #25 | prompts: list[str] = [] |
| #26 | |
| #27 | with self.assertRaises(HumanResponseRequired) as raised: |
| #28 | self.runtime.tools.call( |
| #29 | pid, |
| #30 | "ask_human", |
| #31 | {"question": "Which color should I use?", "context": {"artifact": "draft"}}, |
| #32 | ) |
| #33 | |
| #34 | pending = self.runtime.human.pending()[0] |
| #35 | self.runtime.substrate.human.input_reader = lambda prompt: prompts.append(prompt) or "blue" |
| #36 | processed = self.runtime.human.drain_terminal_queue() |
| #37 | result = self.runtime.tools.call( |
| #38 | pid, |
| #39 | "ask_human", |
| #40 | {"question": "Which color should I use?", "context": {"artifact": "draft"}}, |
| #41 | ) |
| #42 | |
| #43 | self.assertEqual(raised.exception.request_id, pending.request_id) |
| #44 | self.assertEqual(pending.payload["type"], "question") |
| #45 | self.assertEqual(self.runtime.process.get(pid).status, ProcessStatus.RUNNABLE) |
| #46 | self.assertEqual(processed[0].status, HumanRequestStatus.APPROVED) |
| #47 | self.assertEqual(processed[0].decision["answer"], "blue") |
| #48 | self.assertIn("artifact", prompts[0]) |
| #49 | self.assertTrue(result.ok, result.error) |
| #50 | self.assertEqual(result.payload["answer"], "blue") |
| #51 | self.assertEqual(result.payload["request_id"], pending.request_id) |
| #52 | |
| #53 | def test_ask_human_tool_cannot_bypass_human_capability(self) -> None: |
| #54 | pid = self.runtime.process.spawn(image="review-agent:v0", goal="ask without authority") |
| #55 | |
| #56 | denied = self.runtime.tools.call(pid, "ask_human", {"question": "May I ask?"}) |
| #57 | |
| #58 | self.assertFalse(denied.ok) |
| #59 | self.assertIn("lacks write on human:owner", denied.error or "") |
| #60 | self.assertEqual(self.runtime.human.pending(), []) |
| #61 | self.assertNotIn("human.query", self._audit_actions()) |
| #62 | |
| #63 | def test_async_runtime_resumes_human_question_with_answer(self) -> None: |
| #64 | self.runtime.llm.client = PlannedActionClient( |
| #65 | [ |
| #66 | {"action": "ask_human", "question": "What deployment window should I use?"}, |
| #67 | {"action": "process_exit", "payload": {"done": True}}, |
| #68 | ] |
| #69 | ) |
| #70 | pid = self.runtime.process.spawn(image="base-agent:v0", goal="ask then exit") |
| #71 | |
| #72 | results = asyncio.run( |
| #73 | self.runtime.arun_until_idle( |
| #74 | max_quanta=4, |
| #75 | human_auto_answer="Sunday 02:00 UTC", |
| #76 | ) |
| #77 | ) |
| #78 | |
| #79 | self.assertEqual(self.runtime.process.get(pid).status, ProcessStatus.EXITED) |
| #80 | self.assertEqual(self.runtime.llm.client.calls, 2) |
| #81 | self.assertTrue(results[0]["waiting_human"]) |
| #82 | self.assertNotIn("action", results[0]) |
| #83 | ask_result = next(result for result in results if _action_name(result) == "ask_human") |
| #84 | self.assertEqual(ask_result["result"]["payload"]["answer"], "Sunday 02:00 UTC") |
| #85 | self.assertEqual(self.runtime.human.list(pid)[0].decision["answer"], "Sunday 02:00 UTC") |
| #86 | |
| #87 | def _audit_actions(self) -> list[str]: |
| #88 | return [record.action for record in self.runtime.audit.trace()] |
| #89 | |
| #90 | |
| #91 | class PlannedActionClient: |
| #92 | def __init__(self, actions: list[dict[str, object]]): |
| #93 | self.actions = list(actions) |
| #94 | self.calls = 0 |
| #95 | |
| #96 | def complete_action(self, messages: list[dict[str, str]], tools: list[dict[str, object]]) -> LLMCompletion: |
| #97 | self.calls += 1 |
| #98 | if not self.actions: |
| #99 | raise AssertionError("no planned action remains") |
| #100 | action = self.actions.pop(0) |
| #101 | name = str(action["action"]) |
| #102 | args = {key: value for key, value in action.items() if key != "action"} |
| #103 | return LLMCompletion( |
| #104 | content="", |
| #105 | tool_calls=[{"id": f"human_question_{self.calls}", "name": name, "arguments": json.dumps(args)}], |
| #106 | ) |
| #107 | |
| #108 | |
| #109 | def _action_name(result: object) -> str | None: |
| #110 | if not isinstance(result, dict): |
| #111 | return None |
| #112 | action = result.get("action") |
| #113 | if isinstance(action, dict): |
| #114 | return action.get("action") |
| #115 | return None |
| #116 | |
| #117 | |
| #118 | if __name__ == "__main__": |
| #119 | unittest.main() |
| #120 |