repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
Mirrored from https://github.com/yingqi-z20/Agent-libOS
stars
latest
clone command
git clone gitlawb://did:key:z6MkqRzA...RfoM/yingqi-z20-Agen...git clone gitlawb://did:key:z6MkqRzA.../yingqi-z20-Agen...d98dd2c9IPC1d ago| #1 | from __future__ import annotations |
| #2 | |
| #3 | import argparse |
| #4 | import asyncio |
| #5 | import json |
| #6 | from uuid import uuid4 |
| #7 | |
| #8 | from agent_libos import Runtime |
| #9 | from agent_libos.config import DEFAULT_CONFIG |
| #10 | from agent_libos.models import CapabilityRight, ProcessStatus |
| #11 | from agent_libos.utils.serde import to_jsonable |
| #12 | |
| #13 | _RUNTIME_DEFAULTS = DEFAULT_CONFIG.runtime |
| #14 | _SCRIPT_DEFAULTS = DEFAULT_CONFIG.scripts |
| #15 | _TOOL_DEFAULTS = DEFAULT_CONFIG.tools |
| #16 | |
| #17 | |
| #18 | def main() -> None: |
| #19 | parser = argparse.ArgumentParser(description="Run a real LLM smoke test for write_text_file.") |
| #20 | parser.add_argument("--path", default=f"agent_outputs/llm_goal_{uuid4().hex[:8]}.txt") |
| #21 | parser.add_argument("--content", default="Agent libOS LLM write-file smoke test passed.\n") |
| #22 | parser.add_argument("--max-quanta", type=int, default=_SCRIPT_DEFAULTS.llm_write_smoke_max_quanta) |
| #23 | args = parser.parse_args() |
| #24 | asyncio.run(amain(args)) |
| #25 | |
| #26 | |
| #27 | async def amain(args: argparse.Namespace) -> None: |
| #28 | runtime = Runtime.open(_RUNTIME_DEFAULTS.local_store_target) |
| #29 | try: |
| #30 | goal = ( |
| #31 | f"Use the write_text_file tool to create workspace file {args.path!r} " |
| #32 | f"with exactly this content: {args.content!r}. After the file is written, exit." |
| #33 | ) |
| #34 | pid = runtime.process.spawn(image=_RUNTIME_DEFAULTS.coding_image_id, goal=goal) |
| #35 | runtime.filesystem.grant_workspace(pid, [CapabilityRight.WRITE], issued_by="smoke-test") |
| #36 | results = await runtime.arun_until_idle(max_quanta=args.max_quanta) |
| #37 | target = runtime.workspace_root / args.path |
| #38 | file_exists = target.exists() |
| #39 | actual_content = target.read_text(encoding=_TOOL_DEFAULTS.default_text_encoding) if file_exists else None |
| #40 | process = runtime.process.get(pid) |
| #41 | summary = { |
| #42 | "pid": pid, |
| #43 | "target": str(target), |
| #44 | "file_exists": file_exists, |
| #45 | "content_matches": actual_content == args.content, |
| #46 | "process_status": process.status.value, |
| #47 | "actions": [_action_name(result) for result in results], |
| #48 | "results": to_jsonable(results), |
| #49 | "audit_records": len(runtime.audit.trace()), |
| #50 | } |
| #51 | print(json.dumps(summary, indent=2, ensure_ascii=False)) |
| #52 | if not file_exists or actual_content != args.content: |
| #53 | raise SystemExit(2) |
| #54 | if process.status != ProcessStatus.EXITED: |
| #55 | raise SystemExit(3) |
| #56 | finally: |
| #57 | runtime.close() |
| #58 | |
| #59 | |
| #60 | def _action_name(result: object) -> str | None: |
| #61 | if not isinstance(result, dict): |
| #62 | return None |
| #63 | action = result.get("action") |
| #64 | if isinstance(action, dict): |
| #65 | return action.get("action") |
| #66 | return None |
| #67 | |
| #68 | |
| #69 | if __name__ == "__main__": |
| #70 | main() |
| #71 |