repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
Mirrored from https://github.com/yingqi-z20/Agent-libOS
stars
latest
clone command
git clone gitlawb://did:key:z6MkqRzA...RfoM/yingqi-z20-Agen...git clone gitlawb://did:key:z6MkqRzA.../yingqi-z20-Agen...d98dd2c9IPC1d ago| #1 | from __future__ import annotations |
| #2 | |
| #3 | import hashlib |
| #4 | import os |
| #5 | from dataclasses import dataclass |
| #6 | from typing import Any, Iterable |
| #7 | |
| #8 | from agent_libos.capability.manager import CapabilityManager |
| #9 | from agent_libos.config import DEFAULT_CONFIG |
| #10 | from agent_libos.models.exceptions import CapabilityDenied, HumanApprovalRequired, NotFound, ValidationError |
| #11 | from agent_libos.models import Capability, CapabilityRight, EventType |
| #12 | from agent_libos.runtime.audit_manager import AuditManager |
| #13 | from agent_libos.runtime.event_bus import EventBus |
| #14 | from agent_libos.substrate import FilesystemProvider, LocalFilesystemProvider, ResolvedPath |
| #15 | |
| #16 | _RUNTIME_DEFAULTS = DEFAULT_CONFIG.runtime |
| #17 | _TOOL_DEFAULTS = DEFAULT_CONFIG.tools |
| #18 | |
| #19 | |
| #20 | @dataclass(frozen=True) |
| #21 | class FileReadResult: |
| #22 | path: str |
| #23 | content: str |
| #24 | bytes_read: int |
| #25 | truncated: bool |
| #26 | |
| #27 | |
| #28 | @dataclass(frozen=True) |
| #29 | class FileWriteResult: |
| #30 | path: str |
| #31 | bytes_written: int |
| #32 | created: bool |
| #33 | |
| #34 | |
| #35 | @dataclass(frozen=True) |
| #36 | class DirectoryEntry: |
| #37 | name: str |
| #38 | path: str |
| #39 | kind: str |
| #40 | size_bytes: int | None |
| #41 | modified_at: str |
| #42 | |
| #43 | |
| #44 | @dataclass(frozen=True) |
| #45 | class DirectoryReadResult: |
| #46 | path: str |
| #47 | entries: list[DirectoryEntry] |
| #48 | count: int |
| #49 | truncated: bool |
| #50 | |
| #51 | |
| #52 | @dataclass(frozen=True) |
| #53 | class DirectoryWriteResult: |
| #54 | path: str |
| #55 | created: bool |
| #56 | |
| #57 | |
| #58 | @dataclass(frozen=True) |
| #59 | class DeleteResult: |
| #60 | path: str |
| #61 | kind: str |
| #62 | deleted: bool |
| #63 | recursive: bool = False |
| #64 | |
| #65 | |
| #66 | class FilesystemAdapter: |
| #67 | """Workspace-contained filesystem primitive.""" |
| #68 | |
| #69 | def __init__( |
| #70 | self, |
| #71 | capabilities: CapabilityManager, |
| #72 | audit: AuditManager, |
| #73 | events: EventBus, |
| #74 | root: str | os.PathLike[str] | None = None, |
| #75 | namespace: str = _RUNTIME_DEFAULTS.workspace_namespace, |
| #76 | human: Any | None = None, |
| #77 | provider: FilesystemProvider | None = None, |
| #78 | ): |
| #79 | self.capabilities = capabilities |
| #80 | self.audit = audit |
| #81 | self.events = events |
| #82 | if provider is None: |
| #83 | if root is None: |
| #84 | raise ValueError("FilesystemAdapter requires either root or provider") |
| #85 | provider = LocalFilesystemProvider(root, namespace=namespace) |
| #86 | self.provider = provider |
| #87 | self.root = provider.root_display |
| #88 | self.namespace = provider.namespace |
| #89 | self.human = human |
| #90 | |
| #91 | def read_text( |
| #92 | self, |
| #93 | pid: str, |
| #94 | path: str | os.PathLike[str], |
| #95 | encoding: str = _TOOL_DEFAULTS.default_text_encoding, |
| #96 | max_bytes: int = _TOOL_DEFAULTS.filesystem_read_max_bytes, |
| #97 | cwd: str | os.PathLike[str] | None = None, |
| #98 | ) -> FileReadResult: |
| #99 | max_bytes = self._bounded_positive_int( |
| #100 | max_bytes, |
| #101 | label="max_bytes", |
| #102 | hard_limit=_TOOL_DEFAULTS.filesystem_read_hard_limit_bytes, |
| #103 | ) |
| #104 | target, relative = self._resolve(path, cwd=cwd) |
| #105 | resource = self.resource_for(relative) |
| #106 | self.capabilities.require(pid, resource, CapabilityRight.READ) |
| #107 | target_state = self.provider.state(target) |
| #108 | if not target_state.exists: |
| #109 | raise NotFound(f"file does not exist: {relative}") |
| #110 | if target_state.kind != "file": |
| #111 | raise CapabilityDenied(f"path is not a file: {relative}") |
| #112 | raw = self.provider.read_bytes(target) |
| #113 | truncated = len(raw) > max_bytes |
| #114 | selected = raw[:max_bytes] |
| #115 | content = self._decode_text_prefix(selected, encoding, truncated=truncated) |
| #116 | self.events.emit( |
| #117 | EventType.EXTERNAL_READ, |
| #118 | source=pid, |
| #119 | target=resource, |
| #120 | payload={"adapter": "filesystem", "path": relative, "bytes_read": len(selected), "truncated": truncated}, |
| #121 | ) |
| #122 | self.audit.record( |
| #123 | actor=pid, |
| #124 | action="primitive.filesystem.read_text", |
| #125 | target=resource, |
| #126 | decision={"path": relative, "bytes_read": len(selected), "truncated": truncated}, |
| #127 | ) |
| #128 | return FileReadResult(path=relative, content=content, bytes_read=len(selected), truncated=truncated) |
| #129 | |
| #130 | def write_text( |
| #131 | self, |
| #132 | pid: str, |
| #133 | path: str | os.PathLike[str], |
| #134 | text: str, |
| #135 | encoding: str = _TOOL_DEFAULTS.default_text_encoding, |
| #136 | overwrite: bool = True, |
| #137 | cwd: str | os.PathLike[str] | None = None, |
| #138 | ) -> FileWriteResult: |
| #139 | target, relative = self._resolve(path, cwd=cwd) |
| #140 | resource = self.resource_for(relative) |
| #141 | self._reject_definite_permission_denial(pid, resource, CapabilityRight.WRITE) |
| #142 | target_state = self.provider.state(target) |
| #143 | if target_state.exists and target_state.kind != "file": |
| #144 | raise CapabilityDenied(f"path is not a file: {relative}") |
| #145 | if target_state.exists and not overwrite: |
| #146 | raise FileExistsError(f"file already exists: {relative}") |
| #147 | consumed_once = self._require_write( |
| #148 | pid=pid, |
| #149 | resource=resource, |
| #150 | target=target, |
| #151 | relative=relative, |
| #152 | text=text, |
| #153 | encoding=encoding, |
| #154 | overwrite=overwrite, |
| #155 | ) |
| #156 | try: |
| #157 | target_state = self.provider.state(target) |
| #158 | created = not target_state.exists |
| #159 | if target_state.exists and target_state.kind != "file": |
| #160 | raise CapabilityDenied(f"path is not a file: {relative}") |
| #161 | if target_state.exists and not overwrite: |
| #162 | raise FileExistsError(f"file already exists: {relative}") |
| #163 | self.provider.write_text(target, text, encoding=encoding, newline="\n") |
| #164 | bytes_written = len(text.encode(encoding)) |
| #165 | self.events.emit( |
| #166 | EventType.EXTERNAL_WRITE, |
| #167 | source=pid, |
| #168 | target=resource, |
| #169 | payload={"adapter": "filesystem", "path": relative, "bytes_written": bytes_written, "created": created}, |
| #170 | ) |
| #171 | self.audit.record( |
| #172 | actor=pid, |
| #173 | action="primitive.filesystem.write_text", |
| #174 | target=resource, |
| #175 | decision={"path": relative, "bytes_written": bytes_written, "created": created}, |
| #176 | ) |
| #177 | finally: |
| #178 | if consumed_once: |
| #179 | self.capabilities.consume_allow_once( |
| #180 | subject=pid, |
| #181 | resource=resource, |
| #182 | right=CapabilityRight.WRITE, |
| #183 | used_by="filesystem", |
| #184 | ) |
| #185 | return FileWriteResult(path=relative, bytes_written=bytes_written, created=created) |
| #186 | |
| #187 | def read_directory( |
| #188 | self, |
| #189 | pid: str, |
| #190 | path: str | os.PathLike[str], |
| #191 | limit: int = _TOOL_DEFAULTS.directory_entry_limit, |
| #192 | cwd: str | os.PathLike[str] | None = None, |
| #193 | ) -> DirectoryReadResult: |
| #194 | limit = self._bounded_positive_int( |
| #195 | limit, |
| #196 | label="limit", |
| #197 | hard_limit=_TOOL_DEFAULTS.directory_entry_hard_limit, |
| #198 | ) |
| #199 | target, relative = self._resolve(path, cwd=cwd) |
| #200 | resource = self.directory_resource_for(relative) |
| #201 | self.capabilities.require(pid, resource, CapabilityRight.READ) |
| #202 | target_state = self.provider.state(target) |
| #203 | if not target_state.exists: |
| #204 | raise NotFound(f"directory does not exist: {relative}") |
| #205 | if target_state.kind != "directory": |
| #206 | raise CapabilityDenied(f"path is not a directory: {relative}") |
| #207 | children = list(self.provider.list_directory(target)) |
| #208 | selected = children[:limit] |
| #209 | entries = [DirectoryEntry(**entry.__dict__) for entry in selected] |
| #210 | truncated = len(children) > len(selected) |
| #211 | self.events.emit( |
| #212 | EventType.EXTERNAL_READ, |
| #213 | source=pid, |
| #214 | target=resource, |
| #215 | payload={ |
| #216 | "adapter": "filesystem", |
| #217 | "operation": "read_directory", |
| #218 | "path": relative, |
| #219 | "count": len(entries), |
| #220 | "truncated": truncated, |
| #221 | }, |
| #222 | ) |
| #223 | self.audit.record( |
| #224 | actor=pid, |
| #225 | action="primitive.filesystem.read_directory", |
| #226 | target=resource, |
| #227 | decision={"path": relative, "count": len(entries), "truncated": truncated}, |
| #228 | ) |
| #229 | return DirectoryReadResult(path=relative, entries=entries, count=len(entries), truncated=truncated) |
| #230 | |
| #231 | def write_directory( |
| #232 | self, |
| #233 | pid: str, |
| #234 | path: str | os.PathLike[str], |
| #235 | parents: bool = True, |
| #236 | exist_ok: bool = True, |
| #237 | cwd: str | os.PathLike[str] | None = None, |
| #238 | ) -> DirectoryWriteResult: |
| #239 | target, relative = self._resolve(path, cwd=cwd) |
| #240 | resource = self.directory_resource_for(relative) |
| #241 | self._reject_definite_permission_denial(pid, resource, CapabilityRight.WRITE) |
| #242 | target_state = self.provider.state(target) |
| #243 | if target_state.exists and target_state.kind != "directory": |
| #244 | raise CapabilityDenied(f"path is not a directory: {relative}") |
| #245 | if target_state.exists and not exist_ok: |
| #246 | raise FileExistsError(f"directory already exists: {relative}") |
| #247 | consumed_once = self._require_write_operation( |
| #248 | pid=pid, |
| #249 | resource=resource, |
| #250 | target=target, |
| #251 | relative=relative, |
| #252 | operation="write_directory", |
| #253 | primitive="runtime.filesystem.write_directory", |
| #254 | question=f"Allow this process to create or update directory {relative}?", |
| #255 | extra_context={"parents": parents, "exist_ok": exist_ok}, |
| #256 | ) |
| #257 | try: |
| #258 | target_state = self.provider.state(target) |
| #259 | created = not target_state.exists |
| #260 | if target_state.exists and target_state.kind != "directory": |
| #261 | raise CapabilityDenied(f"path is not a directory: {relative}") |
| #262 | if target_state.exists and not exist_ok: |
| #263 | raise FileExistsError(f"directory already exists: {relative}") |
| #264 | self.provider.make_directory(target, parents=parents, exist_ok=exist_ok) |
| #265 | self.events.emit( |
| #266 | EventType.EXTERNAL_WRITE, |
| #267 | source=pid, |
| #268 | target=resource, |
| #269 | payload={"adapter": "filesystem", "operation": "write_directory", "path": relative, "created": created}, |
| #270 | ) |
| #271 | self.audit.record( |
| #272 | actor=pid, |
| #273 | action="primitive.filesystem.write_directory", |
| #274 | target=resource, |
| #275 | decision={"path": relative, "created": created, "parents": parents, "exist_ok": exist_ok}, |
| #276 | ) |
| #277 | finally: |
| #278 | if consumed_once: |
| #279 | self.capabilities.consume_allow_once( |
| #280 | subject=pid, |
| #281 | resource=resource, |
| #282 | right=CapabilityRight.WRITE, |
| #283 | used_by="filesystem", |
| #284 | ) |
| #285 | return DirectoryWriteResult(path=relative, created=created) |
| #286 | |
| #287 | def delete_file( |
| #288 | self, |
| #289 | pid: str, |
| #290 | path: str | os.PathLike[str], |
| #291 | missing_ok: bool = False, |
| #292 | cwd: str | os.PathLike[str] | None = None, |
| #293 | ) -> DeleteResult: |
| #294 | target, relative = self._resolve(path, cwd=cwd) |
| #295 | resource = self.resource_for(relative) |
| #296 | self._reject_definite_permission_denial(pid, resource, CapabilityRight.DELETE) |
| #297 | target_state = self.provider.state(target) |
| #298 | if not target_state.exists and not missing_ok: |
| #299 | raise NotFound(f"file does not exist: {relative}") |
| #300 | if target_state.exists and target_state.kind != "file": |
| #301 | raise CapabilityDenied(f"path is not a file: {relative}") |
| #302 | consumed_once = self._require_delete( |
| #303 | pid=pid, |
| #304 | resource=resource, |
| #305 | target=target, |
| #306 | relative=relative, |
| #307 | operation="delete_file", |
| #308 | recursive=False, |
| #309 | missing_ok=missing_ok, |
| #310 | ) |
| #311 | try: |
| #312 | target_state = self.provider.state(target) |
| #313 | if not target_state.exists: |
| #314 | if not missing_ok: |
| #315 | raise NotFound(f"file does not exist: {relative}") |
| #316 | return DeleteResult(path=relative, kind="missing", deleted=False) |
| #317 | if target_state.kind != "file": |
| #318 | raise CapabilityDenied(f"path is not a file: {relative}") |
| #319 | self.provider.delete_file(target) |
| #320 | self.events.emit( |
| #321 | EventType.EXTERNAL_WRITE, |
| #322 | source=pid, |
| #323 | target=resource, |
| #324 | payload={"adapter": "filesystem", "operation": "delete_file", "path": relative}, |
| #325 | ) |
| #326 | self.audit.record( |
| #327 | actor=pid, |
| #328 | action="primitive.filesystem.delete_file", |
| #329 | target=resource, |
| #330 | decision={"path": relative, "deleted": True}, |
| #331 | ) |
| #332 | return DeleteResult(path=relative, kind="file", deleted=True) |
| #333 | finally: |
| #334 | if consumed_once: |
| #335 | self.capabilities.consume_allow_once( |
| #336 | subject=pid, |
| #337 | resource=resource, |
| #338 | right=CapabilityRight.DELETE, |
| #339 | used_by="filesystem", |
| #340 | ) |
| #341 | |
| #342 | def delete_directory( |
| #343 | self, |
| #344 | pid: str, |
| #345 | path: str | os.PathLike[str], |
| #346 | recursive: bool = False, |
| #347 | missing_ok: bool = False, |
| #348 | cwd: str | os.PathLike[str] | None = None, |
| #349 | ) -> DeleteResult: |
| #350 | target, relative = self._resolve(path, cwd=cwd) |
| #351 | if target.is_root: |
| #352 | raise CapabilityDenied("cannot delete filesystem adapter root") |
| #353 | resource = self.directory_resource_for(relative) |
| #354 | self._reject_definite_permission_denial(pid, resource, CapabilityRight.DELETE) |
| #355 | target_state = self.provider.state(target) |
| #356 | if not target_state.exists and not missing_ok: |
| #357 | raise NotFound(f"directory does not exist: {relative}") |
| #358 | if target_state.exists and target_state.kind != "directory": |
| #359 | raise CapabilityDenied(f"path is not a directory: {relative}") |
| #360 | consumed_once = self._require_delete( |
| #361 | pid=pid, |
| #362 | resource=resource, |
| #363 | target=target, |
| #364 | relative=relative, |
| #365 | operation="delete_directory", |
| #366 | recursive=recursive, |
| #367 | missing_ok=missing_ok, |
| #368 | ) |
| #369 | try: |
| #370 | target_state = self.provider.state(target) |
| #371 | if not target_state.exists: |
| #372 | if not missing_ok: |
| #373 | raise NotFound(f"directory does not exist: {relative}") |
| #374 | return DeleteResult(path=relative, kind="missing", deleted=False, recursive=recursive) |
| #375 | if target_state.kind != "directory": |
| #376 | raise CapabilityDenied(f"path is not a directory: {relative}") |
| #377 | self.provider.delete_directory(target, recursive=recursive) |
| #378 | self.events.emit( |
| #379 | EventType.EXTERNAL_WRITE, |
| #380 | source=pid, |
| #381 | target=resource, |
| #382 | payload={ |
| #383 | "adapter": "filesystem", |
| #384 | "operation": "delete_directory", |
| #385 | "path": relative, |
| #386 | "recursive": recursive, |
| #387 | }, |
| #388 | ) |
| #389 | self.audit.record( |
| #390 | actor=pid, |
| #391 | action="primitive.filesystem.delete_directory", |
| #392 | target=resource, |
| #393 | decision={"path": relative, "deleted": True, "recursive": recursive}, |
| #394 | ) |
| #395 | return DeleteResult(path=relative, kind="directory", deleted=True, recursive=recursive) |
| #396 | finally: |
| #397 | if consumed_once: |
| #398 | self.capabilities.consume_allow_once( |
| #399 | subject=pid, |
| #400 | resource=resource, |
| #401 | right=CapabilityRight.DELETE, |
| #402 | used_by="filesystem", |
| #403 | ) |
| #404 | |
| #405 | def grant_workspace( |
| #406 | self, |
| #407 | pid: str, |
| #408 | rights: Iterable[str | CapabilityRight], |
| #409 | issued_by: str = "filesystem", |
| #410 | ) -> Capability: |
| #411 | return self.capabilities.grant( |
| #412 | subject=pid, |
| #413 | resource=self.workspace_resource(), |
| #414 | rights=rights, |
| #415 | issued_by=issued_by, |
| #416 | ) |
| #417 | |
| #418 | def grant_path( |
| #419 | self, |
| #420 | pid: str, |
| #421 | path: str | os.PathLike[str], |
| #422 | rights: Iterable[str | CapabilityRight], |
| #423 | issued_by: str = "filesystem", |
| #424 | cwd: str | os.PathLike[str] | None = None, |
| #425 | ) -> Capability: |
| #426 | return self.capabilities.grant( |
| #427 | subject=pid, |
| #428 | resource=self.resource_for_path(path, cwd=cwd), |
| #429 | rights=rights, |
| #430 | issued_by=issued_by, |
| #431 | ) |
| #432 | |
| #433 | def grant_directory( |
| #434 | self, |
| #435 | pid: str, |
| #436 | path: str | os.PathLike[str], |
| #437 | rights: Iterable[str | CapabilityRight], |
| #438 | issued_by: str = "filesystem", |
| #439 | cwd: str | os.PathLike[str] | None = None, |
| #440 | ) -> Capability: |
| #441 | return self.capabilities.grant( |
| #442 | subject=pid, |
| #443 | resource=self.directory_resource_for_path(path, cwd=cwd), |
| #444 | rights=rights, |
| #445 | issued_by=issued_by, |
| #446 | ) |
| #447 | |
| #448 | def grant_path_list( |
| #449 | self, |
| #450 | pid: str, |
| #451 | *, |
| #452 | read_files: Iterable[str | os.PathLike[str]] = (), |
| #453 | write_files: Iterable[str | os.PathLike[str]] = (), |
| #454 | delete_files: Iterable[str | os.PathLike[str]] = (), |
| #455 | read_dirs: Iterable[str | os.PathLike[str]] = (), |
| #456 | write_dirs: Iterable[str | os.PathLike[str]] = (), |
| #457 | delete_dirs: Iterable[str | os.PathLike[str]] = (), |
| #458 | issued_by: str = "filesystem", |
| #459 | cwd: str | os.PathLike[str] | None = None, |
| #460 | ) -> list[Capability]: |
| #461 | grants: list[Capability] = [] |
| #462 | for path in read_files: |
| #463 | grants.append(self.grant_path(pid, path, [CapabilityRight.READ], issued_by=issued_by, cwd=cwd)) |
| #464 | for path in write_files: |
| #465 | grants.append(self.grant_path(pid, path, [CapabilityRight.WRITE], issued_by=issued_by, cwd=cwd)) |
| #466 | for path in delete_files: |
| #467 | grants.append(self.grant_path(pid, path, [CapabilityRight.DELETE], issued_by=issued_by, cwd=cwd)) |
| #468 | for path in read_dirs: |
| #469 | grants.append(self.grant_directory(pid, path, [CapabilityRight.READ], issued_by=issued_by, cwd=cwd)) |
| #470 | for path in write_dirs: |
| #471 | grants.append(self.grant_directory(pid, path, [CapabilityRight.WRITE], issued_by=issued_by, cwd=cwd)) |
| #472 | for path in delete_dirs: |
| #473 | grants.append(self.grant_directory(pid, path, [CapabilityRight.DELETE], issued_by=issued_by, cwd=cwd)) |
| #474 | return grants |
| #475 | |
| #476 | def workspace_resource(self) -> str: |
| #477 | return f"filesystem:{self.namespace}:*" |
| #478 | |
| #479 | def resource_for(self, path: str | os.PathLike[str]) -> str: |
| #480 | relative = self._logical_path(path) |
| #481 | if relative in {"", "."}: |
| #482 | return f"filesystem:{self.namespace}:" |
| #483 | return f"filesystem:{self.namespace}:{relative}" |
| #484 | |
| #485 | def resource_for_path(self, path: str | os.PathLike[str], cwd: str | os.PathLike[str] | None = None) -> str: |
| #486 | _target, relative = self._resolve(path, cwd=cwd) |
| #487 | return self.resource_for(relative) |
| #488 | |
| #489 | def directory_resource_for(self, path: str | os.PathLike[str]) -> str: |
| #490 | relative = self._logical_path(path).rstrip("/") |
| #491 | if relative in {"", "."}: |
| #492 | return self.workspace_resource() |
| #493 | return f"filesystem:{self.namespace}:{relative}/*" |
| #494 | |
| #495 | def directory_resource_for_path( |
| #496 | self, |
| #497 | path: str | os.PathLike[str], |
| #498 | cwd: str | os.PathLike[str] | None = None, |
| #499 | ) -> str: |
| #500 | _target, relative = self._resolve(path, cwd=cwd) |
| #501 | return self.directory_resource_for(relative) |
| #502 | |
| #503 | def resolve_path( |
| #504 | self, |
| #505 | path: str | os.PathLike[str], |
| #506 | cwd: str | os.PathLike[str] | None = None, |
| #507 | ) -> tuple[ResolvedPath, str]: |
| #508 | return self._resolve(path, cwd=cwd) |
| #509 | |
| #510 | def _resolve( |
| #511 | self, |
| #512 | path: str | os.PathLike[str], |
| #513 | cwd: str | os.PathLike[str] | None = None, |
| #514 | ) -> tuple[ResolvedPath, str]: |
| #515 | target = self.provider.resolve(self._path_with_cwd(path, cwd)) |
| #516 | return target, target.relative |
| #517 | |
| #518 | def _logical_path(self, path: str | os.PathLike[str]) -> str: |
| #519 | return os.fspath(path).replace("\\", "/") |
| #520 | |
| #521 | def _path_with_cwd( |
| #522 | self, |
| #523 | path: str | os.PathLike[str], |
| #524 | cwd: str | os.PathLike[str] | None, |
| #525 | ) -> str: |
| #526 | raw = os.fspath(path) |
| #527 | if os.path.isabs(raw) or cwd is None or os.fspath(cwd) in {"", "."}: |
| #528 | return raw |
| #529 | cwd_path = self._logical_path(cwd).strip("/") |
| #530 | if cwd_path in {"", "."}: |
| #531 | return raw |
| #532 | return f"{cwd_path}/{raw}" |
| #533 | |
| #534 | def _require_write( |
| #535 | self, |
| #536 | pid: str, |
| #537 | resource: str, |
| #538 | target: ResolvedPath, |
| #539 | relative: str, |
| #540 | text: str, |
| #541 | encoding: str, |
| #542 | overwrite: bool, |
| #543 | ) -> bool: |
| #544 | return self._require_write_operation( |
| #545 | pid=pid, |
| #546 | resource=resource, |
| #547 | target=target, |
| #548 | relative=relative, |
| #549 | operation="write_text", |
| #550 | primitive="runtime.filesystem.write_text", |
| #551 | question=f"Allow this process to write {relative}?", |
| #552 | extra_context={ |
| #553 | "encoding": encoding, |
| #554 | "overwrite": overwrite, |
| #555 | **self._content_context(text, encoding), |
| #556 | }, |
| #557 | ) |
| #558 | |
| #559 | def _reject_definite_permission_denial( |
| #560 | self, |
| #561 | pid: str, |
| #562 | resource: str, |
| #563 | right: CapabilityRight, |
| #564 | ) -> None: |
| #565 | # Do not stat the target before a definite deny/miss; existence and |
| #566 | # kind are filesystem facts that require some matching policy first. |
| #567 | policy = self.capabilities.permission_policy(pid, resource, right) |
| #568 | if policy in {CapabilityManager.MISSING, CapabilityManager.ALWAYS_DENY}: |
| #569 | self.capabilities.require(pid, resource, right) |
| #570 | |
| #571 | def _require_write_operation( |
| #572 | self, |
| #573 | pid: str, |
| #574 | resource: str, |
| #575 | target: ResolvedPath, |
| #576 | relative: str, |
| #577 | operation: str, |
| #578 | primitive: str, |
| #579 | question: str, |
| #580 | extra_context: dict[str, Any] | None = None, |
| #581 | ) -> bool: |
| #582 | policy = self.capabilities.permission_policy(pid, resource, CapabilityRight.WRITE) |
| #583 | if policy == CapabilityManager.ALWAYS_ALLOW: |
| #584 | return False |
| #585 | if policy == CapabilityManager.ALLOW_ONCE: |
| #586 | return True |
| #587 | if policy == CapabilityManager.ALWAYS_DENY: |
| #588 | raise CapabilityDenied(f"{pid} denied write on {resource}") |
| #589 | if policy == CapabilityManager.ASK_EACH_TIME: |
| #590 | if self.human is None: |
| #591 | raise CapabilityDenied(f"{pid} requires human approval for write on {resource}") |
| #592 | # This primitive has the concrete path, overwrite state, byte count, |
| #593 | # and preview needed for a safe per-use human decision. |
| #594 | request_id = self.human.query( |
| #595 | pid=pid, |
| #596 | human=_RUNTIME_DEFAULTS.default_human, |
| #597 | request={ |
| #598 | "type": "external_operation_approval", |
| #599 | "question": question, |
| #600 | "requested_once_capability": { |
| #601 | "subject": pid, |
| #602 | "resource": resource, |
| #603 | "rights": [CapabilityRight.WRITE.value], |
| #604 | }, |
| #605 | "context": { |
| #606 | **self._operation_context( |
| #607 | pid=pid, |
| #608 | resource=resource, |
| #609 | target=target, |
| #610 | relative=relative, |
| #611 | primitive=primitive, |
| #612 | operation=operation, |
| #613 | right=CapabilityRight.WRITE.value, |
| #614 | extra=extra_context or {}, |
| #615 | ), |
| #616 | }, |
| #617 | }, |
| #618 | blocking=True, |
| #619 | ) |
| #620 | raise HumanApprovalRequired( |
| #621 | request_id=request_id, |
| #622 | message=f"{pid} is waiting for per-use human approval to write {resource}", |
| #623 | ) |
| #624 | raise CapabilityDenied(f"{pid} lacks write on {resource}") |
| #625 | |
| #626 | def _require_delete( |
| #627 | self, |
| #628 | pid: str, |
| #629 | resource: str, |
| #630 | target: ResolvedPath, |
| #631 | relative: str, |
| #632 | operation: str, |
| #633 | recursive: bool, |
| #634 | missing_ok: bool, |
| #635 | ) -> bool: |
| #636 | policy = self.capabilities.permission_policy(pid, resource, CapabilityRight.DELETE) |
| #637 | if policy == CapabilityManager.ALWAYS_ALLOW: |
| #638 | return False |
| #639 | if policy == CapabilityManager.ALLOW_ONCE: |
| #640 | return True |
| #641 | if policy == CapabilityManager.ALWAYS_DENY: |
| #642 | raise CapabilityDenied(f"{pid} denied delete on {resource}") |
| #643 | if policy == CapabilityManager.ASK_EACH_TIME: |
| #644 | if self.human is None: |
| #645 | raise CapabilityDenied(f"{pid} requires human approval for delete on {resource}") |
| #646 | request_id = self.human.query( |
| #647 | pid=pid, |
| #648 | human=_RUNTIME_DEFAULTS.default_human, |
| #649 | request={ |
| #650 | "type": "external_operation_approval", |
| #651 | "question": f"Allow this process to delete {relative}?", |
| #652 | "requested_once_capability": { |
| #653 | "subject": pid, |
| #654 | "resource": resource, |
| #655 | "rights": [CapabilityRight.DELETE.value], |
| #656 | }, |
| #657 | "context": self._operation_context( |
| #658 | pid=pid, |
| #659 | resource=resource, |
| #660 | target=target, |
| #661 | relative=relative, |
| #662 | primitive=f"runtime.filesystem.{operation}", |
| #663 | operation=operation, |
| #664 | right=CapabilityRight.DELETE.value, |
| #665 | extra={"recursive": recursive, "missing_ok": missing_ok}, |
| #666 | ), |
| #667 | }, |
| #668 | blocking=True, |
| #669 | ) |
| #670 | raise HumanApprovalRequired( |
| #671 | request_id=request_id, |
| #672 | message=f"{pid} is waiting for per-use human approval to delete {resource}", |
| #673 | ) |
| #674 | raise CapabilityDenied(f"{pid} lacks delete on {resource}") |
| #675 | |
| #676 | def _operation_context( |
| #677 | self, |
| #678 | pid: str, |
| #679 | resource: str, |
| #680 | target: ResolvedPath, |
| #681 | relative: str, |
| #682 | primitive: str, |
| #683 | operation: str, |
| #684 | right: str, |
| #685 | extra: dict[str, Any], |
| #686 | ) -> dict[str, Any]: |
| #687 | target_state = self._target_state(target) |
| #688 | will_overwrite = bool(target_state["exists"] and target_state["kind"] == "file") |
| #689 | return { |
| #690 | "adapter": "filesystem", |
| #691 | "primitive": primitive, |
| #692 | "operation": operation, |
| #693 | "pid": pid, |
| #694 | "workspace_root": self.root, |
| #695 | "path": relative, |
| #696 | "absolute_path": target.display, |
| #697 | "resource": resource, |
| #698 | "right": right, |
| #699 | "grant_scope": "one_time", |
| #700 | "will_create": not target_state["exists"], |
| #701 | "will_overwrite": will_overwrite, |
| #702 | "target": target_state, |
| #703 | **extra, |
| #704 | } |
| #705 | |
| #706 | def _content_context(self, text: str, encoding: str) -> dict[str, Any]: |
| #707 | encoded = text.encode(encoding) |
| #708 | preview, preview_truncated = self._preview_text(text) |
| #709 | return { |
| #710 | "content_bytes": len(encoded), |
| #711 | "content_sha256": hashlib.sha256(encoded).hexdigest(), |
| #712 | "content_preview": preview, |
| #713 | "content_preview_chars": len(preview), |
| #714 | "content_preview_truncated": preview_truncated, |
| #715 | } |
| #716 | |
| #717 | def _preview_text(self, text: str, limit: int = _TOOL_DEFAULTS.approval_preview_chars) -> tuple[str, bool]: |
| #718 | preview = text[:limit] |
| #719 | # repr() prevents newlines or prompt-like text from masquerading as |
| #720 | # separate approval instructions in the human terminal prompt. |
| #721 | return repr(preview), len(text) > limit |
| #722 | |
| #723 | def _decode_text_prefix(self, data: bytes, encoding: str, *, truncated: bool) -> str: |
| #724 | try: |
| #725 | return data.decode(encoding) |
| #726 | except UnicodeDecodeError as exc: |
| #727 | if truncated and exc.end == len(data): |
| #728 | return data[: exc.start].decode(encoding) |
| #729 | raise |
| #730 | |
| #731 | def _target_state(self, target: ResolvedPath) -> dict[str, Any]: |
| #732 | state = self.provider.state(target) |
| #733 | if not state.exists: |
| #734 | return {"exists": False, "kind": "missing"} |
| #735 | result = { |
| #736 | "exists": True, |
| #737 | "kind": state.kind, |
| #738 | "size_bytes": state.size_bytes, |
| #739 | "modified_at": state.modified_at, |
| #740 | } |
| #741 | return result |
| #742 | |
| #743 | def _bounded_positive_int(self, value: int, *, label: str, hard_limit: int) -> int: |
| #744 | try: |
| #745 | selected = int(value) |
| #746 | except (TypeError, ValueError) as exc: |
| #747 | raise ValidationError(f"{label} must be an integer") from exc |
| #748 | if selected < 1: |
| #749 | raise ValidationError(f"{label} must be >= 1") |
| #750 | if selected > hard_limit: |
| #751 | raise ValidationError(f"{label} exceeds hard limit {hard_limit}") |
| #752 | return selected |
| #753 |