repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
Mirrored from https://github.com/yingqi-z20/Agent-libOS
stars
latest
clone command
git clone gitlawb://did:key:z6MkqRzA...RfoM/yingqi-z20-Agen...git clone gitlawb://did:key:z6MkqRzA.../yingqi-z20-Agen...d98dd2c9IPC1d ago| #1 | from __future__ import annotations |
| #2 | |
| #3 | import asyncio |
| #4 | import inspect |
| #5 | from typing import Any, TYPE_CHECKING |
| #6 | |
| #7 | from agent_libos.config import DEFAULT_CONFIG, AgentLibOSConfig |
| #8 | from agent_libos.models.exceptions import HumanApprovalRequired, ProcessMessageWaitRequired, ProcessWaitRequired |
| #9 | from agent_libos.utils.ids import new_id, utc_now |
| #10 | from agent_libos.llm.action_parser import parse_json_action |
| #11 | from agent_libos.llm.client import LLMClient |
| #12 | from agent_libos.llm.context_memory import LLMContextMemory |
| #13 | from agent_libos.llm.prompt import build_system_prompt, build_user_prompt |
| #14 | from agent_libos.llm.tool_protocol import tool_call_to_action |
| #15 | from agent_libos.models import ( |
| #16 | EventType, |
| #17 | HumanRequestStatus, |
| #18 | LLMCallRecord, |
| #19 | ObjectHandle, |
| #20 | ProcessMessageKind, |
| #21 | ProcessStatus, |
| #22 | ViewMode, |
| #23 | ) |
| #24 | |
| #25 | if TYPE_CHECKING: |
| #26 | from agent_libos.runtime.runtime import Runtime |
| #27 | |
| #28 | |
| #29 | class LLMProcessExecutor: |
| #30 | """Runs one model-selected tool action per process quantum.""" |
| #31 | |
| #32 | def __init__(self, runtime: "Runtime", client: LLMClient | None = None, config: AgentLibOSConfig | None = None): |
| #33 | self.runtime = runtime |
| #34 | self.config = config or DEFAULT_CONFIG |
| #35 | self.client = client or LLMClient.from_env() |
| #36 | # Pending actions are held outside Object Memory because the process has |
| #37 | # not received a tool result yet. The action is retried after the human |
| #38 | # queue records a decision, without asking the model for a new action. |
| #39 | self._pending_human_actions: dict[str, dict[str, Any]] = {} |
| #40 | self._pending_wait_actions: dict[str, dict[str, Any]] = {} |
| #41 | self._pending_message_actions: dict[str, dict[str, Any]] = {} |
| #42 | self.context_memory = LLMContextMemory(runtime) |
| #43 | |
| #44 | def run_once(self, pid: str) -> dict[str, Any]: |
| #45 | try: |
| #46 | asyncio.get_running_loop() |
| #47 | except RuntimeError: |
| #48 | return asyncio.run(self.arun_once(pid)) |
| #49 | raise RuntimeError("Cannot call run_once() inside a running event loop. Use await arun_once(...).") |
| #50 | |
| #51 | async def arun_once(self, pid: str) -> dict[str, Any]: |
| #52 | process = self.runtime.process.get(pid) |
| #53 | if process.status not in {ProcessStatus.RUNNING, ProcessStatus.RUNNABLE}: |
| #54 | return {"ok": False, "skipped": True, "status": process.status.value} |
| #55 | if pid in self._pending_human_actions: |
| #56 | return await self._resume_pending_human_action(pid) |
| #57 | if pid in self._pending_wait_actions: |
| #58 | return await self._resume_pending_wait_action(pid) |
| #59 | if pid in self._pending_message_actions: |
| #60 | return await self._resume_pending_message_action(pid) |
| #61 | image = self.runtime.images.get(process.image_id) or self.runtime.images[self.config.runtime.default_image_id] |
| #62 | if process.memory_view is None: |
| #63 | process.memory_view = self.runtime.memory.create_view(pid, [], mode=ViewMode.READ_ONLY) |
| #64 | process.updated_at = utc_now() |
| #65 | self.runtime.store.update_process(process) |
| #66 | |
| #67 | self._notify_interrupt_messages(pid) |
| #68 | source_view = self.context_memory.view_without_context(pid, process.memory_view) |
| #69 | source_context = self.runtime.memory.materialize_context( |
| #70 | pid, |
| #71 | source_view, |
| #72 | policy=image.context_policy, |
| #73 | budget_tokens=process.resource_budget.max_materialized_tokens, |
| #74 | ) |
| #75 | events = self.runtime.events.list(target=pid) |
| #76 | capabilities = self.runtime.capability.capabilities_for(pid) |
| #77 | # The prompt-visible tool list must match the process tool table. The |
| #78 | # broker still owns the real execute check, but showing extra tools |
| #79 | # teaches the model to choose actions the process cannot call. |
| #80 | tools = self.runtime.tools.visible_tools(pid) |
| #81 | context = self.context_memory.prepare( |
| #82 | pid=pid, |
| #83 | image=image, |
| #84 | process=process, |
| #85 | source_context=source_context, |
| #86 | events=events, |
| #87 | capabilities=capabilities, |
| #88 | tools=tools, |
| #89 | ) |
| #90 | messages = [ |
| #91 | {"role": "system", "content": build_system_prompt(image)}, |
| #92 | { |
| #93 | "role": "user", |
| #94 | "content": build_user_prompt( |
| #95 | process=process, |
| #96 | context=context, |
| #97 | events=events, |
| #98 | capabilities=capabilities, |
| #99 | tools=tools, |
| #100 | ), |
| #101 | }, |
| #102 | ] |
| #103 | self.runtime.audit.record( |
| #104 | actor=pid, |
| #105 | action="llm.request", |
| #106 | target=f"image:{image.image_id}", |
| #107 | input_refs=context.object_refs, |
| #108 | decision={"messages": len(messages), "policy": image.context_policy}, |
| #109 | ) |
| #110 | try: |
| #111 | completion, action = await self._complete_valid_action( |
| #112 | pid, |
| #113 | messages, |
| #114 | self.runtime.tools.openai_tool_schemas(pid), |
| #115 | ) |
| #116 | try: |
| #117 | result = await self.adispatch(pid, action) |
| #118 | except HumanApprovalRequired as exc: |
| #119 | return self._wait_for_human_action( |
| #120 | pid=pid, |
| #121 | action=action, |
| #122 | request_id=exc.request_id, |
| #123 | message=str(exc), |
| #124 | content_preview=completion.content[: self.config.llm.content_preview_chars], |
| #125 | tool_call_count=len(completion.tool_calls), |
| #126 | ) |
| #127 | except ProcessWaitRequired as exc: |
| #128 | return self._wait_for_child_action( |
| #129 | pid=pid, |
| #130 | action=action, |
| #131 | child_pid=exc.child_pid, |
| #132 | message=str(exc), |
| #133 | content_preview=completion.content[: self.config.llm.content_preview_chars], |
| #134 | tool_call_count=len(completion.tool_calls), |
| #135 | ) |
| #136 | except ProcessMessageWaitRequired as exc: |
| #137 | return self._wait_for_message_action( |
| #138 | pid=pid, |
| #139 | action=action, |
| #140 | filters=exc.filters, |
| #141 | message=str(exc), |
| #142 | content_preview=completion.content[: self.config.llm.content_preview_chars], |
| #143 | tool_call_count=len(completion.tool_calls), |
| #144 | ) |
| #145 | return self._completed_action_result( |
| #146 | pid=pid, |
| #147 | action=action, |
| #148 | result=result, |
| #149 | content_preview=completion.content[: self.config.llm.content_preview_chars], |
| #150 | tool_call_count=len(completion.tool_calls), |
| #151 | ) |
| #152 | except HumanApprovalRequired as exc: |
| #153 | self.runtime.audit.record( |
| #154 | actor=pid, |
| #155 | action="llm.action_waiting_human", |
| #156 | target=f"human_request:{exc.request_id}", |
| #157 | decision={"request_id": exc.request_id, "message": str(exc)}, |
| #158 | ) |
| #159 | return {"ok": False, "waiting_human": True, "request_id": exc.request_id} |
| #160 | except ProcessWaitRequired as exc: |
| #161 | self.runtime.audit.record( |
| #162 | actor=pid, |
| #163 | action="llm.action_waiting_child", |
| #164 | target=f"process:{exc.child_pid}", |
| #165 | decision={"child_pid": exc.child_pid, "message": str(exc)}, |
| #166 | ) |
| #167 | return {"ok": False, "waiting_event": True, "child_pid": exc.child_pid} |
| #168 | except ProcessMessageWaitRequired as exc: |
| #169 | self.runtime.audit.record( |
| #170 | actor=pid, |
| #171 | action="llm.action_waiting_message", |
| #172 | target=f"process:{pid}", |
| #173 | decision={"recipient_pid": exc.recipient_pid, "filters": exc.filters, "message": str(exc)}, |
| #174 | ) |
| #175 | return {"ok": False, "waiting_message": True, "filters": exc.filters} |
| #176 | except Exception as exc: |
| #177 | self.runtime.process.exit(pid, failed=True, message=f"LLM quantum failed: {exc}") |
| #178 | self.runtime.audit.record( |
| #179 | actor=pid, |
| #180 | action="llm.action_failed", |
| #181 | target=f"process:{pid}", |
| #182 | decision={"error": str(exc)}, |
| #183 | ) |
| #184 | return {"ok": False, "error": str(exc)} |
| #185 | |
| #186 | def _completed_action_result( |
| #187 | self, |
| #188 | pid: str, |
| #189 | action: dict[str, Any], |
| #190 | result: dict[str, Any], |
| #191 | content_preview: str, |
| #192 | tool_call_count: int, |
| #193 | resumed_after_human: bool = False, |
| #194 | resumed_after_message: bool = False, |
| #195 | ) -> dict[str, Any]: |
| #196 | self.runtime.audit.record( |
| #197 | actor=pid, |
| #198 | action="llm.action", |
| #199 | target=action.get("action"), |
| #200 | decision={ |
| #201 | "action": action, |
| #202 | "result": result, |
| #203 | "content_preview": content_preview, |
| #204 | "tool_call_count": tool_call_count, |
| #205 | "resumed_after_human": resumed_after_human, |
| #206 | "resumed_after_message": resumed_after_message, |
| #207 | }, |
| #208 | ) |
| #209 | payload = {"ok": True, "action": action, "result": result} |
| #210 | if resumed_after_human: |
| #211 | payload["resumed_after_human"] = True |
| #212 | if resumed_after_message: |
| #213 | payload["resumed_after_message"] = True |
| #214 | return payload |
| #215 | |
| #216 | def _wait_for_human_action( |
| #217 | self, |
| #218 | pid: str, |
| #219 | action: dict[str, Any], |
| #220 | request_id: str, |
| #221 | message: str, |
| #222 | content_preview: str, |
| #223 | tool_call_count: int, |
| #224 | ) -> dict[str, Any]: |
| #225 | self._pending_human_actions[pid] = { |
| #226 | "request_id": request_id, |
| #227 | "action": dict(action), |
| #228 | "content_preview": content_preview, |
| #229 | "tool_call_count": tool_call_count, |
| #230 | } |
| #231 | self.runtime.audit.record( |
| #232 | actor=pid, |
| #233 | action="llm.action_waiting_human", |
| #234 | target=f"human_request:{request_id}", |
| #235 | decision={ |
| #236 | "request_id": request_id, |
| #237 | "action": action, |
| #238 | "message": message, |
| #239 | "tool_call_count": tool_call_count, |
| #240 | }, |
| #241 | ) |
| #242 | return {"ok": False, "waiting_human": True, "request_id": request_id} |
| #243 | |
| #244 | def _wait_for_child_action( |
| #245 | self, |
| #246 | pid: str, |
| #247 | action: dict[str, Any], |
| #248 | child_pid: str, |
| #249 | message: str, |
| #250 | content_preview: str, |
| #251 | tool_call_count: int, |
| #252 | ) -> dict[str, Any]: |
| #253 | self._pending_wait_actions[pid] = { |
| #254 | "child_pid": child_pid, |
| #255 | "action": dict(action), |
| #256 | "content_preview": content_preview, |
| #257 | "tool_call_count": tool_call_count, |
| #258 | } |
| #259 | self.runtime.audit.record( |
| #260 | actor=pid, |
| #261 | action="llm.action_waiting_child", |
| #262 | target=f"process:{child_pid}", |
| #263 | decision={ |
| #264 | "child_pid": child_pid, |
| #265 | "action": action, |
| #266 | "message": message, |
| #267 | "tool_call_count": tool_call_count, |
| #268 | }, |
| #269 | ) |
| #270 | return {"ok": False, "waiting_event": True, "child_pid": child_pid} |
| #271 | |
| #272 | def _wait_for_message_action( |
| #273 | self, |
| #274 | pid: str, |
| #275 | action: dict[str, Any], |
| #276 | filters: dict[str, Any], |
| #277 | message: str, |
| #278 | content_preview: str, |
| #279 | tool_call_count: int, |
| #280 | ) -> dict[str, Any]: |
| #281 | self._pending_message_actions[pid] = { |
| #282 | "filters": dict(filters), |
| #283 | "action": dict(action), |
| #284 | "content_preview": content_preview, |
| #285 | "tool_call_count": tool_call_count, |
| #286 | } |
| #287 | self.runtime.audit.record( |
| #288 | actor=pid, |
| #289 | action="llm.action_waiting_message", |
| #290 | target=f"process:{pid}", |
| #291 | decision={ |
| #292 | "filters": filters, |
| #293 | "action": action, |
| #294 | "message": message, |
| #295 | "tool_call_count": tool_call_count, |
| #296 | }, |
| #297 | ) |
| #298 | return {"ok": False, "waiting_message": True, "filters": filters} |
| #299 | |
| #300 | async def _resume_pending_human_action(self, pid: str) -> dict[str, Any]: |
| #301 | pending = self._pending_human_actions[pid] |
| #302 | request_id = str(pending["request_id"]) |
| #303 | request = self.runtime.human.get(request_id) |
| #304 | if request.status == HumanRequestStatus.PENDING: |
| #305 | return {"ok": False, "waiting_human": True, "request_id": request_id} |
| #306 | |
| #307 | action = dict(pending["action"]) |
| #308 | self._pending_human_actions.pop(pid, None) |
| #309 | if request.status == HumanRequestStatus.APPROVED: |
| #310 | # Re-dispatch the exact same action. This preserves the original |
| #311 | # model decision and prevents hidden progress before approval. |
| #312 | try: |
| #313 | result = await self.adispatch(pid, action) |
| #314 | except HumanApprovalRequired as exc: |
| #315 | return self._wait_for_human_action( |
| #316 | pid=pid, |
| #317 | action=action, |
| #318 | request_id=exc.request_id, |
| #319 | message=str(exc), |
| #320 | content_preview=str(pending.get("content_preview", "")), |
| #321 | tool_call_count=int(pending.get("tool_call_count", 0)), |
| #322 | ) |
| #323 | except ProcessMessageWaitRequired as exc: |
| #324 | return self._wait_for_message_action( |
| #325 | pid=pid, |
| #326 | action=action, |
| #327 | filters=exc.filters, |
| #328 | message=str(exc), |
| #329 | content_preview=str(pending.get("content_preview", "")), |
| #330 | tool_call_count=int(pending.get("tool_call_count", 0)), |
| #331 | ) |
| #332 | return self._completed_action_result( |
| #333 | pid=pid, |
| #334 | action=action, |
| #335 | result=result, |
| #336 | content_preview=str(pending.get("content_preview", "")), |
| #337 | tool_call_count=int(pending.get("tool_call_count", 0)), |
| #338 | resumed_after_human=True, |
| #339 | ) |
| #340 | |
| #341 | error = f"human rejected approval request {request_id}" |
| #342 | # A rejected per-use approval is surfaced as a failed action result, not |
| #343 | # as a runtime crash, so the process can explain or choose another path. |
| #344 | self._emit_pending_action_rejected(pid, action, request_id, error) |
| #345 | result = {"ok": False, "tool_id": None, "result_oid": None, "payload": None, "error": error} |
| #346 | return self._completed_action_result( |
| #347 | pid=pid, |
| #348 | action=action, |
| #349 | result=result, |
| #350 | content_preview=str(pending.get("content_preview", "")), |
| #351 | tool_call_count=int(pending.get("tool_call_count", 0)), |
| #352 | resumed_after_human=True, |
| #353 | ) |
| #354 | |
| #355 | async def _resume_pending_wait_action(self, pid: str) -> dict[str, Any]: |
| #356 | pending = self._pending_wait_actions[pid] |
| #357 | child_pid = str(pending["child_pid"]) |
| #358 | child = self.runtime.process.get(child_pid) |
| #359 | if child.status not in {ProcessStatus.EXITED, ProcessStatus.FAILED, ProcessStatus.KILLED}: |
| #360 | return {"ok": False, "waiting_event": True, "child_pid": child_pid} |
| #361 | |
| #362 | action = dict(pending["action"]) |
| #363 | self._pending_wait_actions.pop(pid, None) |
| #364 | try: |
| #365 | result = await self.adispatch(pid, action) |
| #366 | except ProcessWaitRequired as exc: |
| #367 | return self._wait_for_child_action( |
| #368 | pid=pid, |
| #369 | action=action, |
| #370 | child_pid=exc.child_pid, |
| #371 | message=str(exc), |
| #372 | content_preview=str(pending.get("content_preview", "")), |
| #373 | tool_call_count=int(pending.get("tool_call_count", 0)), |
| #374 | ) |
| #375 | except HumanApprovalRequired as exc: |
| #376 | return self._wait_for_human_action( |
| #377 | pid=pid, |
| #378 | action=action, |
| #379 | request_id=exc.request_id, |
| #380 | message=str(exc), |
| #381 | content_preview=str(pending.get("content_preview", "")), |
| #382 | tool_call_count=int(pending.get("tool_call_count", 0)), |
| #383 | ) |
| #384 | except ProcessMessageWaitRequired as exc: |
| #385 | return self._wait_for_message_action( |
| #386 | pid=pid, |
| #387 | action=action, |
| #388 | filters=exc.filters, |
| #389 | message=str(exc), |
| #390 | content_preview=str(pending.get("content_preview", "")), |
| #391 | tool_call_count=int(pending.get("tool_call_count", 0)), |
| #392 | ) |
| #393 | return self._completed_action_result( |
| #394 | pid=pid, |
| #395 | action=action, |
| #396 | result=result, |
| #397 | content_preview=str(pending.get("content_preview", "")), |
| #398 | tool_call_count=int(pending.get("tool_call_count", 0)), |
| #399 | resumed_after_human=False, |
| #400 | ) |
| #401 | |
| #402 | async def _resume_pending_message_action(self, pid: str) -> dict[str, Any]: |
| #403 | pending = self._pending_message_actions[pid] |
| #404 | filters = dict(pending.get("filters") or {}) |
| #405 | messages = self.runtime.messages.unread( |
| #406 | pid, |
| #407 | kind=filters.get("kind"), |
| #408 | sender=filters.get("sender"), |
| #409 | channel=filters.get("channel"), |
| #410 | correlation_id=filters.get("correlation_id"), |
| #411 | reply_to=filters.get("reply_to"), |
| #412 | message_ids=filters.get("message_ids"), |
| #413 | ) |
| #414 | if not messages: |
| #415 | return {"ok": False, "waiting_message": True, "filters": filters} |
| #416 | action = dict(pending["action"]) |
| #417 | self._pending_message_actions.pop(pid, None) |
| #418 | try: |
| #419 | result = await self.adispatch(pid, action) |
| #420 | except ProcessMessageWaitRequired as exc: |
| #421 | return self._wait_for_message_action( |
| #422 | pid=pid, |
| #423 | action=action, |
| #424 | filters=exc.filters, |
| #425 | message=str(exc), |
| #426 | content_preview=str(pending.get("content_preview", "")), |
| #427 | tool_call_count=int(pending.get("tool_call_count", 0)), |
| #428 | ) |
| #429 | except ProcessWaitRequired as exc: |
| #430 | return self._wait_for_child_action( |
| #431 | pid=pid, |
| #432 | action=action, |
| #433 | child_pid=exc.child_pid, |
| #434 | message=str(exc), |
| #435 | content_preview=str(pending.get("content_preview", "")), |
| #436 | tool_call_count=int(pending.get("tool_call_count", 0)), |
| #437 | ) |
| #438 | except HumanApprovalRequired as exc: |
| #439 | return self._wait_for_human_action( |
| #440 | pid=pid, |
| #441 | action=action, |
| #442 | request_id=exc.request_id, |
| #443 | message=str(exc), |
| #444 | content_preview=str(pending.get("content_preview", "")), |
| #445 | tool_call_count=int(pending.get("tool_call_count", 0)), |
| #446 | ) |
| #447 | completed = self._completed_action_result( |
| #448 | pid=pid, |
| #449 | action=action, |
| #450 | result=result, |
| #451 | content_preview=str(pending.get("content_preview", "")), |
| #452 | tool_call_count=int(pending.get("tool_call_count", 0)), |
| #453 | resumed_after_message=True, |
| #454 | ) |
| #455 | return completed |
| #456 | |
| #457 | def _emit_pending_action_rejected(self, pid: str, action: dict[str, Any], request_id: str, error: str) -> None: |
| #458 | tool_name = str(action.get("action")) |
| #459 | source = f"tool:{tool_name}" |
| #460 | try: |
| #461 | handle = self.runtime.tools.resolve(tool_name, pid=pid) |
| #462 | source = f"tool:{handle.tool_id}" |
| #463 | except Exception: |
| #464 | pass |
| #465 | self.runtime.events.emit( |
| #466 | EventType.TOOL_FAILED, |
| #467 | source=source, |
| #468 | target=pid, |
| #469 | payload={ |
| #470 | "error": error, |
| #471 | "tool_name": tool_name, |
| #472 | "request_id": request_id, |
| #473 | "policy_decision": "deny", |
| #474 | "policy_reason": "human_rejected_per_use_approval", |
| #475 | }, |
| #476 | ) |
| #477 | self.runtime.audit.record( |
| #478 | actor=pid, |
| #479 | action="llm.pending_action_rejected", |
| #480 | target=tool_name, |
| #481 | decision={"request_id": request_id, "action": action, "error": error}, |
| #482 | ) |
| #483 | |
| #484 | def _completion_to_action(self, content: str, tool_calls: list[dict[str, Any]]) -> dict[str, Any]: |
| #485 | errors: list[str] = [] |
| #486 | for tool_call in reversed(tool_calls): |
| #487 | try: |
| #488 | return tool_call_to_action(tool_call) |
| #489 | except Exception as exc: |
| #490 | errors.append(str(exc)) |
| #491 | try: |
| #492 | return parse_json_action(content) |
| #493 | except Exception as exc: |
| #494 | detail = f"; invalid tool calls: {errors}" if errors else "" |
| #495 | raise ValueError( |
| #496 | f"no valid tool call or fallback JSON action found: {exc}{detail}; " |
| #497 | f"content preview: {content[: self.config.llm.content_preview_chars]!r}" |
| #498 | ) from exc |
| #499 | |
| #500 | async def _complete_valid_action( |
| #501 | self, |
| #502 | pid: str, |
| #503 | messages: list[dict[str, Any]], |
| #504 | tools: list[dict[str, Any]], |
| #505 | max_attempts: int | None = None, |
| #506 | ) -> tuple[Any, dict[str, Any]]: |
| #507 | attempt_messages = messages |
| #508 | last_error: Exception | None = None |
| #509 | selected_max_attempts = max_attempts or self.config.llm.action_repair_attempts |
| #510 | for attempt in range(selected_max_attempts): |
| #511 | completion = await self._complete_action_recorded( |
| #512 | pid=pid, |
| #513 | messages=attempt_messages, |
| #514 | tools=tools, |
| #515 | attempt=attempt + 1, |
| #516 | max_attempts=selected_max_attempts, |
| #517 | ) |
| #518 | try: |
| #519 | action = self._completion_to_action(completion.content, completion.tool_calls) |
| #520 | self._validate_dispatchable_action(pid, action) |
| #521 | return completion, action |
| #522 | except ValueError as exc: |
| #523 | last_error = exc |
| #524 | self.runtime.audit.record( |
| #525 | actor=pid, |
| #526 | action="llm.action_repair_requested", |
| #527 | target=f"process:{pid}", |
| #528 | decision={ |
| #529 | "attempt": attempt + 1, |
| #530 | "error": str(exc), |
| #531 | "tool_call_count": len(completion.tool_calls), |
| #532 | "tool_calls_preview": self._tool_call_previews(completion.tool_calls), |
| #533 | "content_preview": completion.content[: self.config.llm.content_preview_chars], |
| #534 | }, |
| #535 | ) |
| #536 | if attempt + 1 >= selected_max_attempts: |
| #537 | break |
| #538 | attempt_messages = [ |
| #539 | *messages, |
| #540 | { |
| #541 | "role": "user", |
| #542 | "content": ( |
| #543 | "The previous model response could not be dispatched: " |
| #544 | f"{exc}. Choose exactly one available OpenAI tool call by its function name. " |
| #545 | f"Available tool names: {sorted(self.runtime.process.get(pid).tool_table)}" |
| #546 | ), |
| #547 | }, |
| #548 | ] |
| #549 | assert last_error is not None |
| #550 | raise last_error |
| #551 | |
| #552 | def _validate_dispatchable_action(self, pid: str, action: dict[str, Any]) -> None: |
| #553 | name = str(action.get("action") or "").strip() |
| #554 | if not name: |
| #555 | raise ValueError("selected action has an empty tool name") |
| #556 | process = self.runtime.process.get(pid) |
| #557 | if name not in process.tool_table: |
| #558 | raise ValueError(f"selected action is not in this process tool table: {name}") |
| #559 | |
| #560 | def _tool_call_previews(self, tool_calls: list[dict[str, Any]]) -> list[dict[str, Any]]: |
| #561 | previews: list[dict[str, Any]] = [] |
| #562 | for tool_call in tool_calls: |
| #563 | raw_args = tool_call.get("arguments") |
| #564 | if isinstance(raw_args, str): |
| #565 | arguments_preview = raw_args[: self.config.llm.tool_arguments_preview_chars] |
| #566 | else: |
| #567 | arguments_preview = repr(raw_args)[: self.config.llm.tool_arguments_preview_chars] |
| #568 | previews.append( |
| #569 | { |
| #570 | "id": tool_call.get("id"), |
| #571 | "call_id": tool_call.get("call_id"), |
| #572 | "name": tool_call.get("name"), |
| #573 | "arguments_type": type(raw_args).__name__, |
| #574 | "arguments_preview": arguments_preview, |
| #575 | } |
| #576 | ) |
| #577 | return previews |
| #578 | |
| #579 | async def _complete_action_recorded( |
| #580 | self, |
| #581 | *, |
| #582 | pid: str, |
| #583 | messages: list[dict[str, Any]], |
| #584 | tools: list[dict[str, Any]], |
| #585 | attempt: int, |
| #586 | max_attempts: int, |
| #587 | ) -> Any: |
| #588 | call_id = new_id("llmcall") |
| #589 | process = self.runtime.process.get(pid) |
| #590 | created_at = utc_now() |
| #591 | request_options = { |
| #592 | "attempt": attempt, |
| #593 | "max_attempts": max_attempts, |
| #594 | "purpose": "action_selection", |
| #595 | "client_class": type(self.client).__name__, |
| #596 | "real_llm_client": isinstance(self.client, LLMClient), |
| #597 | } |
| #598 | try: |
| #599 | completion = await self._complete_action(messages, tools) |
| #600 | except Exception as exc: |
| #601 | self.runtime.store.insert_llm_call( |
| #602 | LLMCallRecord( |
| #603 | call_id=call_id, |
| #604 | pid=pid, |
| #605 | image_id=process.image_id if process is not None else None, |
| #606 | purpose="action_selection", |
| #607 | status="error", |
| #608 | messages=messages, |
| #609 | tools=tools, |
| #610 | request_options=request_options, |
| #611 | error=str(exc), |
| #612 | created_at=created_at, |
| #613 | completed_at=utc_now(), |
| #614 | ) |
| #615 | ) |
| #616 | raise |
| #617 | self.runtime.store.insert_llm_call( |
| #618 | LLMCallRecord( |
| #619 | call_id=call_id, |
| #620 | pid=pid, |
| #621 | image_id=process.image_id if process is not None else None, |
| #622 | purpose="action_selection", |
| #623 | status="ok", |
| #624 | api=getattr(completion, "api", None), |
| #625 | model=getattr(completion, "model", None), |
| #626 | request_id=getattr(completion, "request_id", None), |
| #627 | response_id=getattr(completion, "response_id", None), |
| #628 | messages=messages, |
| #629 | tools=tools, |
| #630 | request_options=request_options, |
| #631 | response_content=str(getattr(completion, "content", "")), |
| #632 | tool_calls=list(getattr(completion, "tool_calls", []) or []), |
| #633 | reasoning=getattr(completion, "reasoning", None), |
| #634 | usage=dict(getattr(completion, "usage", {}) or {}), |
| #635 | raw_response=getattr(completion, "raw", None), |
| #636 | created_at=created_at, |
| #637 | completed_at=utc_now(), |
| #638 | ) |
| #639 | ) |
| #640 | return completion |
| #641 | |
| #642 | async def _complete_action(self, messages: list[dict[str, Any]], tools: list[dict[str, Any]]) -> Any: |
| #643 | if hasattr(self.client, "acomplete_action"): |
| #644 | result = self.client.acomplete_action(messages, tools) |
| #645 | if inspect.isawaitable(result): |
| #646 | return await result |
| #647 | return result |
| #648 | return await asyncio.to_thread(self.client.complete_action, messages, tools) |
| #649 | |
| #650 | def dispatch(self, pid: str, action: dict[str, Any]) -> dict[str, Any]: |
| #651 | name = str(action["action"]) |
| #652 | args = {key: value for key, value in action.items() if key != "action"} |
| #653 | if notice := self._pre_tool_interrupt_notice(pid, name): |
| #654 | return notice |
| #655 | result = self.runtime.tools.call(pid, name, args) |
| #656 | if result.result_handle is not None: |
| #657 | self._add_to_view(pid, result.result_handle) |
| #658 | post_tool_notice = self._notify_normal_messages(pid) |
| #659 | return { |
| #660 | "ok": result.ok, |
| #661 | "tool_id": result.tool_id, |
| #662 | "result_oid": result.result_handle.oid if result.result_handle else None, |
| #663 | "payload": result.payload, |
| #664 | "error": result.error, |
| #665 | "message_notice": post_tool_notice, |
| #666 | } |
| #667 | |
| #668 | async def adispatch(self, pid: str, action: dict[str, Any]) -> dict[str, Any]: |
| #669 | name = str(action["action"]) |
| #670 | args = {key: value for key, value in action.items() if key != "action"} |
| #671 | if notice := self._pre_tool_interrupt_notice(pid, name): |
| #672 | return notice |
| #673 | result = await self.runtime.tools.acall(pid, name, args) |
| #674 | if result.result_handle is not None: |
| #675 | self._add_to_view(pid, result.result_handle) |
| #676 | post_tool_notice = self._notify_normal_messages(pid) |
| #677 | return { |
| #678 | "ok": result.ok, |
| #679 | "tool_id": result.tool_id, |
| #680 | "result_oid": result.result_handle.oid if result.result_handle else None, |
| #681 | "payload": result.payload, |
| #682 | "error": result.error, |
| #683 | "message_notice": post_tool_notice, |
| #684 | } |
| #685 | |
| #686 | def _notify_interrupt_messages(self, pid: str) -> dict[str, Any] | None: |
| #687 | return self.runtime.messages.notice( |
| #688 | pid, |
| #689 | kind=ProcessMessageKind.INTERRUPT, |
| #690 | phase="before_llm_tool_selection", |
| #691 | source="llm.executor", |
| #692 | ) |
| #693 | |
| #694 | def _pre_tool_interrupt_notice(self, pid: str, tool_name: str) -> dict[str, Any] | None: |
| #695 | if tool_name in {"read_process_messages", "receive_process_messages"}: |
| #696 | return None |
| #697 | notice = self.runtime.messages.notice( |
| #698 | pid, |
| #699 | kind=ProcessMessageKind.INTERRUPT, |
| #700 | phase="before_tool_call", |
| #701 | source="llm.executor", |
| #702 | ) |
| #703 | if notice is None: |
| #704 | return None |
| #705 | return { |
| #706 | "ok": False, |
| #707 | "tool_id": None, |
| #708 | "result_oid": None, |
| #709 | "payload": {"message_notice": notice}, |
| #710 | "error": "unread interrupt process messages are waiting; call read_process_messages or receive_process_messages first", |
| #711 | "interrupted_by_message": True, |
| #712 | "message_notice": notice, |
| #713 | } |
| #714 | |
| #715 | def _notify_normal_messages(self, pid: str) -> dict[str, Any] | None: |
| #716 | return self.runtime.messages.notice( |
| #717 | pid, |
| #718 | kind=ProcessMessageKind.NORMAL, |
| #719 | phase="after_tool_call", |
| #720 | source="llm.executor", |
| #721 | ) |
| #722 | |
| #723 | def _handles_for_oids(self, pid: str, oids: list[str]) -> list[ObjectHandle]: |
| #724 | return [self._handle_for_oid(pid, oid) for oid in oids] |
| #725 | |
| #726 | def _handle_for_oid(self, pid: str, oid: str) -> ObjectHandle: |
| #727 | process = self.runtime.process.get(pid) |
| #728 | if process.memory_view is not None: |
| #729 | for handle in process.memory_view.roots: |
| #730 | if handle.oid == oid: |
| #731 | return handle |
| #732 | return self.runtime.capability.handle_for_object( |
| #733 | pid, |
| #734 | oid, |
| #735 | {"read", "materialize", "link", "diff"}, |
| #736 | issued_by="llm.executor", |
| #737 | ) |
| #738 | |
| #739 | def _add_to_view(self, pid: str, handle: ObjectHandle) -> None: |
| #740 | process = self.runtime.process.get(pid) |
| #741 | if process.memory_view is None: |
| #742 | process.memory_view = self.runtime.memory.create_view(pid, [handle], mode=ViewMode.READ_ONLY) |
| #743 | elif all(existing.oid != handle.oid for existing in process.memory_view.roots): |
| #744 | process.memory_view.roots.append(handle) |
| #745 | process.updated_at = utc_now() |
| #746 | self.runtime.store.update_process(process) |
| #747 |