repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
Mirrored from https://github.com/yingqi-z20/Agent-libOS
stars
latest
clone command
git clone gitlawb://did:key:z6MkqRzA...RfoM/yingqi-z20-Agen...git clone gitlawb://did:key:z6MkqRzA.../yingqi-z20-Agen...d98dd2c9IPC1d ago| #1 | from __future__ import annotations |
| #2 | |
| #3 | import asyncio |
| #4 | import inspect |
| #5 | from collections.abc import Awaitable, Callable |
| #6 | from typing import Any |
| #7 | |
| #8 | from agent_libos.config import DEFAULT_CONFIG |
| #9 | from agent_libos.utils.ids import utc_now |
| #10 | from agent_libos.models import ProcessStatus |
| #11 | from agent_libos.runtime.audit_manager import AuditManager |
| #12 | from agent_libos.storage import SQLiteStore |
| #13 | |
| #14 | |
| #15 | Quantum = Callable[[str], Any | Awaitable[Any]] |
| #16 | _SCHEDULER_DEFAULTS = DEFAULT_CONFIG.scheduler |
| #17 | |
| #18 | |
| #19 | class AsyncProcessScheduler: |
| #20 | """Cooperative async scheduler for AgentProcess quanta.""" |
| #21 | |
| #22 | TERMINAL_STATUSES = {ProcessStatus.EXITED, ProcessStatus.FAILED, ProcessStatus.KILLED} |
| #23 | |
| #24 | def __init__(self, store: SQLiteStore, audit: AuditManager, poll_interval_s: float = _SCHEDULER_DEFAULTS.poll_interval_s): |
| #25 | self.store = store |
| #26 | self.audit = audit |
| #27 | self.poll_interval_s = poll_interval_s |
| #28 | |
| #29 | def next_runnable(self) -> str | None: |
| #30 | runnable = [p for p in self.store.list_processes() if p.status == ProcessStatus.RUNNABLE] |
| #31 | runnable.sort(key=lambda proc: proc.created_at) |
| #32 | return runnable[0].pid if runnable else None |
| #33 | |
| #34 | def runnable_pids(self) -> list[str]: |
| #35 | runnable = [p for p in self.store.list_processes() if p.status == ProcessStatus.RUNNABLE] |
| #36 | runnable.sort(key=lambda proc: proc.created_at) |
| #37 | return [proc.pid for proc in runnable] |
| #38 | |
| #39 | async def arun_once(self, quantum: Quantum) -> Any: |
| #40 | pid = self.next_runnable() |
| #41 | if pid is None: |
| #42 | return None |
| #43 | return await self._run_quantum(pid, quantum) |
| #44 | |
| #45 | def run_once(self, quantum: Quantum) -> Any: |
| #46 | return _run_sync(self.arun_once(quantum)) |
| #47 | |
| #48 | async def arun_until_idle(self, quantum: Quantum, max_quanta: int = _SCHEDULER_DEFAULTS.max_quanta) -> list[Any]: |
| #49 | results: list[Any] = [] |
| #50 | tasks: dict[str, asyncio.Task[list[Any]]] = {} |
| #51 | quanta_used = 0 |
| #52 | quanta_lock = asyncio.Lock() |
| #53 | |
| #54 | async def reserve_quantum() -> bool: |
| #55 | # The quantum budget is global across process tasks, not per process. |
| #56 | nonlocal quanta_used |
| #57 | async with quanta_lock: |
| #58 | if quanta_used >= max_quanta: |
| #59 | return False |
| #60 | quanta_used += 1 |
| #61 | return True |
| #62 | |
| #63 | async def process_loop(pid: str) -> list[Any]: |
| #64 | process_results: list[Any] = [] |
| #65 | while await reserve_quantum(): |
| #66 | process = self.store.get_process(pid) |
| #67 | if process is None or process.status != ProcessStatus.RUNNABLE: |
| #68 | break |
| #69 | try: |
| #70 | process_results.append(await self._run_quantum(pid, quantum)) |
| #71 | except Exception as exc: |
| #72 | self._fail_process_task(pid, exc) |
| #73 | process_results.append({"ok": False, "pid": pid, "error": str(exc)}) |
| #74 | break |
| #75 | latest = self.store.get_process(pid) |
| #76 | if latest is None or latest.status != ProcessStatus.RUNNABLE: |
| #77 | break |
| #78 | # Yield so a sleeping or long-running async tool in another |
| #79 | # process can advance without this pid monopolizing the loop. |
| #80 | await asyncio.sleep(0) |
| #81 | return process_results |
| #82 | |
| #83 | while True: |
| #84 | # Start one task per runnable pid. Each task keeps advancing its own |
| #85 | # process until it blocks, exits, fails, or the shared budget is used. |
| #86 | for pid in self.runnable_pids(): |
| #87 | if quanta_used >= max_quanta: |
| #88 | break |
| #89 | if pid not in tasks: |
| #90 | tasks[pid] = asyncio.create_task(process_loop(pid), name=f"agent-process:{pid}") |
| #91 | |
| #92 | if not tasks: |
| #93 | break |
| #94 | |
| #95 | done, _pending = await asyncio.wait( |
| #96 | tasks.values(), |
| #97 | timeout=self.poll_interval_s, |
| #98 | return_when=asyncio.FIRST_COMPLETED, |
| #99 | ) |
| #100 | for task in done: |
| #101 | pid = self._pid_for_task(tasks, task) |
| #102 | if pid is not None: |
| #103 | tasks.pop(pid, None) |
| #104 | results.extend(task.result()) |
| #105 | |
| #106 | if quanta_used >= max_quanta and not done: |
| #107 | done_all, _pending = await asyncio.wait(tasks.values(), return_when=asyncio.ALL_COMPLETED) |
| #108 | for task in done_all: |
| #109 | pid = self._pid_for_task(tasks, task) |
| #110 | if pid is not None: |
| #111 | tasks.pop(pid, None) |
| #112 | results.extend(task.result()) |
| #113 | break |
| #114 | |
| #115 | return results |
| #116 | |
| #117 | def run_until_idle(self, quantum: Quantum, max_quanta: int = _SCHEDULER_DEFAULTS.max_quanta) -> list[Any]: |
| #118 | return _run_sync(self.arun_until_idle(quantum, max_quanta=max_quanta)) |
| #119 | |
| #120 | async def _run_quantum(self, pid: str, quantum: Quantum) -> Any: |
| #121 | process = self.store.get_process(pid) |
| #122 | if process is None or process.status != ProcessStatus.RUNNABLE: |
| #123 | return None |
| #124 | process.status = ProcessStatus.RUNNING |
| #125 | process.updated_at = utc_now() |
| #126 | self.store.update_process(process) |
| #127 | self.audit.record(actor="scheduler", action="scheduler.run_quantum", target=f"process:{pid}") |
| #128 | try: |
| #129 | result = quantum(pid) |
| #130 | if inspect.isawaitable(result): |
| #131 | return await result |
| #132 | return result |
| #133 | finally: |
| #134 | latest = self.store.get_process(pid) |
| #135 | # A primitive may deliberately set WAITING_HUMAN, EXITED, or another |
| #136 | # status during the quantum. Only restore RUNNABLE for plain returns. |
| #137 | if latest is not None and latest.status == ProcessStatus.RUNNING: |
| #138 | latest.status = ProcessStatus.RUNNABLE |
| #139 | latest.updated_at = utc_now() |
| #140 | self.store.update_process(latest) |
| #141 | |
| #142 | def _fail_process_task(self, pid: str, exc: Exception) -> None: |
| #143 | process = self.store.get_process(pid) |
| #144 | if process is not None and process.status not in self.TERMINAL_STATUSES: |
| #145 | process.status = ProcessStatus.FAILED |
| #146 | process.status_message = f"scheduler task failed: {exc}" |
| #147 | process.updated_at = utc_now() |
| #148 | self.store.update_process(process) |
| #149 | self.audit.record( |
| #150 | actor="scheduler", |
| #151 | action="scheduler.process_task_failed", |
| #152 | target=f"process:{pid}", |
| #153 | decision={"error": str(exc), "error_type": type(exc).__name__}, |
| #154 | ) |
| #155 | |
| #156 | def _pid_for_task(self, tasks: dict[str, asyncio.Task[list[Any]]], task: asyncio.Task[list[Any]]) -> str | None: |
| #157 | for pid, candidate in tasks.items(): |
| #158 | if candidate is task: |
| #159 | return pid |
| #160 | return None |
| #161 | |
| #162 | |
| #163 | class SimpleScheduler(AsyncProcessScheduler): |
| #164 | pass |
| #165 | |
| #166 | |
| #167 | def _run_sync(awaitable: Awaitable[Any]) -> Any: |
| #168 | try: |
| #169 | asyncio.get_running_loop() |
| #170 | except RuntimeError: |
| #171 | return asyncio.run(awaitable) |
| #172 | if inspect.iscoroutine(awaitable): |
| #173 | awaitable.close() |
| #174 | raise RuntimeError("Cannot use sync scheduler APIs inside a running event loop. Use async APIs instead.") |
| #175 |