repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
Mirrored from https://github.com/yingqi-z20/Agent-libOS
stars
latest
clone command
git clone gitlawb://did:key:z6MkqRzA...RfoM/yingqi-z20-Agen...git clone gitlawb://did:key:z6MkqRzA.../yingqi-z20-Agen...d98dd2c9IPC1d ago| #1 | from __future__ import annotations |
| #2 | |
| #3 | import ast |
| #4 | from dataclasses import dataclass |
| #5 | from typing import Any |
| #6 | |
| #7 | from agent_libos.models.exceptions import ValidationError |
| #8 | |
| #9 | |
| #10 | def load_yaml_mapping(text: str) -> dict[str, Any]: |
| #11 | """Load a YAML mapping, using PyYAML when available and a strict fallback otherwise.""" |
| #12 | |
| #13 | try: |
| #14 | import yaml # type: ignore[import-not-found] |
| #15 | except ModuleNotFoundError: |
| #16 | data = _StrictYamlParser(text).parse() |
| #17 | else: |
| #18 | data = yaml.safe_load(text) |
| #19 | if not isinstance(data, dict): |
| #20 | raise ValidationError("YAML document must be a mapping") |
| #21 | return data |
| #22 | |
| #23 | |
| #24 | @dataclass(frozen=True) |
| #25 | class _Line: |
| #26 | raw: str |
| #27 | lineno: int |
| #28 | |
| #29 | @property |
| #30 | def indent(self) -> int: |
| #31 | return len(self.raw) - len(self.raw.lstrip(" ")) |
| #32 | |
| #33 | @property |
| #34 | def stripped(self) -> str: |
| #35 | return self.raw.strip() |
| #36 | |
| #37 | |
| #38 | class _StrictYamlParser: |
| #39 | """Small YAML subset parser for image registration manifests. |
| #40 | |
| #41 | It intentionally supports only mappings, lists, inline scalars/lists/maps, |
| #42 | and literal/folded block scalars. The runtime prefers PyYAML when installed; |
| #43 | this fallback keeps image manifests usable without adding a parser dependency. |
| #44 | """ |
| #45 | |
| #46 | def __init__(self, text: str): |
| #47 | if "\t" in text: |
| #48 | raise ValidationError("YAML tabs are not supported; use spaces for indentation") |
| #49 | self.lines = [_Line(line.rstrip("\n\r"), index + 1) for index, line in enumerate(text.splitlines())] |
| #50 | |
| #51 | def parse(self) -> Any: |
| #52 | index = self._next_significant(0) |
| #53 | if index >= len(self.lines): |
| #54 | return {} |
| #55 | data, index = self._parse_block(index, self.lines[index].indent) |
| #56 | trailing = self._next_significant(index) |
| #57 | if trailing < len(self.lines): |
| #58 | line = self.lines[trailing] |
| #59 | raise ValidationError(f"unexpected YAML content at line {line.lineno}") |
| #60 | return data |
| #61 | |
| #62 | def _parse_block(self, index: int, indent: int) -> tuple[Any, int]: |
| #63 | line = self.lines[index] |
| #64 | if line.indent != indent: |
| #65 | raise ValidationError(f"unexpected indentation at line {line.lineno}") |
| #66 | if self._normal_text(line).startswith("- "): |
| #67 | return self._parse_list(index, indent) |
| #68 | return self._parse_mapping(index, indent) |
| #69 | |
| #70 | def _parse_mapping(self, index: int, indent: int) -> tuple[dict[str, Any], int]: |
| #71 | result: dict[str, Any] = {} |
| #72 | while True: |
| #73 | index = self._next_significant(index) |
| #74 | if index >= len(self.lines): |
| #75 | return result, index |
| #76 | line = self.lines[index] |
| #77 | if line.indent < indent: |
| #78 | return result, index |
| #79 | if line.indent > indent: |
| #80 | raise ValidationError(f"unexpected nested YAML mapping at line {line.lineno}") |
| #81 | text = self._normal_text(line) |
| #82 | if text.startswith("- "): |
| #83 | return result, index |
| #84 | key, value_text = self._split_key_value(text, line.lineno) |
| #85 | if value_text in {"|", "|-", "|+", ">", ">-", ">+"}: |
| #86 | result[key], index = self._collect_block_scalar(index + 1, line.indent, folded=value_text.startswith(">")) |
| #87 | continue |
| #88 | if value_text == "": |
| #89 | next_index = self._next_significant(index + 1) |
| #90 | if next_index >= len(self.lines) or self.lines[next_index].indent <= indent: |
| #91 | result[key] = {} |
| #92 | index += 1 |
| #93 | continue |
| #94 | result[key], index = self._parse_block(next_index, self.lines[next_index].indent) |
| #95 | continue |
| #96 | result[key] = self._parse_scalar(value_text) |
| #97 | index += 1 |
| #98 | |
| #99 | def _parse_list(self, index: int, indent: int) -> tuple[list[Any], int]: |
| #100 | result: list[Any] = [] |
| #101 | while True: |
| #102 | index = self._next_significant(index) |
| #103 | if index >= len(self.lines): |
| #104 | return result, index |
| #105 | line = self.lines[index] |
| #106 | if line.indent < indent: |
| #107 | return result, index |
| #108 | if line.indent > indent: |
| #109 | raise ValidationError(f"unexpected nested YAML list at line {line.lineno}") |
| #110 | text = self._normal_text(line) |
| #111 | if not text.startswith("- "): |
| #112 | return result, index |
| #113 | item_text = text[2:].strip() |
| #114 | if item_text == "": |
| #115 | next_index = self._next_significant(index + 1) |
| #116 | if next_index >= len(self.lines) or self.lines[next_index].indent <= indent: |
| #117 | result.append(None) |
| #118 | index += 1 |
| #119 | continue |
| #120 | item, index = self._parse_block(next_index, self.lines[next_index].indent) |
| #121 | result.append(item) |
| #122 | continue |
| #123 | if self._looks_like_inline_mapping(item_text): |
| #124 | item, index = self._parse_list_mapping_item(item_text, index + 1, indent) |
| #125 | result.append(item) |
| #126 | continue |
| #127 | result.append(self._parse_scalar(item_text)) |
| #128 | index += 1 |
| #129 | |
| #130 | def _parse_list_mapping_item(self, item_text: str, index: int, list_indent: int) -> tuple[dict[str, Any], int]: |
| #131 | key, value_text = self._split_key_value(item_text, self.lines[index - 1].lineno) |
| #132 | item: dict[str, Any] = {key: self._parse_scalar(value_text) if value_text else {}} |
| #133 | next_index = self._next_significant(index) |
| #134 | if next_index >= len(self.lines) or self.lines[next_index].indent <= list_indent: |
| #135 | return item, index |
| #136 | extra, index = self._parse_mapping(next_index, self.lines[next_index].indent) |
| #137 | item.update(extra) |
| #138 | return item, index |
| #139 | |
| #140 | def _collect_block_scalar(self, index: int, parent_indent: int, folded: bool) -> tuple[str, int]: |
| #141 | collected: list[_Line] = [] |
| #142 | while index < len(self.lines): |
| #143 | line = self.lines[index] |
| #144 | if line.stripped and line.indent <= parent_indent: |
| #145 | break |
| #146 | collected.append(line) |
| #147 | index += 1 |
| #148 | content_indent = min((line.indent for line in collected if line.stripped), default=parent_indent + 1) |
| #149 | parts = [line.raw[content_indent:] if len(line.raw) >= content_indent else "" for line in collected] |
| #150 | if folded: |
| #151 | return "\n".join(" ".join(part.split()) for part in parts).strip() + "\n", index |
| #152 | return "\n".join(parts).rstrip() + ("\n" if parts else ""), index |
| #153 | |
| #154 | def _normal_text(self, line: _Line) -> str: |
| #155 | return self._strip_comment(line.raw[line.indent :]).strip() |
| #156 | |
| #157 | def _strip_comment(self, text: str) -> str: |
| #158 | quote: str | None = None |
| #159 | escaped = False |
| #160 | for index, char in enumerate(text): |
| #161 | if escaped: |
| #162 | escaped = False |
| #163 | continue |
| #164 | if char == "\\": |
| #165 | escaped = True |
| #166 | continue |
| #167 | if quote is not None: |
| #168 | if char == quote: |
| #169 | quote = None |
| #170 | continue |
| #171 | if char in {'"', "'"}: |
| #172 | quote = char |
| #173 | continue |
| #174 | if char == "#" and (index == 0 or text[index - 1].isspace()): |
| #175 | return text[:index] |
| #176 | return text |
| #177 | |
| #178 | def _next_significant(self, index: int) -> int: |
| #179 | while index < len(self.lines): |
| #180 | stripped = self.lines[index].stripped |
| #181 | if stripped and not stripped.startswith("#"): |
| #182 | return index |
| #183 | index += 1 |
| #184 | return index |
| #185 | |
| #186 | def _split_key_value(self, text: str, lineno: int) -> tuple[str, str]: |
| #187 | if ":" not in text: |
| #188 | raise ValidationError(f"expected key/value mapping at line {lineno}") |
| #189 | key, value = text.split(":", 1) |
| #190 | key = key.strip() |
| #191 | if not key: |
| #192 | raise ValidationError(f"empty YAML key at line {lineno}") |
| #193 | return key, value.strip() |
| #194 | |
| #195 | def _looks_like_inline_mapping(self, text: str) -> bool: |
| #196 | if ":" not in text: |
| #197 | return False |
| #198 | key, rest = text.split(":", 1) |
| #199 | return bool(key.strip()) and (rest == "" or rest.startswith(" ")) |
| #200 | |
| #201 | def _parse_scalar(self, text: str) -> Any: |
| #202 | text = text.strip() |
| #203 | lowered = text.lower() |
| #204 | if lowered in {"null", "none", "~"}: |
| #205 | return None |
| #206 | if lowered == "true": |
| #207 | return True |
| #208 | if lowered == "false": |
| #209 | return False |
| #210 | if text.startswith("[") and text.endswith("]"): |
| #211 | inner = text[1:-1].strip() |
| #212 | if not inner: |
| #213 | return [] |
| #214 | return [self._parse_scalar(part) for part in self._split_inline(inner)] |
| #215 | if text.startswith("{") and text.endswith("}"): |
| #216 | inner = text[1:-1].strip() |
| #217 | if not inner: |
| #218 | return {} |
| #219 | result: dict[str, Any] = {} |
| #220 | for part in self._split_inline(inner): |
| #221 | key, value = self._split_key_value(part, 0) |
| #222 | result[key] = self._parse_scalar(value) |
| #223 | return result |
| #224 | if (text.startswith('"') and text.endswith('"')) or (text.startswith("'") and text.endswith("'")): |
| #225 | try: |
| #226 | return ast.literal_eval(text) |
| #227 | except Exception as exc: # pragma: no cover - defensive parse detail |
| #228 | raise ValidationError(f"invalid quoted YAML scalar: {text}") from exc |
| #229 | try: |
| #230 | return int(text) |
| #231 | except ValueError: |
| #232 | pass |
| #233 | try: |
| #234 | return float(text) |
| #235 | except ValueError: |
| #236 | return text |
| #237 | |
| #238 | def _split_inline(self, text: str) -> list[str]: |
| #239 | parts: list[str] = [] |
| #240 | start = 0 |
| #241 | quote: str | None = None |
| #242 | depth = 0 |
| #243 | escaped = False |
| #244 | for index, char in enumerate(text): |
| #245 | if escaped: |
| #246 | escaped = False |
| #247 | continue |
| #248 | if char == "\\": |
| #249 | escaped = True |
| #250 | continue |
| #251 | if quote is not None: |
| #252 | if char == quote: |
| #253 | quote = None |
| #254 | continue |
| #255 | if char in {'"', "'"}: |
| #256 | quote = char |
| #257 | continue |
| #258 | if char in "[{": |
| #259 | depth += 1 |
| #260 | continue |
| #261 | if char in "]}": |
| #262 | depth -= 1 |
| #263 | continue |
| #264 | if char == "," and depth == 0: |
| #265 | parts.append(text[start:index].strip()) |
| #266 | start = index + 1 |
| #267 | parts.append(text[start:].strip()) |
| #268 | return parts |
| #269 |