repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | """ |
| #2 | Borrowed from https://github.com/WujiangXu/AgenticMemory/blob/main/utils.py |
| #3 | |
| #4 | @article{xu2025mem, |
| #5 | title={A-mem: Agentic memory for llm agents}, |
| #6 | author={Xu, Wujiang and Liang, Zujie and Mei, Kai and Gao, Hang and Tan, Juntao |
| #7 | and Zhang, Yongfeng}, |
| #8 | journal={arXiv preprint arXiv:2502.12110}, |
| #9 | year={2025} |
| #10 | } |
| #11 | """ |
| #12 | |
| #13 | import statistics |
| #14 | from collections import defaultdict |
| #15 | from typing import Dict, List, Union |
| #16 | |
| #17 | import nltk |
| #18 | from bert_score import score as bert_score |
| #19 | from nltk.translate.bleu_score import SmoothingFunction, sentence_bleu |
| #20 | from nltk.translate.meteor_score import meteor_score |
| #21 | from rouge_score import rouge_scorer |
| #22 | from sentence_transformers import SentenceTransformer |
| #23 | |
| #24 | # from load_dataset import load_locomo_dataset, QA, Turn, Session, Conversation |
| #25 | from sentence_transformers.util import pytorch_cos_sim |
| #26 | |
| #27 | # Download required NLTK data |
| #28 | try: |
| #29 | nltk.download("punkt", quiet=True) |
| #30 | nltk.download("wordnet", quiet=True) |
| #31 | except Exception as e: |
| #32 | print(f"Error downloading NLTK data: {e}") |
| #33 | |
| #34 | # Initialize SentenceTransformer model (this will be reused) |
| #35 | try: |
| #36 | sentence_model = SentenceTransformer("all-MiniLM-L6-v2") |
| #37 | except Exception as e: |
| #38 | print(f"Warning: Could not load SentenceTransformer model: {e}") |
| #39 | sentence_model = None |
| #40 | |
| #41 | |
| #42 | def simple_tokenize(text): |
| #43 | """Simple tokenization function.""" |
| #44 | # Convert to string if not already |
| #45 | text = str(text) |
| #46 | return text.lower().replace(".", " ").replace(",", " ").replace("!", " ").replace("?", " ").split() |
| #47 | |
| #48 | |
| #49 | def calculate_rouge_scores(prediction: str, reference: str) -> Dict[str, float]: |
| #50 | """Calculate ROUGE scores for prediction against reference.""" |
| #51 | scorer = rouge_scorer.RougeScorer(["rouge1", "rouge2", "rougeL"], use_stemmer=True) |
| #52 | scores = scorer.score(reference, prediction) |
| #53 | return { |
| #54 | "rouge1_f": scores["rouge1"].fmeasure, |
| #55 | "rouge2_f": scores["rouge2"].fmeasure, |
| #56 | "rougeL_f": scores["rougeL"].fmeasure, |
| #57 | } |
| #58 | |
| #59 | |
| #60 | def calculate_bleu_scores(prediction: str, reference: str) -> Dict[str, float]: |
| #61 | """Calculate BLEU scores with different n-gram settings.""" |
| #62 | pred_tokens = nltk.word_tokenize(prediction.lower()) |
| #63 | ref_tokens = [nltk.word_tokenize(reference.lower())] |
| #64 | |
| #65 | weights_list = [(1, 0, 0, 0), (0.5, 0.5, 0, 0), (0.33, 0.33, 0.33, 0), (0.25, 0.25, 0.25, 0.25)] |
| #66 | smooth = SmoothingFunction().method1 |
| #67 | |
| #68 | scores = {} |
| #69 | for n, weights in enumerate(weights_list, start=1): |
| #70 | try: |
| #71 | score = sentence_bleu(ref_tokens, pred_tokens, weights=weights, smoothing_function=smooth) |
| #72 | except Exception as e: |
| #73 | print(f"Error calculating BLEU score: {e}") |
| #74 | score = 0.0 |
| #75 | scores[f"bleu{n}"] = score |
| #76 | |
| #77 | return scores |
| #78 | |
| #79 | |
| #80 | def calculate_bert_scores(prediction: str, reference: str) -> Dict[str, float]: |
| #81 | """Calculate BERTScore for semantic similarity.""" |
| #82 | try: |
| #83 | P, R, F1 = bert_score([prediction], [reference], lang="en", verbose=False) |
| #84 | return {"bert_precision": P.item(), "bert_recall": R.item(), "bert_f1": F1.item()} |
| #85 | except Exception as e: |
| #86 | print(f"Error calculating BERTScore: {e}") |
| #87 | return {"bert_precision": 0.0, "bert_recall": 0.0, "bert_f1": 0.0} |
| #88 | |
| #89 | |
| #90 | def calculate_meteor_score(prediction: str, reference: str) -> float: |
| #91 | """Calculate METEOR score for the prediction.""" |
| #92 | try: |
| #93 | return meteor_score([reference.split()], prediction.split()) |
| #94 | except Exception as e: |
| #95 | print(f"Error calculating METEOR score: {e}") |
| #96 | return 0.0 |
| #97 | |
| #98 | |
| #99 | def calculate_sentence_similarity(prediction: str, reference: str) -> float: |
| #100 | """Calculate sentence embedding similarity using SentenceBERT.""" |
| #101 | if sentence_model is None: |
| #102 | return 0.0 |
| #103 | try: |
| #104 | # Encode sentences |
| #105 | embedding1 = sentence_model.encode([prediction], convert_to_tensor=True) |
| #106 | embedding2 = sentence_model.encode([reference], convert_to_tensor=True) |
| #107 | |
| #108 | # Calculate cosine similarity |
| #109 | similarity = pytorch_cos_sim(embedding1, embedding2).item() |
| #110 | return float(similarity) |
| #111 | except Exception as e: |
| #112 | print(f"Error calculating sentence similarity: {e}") |
| #113 | return 0.0 |
| #114 | |
| #115 | |
| #116 | def calculate_metrics(prediction: str, reference: str) -> Dict[str, float]: |
| #117 | """Calculate comprehensive evaluation metrics for a prediction.""" |
| #118 | # Handle empty or None values |
| #119 | if not prediction or not reference: |
| #120 | return { |
| #121 | "exact_match": 0, |
| #122 | "f1": 0.0, |
| #123 | "rouge1_f": 0.0, |
| #124 | "rouge2_f": 0.0, |
| #125 | "rougeL_f": 0.0, |
| #126 | "bleu1": 0.0, |
| #127 | "bleu2": 0.0, |
| #128 | "bleu3": 0.0, |
| #129 | "bleu4": 0.0, |
| #130 | "bert_f1": 0.0, |
| #131 | "meteor": 0.0, |
| #132 | "sbert_similarity": 0.0, |
| #133 | } |
| #134 | |
| #135 | # Convert to strings if they're not already |
| #136 | prediction = str(prediction).strip() |
| #137 | reference = str(reference).strip() |
| #138 | |
| #139 | # Calculate exact match |
| #140 | exact_match = int(prediction.lower() == reference.lower()) |
| #141 | |
| #142 | # Calculate token-based F1 score |
| #143 | pred_tokens = set(simple_tokenize(prediction)) |
| #144 | ref_tokens = set(simple_tokenize(reference)) |
| #145 | common_tokens = pred_tokens & ref_tokens |
| #146 | |
| #147 | if not pred_tokens or not ref_tokens: |
| #148 | f1 = 0.0 |
| #149 | else: |
| #150 | precision = len(common_tokens) / len(pred_tokens) |
| #151 | recall = len(common_tokens) / len(ref_tokens) |
| #152 | f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0 |
| #153 | |
| #154 | # Calculate all scores |
| #155 | bleu_scores = calculate_bleu_scores(prediction, reference) |
| #156 | |
| #157 | # Combine all metrics |
| #158 | metrics = { |
| #159 | "exact_match": exact_match, |
| #160 | "f1": f1, |
| #161 | **bleu_scores, |
| #162 | } |
| #163 | |
| #164 | return metrics |
| #165 | |
| #166 | |
| #167 | def aggregate_metrics( |
| #168 | all_metrics: List[Dict[str, float]], all_categories: List[int] |
| #169 | ) -> Dict[str, Dict[str, Union[float, Dict[str, float]]]]: |
| #170 | """Calculate aggregate statistics for all metrics, split by category.""" |
| #171 | if not all_metrics: |
| #172 | return {} |
| #173 | |
| #174 | # Initialize aggregates for overall and per-category metrics |
| #175 | aggregates = defaultdict(list) |
| #176 | category_aggregates = defaultdict(lambda: defaultdict(list)) |
| #177 | |
| #178 | # Collect all values for each metric, both overall and per category |
| #179 | for metrics, category in zip(all_metrics, all_categories): |
| #180 | for metric_name, value in metrics.items(): |
| #181 | aggregates[metric_name].append(value) |
| #182 | category_aggregates[category][metric_name].append(value) |
| #183 | |
| #184 | # Calculate statistics for overall metrics |
| #185 | results = {"overall": {}} |
| #186 | |
| #187 | for metric_name, values in aggregates.items(): |
| #188 | results["overall"][metric_name] = { |
| #189 | "mean": statistics.mean(values), |
| #190 | "std": statistics.stdev(values) if len(values) > 1 else 0.0, |
| #191 | "median": statistics.median(values), |
| #192 | "min": min(values), |
| #193 | "max": max(values), |
| #194 | "count": len(values), |
| #195 | } |
| #196 | |
| #197 | # Calculate statistics for each category |
| #198 | for category in sorted(category_aggregates.keys()): |
| #199 | results[f"category_{category}"] = {} |
| #200 | for metric_name, values in category_aggregates[category].items(): |
| #201 | if values: # Only calculate if we have values for this category |
| #202 | results[f"category_{category}"][metric_name] = { |
| #203 | "mean": statistics.mean(values), |
| #204 | "std": statistics.stdev(values) if len(values) > 1 else 0.0, |
| #205 | "median": statistics.median(values), |
| #206 | "min": min(values), |
| #207 | "max": max(values), |
| #208 | "count": len(values), |
| #209 | } |
| #210 | |
| #211 | return results |
| #212 |