repositories
loading repo index
repositories
loading repo index
repository
loading code, commits, and activity
public Clawd ADK gateway launch mirror
stars
latest
clone command
git clone gitlawb://did:key:z6Mkq5mY...iFZ5/my-project-publ...git clone gitlawb://did:key:z6Mkq5mY.../my-project-publ...2fa351d6docs: add automaton and perps launch sources16d ago| #1 | import concurrent.futures |
| #2 | import logging |
| #3 | import os |
| #4 | from string import Template |
| #5 | from typing import Optional |
| #6 | |
| #7 | import numpy as np |
| #8 | from openai import OpenAI |
| #9 | from tqdm import tqdm |
| #10 | |
| #11 | from embedchain.config.evaluation.base import GroundednessConfig |
| #12 | from embedchain.evaluation.base import BaseMetric |
| #13 | from embedchain.utils.evaluation import EvalData, EvalMetric |
| #14 | |
| #15 | logger = logging.getLogger(__name__) |
| #16 | |
| #17 | |
| #18 | class Groundedness(BaseMetric): |
| #19 | """ |
| #20 | Metric for groundedness of answer from the given contexts. |
| #21 | """ |
| #22 | |
| #23 | def __init__(self, config: Optional[GroundednessConfig] = None): |
| #24 | super().__init__(name=EvalMetric.GROUNDEDNESS.value) |
| #25 | self.config = config or GroundednessConfig() |
| #26 | api_key = self.config.api_key or os.getenv("OPENAI_API_KEY") |
| #27 | if not api_key: |
| #28 | raise ValueError("Please set the OPENAI_API_KEY environment variable or pass the `api_key` in config.") |
| #29 | self.client = OpenAI(api_key=api_key) |
| #30 | |
| #31 | def _generate_answer_claim_prompt(self, data: EvalData) -> str: |
| #32 | """ |
| #33 | Generate the prompt for the given data. |
| #34 | """ |
| #35 | prompt = Template(self.config.answer_claims_prompt).substitute(question=data.question, answer=data.answer) |
| #36 | return prompt |
| #37 | |
| #38 | def _get_claim_statements(self, prompt: str) -> np.ndarray: |
| #39 | """ |
| #40 | Get claim statements from the answer. |
| #41 | """ |
| #42 | response = self.client.chat.completions.create( |
| #43 | model=self.config.model, |
| #44 | messages=[{"role": "user", "content": f"{prompt}"}], |
| #45 | ) |
| #46 | result = response.choices[0].message.content.strip() |
| #47 | claim_statements = np.array([statement for statement in result.split("\n") if statement]) |
| #48 | return claim_statements |
| #49 | |
| #50 | def _generate_claim_inference_prompt(self, data: EvalData, claim_statements: list[str]) -> str: |
| #51 | """ |
| #52 | Generate the claim inference prompt for the given data and claim statements. |
| #53 | """ |
| #54 | prompt = Template(self.config.claims_inference_prompt).substitute( |
| #55 | context="\n".join(data.contexts), claim_statements="\n".join(claim_statements) |
| #56 | ) |
| #57 | return prompt |
| #58 | |
| #59 | def _get_claim_verdict_scores(self, prompt: str) -> np.ndarray: |
| #60 | """ |
| #61 | Get verdicts for claim statements. |
| #62 | """ |
| #63 | response = self.client.chat.completions.create( |
| #64 | model=self.config.model, |
| #65 | messages=[{"role": "user", "content": f"{prompt}"}], |
| #66 | ) |
| #67 | result = response.choices[0].message.content.strip() |
| #68 | claim_verdicts = result.split("\n") |
| #69 | verdict_score_map = {"1": 1, "0": 0, "-1": np.nan} |
| #70 | verdict_scores = np.array([verdict_score_map[verdict] for verdict in claim_verdicts]) |
| #71 | return verdict_scores |
| #72 | |
| #73 | def _compute_score(self, data: EvalData) -> float: |
| #74 | """ |
| #75 | Compute the groundedness score for a single data point. |
| #76 | """ |
| #77 | answer_claims_prompt = self._generate_answer_claim_prompt(data) |
| #78 | claim_statements = self._get_claim_statements(answer_claims_prompt) |
| #79 | |
| #80 | claim_inference_prompt = self._generate_claim_inference_prompt(data, claim_statements) |
| #81 | verdict_scores = self._get_claim_verdict_scores(claim_inference_prompt) |
| #82 | return np.sum(verdict_scores) / claim_statements.size |
| #83 | |
| #84 | def evaluate(self, dataset: list[EvalData]): |
| #85 | """ |
| #86 | Evaluate the dataset and returns the average groundedness score. |
| #87 | """ |
| #88 | results = [] |
| #89 | |
| #90 | with concurrent.futures.ThreadPoolExecutor() as executor: |
| #91 | future_to_data = {executor.submit(self._compute_score, data): data for data in dataset} |
| #92 | for future in tqdm( |
| #93 | concurrent.futures.as_completed(future_to_data), |
| #94 | total=len(future_to_data), |
| #95 | desc="Evaluating Groundedness", |
| #96 | ): |
| #97 | data = future_to_data[future] |
| #98 | try: |
| #99 | score = future.result() |
| #100 | results.append(score) |
| #101 | except Exception as e: |
| #102 | logger.error(f"Error while evaluating groundedness for data point {data}: {e}") |
| #103 | |
| #104 | return np.mean(results) if results else 0.0 |
| #105 |