repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
Mirrored from https://github.com/yingqi-z20/Agent-libOS
stars
latest
clone command
git clone gitlawb://did:key:z6MkqRzA...RfoM/yingqi-z20-Agen...git clone gitlawb://did:key:z6MkqRzA.../yingqi-z20-Agen...d98dd2c9IPC1d ago| #1 | from __future__ import annotations |
| #2 | |
| #3 | import json |
| #4 | from typing import Any |
| #5 | |
| #6 | from agent_libos.models import EventPriority, EventType, ProcessMessage, ProcessMessageKind, ProcessMessageStatus, ProcessStatus |
| #7 | from agent_libos.models.exceptions import NotFound, ProcessError, ProcessMessageWaitRequired |
| #8 | from agent_libos.runtime.audit_manager import AuditManager |
| #9 | from agent_libos.runtime.event_bus import EventBus |
| #10 | from agent_libos.storage import SQLiteStore |
| #11 | from agent_libos.utils.ids import new_id, utc_now |
| #12 | |
| #13 | |
| #14 | class ProcessMessageManager: |
| #15 | """Per-process message queues with explicit read/ack semantics.""" |
| #16 | |
| #17 | TERMINAL_STATUSES = {ProcessStatus.EXITED, ProcessStatus.FAILED, ProcessStatus.KILLED} |
| #18 | WAIT_STATUS_PREFIX = "waiting_message:" |
| #19 | |
| #20 | def __init__(self, store: SQLiteStore, audit: AuditManager, events: EventBus): |
| #21 | self.store = store |
| #22 | self.audit = audit |
| #23 | self.events = events |
| #24 | |
| #25 | def post( |
| #26 | self, |
| #27 | *, |
| #28 | sender: str, |
| #29 | recipient_pid: str, |
| #30 | kind: ProcessMessageKind | str = ProcessMessageKind.NORMAL, |
| #31 | channel: str = "default", |
| #32 | correlation_id: str | None = None, |
| #33 | reply_to: str | None = None, |
| #34 | subject: str = "", |
| #35 | body: str = "", |
| #36 | payload: dict[str, Any] | None = None, |
| #37 | ) -> ProcessMessage: |
| #38 | recipient = self.store.get_process(recipient_pid) |
| #39 | if recipient is None: |
| #40 | raise NotFound(f"process not found: {recipient_pid}") |
| #41 | if recipient.status in self.TERMINAL_STATUSES: |
| #42 | raise ProcessError(f"cannot post message to terminal process: {recipient_pid}") |
| #43 | selected_kind = ProcessMessageKind(kind) |
| #44 | now = utc_now() |
| #45 | message = ProcessMessage( |
| #46 | message_id=new_id("pmsg"), |
| #47 | sender=sender, |
| #48 | recipient_pid=recipient_pid, |
| #49 | kind=selected_kind, |
| #50 | channel=self._normalize_channel(channel), |
| #51 | correlation_id=correlation_id, |
| #52 | reply_to=reply_to, |
| #53 | subject=str(subject or ""), |
| #54 | body=str(body or ""), |
| #55 | payload=dict(payload or {}), |
| #56 | status=ProcessMessageStatus.UNREAD, |
| #57 | created_at=now, |
| #58 | updated_at=now, |
| #59 | ) |
| #60 | self.store.insert_process_message(message) |
| #61 | self.events.emit( |
| #62 | EventType.PROCESS_MESSAGE_POSTED, |
| #63 | source=sender, |
| #64 | target=recipient_pid, |
| #65 | payload={ |
| #66 | "message_id": message.message_id, |
| #67 | "kind": message.kind.value, |
| #68 | "channel": message.channel, |
| #69 | "correlation_id": message.correlation_id, |
| #70 | "reply_to": message.reply_to, |
| #71 | "subject": message.subject, |
| #72 | "sender": sender, |
| #73 | }, |
| #74 | priority=EventPriority.HIGH if message.kind == ProcessMessageKind.INTERRUPT else EventPriority.NORMAL, |
| #75 | ) |
| #76 | self.audit.record( |
| #77 | actor=sender, |
| #78 | action="process.message.post", |
| #79 | target=f"process:{recipient_pid}", |
| #80 | decision={ |
| #81 | "message_id": message.message_id, |
| #82 | "kind": message.kind.value, |
| #83 | "channel": message.channel, |
| #84 | "correlation_id": message.correlation_id, |
| #85 | "reply_to": message.reply_to, |
| #86 | "subject": message.subject, |
| #87 | }, |
| #88 | ) |
| #89 | self._wake_if_waiting_for_message(message) |
| #90 | return message |
| #91 | |
| #92 | def send_from_process( |
| #93 | self, |
| #94 | sender_pid: str, |
| #95 | recipient_pid: str, |
| #96 | *, |
| #97 | kind: ProcessMessageKind | str = ProcessMessageKind.NORMAL, |
| #98 | channel: str = "default", |
| #99 | correlation_id: str | None = None, |
| #100 | reply_to: str | None = None, |
| #101 | subject: str = "", |
| #102 | body: str = "", |
| #103 | payload: dict[str, Any] | None = None, |
| #104 | ) -> ProcessMessage: |
| #105 | self._require_related_process(sender_pid, recipient_pid) |
| #106 | return self.post( |
| #107 | sender=sender_pid, |
| #108 | recipient_pid=recipient_pid, |
| #109 | kind=kind, |
| #110 | channel=channel, |
| #111 | correlation_id=correlation_id, |
| #112 | reply_to=reply_to, |
| #113 | subject=subject, |
| #114 | body=body, |
| #115 | payload=payload, |
| #116 | ) |
| #117 | |
| #118 | def unread( |
| #119 | self, |
| #120 | pid: str, |
| #121 | *, |
| #122 | kind: ProcessMessageKind | str | None = None, |
| #123 | sender: str | None = None, |
| #124 | channel: str | None = None, |
| #125 | correlation_id: str | None = None, |
| #126 | reply_to: str | None = None, |
| #127 | message_ids: list[str] | None = None, |
| #128 | ) -> list[ProcessMessage]: |
| #129 | self._require_process(pid) |
| #130 | return self.store.list_process_messages( |
| #131 | pid, |
| #132 | status=ProcessMessageStatus.UNREAD, |
| #133 | kind=ProcessMessageKind(kind) if kind is not None else None, |
| #134 | sender=sender, |
| #135 | channel=self._normalize_channel(channel) if channel is not None else None, |
| #136 | correlation_id=correlation_id, |
| #137 | reply_to=reply_to, |
| #138 | message_ids=message_ids, |
| #139 | ) |
| #140 | |
| #141 | def list( |
| #142 | self, |
| #143 | pid: str, |
| #144 | *, |
| #145 | include_acked: bool = False, |
| #146 | kind: ProcessMessageKind | str | None = None, |
| #147 | sender: str | None = None, |
| #148 | channel: str | None = None, |
| #149 | correlation_id: str | None = None, |
| #150 | reply_to: str | None = None, |
| #151 | message_ids: list[str] | None = None, |
| #152 | limit: int | None = None, |
| #153 | ) -> list[ProcessMessage]: |
| #154 | self._require_process(pid) |
| #155 | messages = self.store.list_process_messages( |
| #156 | pid, |
| #157 | status=None if include_acked else ProcessMessageStatus.UNREAD, |
| #158 | kind=ProcessMessageKind(kind) if kind is not None else None, |
| #159 | sender=sender, |
| #160 | channel=self._normalize_channel(channel) if channel is not None else None, |
| #161 | correlation_id=correlation_id, |
| #162 | reply_to=reply_to, |
| #163 | message_ids=message_ids, |
| #164 | ) |
| #165 | if limit is not None: |
| #166 | return messages[: max(0, int(limit))] |
| #167 | return messages |
| #168 | |
| #169 | def receive( |
| #170 | self, |
| #171 | pid: str, |
| #172 | *, |
| #173 | block: bool = False, |
| #174 | include_acked: bool = False, |
| #175 | kind: ProcessMessageKind | str | None = None, |
| #176 | sender: str | None = None, |
| #177 | channel: str | None = None, |
| #178 | correlation_id: str | None = None, |
| #179 | reply_to: str | None = None, |
| #180 | message_ids: list[str] | None = None, |
| #181 | limit: int | None = None, |
| #182 | ) -> list[ProcessMessage]: |
| #183 | filters = self._filters( |
| #184 | kind=kind, |
| #185 | sender=sender, |
| #186 | channel=channel, |
| #187 | correlation_id=correlation_id, |
| #188 | reply_to=reply_to, |
| #189 | message_ids=message_ids, |
| #190 | ) |
| #191 | messages = self.list( |
| #192 | pid, |
| #193 | include_acked=include_acked, |
| #194 | kind=filters.get("kind"), |
| #195 | sender=filters.get("sender"), |
| #196 | channel=filters.get("channel"), |
| #197 | correlation_id=filters.get("correlation_id"), |
| #198 | reply_to=filters.get("reply_to"), |
| #199 | message_ids=filters.get("message_ids"), |
| #200 | limit=limit, |
| #201 | ) |
| #202 | if messages or not block: |
| #203 | return messages |
| #204 | process = self._require_process(pid) |
| #205 | process.status = ProcessStatus.WAITING_EVENT |
| #206 | process.status_message = self._wait_status_message(filters) |
| #207 | process.updated_at = utc_now() |
| #208 | self.store.update_process(process) |
| #209 | self.audit.record( |
| #210 | actor=pid, |
| #211 | action="process.message.wait", |
| #212 | target=f"process:{pid}", |
| #213 | decision={"filters": filters, "block": True}, |
| #214 | ) |
| #215 | raise ProcessMessageWaitRequired( |
| #216 | recipient_pid=pid, |
| #217 | filters=filters, |
| #218 | message=f"{pid} is waiting for process message filters={filters}", |
| #219 | ) |
| #220 | |
| #221 | def ack( |
| #222 | self, |
| #223 | pid: str, |
| #224 | message_ids: list[str] | None = None, |
| #225 | *, |
| #226 | kind: ProcessMessageKind | str | None = None, |
| #227 | sender: str | None = None, |
| #228 | channel: str | None = None, |
| #229 | correlation_id: str | None = None, |
| #230 | reply_to: str | None = None, |
| #231 | ) -> list[ProcessMessage]: |
| #232 | selected_ids = set(message_ids or []) |
| #233 | messages = self.list( |
| #234 | pid, |
| #235 | include_acked=False, |
| #236 | kind=kind, |
| #237 | sender=sender, |
| #238 | channel=channel, |
| #239 | correlation_id=correlation_id, |
| #240 | reply_to=reply_to, |
| #241 | ) |
| #242 | if selected_ids: |
| #243 | messages = [message for message in messages if message.message_id in selected_ids] |
| #244 | now = utc_now() |
| #245 | acked: list[ProcessMessage] = [] |
| #246 | for message in messages: |
| #247 | message.status = ProcessMessageStatus.ACKED |
| #248 | message.acked_at = now |
| #249 | message.updated_at = now |
| #250 | self.store.update_process_message(message) |
| #251 | acked.append(message) |
| #252 | if acked: |
| #253 | self.events.emit( |
| #254 | EventType.PROCESS_MESSAGE_ACKED, |
| #255 | source=pid, |
| #256 | target=pid, |
| #257 | payload={"message_ids": [message.message_id for message in acked], "count": len(acked)}, |
| #258 | ) |
| #259 | self.audit.record( |
| #260 | actor=pid, |
| #261 | action="process.message.ack", |
| #262 | target=f"process:{pid}", |
| #263 | decision={"message_ids": [message.message_id for message in acked], "count": len(acked)}, |
| #264 | ) |
| #265 | return acked |
| #266 | |
| #267 | def notice( |
| #268 | self, |
| #269 | pid: str, |
| #270 | *, |
| #271 | kind: ProcessMessageKind | str, |
| #272 | phase: str, |
| #273 | source: str = "runtime", |
| #274 | ) -> dict[str, Any] | None: |
| #275 | messages = self.unread(pid, kind=kind) |
| #276 | if not messages: |
| #277 | return None |
| #278 | selected_kind = ProcessMessageKind(kind) |
| #279 | payload = { |
| #280 | "phase": phase, |
| #281 | "kind": selected_kind.value, |
| #282 | "count": len(messages), |
| #283 | "message_ids": [message.message_id for message in messages], |
| #284 | "channels": sorted({message.channel for message in messages}), |
| #285 | "correlation_ids": sorted({message.correlation_id for message in messages if message.correlation_id}), |
| #286 | "instruction": "Call read_process_messages or receive_process_messages to inspect and acknowledge unread process messages.", |
| #287 | } |
| #288 | self.events.emit( |
| #289 | EventType.PROCESS_MESSAGE_NOTICE, |
| #290 | source=source, |
| #291 | target=pid, |
| #292 | payload=payload, |
| #293 | priority=EventPriority.HIGH if selected_kind == ProcessMessageKind.INTERRUPT else EventPriority.NORMAL, |
| #294 | ) |
| #295 | self.audit.record( |
| #296 | actor=source, |
| #297 | action="process.message.notice", |
| #298 | target=f"process:{pid}", |
| #299 | decision=payload, |
| #300 | ) |
| #301 | return payload |
| #302 | |
| #303 | def _require_related_process(self, sender_pid: str, recipient_pid: str) -> None: |
| #304 | sender = self._require_process(sender_pid) |
| #305 | recipient = self._require_process(recipient_pid) |
| #306 | if sender.pid == recipient.pid: |
| #307 | return |
| #308 | if sender.parent_pid == recipient.pid: |
| #309 | return |
| #310 | if recipient.parent_pid == sender.pid: |
| #311 | return |
| #312 | raise ProcessError(f"{sender_pid} can only message itself, its parent, or its direct children") |
| #313 | |
| #314 | def _require_process(self, pid: str): |
| #315 | process = self.store.get_process(pid) |
| #316 | if process is None: |
| #317 | raise NotFound(f"process not found: {pid}") |
| #318 | return process |
| #319 | |
| #320 | def _wake_if_waiting_for_message(self, message: ProcessMessage) -> None: |
| #321 | process = self.store.get_process(message.recipient_pid) |
| #322 | if process is None or process.status != ProcessStatus.WAITING_EVENT: |
| #323 | return |
| #324 | filters = self._filters_from_wait_status(process.status_message) |
| #325 | if filters is None or not self._message_matches(message, filters): |
| #326 | return |
| #327 | process.status = ProcessStatus.RUNNABLE |
| #328 | process.status_message = None |
| #329 | process.updated_at = utc_now() |
| #330 | self.store.update_process(process) |
| #331 | self.audit.record( |
| #332 | actor="process.message", |
| #333 | action="process.message.wait_wake", |
| #334 | target=f"process:{process.pid}", |
| #335 | decision={"message_id": message.message_id, "filters": filters}, |
| #336 | ) |
| #337 | |
| #338 | def _message_matches(self, message: ProcessMessage, filters: dict[str, Any]) -> bool: |
| #339 | if message.status != ProcessMessageStatus.UNREAD: |
| #340 | return False |
| #341 | if filters.get("kind") is not None and message.kind.value != filters["kind"]: |
| #342 | return False |
| #343 | if filters.get("sender") is not None and message.sender != filters["sender"]: |
| #344 | return False |
| #345 | if filters.get("channel") is not None and message.channel != filters["channel"]: |
| #346 | return False |
| #347 | if filters.get("correlation_id") is not None and message.correlation_id != filters["correlation_id"]: |
| #348 | return False |
| #349 | if filters.get("reply_to") is not None and message.reply_to != filters["reply_to"]: |
| #350 | return False |
| #351 | message_ids = filters.get("message_ids") |
| #352 | if message_ids is not None and message.message_id not in set(message_ids): |
| #353 | return False |
| #354 | return True |
| #355 | |
| #356 | def _filters( |
| #357 | self, |
| #358 | *, |
| #359 | kind: ProcessMessageKind | str | None = None, |
| #360 | sender: str | None = None, |
| #361 | channel: str | None = None, |
| #362 | correlation_id: str | None = None, |
| #363 | reply_to: str | None = None, |
| #364 | message_ids: list[str] | None = None, |
| #365 | ) -> dict[str, Any]: |
| #366 | return { |
| #367 | "kind": ProcessMessageKind(kind).value if kind is not None else None, |
| #368 | "sender": sender, |
| #369 | "channel": self._normalize_channel(channel) if channel is not None else None, |
| #370 | "correlation_id": correlation_id, |
| #371 | "reply_to": reply_to, |
| #372 | "message_ids": list(message_ids) if message_ids is not None else None, |
| #373 | } |
| #374 | |
| #375 | def _wait_status_message(self, filters: dict[str, Any]) -> str: |
| #376 | return f"{self.WAIT_STATUS_PREFIX}{json.dumps(filters, sort_keys=True)}" |
| #377 | |
| #378 | def _filters_from_wait_status(self, status_message: str | None) -> dict[str, Any] | None: |
| #379 | if not status_message or not status_message.startswith(self.WAIT_STATUS_PREFIX): |
| #380 | return None |
| #381 | try: |
| #382 | decoded = json.loads(status_message[len(self.WAIT_STATUS_PREFIX) :]) |
| #383 | except json.JSONDecodeError: |
| #384 | return None |
| #385 | return decoded if isinstance(decoded, dict) else None |
| #386 | |
| #387 | def _normalize_channel(self, channel: str | None) -> str: |
| #388 | selected = (channel or "default").strip() |
| #389 | if not selected: |
| #390 | raise ProcessError("process message channel must be non-empty") |
| #391 | if len(selected) > 128: |
| #392 | raise ProcessError("process message channel is too long") |
| #393 | return selected |
| #394 |