repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
The Living OS cockpit
stars
latest
clone command
git clone gitlawb://did:key:z6Mku78K...XywC/living-os-cockp...git clone gitlawb://did:key:z6Mku78K.../living-os-cockp...59751530feat: surface worker supervisor health in live work8h ago| #1 | import { NextResponse } from 'next/server'; |
| #2 | import { promises as fs } from 'fs'; |
| #3 | import { join, basename } from 'path'; |
| #4 | |
| #5 | export const runtime = 'nodejs'; |
| #6 | |
| #7 | const AETHON_TASK_BASE = process.env.AETHON_TASKS_DIR ?? '/home/kingbau/.config/aethon/tasks'; |
| #8 | const AGENTIC_AUDIT_DIR = process.env.AGENTIC_AUDIT_DIR ?? '/home/kingbau/Documents/Aethon-Core/data/agentic_audit'; |
| #9 | const BOOK_AUDIT_PROGRESS = process.env.BOOK_SOURCE_AUDIT_PROGRESS ?? '/home/kingbau/Documents/Aethon-Core/data/book_source_audit/ongoing/progress.json'; |
| #10 | const CATALOG_HUNTER_PROGRESS = process.env.CATALOG_HUNTER_PROGRESS ?? '/home/kingbau/Documents/Aethon-Core/data/catalog_hunter/ongoing/progress.json'; |
| #11 | const WORKER_SUPERVISOR_STATE = process.env.WORKER_SUPERVISOR_STATE ?? '/home/kingbau/logs/theliving-os/worker-supervisor/state.json'; |
| #12 | |
| #13 | type ProgressEvent = { |
| #14 | task_id?: string; |
| #15 | tool?: string; |
| #16 | message?: string; |
| #17 | progress?: number; |
| #18 | timestamp?: string; |
| #19 | [key: string]: any; |
| #20 | }; |
| #21 | |
| #22 | function progressMessage(event: ProgressEvent | null) { |
| #23 | if (!event) return ''; |
| #24 | if (event.message) return String(event.message); |
| #25 | if (event.tool === 'embed_batch' || event.event === 'embed_batch') { |
| #26 | const embedded = Number(event.embedded ?? event.done ?? 0); |
| #27 | const total = Number(event.total ?? 0); |
| #28 | const pct = total ? Math.round((embedded / total) * 100) : 0; |
| #29 | return `Embedding chunks ${embedded}/${total}${total ? ` (${pct}%)` : ''}.`; |
| #30 | } |
| #31 | if (event.event === 'heartbeat') { |
| #32 | return `Heartbeat fired. Found ${event.found ?? 0} tasks. Processed ${event.processed ?? 0}. Queued ${event.queued ?? 0} for approval.`; |
| #33 | } |
| #34 | return String(event.event ?? event.tool ?? 'Aethon is working.'); |
| #35 | } |
| #36 | |
| #37 | async function latestProgressEvent(): Promise<ProgressEvent | null> { |
| #38 | try { |
| #39 | const files = await fs.readdir(AGENTIC_AUDIT_DIR); |
| #40 | const progressFiles = await Promise.all( |
| #41 | files |
| #42 | .filter(file => file.endsWith('_progress.jsonl') || file.includes('_progress.')) |
| #43 | .map(async file => { |
| #44 | const path = join(AGENTIC_AUDIT_DIR, file); |
| #45 | const stat = await fs.stat(path); |
| #46 | return { file, path, mtime: stat.mtimeMs }; |
| #47 | }), |
| #48 | ); |
| #49 | progressFiles.sort((a, b) => b.mtime - a.mtime); |
| #50 | for (const item of progressFiles.slice(0, 5)) { |
| #51 | const lines = (await fs.readFile(item.path, 'utf-8')).trim().split(/\r?\n/).filter(Boolean); |
| #52 | const last = lines.at(-1); |
| #53 | if (!last) continue; |
| #54 | const parsed = JSON.parse(last); |
| #55 | return { |
| #56 | ...parsed, |
| #57 | message: progressMessage(parsed), |
| #58 | file: item.file, |
| #59 | }; |
| #60 | } |
| #61 | } catch { |
| #62 | return null; |
| #63 | } |
| #64 | return null; |
| #65 | } |
| #66 | |
| #67 | async function readProgress(path: string) { |
| #68 | try { |
| #69 | return JSON.parse(await fs.readFile(path, 'utf-8')); |
| #70 | } catch { |
| #71 | return null; |
| #72 | } |
| #73 | } |
| #74 | |
| #75 | function formatAge(seconds: number | null | undefined) { |
| #76 | if (seconds === null || seconds === undefined || Number.isNaN(Number(seconds))) return 'unknown'; |
| #77 | const value = Number(seconds); |
| #78 | if (value < 90) return `${Math.round(value)}s ago`; |
| #79 | if (value < 5400) return `${Math.round(value / 60)}m ago`; |
| #80 | return `${Math.round(value / 3600)}h ago`; |
| #81 | } |
| #82 | |
| #83 | async function workerSupervisorRows() { |
| #84 | const state = await readProgress(WORKER_SUPERVISOR_STATE); |
| #85 | const workers = Array.isArray(state?.workers) ? state.workers : []; |
| #86 | return workers.map((worker: any) => { |
| #87 | const restartCount = Number(worker.restart_count ?? 0); |
| #88 | const status = String(worker.status ?? 'unknown'); |
| #89 | const lastBeat = formatAge(worker.heartbeat_age_sec); |
| #90 | return { |
| #91 | id: `supervisor-${worker.id ?? worker.title ?? 'worker'}`, |
| #92 | title: `${worker.title ?? worker.id ?? 'Worker'} durability`, |
| #93 | status, |
| #94 | message: `${worker.title ?? worker.id ?? 'Worker'} supervisor is ${status}: ${worker.reason ?? 'no reason reported'}; last beat ${lastBeat}; restarts ${restartCount}.`, |
| #95 | progress: null, |
| #96 | updated_at: worker.updated_at ?? state?.updated_at, |
| #97 | source: WORKER_SUPERVISOR_STATE, |
| #98 | detail: { |
| #99 | ...worker, |
| #100 | supervisor_state_updated_at: state?.updated_at, |
| #101 | }, |
| #102 | }; |
| #103 | }); |
| #104 | } |
| #105 | |
| #106 | async function workerStatusRows() { |
| #107 | const [book, catalog, supervisorRows] = await Promise.all([ |
| #108 | readProgress(BOOK_AUDIT_PROGRESS), |
| #109 | readProgress(CATALOG_HUNTER_PROGRESS), |
| #110 | workerSupervisorRows(), |
| #111 | ]); |
| #112 | const rows = []; |
| #113 | if (book) { |
| #114 | const totalDocuments = Number(book.total_documents ?? book.total ?? 0); |
| #115 | const missingDocuments = Number(book.missing_documents ?? 0); |
| #116 | const needsOcr = Number(book.needs_ocr ?? 0); |
| #117 | const currentPassEmbedded = Number(book.embedded ?? 0); |
| #118 | const accountedDocuments = totalDocuments > 0 |
| #119 | ? Math.max(0, totalDocuments - missingDocuments) |
| #120 | : currentPassEmbedded; |
| #121 | const passNote = currentPassEmbedded > 0 |
| #122 | ? ` Current pass handled ${currentPassEmbedded}.` |
| #123 | : ''; |
| #124 | rows.push({ |
| #125 | id: 'book-source-audit', |
| #126 | title: 'Book/source ingest', |
| #127 | status: book.status ?? 'watching', |
| #128 | message: `Book ingest ${accountedDocuments}/${totalDocuments || '?'} accounted; ${missingDocuments} missing; ${needsOcr} OCR pending.${passNote}`, |
| #129 | progress: totalDocuments ? Math.round((accountedDocuments / totalDocuments) * 1000) / 10 : null, |
| #130 | updated_at: book.updated_at, |
| #131 | source: BOOK_AUDIT_PROGRESS, |
| #132 | detail: { |
| #133 | ...book, |
| #134 | accounted_documents: accountedDocuments, |
| #135 | current_pass_embedded: currentPassEmbedded, |
| #136 | }, |
| #137 | }); |
| #138 | } |
| #139 | if (catalog) { |
| #140 | const acquire = catalog.acquire_summary ?? catalog.acquire ?? {}; |
| #141 | const embed = catalog.embed_summary ?? catalog.embed ?? {}; |
| #142 | rows.push({ |
| #143 | id: 'catalog-hunter', |
| #144 | title: 'Catalog hunter', |
| #145 | status: catalog.status ?? 'watching', |
| #146 | message: `Catalog hunter ${catalog.status ?? 'watching'}${catalog.current_target ? ` · ${catalog.current_target}` : ''}: acquired ${acquire.acquired ?? 0}, embedded ${embed.embedded ?? 0}, chunks ${embed.chunks_added ?? 0}.`, |
| #147 | progress: catalog.current_index ?? null, |
| #148 | updated_at: catalog.updated_at, |
| #149 | source: CATALOG_HUNTER_PROGRESS, |
| #150 | detail: catalog, |
| #151 | }); |
| #152 | } |
| #153 | return [...rows, ...supervisorRows]; |
| #154 | } |
| #155 | |
| #156 | async function readTasksFromDir(dir: string, status: string) { |
| #157 | const fullPath = join(AETHON_TASK_BASE, dir); |
| #158 | try { |
| #159 | const files = await fs.readdir(fullPath); |
| #160 | const tasks = await Promise.all( |
| #161 | files |
| #162 | .filter(f => f.endsWith('.md')) |
| #163 | .map(async (f) => { |
| #164 | const p = join(fullPath, f); |
| #165 | const stat = await fs.stat(p); |
| #166 | const content = await fs.readFile(p, 'utf-8'); |
| #167 | const titleMatch = content.match(/^#\s+(.+)/m); |
| #168 | const priorityMatch = content.match(/Priority:\s*(\w+)/i); |
| #169 | const typeMatch = content.match(/Type:\s*(\S+)/i); |
| #170 | const tierMatch = content.match(/Tier 1:\s*([^\n|]+)/i); |
| #171 | return { |
| #172 | id: basename(f, '.md'), |
| #173 | file: f, |
| #174 | status, |
| #175 | title: titleMatch?.[1]?.trim() ?? f, |
| #176 | priority: priorityMatch?.[1]?.trim() ?? 'MEDIUM', |
| #177 | type: typeMatch?.[1]?.trim() ?? 'unknown', |
| #178 | tier1: tierMatch?.[1]?.trim() ?? '—', |
| #179 | mtime: stat.mtimeMs, |
| #180 | size: stat.size, |
| #181 | }; |
| #182 | }) |
| #183 | ); |
| #184 | return tasks; |
| #185 | } catch { |
| #186 | return []; |
| #187 | } |
| #188 | } |
| #189 | |
| #190 | export async function GET() { |
| #191 | const [queued, inProgress, completed, latestProgress, workerStatus] = await Promise.all([ |
| #192 | readTasksFromDir('queued', 'queued'), |
| #193 | readTasksFromDir('in_progress', 'in_progress'), |
| #194 | readTasksFromDir('completed', 'completed'), |
| #195 | latestProgressEvent(), |
| #196 | workerStatusRows(), |
| #197 | ]); |
| #198 | const liveStatus = latestProgress ? { |
| #199 | ...latestProgress, |
| #200 | message: progressMessage(latestProgress), |
| #201 | } : null; |
| #202 | |
| #203 | return NextResponse.json({ |
| #204 | queued, |
| #205 | inProgress: inProgress.map(task => ({ ...task, liveStatus: liveStatus?.message ?? '' })), |
| #206 | completed: completed.sort((a, b) => b.mtime - a.mtime).slice(0, 10), |
| #207 | totals: { queued: queued.length, inProgress: inProgress.length, completed: completed.length }, |
| #208 | liveStatus, |
| #209 | workerStatus, |
| #210 | }); |
| #211 | } |
| #212 |