跳转到内容

评估指标

MetricType

基类:Enum

Ragas 中指标类型的枚举。

属性

名称 类型 描述
SINGLE_TURN str

表示单轮指标类型。

MULTI_TURN str

表示多轮指标类型。

Metric dataclass

Metric(_required_columns: Dict[MetricType, Set[str]] = dict(), name: str = '')

Bases: ABC

Ragas 中指标的抽象基类。

属性

名称 类型 描述
name str

指标的名称。

required_columns Dict[str, Set[str]]

一个将指标类型名称映射到所需列名称集合的字典。这是一个属性,如果列不在 VALID_COLUMNS 中,则会引发 ValueError

init abstractmethod

init(run_config: RunConfig) -> None

使用给定的运行配置初始化指标。

参数

名称 类型 描述 默认值
run_config RunConfig

指标运行的配置,包括超时和其他设置。

必需
源代码位于 src/ragas/metrics/base.py
@abstractmethod
def init(self, run_config: RunConfig) -> None:
    """
    Initialize the metric with the given run configuration.

    Parameters
    ----------
    run_config : RunConfig
        Configuration for the metric run including timeouts and other settings.
    """
    ...

MetricWithLLM dataclass

MetricWithLLM(_required_columns: Dict[MetricType, Set[str]] = dict(), name: str = '', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = None)

基类:Metric, PromptMixin

一个使用语言模型进行评估的指标类。

属性

名称 类型 描述
llm Optional[BaseRagasLLM]

用于指标的语言模型。BaseRagasLLM 和 InstructorBaseRagasLLM 都在运行时通过鸭子类型被接受(两者都有兼容的方法)。

init

init(run_config: RunConfig) -> None

使用运行配置初始化指标并验证 LLM 是否存在。

参数

名称 类型 描述 默认值
run_config RunConfig

指标运行的配置。

必需

抛出

类型 描述
ValueError

如果未向指标提供 LLM。

源代码位于 src/ragas/metrics/base.py
def init(self, run_config: RunConfig) -> None:
    """
    Initialize the metric with run configuration and validate LLM is present.

    Parameters
    ----------
    run_config : RunConfig
        Configuration for the metric run.

    Raises
    ------
    ValueError
        If no LLM is provided to the metric.
    """
    if self.llm is None:
        raise ValueError(
            f"Metric '{self.name}' has no valid LLM provided (self.llm is None). Please instantiate the metric with an LLM to run."
        )
    # Only BaseRagasLLM has set_run_config method, not InstructorBaseRagasLLM
    if isinstance(self.llm, BaseRagasLLM):
        self.llm.set_run_config(run_config)

train

train(path: str, demonstration_config: Optional[DemonstrationConfig] = None, instruction_config: Optional[InstructionConfig] = None, callbacks: Optional[Callbacks] = None, run_config: Optional[RunConfig] = None, batch_size: Optional[int] = None, with_debugging_logs=False, raise_exceptions: bool = True) -> None

使用本地 JSON 数据训练指标

参数

名称 类型 描述 默认值
path str

本地 JSON 训练数据文件的路径

必需
demonstration_config DemonstrationConfig

示范优化的配置

None
instruction_config InstructionConfig

指令优化的配置

None
callbacks Callbacks

回调函数列表

None
run_config RunConfig

运行配置

None
batch_size int

训练的批处理大小

None
with_debugging_logs bool

启用调试日志

False
raise_exceptions bool

是否在训练期间引发异常

True

抛出

类型 描述
ValueError

如果未提供路径或路径不是 JSON 文件

源代码位于 src/ragas/metrics/base.py
def train(
    self,
    path: str,
    demonstration_config: t.Optional[DemonstrationConfig] = None,
    instruction_config: t.Optional[InstructionConfig] = None,
    callbacks: t.Optional[Callbacks] = None,
    run_config: t.Optional[RunConfig] = None,
    batch_size: t.Optional[int] = None,
    with_debugging_logs=False,
    raise_exceptions: bool = True,
) -> None:
    """
    Train the metric using local JSON data

    Parameters
    ----------
    path : str
        Path to local JSON training data file
    demonstration_config : DemonstrationConfig, optional
        Configuration for demonstration optimization
    instruction_config : InstructionConfig, optional
        Configuration for instruction optimization
    callbacks : Callbacks, optional
        List of callback functions
    run_config : RunConfig, optional
        Run configuration
    batch_size : int, optional
        Batch size for training
    with_debugging_logs : bool, default=False
        Enable debugging logs
    raise_exceptions : bool, default=True
        Whether to raise exceptions during training

    Raises
    ------
    ValueError
        If path is not provided or not a JSON file
    """
    # Validate input parameters
    if not path:
        raise ValueError("Path to training data file must be provided")

    if not path.endswith(".json"):
        raise ValueError("Train data must be in json format")

    run_config = run_config or RunConfig()
    callbacks = callbacks or []

    # Load the dataset from JSON file
    dataset = MetricAnnotation.from_json(path, metric_name=self.name)

    # only optimize the instruction if instruction_config is provided
    if instruction_config is not None:
        self._optimize_instruction(
            instruction_config=instruction_config,
            dataset=dataset,
            callbacks=callbacks,
            run_config=run_config,
            batch_size=batch_size,
            with_debugging_logs=with_debugging_logs,
            raise_exceptions=raise_exceptions,
        )

    # if demonstration_config is provided, optimize the demonstrations
    if demonstration_config is not None:
        self._optimize_demonstration(
            demonstration_config=demonstration_config,
            dataset=dataset,
        )

SingleTurnMetric dataclass

SingleTurnMetric(_required_columns: Dict[MetricType, Set[str]] = dict(), name: str = '')

基类:Metric

用于评估单轮交互的指标类。

此类提供了同步和异步对单轮样本进行评分的方法。

single_turn_score

single_turn_score(sample: SingleTurnSample, callbacks: Callbacks = None) -> float

同步对单轮样本进行评分。

如果在类 Jupyter 环境中未安装 nest_asyncio,可能会引发 ImportError。

源代码位于 src/ragas/metrics/base.py
def single_turn_score(
    self,
    sample: SingleTurnSample,
    callbacks: Callbacks = None,
) -> float:
    """
    Synchronously score a single-turn sample.

    May raise ImportError if nest_asyncio is not installed in a Jupyter-like environment.
    """
    callbacks = callbacks or []
    # only get the required columns
    sample = self._only_required_columns_single_turn(sample)
    rm, group_cm = new_group(
        self.name,
        inputs=sample.to_dict(),
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )

    async def _async_wrapper():
        try:
            result = await self._single_turn_ascore(
                sample=sample, callbacks=group_cm
            )
        except Exception as e:
            if not group_cm.ended:
                rm.on_chain_error(e)
            raise e
        else:
            if not group_cm.ended:
                rm.on_chain_end({"output": result})
            return result

    apply_nest_asyncio()
    score = run(_async_wrapper)

    # track the evaluation event
    _analytics_batcher.add_evaluation(
        EvaluationEvent(
            metrics=[self.name],
            num_rows=1,
            evaluation_type=MetricType.SINGLE_TURN.name,
            language=get_metric_language(self),
        )
    )
    return score

single_turn_ascore async

single_turn_ascore(sample: SingleTurnSample, callbacks: Callbacks = None, timeout: Optional[float] = None) -> float

异步对单轮样本进行评分,可选择超时。

如果评分过程超过指定的超时时间,可能会引发 asyncio.TimeoutError。

源代码位于 src/ragas/metrics/base.py
async def single_turn_ascore(
    self,
    sample: SingleTurnSample,
    callbacks: Callbacks = None,
    timeout: t.Optional[float] = None,
) -> float:
    """
    Asynchronously score a single-turn sample with an optional timeout.

    May raise asyncio.TimeoutError if the scoring process exceeds the specified timeout.
    """
    callbacks = callbacks or []
    # only get the required columns
    sample = self._only_required_columns_single_turn(sample)
    rm, group_cm = new_group(
        self.name,
        inputs=sample.to_dict(),
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )
    try:
        score = await asyncio.wait_for(
            self._single_turn_ascore(sample=sample, callbacks=group_cm),
            timeout=timeout,
        )
    except Exception as e:
        if not group_cm.ended:
            rm.on_chain_error(e)
        raise e
    else:
        if not group_cm.ended:
            rm.on_chain_end({"output": score})

    # track the evaluation event
    _analytics_batcher.add_evaluation(
        EvaluationEvent(
            metrics=[self.name],
            num_rows=1,
            evaluation_type=MetricType.SINGLE_TURN.name,
            language=get_metric_language(self),
        )
    )
    return score

MultiTurnMetric dataclass

MultiTurnMetric(_required_columns: Dict[MetricType, Set[str]] = dict(), name: str = '')

基类:Metric

用于评估多轮对话的指标类。

此类扩展了基础 Metric 类,以提供对多轮对话样本进行评分的功能。

multi_turn_score

multi_turn_score(sample: MultiTurnSample, callbacks: Callbacks = None) -> float

同步对多轮对话样本进行评分。

如果在类 Jupyter 环境中未安装 nest_asyncio,可能会引发 ImportError。

源代码位于 src/ragas/metrics/base.py
def multi_turn_score(
    self,
    sample: MultiTurnSample,
    callbacks: Callbacks = None,
) -> float:
    """
    Score a multi-turn conversation sample synchronously.

    May raise ImportError if nest_asyncio is not installed in Jupyter-like environments.
    """
    callbacks = callbacks or []
    sample = self._only_required_columns_multi_turn(sample)
    rm, group_cm = new_group(
        self.name,
        inputs=sample.to_dict(),
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )

    async def _async_wrapper():
        try:
            result = await self._multi_turn_ascore(
                sample=sample, callbacks=group_cm
            )
        except Exception as e:
            if not group_cm.ended:
                rm.on_chain_error(e)
            raise e
        else:
            if not group_cm.ended:
                rm.on_chain_end({"output": result})
            return result

    apply_nest_asyncio()
    score = run(_async_wrapper)

    # track the evaluation event
    _analytics_batcher.add_evaluation(
        EvaluationEvent(
            metrics=[self.name],
            num_rows=1,
            evaluation_type=MetricType.SINGLE_TURN.name,
            language=get_metric_language(self),
        )
    )
    return score

multi_turn_ascore async

multi_turn_ascore(sample: MultiTurnSample, callbacks: Callbacks = None, timeout: Optional[float] = None) -> float

异步对多轮对话样本进行评分。

如果评分过程超过指定的超时时间,可能会引发 asyncio.TimeoutError。

源代码位于 src/ragas/metrics/base.py
async def multi_turn_ascore(
    self,
    sample: MultiTurnSample,
    callbacks: Callbacks = None,
    timeout: t.Optional[float] = None,
) -> float:
    """
    Score a multi-turn conversation sample asynchronously.

    May raise asyncio.TimeoutError if the scoring process exceeds the specified timeout.
    """
    callbacks = callbacks or []
    sample = self._only_required_columns_multi_turn(sample)

    rm, group_cm = new_group(
        self.name,
        inputs=sample.to_dict(),
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )
    try:
        score = await asyncio.wait_for(
            self._multi_turn_ascore(sample=sample, callbacks=group_cm),
            timeout=timeout,
        )
    except Exception as e:
        if not group_cm.ended:
            rm.on_chain_error(e)
        raise e
    else:
        if not group_cm.ended:
            rm.on_chain_end({"output": score})

    # track the evaluation event
    _analytics_batcher.add_evaluation(
        EvaluationEvent(
            metrics=[self.name],
            num_rows=1,
            evaluation_type=MetricType.SINGLE_TURN.name,
            language=get_metric_language(self),
        )
    )

    return score

Ensember

将同一输入(n>1)的多个 llm 输出合并为单个输出

from_discrete

from_discrete(inputs: list[list[Dict]], attribute: str) -> List[Dict]

对二进制值进行简单多数投票,例如 [0,0,1] -> 0 输入:包含单个输入判决的字典列表的列表

源代码位于 src/ragas/metrics/base.py
def from_discrete(
    self, inputs: list[list[t.Dict]], attribute: str
) -> t.List[t.Dict]:
    """
    Simple majority voting for binary values, ie [0,0,1] -> 0
    inputs: list of list of dicts each containing verdict for a single input
    """

    if not isinstance(inputs, list):
        inputs = [inputs]

    if not all(len(item) == len(inputs[0]) for item in inputs):
        logger.warning("All inputs must have the same length")
        return inputs[0]

    if not all(attribute in item for input in inputs for item in input):
        logger.warning(f"All inputs must have {attribute} attribute")
        return inputs[0]

    if len(inputs) == 1:
        return inputs[0]

    verdict_agg = []
    for i in range(len(inputs[0])):
        item = inputs[0][i]
        verdicts = [inputs[k][i][attribute] for k in range(len(inputs))]
        verdict_counts = dict(Counter(verdicts).most_common())
        item[attribute] = list(verdict_counts.keys())[0]
        verdict_agg.append(item)

    return verdict_agg

SimpleBaseMetric dataclass

SimpleBaseMetric(name: str, allowed_values: AllowedValuesType = (lambda: ['pass', 'fail'])())

Bases: ABC

返回 MetricResult 对象的简单指标的基类。

此类为评估输入并返回包含分数和推理的结构化 MetricResult 对象的指标提供了基础。

属性

名称 类型 描述
name str

指标的名称。

allowed_values AllowedValuesType

指标输出的允许值。对于离散指标,可以是一个字符串列表;对于数值指标,可以是一个浮点数元组;对于排名指标,可以是一个整数。

示例

>>> from ragas.metrics import discrete_metric
>>>
>>> @discrete_metric(name="sentiment", allowed_values=["positive", "negative"])
>>> def sentiment_metric(user_input: str, response: str) -> str:
...     return "positive" if "good" in response else "negative"
>>>
>>> result = sentiment_metric(user_input="How are you?", response="I'm good!")
>>> print(result.value)  # "positive"

score abstractmethod

score(**kwargs) -> 'MetricResult'

同步计算指标分数。

参数

名称 类型 描述 默认值
**kwargs dict

特定指标实现所需的输入参数。

{}

返回

类型 描述
MetricResult

包含分数和推理的评估结果。

源代码位于 src/ragas/metrics/base.py
@abstractmethod
def score(self, **kwargs) -> "MetricResult":
    """
    Synchronously calculate the metric score.

    Parameters
    ----------
    **kwargs : dict
        Input parameters required by the specific metric implementation.

    Returns
    -------
    MetricResult
        The evaluation result containing the score and reasoning.
    """
    pass

ascore abstractmethod async

ascore(**kwargs) -> 'MetricResult'

异步计算指标分数。

参数

名称 类型 描述 默认值
**kwargs dict

特定指标实现所需的输入参数。

{}

返回

类型 描述
MetricResult

包含分数和推理的评估结果。

源代码位于 src/ragas/metrics/base.py
@abstractmethod
async def ascore(self, **kwargs) -> "MetricResult":
    """
    Asynchronously calculate the metric score.

    Parameters
    ----------
    **kwargs : dict
        Input parameters required by the specific metric implementation.

    Returns
    -------
    MetricResult
        The evaluation result containing the score and reasoning.
    """
    pass

batch_score

batch_score(inputs: List[Dict[str, Any]]) -> List['MetricResult']

同步计算一批输入的分数。

参数

名称 类型 描述 默认值
inputs List[Dict[str, Any]]

输入字典的列表,每个字典包含指标的参数。

必需

返回

类型 描述
List[MetricResult]

评估结果的列表,每个输入对应一个。

源代码位于 src/ragas/metrics/base.py
def batch_score(
    self,
    inputs: t.List[t.Dict[str, t.Any]],
) -> t.List["MetricResult"]:
    """
    Synchronously calculate scores for a batch of inputs.

    Parameters
    ----------
    inputs : List[Dict[str, Any]]
        List of input dictionaries, each containing parameters for the metric.

    Returns
    -------
    List[MetricResult]
        List of evaluation results, one for each input.
    """
    return [self.score(**input_dict) for input_dict in inputs]

abatch_score async

abatch_score(inputs: List[Dict[str, Any]]) -> List['MetricResult']

并行异步计算一批输入的分数。

参数

名称 类型 描述 默认值
inputs List[Dict[str, Any]]

输入字典的列表,每个字典包含指标的参数。

必需

返回

类型 描述
List[MetricResult]

评估结果的列表,每个输入对应一个。

源代码位于 src/ragas/metrics/base.py
async def abatch_score(
    self,
    inputs: t.List[t.Dict[str, t.Any]],
) -> t.List["MetricResult"]:
    """
    Asynchronously calculate scores for a batch of inputs in parallel.

    Parameters
    ----------
    inputs : List[Dict[str, Any]]
        List of input dictionaries, each containing parameters for the metric.

    Returns
    -------
    List[MetricResult]
        List of evaluation results, one for each input.
    """
    async_tasks = []
    for input_dict in inputs:
        # Process input asynchronously
        async_tasks.append(self.ascore(**input_dict))

    # Run all tasks concurrently and return results
    return await asyncio.gather(*async_tasks)

SimpleLLMMetric dataclass

SimpleLLMMetric(name: str, allowed_values: AllowedValuesType = (lambda: ['pass', 'fail'])(), prompt: Optional[Union[str, 'Prompt']] = None)

基类:SimpleBaseMetric

基于 LLM 的指标,使用提示来生成结构化响应。

save

save(path: Optional[str] = None) -> None

将指标配置保存到 JSON 文件。

参数

path : str, optional 要保存到的文件路径。如果未提供,则保存到 "./{metric.name}.json"。使用 .gz 扩展名进行压缩。

注意

如果指标有 response_model,它的模式将被保存以供参考,但模型本身无法序列化。加载时需要提供它。

示例

所有这些都有效

metric.save() # → ./response_quality.json metric.save("custom.json") # → ./custom.json metric.save("/path/to/metrics/") # → /path/to/metrics/response_quality.json metric.save("no_extension") # → ./no_extension.json metric.save("compressed.json.gz") # → ./compressed.json.gz (压缩)

源代码位于 src/ragas/metrics/base.py
def save(self, path: t.Optional[str] = None) -> None:
    """
    Save the metric configuration to a JSON file.

    Parameters:
    -----------
    path : str, optional
        File path to save to. If not provided, saves to "./{metric.name}.json"
        Use .gz extension for compression.

    Note:
    -----
    If the metric has a response_model, its schema will be saved for reference
    but the model itself cannot be serialized. You'll need to provide it when loading.

    Examples:
    ---------
    All these work:
    >>> metric.save()                      # → ./response_quality.json
    >>> metric.save("custom.json")         # → ./custom.json
    >>> metric.save("/path/to/metrics/")   # → /path/to/metrics/response_quality.json
    >>> metric.save("no_extension")        # → ./no_extension.json
    >>> metric.save("compressed.json.gz")  # → ./compressed.json.gz (compressed)
    """
    import gzip
    import json
    import warnings
    from pathlib import Path

    # Handle default path
    if path is None:
        # Default to current directory with metric name as filename
        file_path = Path(f"./{self.name}.json")
    else:
        file_path = Path(path)

        # If path is a directory, append the metric name as filename
        if file_path.is_dir():
            file_path = file_path / f"{self.name}.json"
        # If path has no extension, add .json
        elif not file_path.suffix:
            file_path = file_path.with_suffix(".json")

    # Collect warning messages for data loss
    warning_messages = []

    if hasattr(self, "_response_model") and self._response_model:
        # Only warn for custom response models, not auto-generated ones
        if not getattr(self._response_model, "__ragas_auto_generated__", False):
            warning_messages.append(
                "- Custom response_model will be lost (set it manually after loading)"
            )

    # Serialize the prompt (may add embedding_model warning)
    prompt_data = self._serialize_prompt(warning_messages)

    # Determine the metric type
    metric_type = self.__class__.__name__

    # Get metric-specific config
    config = self._get_metric_config()

    # Emit consolidated warning if there's data loss
    if warning_messages:
        warnings.warn(
            "Some metric components cannot be saved and will be lost:\n"
            + "\n".join(warning_messages)
            + "\n\nYou'll need to provide these when loading the metric."
        )

    data = {
        "format_version": "1.0",
        "metric_type": metric_type,
        "name": self.name,
        "prompt": prompt_data,
        "config": config,
        "response_model_info": self._serialize_response_model_info(),
    }
    try:
        if file_path.suffix == ".gz":
            with gzip.open(file_path, "wt", encoding="utf-8") as f:
                json.dump(data, f, indent=2)
        else:
            with open(file_path, "w", encoding="utf-8") as f:
                json.dump(data, f, indent=2)
    except (OSError, IOError) as e:
        raise ValueError(f"Cannot save metric to {file_path}: {e}")

load classmethod

load(path: str, response_model: Optional[Type['BaseModel']] = None, embedding_model: Optional['EmbeddingModelType'] = None) -> 'SimpleLLMMetric'

从 JSON 文件加载指标。

参数

path : str 要从中加载的文件路径。支持 .gz 压缩文件。 response_model : Optional[Type[BaseModel]] 用于响应验证的 Pydantic 模型。自定义 SimpleLLMMetrics 需要。 embedding_model : Optional[Any] 用于 DynamicFewShotPrompt 的嵌入模型。如果原始模型使用过,则需要。

返回

SimpleLLMMetric 加载的指标实例

抛出

ValueError 如果文件无法加载、无效或缺少所需的模型

源代码位于 src/ragas/metrics/base.py
@classmethod
def load(
    cls,
    path: str,
    response_model: t.Optional[t.Type["BaseModel"]] = None,
    embedding_model: t.Optional["EmbeddingModelType"] = None,
) -> "SimpleLLMMetric":
    """
    Load a metric from a JSON file.

    Parameters:
    -----------
    path : str
        File path to load from. Supports .gz compressed files.
    response_model : Optional[Type[BaseModel]]
        Pydantic model to use for response validation. Required for custom SimpleLLMMetrics.
    embedding_model : Optional[Any]
        Embedding model for DynamicFewShotPrompt. Required if the original used one.

    Returns:
    --------
    SimpleLLMMetric
        Loaded metric instance

    Raises:
    -------
    ValueError
        If file cannot be loaded, is invalid, or missing required models
    """
    import gzip
    import json
    from pathlib import Path

    file_path = Path(path)

    # Load JSON data
    try:
        if file_path.suffix == ".gz":
            with gzip.open(file_path, "rt", encoding="utf-8") as f:
                data = json.load(f)
        else:
            with open(file_path, "r", encoding="utf-8") as f:
                data = json.load(f)
    except (FileNotFoundError, json.JSONDecodeError, OSError) as e:
        raise ValueError(f"Cannot load metric from {path}: {e}")

    # Validate format
    if data.get("format_version") != "1.0":
        import warnings

        warnings.warn(
            f"Loading metric with format version {data.get('format_version')}, expected 1.0"
        )

    # Reconstruct the prompt
    prompt = cls._deserialize_prompt(data["prompt"], embedding_model)

    # Get config
    config = data.get("config", {})

    # Create the metric instance
    metric = cls(name=data["name"], prompt=prompt, **config)

    # Set response model if provided
    if response_model:
        metric._response_model = response_model

    return metric

get_correlation abstractmethod

get_correlation(gold_labels: List[str], predictions: List[str]) -> float

计算黄金分数和预测分数之间的相关性。这是一个占位符方法,应根据具体指标实现。

源代码位于 src/ragas/metrics/base.py
@abstractmethod
def get_correlation(
    self, gold_labels: t.List[str], predictions: t.List[str]
) -> float:
    """
    Calculate the correlation between gold scores and predicted scores.
    This is a placeholder method and should be implemented based on the specific metric.
    """
    pass

align_and_validate

align_and_validate(dataset: 'Dataset', embedding_model: 'EmbeddingModelType', llm: 'BaseRagasLLM', test_size: float = 0.2, random_state: int = 42, **kwargs: Dict[str, Any])

参数: dataset:要与指标对齐的实验。 embedding_model:用于动态少样本提示的嵌入模型。 llm:用于评分的 LLM 实例。

将指标与指定的实验对齐,并对照黄金标准实验进行验证。此方法将对齐和验证合并为一个步骤。

源代码位于 src/ragas/metrics/base.py
def align_and_validate(
    self,
    dataset: "Dataset",
    embedding_model: "EmbeddingModelType",
    llm: "BaseRagasLLM",
    test_size: float = 0.2,
    random_state: int = 42,
    **kwargs: t.Dict[str, t.Any],
):
    """
    Args:
        dataset: experiment to align the metric with.
        embedding_model: The embedding model used for dynamic few-shot prompting.
        llm: The LLM instance to use for scoring.

    Align the metric with the specified experiments and validate it against a gold standard experiment.
    This method combines alignment and validation into a single step.
    """
    train_dataset, test_dataset = dataset.train_test_split(
        test_size=test_size, random_state=random_state
    )

    self.align(train_dataset, embedding_model, **kwargs)  # type: ignore
    return self.validate_alignment(llm, test_dataset)  # type: ignore

align

align(train_dataset: 'Dataset', embedding_model: 'EmbeddingModelType', **kwargs: Dict[str, Any])

参数: train_dataset:要与指标对齐的 train_dataset。 embedding_model:用于动态少样本提示的嵌入模型。

通过不同的优化方法将指标与指定的实验对齐。

源代码位于 src/ragas/metrics/base.py
def align(
    self,
    train_dataset: "Dataset",
    embedding_model: "EmbeddingModelType",
    **kwargs: t.Dict[str, t.Any],
):
    """
    Args:
        train_dataset: train_dataset to align the metric with.
        embedding_model: The embedding model used for dynamic few-shot prompting.

    Align the metric with the specified experiments by different optimization methods.
    """

    # get prompt
    if not self.prompt:
        raise Exception("prompt not passed")
    from ragas.prompt.simple_prompt import Prompt

    self.prompt = (
        self.prompt if isinstance(self.prompt, Prompt) else Prompt(self.prompt)
    )
    # Extract specific parameters for from_prompt method
    max_similar_examples_val = kwargs.get("max_similar_examples", 3)
    similarity_threshold_val = kwargs.get("similarity_threshold", 0.7)
    max_similar_examples = (
        int(max_similar_examples_val)
        if isinstance(max_similar_examples_val, (int, str))
        else 3
    )
    similarity_threshold = (
        float(similarity_threshold_val)
        if isinstance(similarity_threshold_val, (int, float, str))
        else 0.7
    )
    # Convert BaseRagasEmbeddings to BaseRagasEmbedding if needed
    if hasattr(embedding_model, "embed_query"):
        # For legacy BaseRagasEmbeddings, we need to wrap it
        # Create a wrapper that implements BaseRagasEmbedding interface
        class EmbeddingWrapper:
            def __init__(self, legacy_embedding):
                self.legacy_embedding = legacy_embedding

            def embed_text(self, text: str, **kwargs) -> t.List[float]:
                return self.legacy_embedding.embed_query(text)

            async def aembed_text(self, text: str, **kwargs) -> t.List[float]:
                return await self.legacy_embedding.aembed_query(text)

        actual_embedding_model = EmbeddingWrapper(embedding_model)
    else:
        # Already BaseRagasEmbedding
        actual_embedding_model = embedding_model

    from ragas.prompt.dynamic_few_shot import DynamicFewShotPrompt

    self.prompt = DynamicFewShotPrompt.from_prompt(
        self.prompt,
        actual_embedding_model,  # type: ignore[arg-type]
        max_similar_examples,
        similarity_threshold,
    )
    train_dataset.reload()
    total_items = len(train_dataset)
    input_vars = self.get_variables()
    output_vars = [self.name, f"{self.name}_reason"]

    from rich.progress import Progress

    with Progress() as progress:
        task = progress.add_task("Processing examples", total=total_items)
        for row in train_dataset:
            inputs = {
                var: train_dataset.get_row_value(row, var) for var in input_vars
            }
            inputs = {k: v for k, v in inputs.items() if v is not None}
            output = {
                var: train_dataset.get_row_value(row, var) for var in output_vars
            }
            output = {k: v for k, v in output.items() if v is not None}

            if output:
                self.prompt.add_example(inputs, output)
            progress.update(task, advance=1)

validate_alignment

validate_alignment(llm: 'BaseRagasLLM', test_dataset: 'Dataset', mapping: Dict[str, str] = {})

参数: llm:用于评分的 LLM 实例。 test_dataset:包含黄金标准分数的 Dataset 实例。 mapping:一个字典,将指标期望的变量名映射到黄金实验中对应的名称。

通过将分数与黄金标准实验进行比较来验证指标的对齐情况。此方法计算黄金标准分数和指标预测分数之间的科恩 Kappa 分数和一致性率。

源代码位于 src/ragas/metrics/base.py
def validate_alignment(
    self,
    llm: "BaseRagasLLM",
    test_dataset: "Dataset",
    mapping: t.Dict[str, str] = {},
):
    """
    Args:
        llm: The LLM instance to use for scoring.
        test_dataset: An Dataset instance containing the gold standard scores.
        mapping: A dictionary mapping variable names expected by metrics to their corresponding names in the gold experiment.

    Validate the alignment of the metric by comparing the scores against a gold standard experiment.
    This method computes the Cohen's Kappa score and agreement rate between the gold standard scores and
    the predicted scores from the metric.
    """

    test_dataset.reload()
    gold_scores_raw = [
        test_dataset.get_row_value(row, self.name) for row in test_dataset
    ]
    pred_scores = []
    for row in test_dataset:
        values = {
            v: (
                test_dataset.get_row_value(row, v)
                if v not in mapping
                else test_dataset.get_row_value(row, mapping.get(v, v))
            )
            for v in self.get_variables()
        }
        score = self.score(llm=llm, **values)
        pred_scores.append(score.value)

    # Convert to strings for correlation calculation, filtering out None values
    gold_scores = [str(score) for score in gold_scores_raw if score is not None]
    pred_scores_str = [str(score) for score in pred_scores if score is not None]

    df = test_dataset.to_pandas()
    df[f"{self.name}_pred"] = pred_scores
    correlation = self.get_correlation(gold_scores, pred_scores_str)
    agreement_rate = sum(
        x == y for x, y in zip(gold_scores, pred_scores_str)
    ) / len(gold_scores)
    return {
        "correlation": correlation,
        "agreement_rate": agreement_rate,
        "df": df,
    }

create_auto_response_model

create_auto_response_model(name: str, **fields) -> Type['BaseModel']

创建一个响应模型,并将其标记为由 Ragas 自动生成。

此函数使用 create_model 创建一个 Pydantic 模型,并用一个特殊属性标记它,以表明它是自动生成的。这使得 save() 方法能够区分自动生成的模型(在加载时重新创建)和自定义用户模型。

参数

名称 类型 描述 默认值
name str

模型类的名称

必需
**fields

以 create_model 格式定义的字段。每个字段指定为:field_name=(type, default_or_field_info)

{}

返回

类型 描述
Type[BaseModel]

标记为自动生成的 Pydantic 模型类

示例

>>> from pydantic import Field
>>> # Simple model with required fields
>>> ResponseModel = create_auto_response_model(
...     "ResponseModel",
...     value=(str, ...),
...     reason=(str, ...)
... )
>>>
>>> # Model with Field validators and descriptions
>>> ResponseModel = create_auto_response_model(
...     "ResponseModel",
...     value=(str, Field(..., description="The predicted value")),
...     reason=(str, Field(..., description="Reasoning for the prediction"))
... )
源代码位于 src/ragas/metrics/base.py
def create_auto_response_model(name: str, **fields) -> t.Type["BaseModel"]:
    """
    Create a response model and mark it as auto-generated by Ragas.

    This function creates a Pydantic model using create_model and marks it
    with a special attribute to indicate it was auto-generated. This allows
    the save() method to distinguish between auto-generated models (which
    are recreated on load) and custom user models.

    Parameters
    ----------
    name : str
        Name for the model class
    **fields
        Field definitions in create_model format.
        Each field is specified as: field_name=(type, default_or_field_info)

    Returns
    -------
    Type[BaseModel]
        Pydantic model class marked as auto-generated

    Examples
    --------
    >>> from pydantic import Field
    >>> # Simple model with required fields
    >>> ResponseModel = create_auto_response_model(
    ...     "ResponseModel",
    ...     value=(str, ...),
    ...     reason=(str, ...)
    ... )
    >>>
    >>> # Model with Field validators and descriptions
    >>> ResponseModel = create_auto_response_model(
    ...     "ResponseModel",
    ...     value=(str, Field(..., description="The predicted value")),
    ...     reason=(str, Field(..., description="Reasoning for the prediction"))
    ... )
    """
    from pydantic import create_model

    model = create_model(name, **fields)
    setattr(model, "__ragas_auto_generated__", True)  # type: ignore[attr-defined]
    return model

AnswerCorrectness dataclass

AnswerCorrectness(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'user_input', 'response', 'reference'}})(), name: str = 'answer_correctness', embeddings: Optional[Union[BaseRagasEmbeddings, BaseRagasEmbedding]] = None, llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = None, correctness_prompt: PydanticPrompt = CorrectnessClassifier(), statement_generator_prompt: PydanticPrompt = StatementGeneratorPrompt(), weights: list[float] = (lambda: [0.75, 0.25])(), beta: float = 1.0, answer_similarity: Optional[AnswerSimilarity] = None, max_retries: int = 1)

基类:MetricWithLLM, MetricWithEmbeddings, SingleTurnMetric

将答案正确性与真实答案进行比较,作为事实性和语义相似性的组合。

属性

名称 类型 描述
name string

指标的名称

weights list[float]

一个包含两个权重的列表,分别对应事实性和语义相似性,默认为 [0.75, 0.25]

answer_similarity Optional[AnswerSimilarity]

AnswerSimilarity 对象

ResponseRelevancy dataclass

ResponseRelevancy(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'user_input', 'response'}})(), name: str = 'answer_relevancy', embeddings: Optional[Union[BaseRagasEmbeddings, BaseRagasEmbedding]] = None, llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = None, question_generation: PydanticPrompt = ResponseRelevancePrompt(), strictness: int = 3)

基类:MetricWithLLM, MetricWithEmbeddings, SingleTurnMetric

根据给定的问题对答案的相关性进行评分。包含不完整、冗余或不必要信息的答案会受到惩罚。分数范围从 0 到 1,1 为最佳。

属性

名称 类型 描述
name string

指标的名称

strictness int

这里表示每个答案生成的问题数量。理想范围在 3 到 5 之间。

embeddings Embedding

Embedding 对象的 langchain 包装器。例如 HuggingFaceEmbeddings('BAAI/bge-base-en')

SemanticSimilarity dataclass

SemanticSimilarity(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'reference', 'response'}})(), name: str = 'semantic_similarity', embeddings: Optional[Union[BaseRagasEmbeddings, BaseRagasEmbedding]] = None, is_cross_encoder: bool = False, threshold: Optional[float] = None)

基类:MetricWithEmbeddings, SingleTurnMetric

对真实答案与生成答案的语义相似性进行评分。使用交叉编码器分数来量化语义相似性。SAS 论文:https://arxiv.org/pdf/2108.06130.pdf

属性

名称 类型 描述
name str
model_name

用于计算语义相似性的模型,默认为 open-ai-embeddings,选择交叉编码器模型以获得最佳结果 https://hugging-face.cn/spaces/mteb/leaderboard

threshold Optional[float]

如果给定阈值,则用于将输出映射为二进制,默认为 0.5

AspectCritic

AspectCritic(name: str, definition: str, llm: Optional[BaseRagasLLM] = None, required_columns: Optional[Dict[MetricType, Set[str]]] = None, output_type: Optional[MetricOutputType] = BINARY, single_turn_prompt: Optional[PydanticPrompt] = None, multi_turn_prompt: Optional[PydanticPrompt] = None, strictness: int = 1, max_retries: int = 1)

基类:MetricWithLLM, SingleTurnMetric, MultiTurnMetric

使用指标定义中指定的标准对提交内容进行评判,给出二进制结果。

属性

名称 类型 描述
name str

指标的名称

definition str

评判提交内容所依据的标准,例如“提交内容是否在传播虚假信息?”

strictness int

进行自我一致性检查的次数。最终评判通过多数投票决定。

源代码位于 src/ragas/metrics/_aspect_critic.py
def __init__(
    self,
    name: str,
    definition: str,
    llm: t.Optional[BaseRagasLLM] = None,
    required_columns: t.Optional[t.Dict[MetricType, t.Set[str]]] = None,
    output_type: t.Optional[MetricOutputType] = MetricOutputType.BINARY,
    single_turn_prompt: t.Optional[PydanticPrompt] = None,
    multi_turn_prompt: t.Optional[PydanticPrompt] = None,
    strictness: int = 1,
    max_retries: int = 1,
):
    self._required_columns = required_columns or {
        MetricType.SINGLE_TURN: {
            "user_input:optional",
            "response:optional",
            "retrieved_contexts:optional",
            "reference:optional",
            "reference_contexts:optional",
        },
        MetricType.MULTI_TURN: {
            "user_input:optional",
            "reference:optional",
        },
    }
    super().__init__(
        name=name,
        _required_columns=self._required_columns,
        llm=llm,
        output_type=output_type,
    )

    self._definition = definition
    self.single_turn_prompt = single_turn_prompt or SingleTurnAspectCriticPrompt()
    self.multi_turn_prompt = multi_turn_prompt or MultiTurnAspectCriticPrompt()
    self.max_retries = max_retries

    # update the instruction for the prompts with the definition
    instruction = f"Evaluate the Input based on the criterial defined. Use only 'Yes' (1) and 'No' (0) as verdict.\nCriteria Definition: {self._definition}"
    self.single_turn_prompt.instruction = instruction
    self.multi_turn_prompt.instruction = instruction

    # ensure odd number of checks to avoid tie in majority vote.
    self.strictness = strictness
    self.strictness = (
        self.strictness if self.strictness % 2 != 0 else self.strictness + 1
    )

ContextEntityRecall dataclass

ContextEntityRecall(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'reference', 'retrieved_contexts'}})(), name: str = 'context_entity_recall', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = None, context_entity_recall_prompt: PydanticPrompt = ExtractEntitiesPrompt(), max_retries: int = 1)

基类:MetricWithLLM, SingleTurnMetric

根据真实答案和上下文中存在的实体计算召回率。令 CN 为上下文中存在的实体集合,GN 为真实答案中存在的实体集合。

那么我们可以定义上下文实体召回率如下:上下文实体召回率 = | CN ∩ GN | / | GN |

如果这个数量为 1,我们可以说检索机制已经检索到了包含真实答案中所有实体的上下文,因此是一次有用的检索。因此,这可以用于在实体很重要的特定用例中评估检索机制,例如,旅游帮助聊天机器人。

属性

名称 类型 描述
name str
batch_size int

openai completion 的批处理大小。

IDBasedContextPrecision dataclass

IDBasedContextPrecision(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'retrieved_context_ids', 'reference_context_ids'}})(), name: str = 'id_based_context_precision', output_type: MetricOutputType = CONTINUOUS)

基类:SingleTurnMetric

通过直接比较检索到的上下文 ID 和参考上下文 ID 来计算上下文精确率。分数表示检索到的上下文 ID 中有多少是真正相关的(存在于参考中)。

此指标同时适用于字符串和整数 ID。

属性

名称 类型 描述
name str

指标的名称

LLMContextPrecisionWithReference dataclass

LLMContextPrecisionWithReference(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'user_input', 'retrieved_contexts', 'reference'}})(), name: str = 'llm_context_precision_with_reference', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = None, context_precision_prompt: PydanticPrompt = ContextPrecisionPrompt(), max_retries: int = 1)

基类:MetricWithLLM, SingleTurnMetric

平均精确率是一个评估模型选择的所有相关项目是否都排名较高的指标。

属性

名称 类型 描述
name str
evaluation_mode EvaluationMode
context_precision_prompt 提示

IDBasedContextRecall dataclass

IDBasedContextRecall(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'retrieved_context_ids', 'reference_context_ids'}})(), name: str = 'id_based_context_recall', output_type: MetricOutputType = CONTINUOUS)

基类:SingleTurnMetric

通过直接比较检索到的上下文 ID 和参考上下文 ID 来计算上下文召回率。分数表示参考 ID 中有多少被成功检索。

此指标同时适用于字符串和整数 ID。

属性

名称 类型 描述
name str

指标的名称

LLMContextRecall dataclass

LLMContextRecall(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'user_input', 'retrieved_contexts', 'reference'}})(), name: str = 'context_recall', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = CONTINUOUS, context_recall_prompt: PydanticPrompt = ContextRecallClassificationPrompt(), max_retries: int = 1)

基类:MetricWithLLM, SingleTurnMetric

通过使用标注的答案和检索到的上下文来估计 TP 和 FN,从而估计上下文召回率。

属性

名称 类型 描述
name str

FactualCorrectness dataclass

FactualCorrectness(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'response', 'reference'}})(), name: str = 'factual_correctness', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = CONTINUOUS, mode: Literal['precision', 'recall', 'f1'] = 'f1', beta: float = 1.0, atomicity: Literal['low', 'high'] = 'low', coverage: Literal['low', 'high'] = 'low', claim_decomposition_prompt: PydanticPrompt = ClaimDecompositionPrompt(), nli_prompt: PydanticPrompt = NLIStatementPrompt(), language: str = 'english')

基类:MetricWithLLM, SingleTurnMetric

FactualCorrectness 是一个评估语言模型生成响应的事实正确性的指标类。它使用声明分解和自然语言推理(NLI)来验证响应中提出的声明与参考文本的一致性。

属性:name (str): 指标的名称,默认为 "factual_correctness"。_required_columns (Dict[MetricType, Set[str]]): 指定每种指标类型所需列的字典。默认为 {"SINGLE_TURN": {"response", "reference"}}。mode (Literal["precision", "recall", "f1"]): 评估模式,可以是 "precision"、"recall" 或 "f1"。默认为 "f1"。beta (float): 用于 F1 分数计算的 beta 值。beta > 1 时更侧重召回率,beta < 1 时更侧重精确率。默认为 1.0。atomicity (Literal["low", "high"]): 声明分解的原子性级别。默认为 "low"。coverage (Literal["low", "high"]): 声明分解的覆盖范围级别。默认为 "low"。claim_decomposition_prompt (PydanticPrompt): 用于声明分解的提示。nli_prompt (PydanticPrompt): 用于自然语言推理(NLI)的提示。

Faithfulness dataclass

Faithfulness(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'user_input', 'response', 'retrieved_contexts'}})(), name: str = 'faithfulness', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = CONTINUOUS, nli_statements_prompt: PydanticPrompt = NLIStatementPrompt(), statement_generator_prompt: PydanticPrompt = StatementGeneratorPrompt(), max_retries: int = 1)

FaithfulnesswithHHEM dataclass

FaithfulnesswithHHEM(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'user_input', 'response', 'retrieved_contexts'}})(), name: str = 'faithfulness_with_hhem', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = CONTINUOUS, nli_statements_prompt: PydanticPrompt = NLIStatementPrompt(), statement_generator_prompt: PydanticPrompt = StatementGeneratorPrompt(), max_retries: int = 1, device: str = 'cpu', batch_size: int = 10)

基类:Faithfulness

NoiseSensitivity dataclass

NoiseSensitivity(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'user_input', 'response', 'reference', 'retrieved_contexts'}})(), name: str = 'noise_sensitivity', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = CONTINUOUS, mode: Literal['relevant', 'irrelevant'] = 'relevant', nli_statements_prompt: PydanticPrompt = NLIStatementPrompt(), statement_generator_prompt: PydanticPrompt = StatementGeneratorPrompt(), max_retries: int = 1)

AnswerAccuracy dataclass

AnswerAccuracy(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'user_input', 'response', 'reference'}})(), name: str = 'nv_accuracy', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = None)

基类:MetricWithLLM, SingleTurnMetric

根据给定的 user_input,衡量答案与真实答案的准确性。此指标平均两个不同的评判提示来进行评估。

前10名,零样本 LLM-as-a-Judge 排行榜:1)- nvidia/Llama-3_3-Nemotron-Super-49B-v1 2)- mistralai/mixtral-8x22b-instruct-v0.1 3)- mistralai/mixtral-8x7b-instruct-v0.1 4)- meta/llama-3.1-70b-instruct 5)- meta/llama-3.3-70b-instruct 6)- meta/llama-3.1-405b-instruct 7)- mistralai/mistral-nemo-12b-instruct 8)- nvidia/llama-3.1-nemotron-70b-instruct 9)- meta/llama-3.1-8b-instruct 10)- google/gemma-2-2b-it 排行榜前1的模型与人类评判者具有高度相关性(~0.92)。

属性

名称 类型 描述
name string

指标的名称

answer_accuracy

AnswerAccuracy 对象

ContextRelevance dataclass

ContextRelevance(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'user_input', 'retrieved_contexts'}})(), name: str = 'nv_context_relevance', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = None)

基类:MetricWithLLM, SingleTurnMetric

参数:根据用户输入对检索到的上下文的相关性进行评分。

输入:data:包含键 user_input, retrieved_contexts 的字典列表 输出:0.0:retrieved_contexts 与 user_input 不相关 0.5:retrieved_contexts 与 user_input 部分相关 1.0:retrieved_contexts 与 user_input 完全相关

ResponseGroundedness dataclass

ResponseGroundedness(_required_columns: Dict[MetricType, Set[str]] = (lambda: {SINGLE_TURN: {'response', 'retrieved_contexts'}})(), name: str = 'nv_response_groundedness', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = None)

基类:MetricWithLLM, SingleTurnMetric

参数:根据检索到的上下文对响应的扎根性进行评分。

输入:data:包含键 response, retrieved contexts 的字典列表 输出:0.0:响应未扎根于检索到的上下文 0.5:响应部分扎根于检索到的上下文 1.0:响应完全扎根于检索到的上下文

SimpleCriteriaScore

SimpleCriteriaScore(name: str, definition: str, llm: Optional[BaseRagasLLM] = None, required_columns: Optional[Dict[MetricType, Set[str]]] = None, output_type: Optional[MetricOutputType] = DISCRETE, single_turn_prompt: Optional[PydanticPrompt] = None, multi_turn_prompt: Optional[PydanticPrompt] = None, strictness: int = 1)

基类:MetricWithLLM, SingleTurnMetric, MultiTurnMetric

使用指标定义中指定的标准对提交内容进行评判,给出二进制结果。

属性

名称 类型 描述
name str

指标的名称

definition str

对提交内容进行评分的标准

strictness int

进行自我一致性检查的次数。最终评判通过多数投票决定。

源代码位于 src/ragas/metrics/_simple_criteria.py
def __init__(
    self,
    name: str,
    definition: str,
    llm: t.Optional[BaseRagasLLM] = None,
    required_columns: t.Optional[t.Dict[MetricType, t.Set[str]]] = None,
    output_type: t.Optional[MetricOutputType] = MetricOutputType.DISCRETE,
    single_turn_prompt: t.Optional[PydanticPrompt] = None,
    multi_turn_prompt: t.Optional[PydanticPrompt] = None,
    strictness: int = 1,
):
    if required_columns is None:
        required_columns = {
            MetricType.SINGLE_TURN: {
                "user_input:optional",
                "response:optional",
                "retrieved_contexts:optional",
                "reference:optional",
                "reference_contexts:optional",
            },
            MetricType.MULTI_TURN: {
                "user_input:optional",
                "reference:optional",
            },
        }
    super().__init__(
        name=name,
        llm=llm,
        _required_columns=required_columns,
        output_type=output_type,
    )

    self._definition = definition
    self.single_turn_prompt = single_turn_prompt or SingleTurnSimpleCriteriaPrompt()
    self.multi_turn_prompt = multi_turn_prompt or MultiTurnSimpleCriteriaPrompt()

    # update the instruction for the prompts with the definition
    instruction = f"Evaluate the input based on the criteria defined.\nCriteria Definition: {self._definition}"
    self.single_turn_prompt.instruction = instruction
    self.multi_turn_prompt.instruction = instruction

    # ensure odd number of checks to avoid tie in majority vote.
    self.strictness = strictness
    self.strictness = (
        self.strictness if self.strictness % 2 != 0 else self.strictness + 1
    )

ToolCallAccuracy dataclass

ToolCallAccuracy(_required_columns: Dict[MetricType, Set[str]] = (lambda: {MULTI_TURN: {'user_input', 'reference_tool_calls'}})(), name: str = 'tool_call_accuracy', strict_order: bool = True, arg_comparison_metric: SingleTurnMetric = (lambda: ExactMatch())())

基类:MultiTurnMetric

工具调用准确性指标衡量 LLM 代理进行工具调用的准确性,与参考工具调用进行比较。

该指标支持两种评估模式:1. 严格顺序(默认):工具调用必须按顺序完全匹配 2. 灵活顺序:工具调用可以以任何顺序进行(并行评估)

该指标评估两个方面:1. 序列对齐:预测的工具调用和参考工具调用是否以所需顺序匹配 2. 参数准确性:预测和参考之间工具调用参数的匹配程度

分数计算:- 如果序列不对齐:分数为 0 - 如果序列对齐:分数 = (平均参数准确性) * 序列对齐因子 - 长度不匹配会导致警告和相应的惩罚

边缘情况:- 没有预测的工具调用:返回 0.0 - 长度不匹配:仅比较重叠部分并应用覆盖率惩罚 - 缺少参数:对该工具调用的参数分数贡献为 0

最终分数总是在 0.0 和 1.0 之间。

参数:strict_order:如果为 True(默认),工具调用必须按顺序完全匹配。如果为 False,工具调用可以以任何顺序进行(并行评估)。

Metric dataclass

Metric(_required_columns: Dict[MetricType, Set[str]] = dict(), name: str = '')

Bases: ABC

Ragas 中指标的抽象基类。

属性

名称 类型 描述
name str

指标的名称。

required_columns Dict[str, Set[str]]

一个将指标类型名称映射到所需列名称集合的字典。这是一个属性,如果列不在 VALID_COLUMNS 中,则会引发 ValueError

init abstractmethod

init(run_config: RunConfig) -> None

使用给定的运行配置初始化指标。

参数

名称 类型 描述 默认值
run_config RunConfig

指标运行的配置,包括超时和其他设置。

必需
源代码位于 src/ragas/metrics/base.py
@abstractmethod
def init(self, run_config: RunConfig) -> None:
    """
    Initialize the metric with the given run configuration.

    Parameters
    ----------
    run_config : RunConfig
        Configuration for the metric run including timeouts and other settings.
    """
    ...

MetricType

基类:Enum

Ragas 中指标类型的枚举。

属性

名称 类型 描述
SINGLE_TURN str

表示单轮指标类型。

MULTI_TURN str

表示多轮指标类型。

MetricWithLLM dataclass

MetricWithLLM(_required_columns: Dict[MetricType, Set[str]] = dict(), name: str = '', llm: Optional[BaseRagasLLM] = None, output_type: Optional[MetricOutputType] = None)

基类:Metric, PromptMixin

一个使用语言模型进行评估的指标类。

属性

名称 类型 描述
llm Optional[BaseRagasLLM]

用于指标的语言模型。BaseRagasLLM 和 InstructorBaseRagasLLM 都在运行时通过鸭子类型被接受(两者都有兼容的方法)。

init

init(run_config: RunConfig) -> None

使用运行配置初始化指标并验证 LLM 是否存在。

参数

名称 类型 描述 默认值
run_config RunConfig

指标运行的配置。

必需

抛出

类型 描述
ValueError

如果未向指标提供 LLM。

源代码位于 src/ragas/metrics/base.py
def init(self, run_config: RunConfig) -> None:
    """
    Initialize the metric with run configuration and validate LLM is present.

    Parameters
    ----------
    run_config : RunConfig
        Configuration for the metric run.

    Raises
    ------
    ValueError
        If no LLM is provided to the metric.
    """
    if self.llm is None:
        raise ValueError(
            f"Metric '{self.name}' has no valid LLM provided (self.llm is None). Please instantiate the metric with an LLM to run."
        )
    # Only BaseRagasLLM has set_run_config method, not InstructorBaseRagasLLM
    if isinstance(self.llm, BaseRagasLLM):
        self.llm.set_run_config(run_config)

train

train(path: str, demonstration_config: Optional[DemonstrationConfig] = None, instruction_config: Optional[InstructionConfig] = None, callbacks: Optional[Callbacks] = None, run_config: Optional[RunConfig] = None, batch_size: Optional[int] = None, with_debugging_logs=False, raise_exceptions: bool = True) -> None

使用本地 JSON 数据训练指标

参数

名称 类型 描述 默认值
path str

本地 JSON 训练数据文件的路径

必需
demonstration_config DemonstrationConfig

示范优化的配置

None
instruction_config InstructionConfig

指令优化的配置

None
callbacks Callbacks

回调函数列表

None
run_config RunConfig

运行配置

None
batch_size int

训练的批处理大小

None
with_debugging_logs bool

启用调试日志

False
raise_exceptions bool

是否在训练期间引发异常

True

抛出

类型 描述
ValueError

如果未提供路径或路径不是 JSON 文件

源代码位于 src/ragas/metrics/base.py
def train(
    self,
    path: str,
    demonstration_config: t.Optional[DemonstrationConfig] = None,
    instruction_config: t.Optional[InstructionConfig] = None,
    callbacks: t.Optional[Callbacks] = None,
    run_config: t.Optional[RunConfig] = None,
    batch_size: t.Optional[int] = None,
    with_debugging_logs=False,
    raise_exceptions: bool = True,
) -> None:
    """
    Train the metric using local JSON data

    Parameters
    ----------
    path : str
        Path to local JSON training data file
    demonstration_config : DemonstrationConfig, optional
        Configuration for demonstration optimization
    instruction_config : InstructionConfig, optional
        Configuration for instruction optimization
    callbacks : Callbacks, optional
        List of callback functions
    run_config : RunConfig, optional
        Run configuration
    batch_size : int, optional
        Batch size for training
    with_debugging_logs : bool, default=False
        Enable debugging logs
    raise_exceptions : bool, default=True
        Whether to raise exceptions during training

    Raises
    ------
    ValueError
        If path is not provided or not a JSON file
    """
    # Validate input parameters
    if not path:
        raise ValueError("Path to training data file must be provided")

    if not path.endswith(".json"):
        raise ValueError("Train data must be in json format")

    run_config = run_config or RunConfig()
    callbacks = callbacks or []

    # Load the dataset from JSON file
    dataset = MetricAnnotation.from_json(path, metric_name=self.name)

    # only optimize the instruction if instruction_config is provided
    if instruction_config is not None:
        self._optimize_instruction(
            instruction_config=instruction_config,
            dataset=dataset,
            callbacks=callbacks,
            run_config=run_config,
            batch_size=batch_size,
            with_debugging_logs=with_debugging_logs,
            raise_exceptions=raise_exceptions,
        )

    # if demonstration_config is provided, optimize the demonstrations
    if demonstration_config is not None:
        self._optimize_demonstration(
            demonstration_config=demonstration_config,
            dataset=dataset,
        )

MultiTurnMetric dataclass

MultiTurnMetric(_required_columns: Dict[MetricType, Set[str]] = dict(), name: str = '')

基类:Metric

用于评估多轮对话的指标类。

此类扩展了基础 Metric 类,以提供对多轮对话样本进行评分的功能。

multi_turn_score

multi_turn_score(sample: MultiTurnSample, callbacks: Callbacks = None) -> float

同步对多轮对话样本进行评分。

如果在类 Jupyter 环境中未安装 nest_asyncio,可能会引发 ImportError。

源代码位于 src/ragas/metrics/base.py
def multi_turn_score(
    self,
    sample: MultiTurnSample,
    callbacks: Callbacks = None,
) -> float:
    """
    Score a multi-turn conversation sample synchronously.

    May raise ImportError if nest_asyncio is not installed in Jupyter-like environments.
    """
    callbacks = callbacks or []
    sample = self._only_required_columns_multi_turn(sample)
    rm, group_cm = new_group(
        self.name,
        inputs=sample.to_dict(),
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )

    async def _async_wrapper():
        try:
            result = await self._multi_turn_ascore(
                sample=sample, callbacks=group_cm
            )
        except Exception as e:
            if not group_cm.ended:
                rm.on_chain_error(e)
            raise e
        else:
            if not group_cm.ended:
                rm.on_chain_end({"output": result})
            return result

    apply_nest_asyncio()
    score = run(_async_wrapper)

    # track the evaluation event
    _analytics_batcher.add_evaluation(
        EvaluationEvent(
            metrics=[self.name],
            num_rows=1,
            evaluation_type=MetricType.SINGLE_TURN.name,
            language=get_metric_language(self),
        )
    )
    return score

multi_turn_ascore async

multi_turn_ascore(sample: MultiTurnSample, callbacks: Callbacks = None, timeout: Optional[float] = None) -> float

异步对多轮对话样本进行评分。

如果评分过程超过指定的超时时间,可能会引发 asyncio.TimeoutError。

源代码位于 src/ragas/metrics/base.py
async def multi_turn_ascore(
    self,
    sample: MultiTurnSample,
    callbacks: Callbacks = None,
    timeout: t.Optional[float] = None,
) -> float:
    """
    Score a multi-turn conversation sample asynchronously.

    May raise asyncio.TimeoutError if the scoring process exceeds the specified timeout.
    """
    callbacks = callbacks or []
    sample = self._only_required_columns_multi_turn(sample)

    rm, group_cm = new_group(
        self.name,
        inputs=sample.to_dict(),
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )
    try:
        score = await asyncio.wait_for(
            self._multi_turn_ascore(sample=sample, callbacks=group_cm),
            timeout=timeout,
        )
    except Exception as e:
        if not group_cm.ended:
            rm.on_chain_error(e)
        raise e
    else:
        if not group_cm.ended:
            rm.on_chain_end({"output": score})

    # track the evaluation event
    _analytics_batcher.add_evaluation(
        EvaluationEvent(
            metrics=[self.name],
            num_rows=1,
            evaluation_type=MetricType.SINGLE_TURN.name,
            language=get_metric_language(self),
        )
    )

    return score

BaseMetric dataclass

BaseMetric(name: str, allowed_values: AllowedValuesType = (lambda: ['pass', 'fail'])())

Bases: ABC

返回 MetricResult 对象的简单指标的基类。

此类为评估输入并返回包含分数和推理的结构化 MetricResult 对象的指标提供了基础。

属性

名称 类型 描述
name str

指标的名称。

allowed_values AllowedValuesType

指标输出的允许值。对于离散指标,可以是一个字符串列表;对于数值指标,可以是一个浮点数元组;对于排名指标,可以是一个整数。

示例

>>> from ragas.metrics import discrete_metric
>>>
>>> @discrete_metric(name="sentiment", allowed_values=["positive", "negative"])
>>> def sentiment_metric(user_input: str, response: str) -> str:
...     return "positive" if "good" in response else "negative"
>>>
>>> result = sentiment_metric(user_input="How are you?", response="I'm good!")
>>> print(result.value)  # "positive"

score abstractmethod

score(**kwargs) -> 'MetricResult'

同步计算指标分数。

参数

名称 类型 描述 默认值
**kwargs dict

特定指标实现所需的输入参数。

{}

返回

类型 描述
MetricResult

包含分数和推理的评估结果。

源代码位于 src/ragas/metrics/base.py
@abstractmethod
def score(self, **kwargs) -> "MetricResult":
    """
    Synchronously calculate the metric score.

    Parameters
    ----------
    **kwargs : dict
        Input parameters required by the specific metric implementation.

    Returns
    -------
    MetricResult
        The evaluation result containing the score and reasoning.
    """
    pass

ascore abstractmethod async

ascore(**kwargs) -> 'MetricResult'

异步计算指标分数。

参数

名称 类型 描述 默认值
**kwargs dict

特定指标实现所需的输入参数。

{}

返回

类型 描述
MetricResult

包含分数和推理的评估结果。

源代码位于 src/ragas/metrics/base.py
@abstractmethod
async def ascore(self, **kwargs) -> "MetricResult":
    """
    Asynchronously calculate the metric score.

    Parameters
    ----------
    **kwargs : dict
        Input parameters required by the specific metric implementation.

    Returns
    -------
    MetricResult
        The evaluation result containing the score and reasoning.
    """
    pass

batch_score

batch_score(inputs: List[Dict[str, Any]]) -> List['MetricResult']

同步计算一批输入的分数。

参数

名称 类型 描述 默认值
inputs List[Dict[str, Any]]

输入字典的列表,每个字典包含指标的参数。

必需

返回

类型 描述
List[MetricResult]

评估结果的列表,每个输入对应一个。

源代码位于 src/ragas/metrics/base.py
def batch_score(
    self,
    inputs: t.List[t.Dict[str, t.Any]],
) -> t.List["MetricResult"]:
    """
    Synchronously calculate scores for a batch of inputs.

    Parameters
    ----------
    inputs : List[Dict[str, Any]]
        List of input dictionaries, each containing parameters for the metric.

    Returns
    -------
    List[MetricResult]
        List of evaluation results, one for each input.
    """
    return [self.score(**input_dict) for input_dict in inputs]

abatch_score async

abatch_score(inputs: List[Dict[str, Any]]) -> List['MetricResult']

并行异步计算一批输入的分数。

参数

名称 类型 描述 默认值
inputs List[Dict[str, Any]]

输入字典的列表,每个字典包含指标的参数。

必需

返回

类型 描述
List[MetricResult]

评估结果的列表,每个输入对应一个。

源代码位于 src/ragas/metrics/base.py
async def abatch_score(
    self,
    inputs: t.List[t.Dict[str, t.Any]],
) -> t.List["MetricResult"]:
    """
    Asynchronously calculate scores for a batch of inputs in parallel.

    Parameters
    ----------
    inputs : List[Dict[str, Any]]
        List of input dictionaries, each containing parameters for the metric.

    Returns
    -------
    List[MetricResult]
        List of evaluation results, one for each input.
    """
    async_tasks = []
    for input_dict in inputs:
        # Process input asynchronously
        async_tasks.append(self.ascore(**input_dict))

    # Run all tasks concurrently and return results
    return await asyncio.gather(*async_tasks)

LLMMetric dataclass

LLMMetric(name: str, allowed_values: AllowedValuesType = (lambda: ['pass', 'fail'])(), prompt: Optional[Union[str, 'Prompt']] = None)

基类:SimpleBaseMetric

基于 LLM 的指标,使用提示来生成结构化响应。

save

save(path: Optional[str] = None) -> None

将指标配置保存到 JSON 文件。

参数

path : str, optional 要保存到的文件路径。如果未提供,则保存到 "./{metric.name}.json"。使用 .gz 扩展名进行压缩。

注意

如果指标有 response_model,它的模式将被保存以供参考,但模型本身无法序列化。加载时需要提供它。

示例

所有这些都有效

metric.save() # → ./response_quality.json metric.save("custom.json") # → ./custom.json metric.save("/path/to/metrics/") # → /path/to/metrics/response_quality.json metric.save("no_extension") # → ./no_extension.json metric.save("compressed.json.gz") # → ./compressed.json.gz (压缩)

源代码位于 src/ragas/metrics/base.py
def save(self, path: t.Optional[str] = None) -> None:
    """
    Save the metric configuration to a JSON file.

    Parameters:
    -----------
    path : str, optional
        File path to save to. If not provided, saves to "./{metric.name}.json"
        Use .gz extension for compression.

    Note:
    -----
    If the metric has a response_model, its schema will be saved for reference
    but the model itself cannot be serialized. You'll need to provide it when loading.

    Examples:
    ---------
    All these work:
    >>> metric.save()                      # → ./response_quality.json
    >>> metric.save("custom.json")         # → ./custom.json
    >>> metric.save("/path/to/metrics/")   # → /path/to/metrics/response_quality.json
    >>> metric.save("no_extension")        # → ./no_extension.json
    >>> metric.save("compressed.json.gz")  # → ./compressed.json.gz (compressed)
    """
    import gzip
    import json
    import warnings
    from pathlib import Path

    # Handle default path
    if path is None:
        # Default to current directory with metric name as filename
        file_path = Path(f"./{self.name}.json")
    else:
        file_path = Path(path)

        # If path is a directory, append the metric name as filename
        if file_path.is_dir():
            file_path = file_path / f"{self.name}.json"
        # If path has no extension, add .json
        elif not file_path.suffix:
            file_path = file_path.with_suffix(".json")

    # Collect warning messages for data loss
    warning_messages = []

    if hasattr(self, "_response_model") and self._response_model:
        # Only warn for custom response models, not auto-generated ones
        if not getattr(self._response_model, "__ragas_auto_generated__", False):
            warning_messages.append(
                "- Custom response_model will be lost (set it manually after loading)"
            )

    # Serialize the prompt (may add embedding_model warning)
    prompt_data = self._serialize_prompt(warning_messages)

    # Determine the metric type
    metric_type = self.__class__.__name__

    # Get metric-specific config
    config = self._get_metric_config()

    # Emit consolidated warning if there's data loss
    if warning_messages:
        warnings.warn(
            "Some metric components cannot be saved and will be lost:\n"
            + "\n".join(warning_messages)
            + "\n\nYou'll need to provide these when loading the metric."
        )

    data = {
        "format_version": "1.0",
        "metric_type": metric_type,
        "name": self.name,
        "prompt": prompt_data,
        "config": config,
        "response_model_info": self._serialize_response_model_info(),
    }
    try:
        if file_path.suffix == ".gz":
            with gzip.open(file_path, "wt", encoding="utf-8") as f:
                json.dump(data, f, indent=2)
        else:
            with open(file_path, "w", encoding="utf-8") as f:
                json.dump(data, f, indent=2)
    except (OSError, IOError) as e:
        raise ValueError(f"Cannot save metric to {file_path}: {e}")

load classmethod

load(path: str, response_model: Optional[Type['BaseModel']] = None, embedding_model: Optional['EmbeddingModelType'] = None) -> 'SimpleLLMMetric'

从 JSON 文件加载指标。

参数

path : str 要从中加载的文件路径。支持 .gz 压缩文件。 response_model : Optional[Type[BaseModel]] 用于响应验证的 Pydantic 模型。自定义 SimpleLLMMetrics 需要。 embedding_model : Optional[Any] 用于 DynamicFewShotPrompt 的嵌入模型。如果原始模型使用过,则需要。

返回

SimpleLLMMetric 加载的指标实例

抛出

ValueError 如果文件无法加载、无效或缺少所需的模型

源代码位于 src/ragas/metrics/base.py
@classmethod
def load(
    cls,
    path: str,
    response_model: t.Optional[t.Type["BaseModel"]] = None,
    embedding_model: t.Optional["EmbeddingModelType"] = None,
) -> "SimpleLLMMetric":
    """
    Load a metric from a JSON file.

    Parameters:
    -----------
    path : str
        File path to load from. Supports .gz compressed files.
    response_model : Optional[Type[BaseModel]]
        Pydantic model to use for response validation. Required for custom SimpleLLMMetrics.
    embedding_model : Optional[Any]
        Embedding model for DynamicFewShotPrompt. Required if the original used one.

    Returns:
    --------
    SimpleLLMMetric
        Loaded metric instance

    Raises:
    -------
    ValueError
        If file cannot be loaded, is invalid, or missing required models
    """
    import gzip
    import json
    from pathlib import Path

    file_path = Path(path)

    # Load JSON data
    try:
        if file_path.suffix == ".gz":
            with gzip.open(file_path, "rt", encoding="utf-8") as f:
                data = json.load(f)
        else:
            with open(file_path, "r", encoding="utf-8") as f:
                data = json.load(f)
    except (FileNotFoundError, json.JSONDecodeError, OSError) as e:
        raise ValueError(f"Cannot load metric from {path}: {e}")

    # Validate format
    if data.get("format_version") != "1.0":
        import warnings

        warnings.warn(
            f"Loading metric with format version {data.get('format_version')}, expected 1.0"
        )

    # Reconstruct the prompt
    prompt = cls._deserialize_prompt(data["prompt"], embedding_model)

    # Get config
    config = data.get("config", {})

    # Create the metric instance
    metric = cls(name=data["name"], prompt=prompt, **config)

    # Set response model if provided
    if response_model:
        metric._response_model = response_model

    return metric

get_correlation abstractmethod

get_correlation(gold_labels: List[str], predictions: List[str]) -> float

计算黄金分数和预测分数之间的相关性。这是一个占位符方法,应根据具体指标实现。

源代码位于 src/ragas/metrics/base.py
@abstractmethod
def get_correlation(
    self, gold_labels: t.List[str], predictions: t.List[str]
) -> float:
    """
    Calculate the correlation between gold scores and predicted scores.
    This is a placeholder method and should be implemented based on the specific metric.
    """
    pass

align_and_validate

align_and_validate(dataset: 'Dataset', embedding_model: 'EmbeddingModelType', llm: 'BaseRagasLLM', test_size: float = 0.2, random_state: int = 42, **kwargs: Dict[str, Any])

参数: dataset:要与指标对齐的实验。 embedding_model:用于动态少样本提示的嵌入模型。 llm:用于评分的 LLM 实例。

将指标与指定的实验对齐,并对照黄金标准实验进行验证。此方法将对齐和验证合并为一个步骤。

源代码位于 src/ragas/metrics/base.py
def align_and_validate(
    self,
    dataset: "Dataset",
    embedding_model: "EmbeddingModelType",
    llm: "BaseRagasLLM",
    test_size: float = 0.2,
    random_state: int = 42,
    **kwargs: t.Dict[str, t.Any],
):
    """
    Args:
        dataset: experiment to align the metric with.
        embedding_model: The embedding model used for dynamic few-shot prompting.
        llm: The LLM instance to use for scoring.

    Align the metric with the specified experiments and validate it against a gold standard experiment.
    This method combines alignment and validation into a single step.
    """
    train_dataset, test_dataset = dataset.train_test_split(
        test_size=test_size, random_state=random_state
    )

    self.align(train_dataset, embedding_model, **kwargs)  # type: ignore
    return self.validate_alignment(llm, test_dataset)  # type: ignore

align

align(train_dataset: 'Dataset', embedding_model: 'EmbeddingModelType', **kwargs: Dict[str, Any])

参数: train_dataset:要与指标对齐的 train_dataset。 embedding_model:用于动态少样本提示的嵌入模型。

通过不同的优化方法将指标与指定的实验对齐。

源代码位于 src/ragas/metrics/base.py
def align(
    self,
    train_dataset: "Dataset",
    embedding_model: "EmbeddingModelType",
    **kwargs: t.Dict[str, t.Any],
):
    """
    Args:
        train_dataset: train_dataset to align the metric with.
        embedding_model: The embedding model used for dynamic few-shot prompting.

    Align the metric with the specified experiments by different optimization methods.
    """

    # get prompt
    if not self.prompt:
        raise Exception("prompt not passed")
    from ragas.prompt.simple_prompt import Prompt

    self.prompt = (
        self.prompt if isinstance(self.prompt, Prompt) else Prompt(self.prompt)
    )
    # Extract specific parameters for from_prompt method
    max_similar_examples_val = kwargs.get("max_similar_examples", 3)
    similarity_threshold_val = kwargs.get("similarity_threshold", 0.7)
    max_similar_examples = (
        int(max_similar_examples_val)
        if isinstance(max_similar_examples_val, (int, str))
        else 3
    )
    similarity_threshold = (
        float(similarity_threshold_val)
        if isinstance(similarity_threshold_val, (int, float, str))
        else 0.7
    )
    # Convert BaseRagasEmbeddings to BaseRagasEmbedding if needed
    if hasattr(embedding_model, "embed_query"):
        # For legacy BaseRagasEmbeddings, we need to wrap it
        # Create a wrapper that implements BaseRagasEmbedding interface
        class EmbeddingWrapper:
            def __init__(self, legacy_embedding):
                self.legacy_embedding = legacy_embedding

            def embed_text(self, text: str, **kwargs) -> t.List[float]:
                return self.legacy_embedding.embed_query(text)

            async def aembed_text(self, text: str, **kwargs) -> t.List[float]:
                return await self.legacy_embedding.aembed_query(text)

        actual_embedding_model = EmbeddingWrapper(embedding_model)
    else:
        # Already BaseRagasEmbedding
        actual_embedding_model = embedding_model

    from ragas.prompt.dynamic_few_shot import DynamicFewShotPrompt

    self.prompt = DynamicFewShotPrompt.from_prompt(
        self.prompt,
        actual_embedding_model,  # type: ignore[arg-type]
        max_similar_examples,
        similarity_threshold,
    )
    train_dataset.reload()
    total_items = len(train_dataset)
    input_vars = self.get_variables()
    output_vars = [self.name, f"{self.name}_reason"]

    from rich.progress import Progress

    with Progress() as progress:
        task = progress.add_task("Processing examples", total=total_items)
        for row in train_dataset:
            inputs = {
                var: train_dataset.get_row_value(row, var) for var in input_vars
            }
            inputs = {k: v for k, v in inputs.items() if v is not None}
            output = {
                var: train_dataset.get_row_value(row, var) for var in output_vars
            }
            output = {k: v for k, v in output.items() if v is not None}

            if output:
                self.prompt.add_example(inputs, output)
            progress.update(task, advance=1)

validate_alignment

validate_alignment(llm: 'BaseRagasLLM', test_dataset: 'Dataset', mapping: Dict[str, str] = {})

参数: llm:用于评分的 LLM 实例。 test_dataset:包含黄金标准分数的 Dataset 实例。 mapping:一个字典,将指标期望的变量名映射到黄金实验中对应的名称。

通过将分数与黄金标准实验进行比较来验证指标的对齐情况。此方法计算黄金标准分数和指标预测分数之间的科恩 Kappa 分数和一致性率。

源代码位于 src/ragas/metrics/base.py
def validate_alignment(
    self,
    llm: "BaseRagasLLM",
    test_dataset: "Dataset",
    mapping: t.Dict[str, str] = {},
):
    """
    Args:
        llm: The LLM instance to use for scoring.
        test_dataset: An Dataset instance containing the gold standard scores.
        mapping: A dictionary mapping variable names expected by metrics to their corresponding names in the gold experiment.

    Validate the alignment of the metric by comparing the scores against a gold standard experiment.
    This method computes the Cohen's Kappa score and agreement rate between the gold standard scores and
    the predicted scores from the metric.
    """

    test_dataset.reload()
    gold_scores_raw = [
        test_dataset.get_row_value(row, self.name) for row in test_dataset
    ]
    pred_scores = []
    for row in test_dataset:
        values = {
            v: (
                test_dataset.get_row_value(row, v)
                if v not in mapping
                else test_dataset.get_row_value(row, mapping.get(v, v))
            )
            for v in self.get_variables()
        }
        score = self.score(llm=llm, **values)
        pred_scores.append(score.value)

    # Convert to strings for correlation calculation, filtering out None values
    gold_scores = [str(score) for score in gold_scores_raw if score is not None]
    pred_scores_str = [str(score) for score in pred_scores if score is not None]

    df = test_dataset.to_pandas()
    df[f"{self.name}_pred"] = pred_scores
    correlation = self.get_correlation(gold_scores, pred_scores_str)
    agreement_rate = sum(
        x == y for x, y in zip(gold_scores, pred_scores_str)
    ) / len(gold_scores)
    return {
        "correlation": correlation,
        "agreement_rate": agreement_rate,
        "df": df,
    }

SingleTurnMetric dataclass

SingleTurnMetric(_required_columns: Dict[MetricType, Set[str]] = dict(), name: str = '')

基类:Metric

用于评估单轮交互的指标类。

此类提供了同步和异步对单轮样本进行评分的方法。

single_turn_score

single_turn_score(sample: SingleTurnSample, callbacks: Callbacks = None) -> float

同步对单轮样本进行评分。

如果在类 Jupyter 环境中未安装 nest_asyncio,可能会引发 ImportError。

源代码位于 src/ragas/metrics/base.py
def single_turn_score(
    self,
    sample: SingleTurnSample,
    callbacks: Callbacks = None,
) -> float:
    """
    Synchronously score a single-turn sample.

    May raise ImportError if nest_asyncio is not installed in a Jupyter-like environment.
    """
    callbacks = callbacks or []
    # only get the required columns
    sample = self._only_required_columns_single_turn(sample)
    rm, group_cm = new_group(
        self.name,
        inputs=sample.to_dict(),
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )

    async def _async_wrapper():
        try:
            result = await self._single_turn_ascore(
                sample=sample, callbacks=group_cm
            )
        except Exception as e:
            if not group_cm.ended:
                rm.on_chain_error(e)
            raise e
        else:
            if not group_cm.ended:
                rm.on_chain_end({"output": result})
            return result

    apply_nest_asyncio()
    score = run(_async_wrapper)

    # track the evaluation event
    _analytics_batcher.add_evaluation(
        EvaluationEvent(
            metrics=[self.name],
            num_rows=1,
            evaluation_type=MetricType.SINGLE_TURN.name,
            language=get_metric_language(self),
        )
    )
    return score

single_turn_ascore async

single_turn_ascore(sample: SingleTurnSample, callbacks: Callbacks = None, timeout: Optional[float] = None) -> float

异步对单轮样本进行评分,可选择超时。

如果评分过程超过指定的超时时间,可能会引发 asyncio.TimeoutError。

源代码位于 src/ragas/metrics/base.py
async def single_turn_ascore(
    self,
    sample: SingleTurnSample,
    callbacks: Callbacks = None,
    timeout: t.Optional[float] = None,
) -> float:
    """
    Asynchronously score a single-turn sample with an optional timeout.

    May raise asyncio.TimeoutError if the scoring process exceeds the specified timeout.
    """
    callbacks = callbacks or []
    # only get the required columns
    sample = self._only_required_columns_single_turn(sample)
    rm, group_cm = new_group(
        self.name,
        inputs=sample.to_dict(),
        callbacks=callbacks,
        metadata={"type": ChainType.METRIC},
    )
    try:
        score = await asyncio.wait_for(
            self._single_turn_ascore(sample=sample, callbacks=group_cm),
            timeout=timeout,
        )
    except Exception as e:
        if not group_cm.ended:
            rm.on_chain_error(e)
        raise e
    else:
        if not group_cm.ended:
            rm.on_chain_end({"output": score})

    # track the evaluation event
    _analytics_batcher.add_evaluation(
        EvaluationEvent(
            metrics=[self.name],
            num_rows=1,
            evaluation_type=MetricType.SINGLE_TURN.name,
            language=get_metric_language(self),
        )
    )
    return score

DiscreteMetric dataclass

DiscreteMetric(name: str, allowed_values: List[str] = (lambda: ['pass', 'fail'])(), prompt: Optional[Union[str, 'Prompt']] = None)

基类:SimpleLLMMetric, DiscreteValidator

用于具有预定义允许值的分类/离散评估的指标。

此类用于输出分类值的指标,如“通过/失败”、“好/坏/优秀”或自定义离散类别。使用 instructor 库进行结构化 LLM 输出。

属性

名称 类型 描述
allowed_values List[str]

指标可以输出的允许分类值列表。默认为 ["pass", "fail"]。

llm Optional[BaseRagasLLM]

用于评估的语言模型实例。可以使用 llm_factory() 创建。

prompt Optional[Union[str, Prompt]]

指标的提示模板。应包含用于评估输入的占位符,这些占位符将在运行时格式化。

示例

>>> from ragas.metrics import DiscreteMetric
>>> from ragas.llms import llm_factory
>>> from openai import OpenAI
>>>
>>> # Create an LLM instance
>>> client = OpenAI(api_key="your-api-key")
>>> llm = llm_factory("gpt-4o-mini", client=client)
>>>
>>> # Create a custom discrete metric
>>> metric = DiscreteMetric(
...     name="quality_check",
...     llm=llm,
...     prompt="Check the quality of the response: {response}. Return 'excellent', 'good', or 'poor'.",
...     allowed_values=["excellent", "good", "poor"]
... )
>>>
>>> # Score with the metric
>>> result = metric.score(
...     llm=llm,
...     response="This is a great response!"
... )
>>> print(result.value)  # Output: "excellent" or similar

get_correlation

get_correlation(gold_labels: List[str], predictions: List[str]) -> float

计算黄金标签和预测之间的相关性。这是一个占位符方法,应根据具体指标实现。

源代码位于 src/ragas/metrics/discrete.py
def get_correlation(
    self, gold_labels: t.List[str], predictions: t.List[str]
) -> float:
    """
    Calculate the correlation between gold labels and predictions.
    This is a placeholder method and should be implemented based on the specific metric.
    """
    try:
        from sklearn.metrics import cohen_kappa_score
    except ImportError:
        raise ImportError(
            "scikit-learn is required for correlation calculation. "
            "Please install it with `pip install scikit-learn`."
        )
    return cohen_kappa_score(gold_labels, predictions)

load classmethod

load(path: str, embedding_model: Optional[EmbeddingModelType] = None) -> DiscreteMetric

从 JSON 文件加载 DiscreteMetric。

参数

path : str 要从中加载的文件路径。支持 .gz 压缩文件。 embedding_model : Optional[Any] 用于 DynamicFewShotPrompt 的嵌入模型。如果原始模型使用过,则需要。

返回

DiscreteMetric 加载的指标实例

抛出

ValueError 如果文件无法加载或不是 DiscreteMetric

源代码位于 src/ragas/metrics/discrete.py
@classmethod
def load(
    cls, path: str, embedding_model: t.Optional["EmbeddingModelType"] = None
) -> "DiscreteMetric":
    """
    Load a DiscreteMetric from a JSON file.

    Parameters:
    -----------
    path : str
        File path to load from. Supports .gz compressed files.
    embedding_model : Optional[Any]
        Embedding model for DynamicFewShotPrompt. Required if the original used one.

    Returns:
    --------
    DiscreteMetric
        Loaded metric instance

    Raises:
    -------
    ValueError
        If file cannot be loaded or is not a DiscreteMetric
    """
    # Validate metric type before loading
    cls._validate_metric_type(path)

    # Load using parent class method
    metric = super().load(path, embedding_model=embedding_model)

    # Additional type check for safety
    if not isinstance(metric, cls):
        raise ValueError(f"Loaded metric is not a {cls.__name__}")

    return metric

NumericMetric dataclass

NumericMetric(name: str, allowed_values: Union[Tuple[float, float], range] = (0.0, 1.0), prompt: Optional[Union[str, 'Prompt']] = None)

基类:SimpleLLMMetric, NumericValidator

用于指定范围内的连续数值评估的指标。

此类用于输出在定义范围内的数值分数的指标,例如 0.0 到 1.0 的相似度分数或 1-10 的评级。使用 instructor 库进行结构化 LLM 输出。

属性

名称 类型 描述
allowed_values Union[Tuple[float, float], range]

指标输出的有效范围。可以是一个 (min, max) 浮点数元组或一个 range 对象。默认为 (0.0, 1.0)。

llm Optional[BaseRagasLLM]

用于评估的语言模型实例。可以使用 llm_factory() 创建。

prompt Optional[Union[str, Prompt]]

指标的提示模板。应包含用于评估输入的占位符,这些占位符将在运行时格式化。

示例

>>> from ragas.metrics import NumericMetric
>>> from ragas.llms import llm_factory
>>> from openai import OpenAI
>>>
>>> # Create an LLM instance
>>> client = OpenAI(api_key="your-api-key")
>>> llm = llm_factory("gpt-4o-mini", client=client)
>>>
>>> # Create a custom numeric metric with 0-10 range
>>> metric = NumericMetric(
...     name="quality_score",
...     llm=llm,
...     prompt="Rate the quality of this response on a scale of 0-10: {response}",
...     allowed_values=(0.0, 10.0)
... )
>>>
>>> # Score with the metric
>>> result = metric.score(
...     llm=llm,
...     response="This is a great response!"
... )
>>> print(result.value)  # Output: a float between 0.0 and 10.0

get_correlation

get_correlation(gold_labels: List[str], predictions: List[str]) -> float

计算黄金标签和预测之间的相关性。这是一个占位符方法,应根据具体指标实现。

源代码位于 src/ragas/metrics/numeric.py
def get_correlation(
    self, gold_labels: t.List[str], predictions: t.List[str]
) -> float:
    """
    Calculate the correlation between gold labels and predictions.
    This is a placeholder method and should be implemented based on the specific metric.
    """
    try:
        from scipy.stats import pearsonr
    except ImportError:
        raise ImportError(
            "scipy is required for correlation calculation. "
            "Please install it with `pip install scipy`."
        )
    # Convert strings to floats for correlation calculation
    gold_floats = [float(x) for x in gold_labels]
    pred_floats = [float(x) for x in predictions]
    result = pearsonr(gold_floats, pred_floats)
    # pearsonr returns (correlation, p-value) tuple
    correlation = t.cast(float, result[0])
    return correlation

load classmethod

load(path: str, embedding_model: Optional[EmbeddingModelType] = None) -> NumericMetric

从 JSON 文件加载 NumericMetric。

参数

path : str 要从中加载的文件路径。支持 .gz 压缩文件。 embedding_model : Optional[Any] 用于 DynamicFewShotPrompt 的嵌入模型。如果原始模型使用过,则需要。

返回

NumericMetric 加载的指标实例

抛出

ValueError 如果文件无法加载或不是 NumericMetric

源代码位于 src/ragas/metrics/numeric.py
@classmethod
def load(
    cls, path: str, embedding_model: t.Optional["EmbeddingModelType"] = None
) -> "NumericMetric":
    """
    Load a NumericMetric from a JSON file.

    Parameters:
    -----------
    path : str
        File path to load from. Supports .gz compressed files.
    embedding_model : Optional[Any]
        Embedding model for DynamicFewShotPrompt. Required if the original used one.

    Returns:
    --------
    NumericMetric
        Loaded metric instance

    Raises:
    -------
    ValueError
        If file cannot be loaded or is not a NumericMetric
    """
    # Validate metric type before loading
    cls._validate_metric_type(path)

    # Load using parent class method
    metric = super().load(path, embedding_model=embedding_model)

    # Additional type check for safety
    if not isinstance(metric, cls):
        raise ValueError(f"Loaded metric is not a {cls.__name__}")

    # Convert allowed_values back to tuple if it's a list (due to JSON serialization)
    if hasattr(metric, "allowed_values") and isinstance(
        metric.allowed_values, list
    ):
        # Ensure it's a 2-element tuple for NumericMetric
        if len(metric.allowed_values) == 2:
            metric.allowed_values = (
                metric.allowed_values[0],
                metric.allowed_values[1],
            )
        else:
            metric.allowed_values = tuple(metric.allowed_values)  # type: ignore

    return metric

RankingMetric dataclass

RankingMetric(name: str, allowed_values: int = 2, prompt: Optional[Union[str, 'Prompt']] = None)

基类:SimpleLLMMetric, RankingValidator

用于产生项目排名列表的评估指标。

此类用于输出有序列表的指标,例如对搜索结果进行排名、对功能进行优先级排序或按相关性对响应进行排序。使用 instructor 库进行结构化 LLM 输出。

属性

名称 类型 描述
allowed_values int

排名列表中的预期项目数。默认为 2。

llm Optional[BaseRagasLLM]

用于评估的语言模型实例。可以使用 llm_factory() 创建。

prompt Optional[Union[str, Prompt]]

指标的提示模板。应包含用于评估输入的占位符,这些占位符将在运行时格式化。

示例

>>> from ragas.metrics import RankingMetric
>>> from ragas.llms import llm_factory
>>> from openai import OpenAI
>>>
>>> # Create an LLM instance
>>> client = OpenAI(api_key="your-api-key")
>>> llm = llm_factory("gpt-4o-mini", client=client)
>>>
>>> # Create a ranking metric that returns top 3 items
>>> metric = RankingMetric(
...     name="relevance_ranking",
...     llm=llm,
...     prompt="Rank these results by relevance: {results}",
...     allowed_values=3
... )
>>>
>>> # Score with the metric
>>> result = metric.score(
...     llm=llm,
...     results="result1, result2, result3"
... )
>>> print(result.value)  # Output: a list of 3 ranked items

get_correlation

get_correlation(gold_labels: List[str], predictions: List[str]) -> float

计算黄金标签和预测之间的相关性。这是一个占位符方法,应根据具体指标实现。

源代码位于 src/ragas/metrics/ranking.py
def get_correlation(
    self, gold_labels: t.List[str], predictions: t.List[str]
) -> float:
    """
    Calculate the correlation between gold labels and predictions.
    This is a placeholder method and should be implemented based on the specific metric.
    """
    try:
        from sklearn.metrics import cohen_kappa_score
    except ImportError:
        raise ImportError(
            "scikit-learn is required for correlation calculation. "
            "Please install it with `pip install scikit-learn`."
        )

    kappa_scores = []
    for gold_item, prediction in zip(gold_labels, predictions):
        kappa = cohen_kappa_score(gold_item, prediction, weights="quadratic")
        kappa_scores.append(kappa)

    return sum(kappa_scores) / len(kappa_scores) if kappa_scores else 0.0

load classmethod

load(path: str, embedding_model: Optional[EmbeddingModelType] = None) -> RankingMetric

从 JSON 文件加载 RankingMetric。

参数

path : str 要从中加载的文件路径。支持 .gz 压缩文件。 embedding_model : Optional[Any] 用于 DynamicFewShotPrompt 的嵌入模型。如果原始模型使用过,则需要。

返回

RankingMetric 加载的指标实例

抛出

ValueError 如果文件无法加载或不是 RankingMetric

源代码位于 src/ragas/metrics/ranking.py
@classmethod
def load(
    cls, path: str, embedding_model: t.Optional["EmbeddingModelType"] = None
) -> "RankingMetric":
    """
    Load a RankingMetric from a JSON file.

    Parameters:
    -----------
    path : str
        File path to load from. Supports .gz compressed files.
    embedding_model : Optional[Any]
        Embedding model for DynamicFewShotPrompt. Required if the original used one.

    Returns:
    --------
    RankingMetric
        Loaded metric instance

    Raises:
    -------
    ValueError
        If file cannot be loaded or is not a RankingMetric
    """
    # Validate metric type before loading
    cls._validate_metric_type(path)

    # Load using parent class method
    metric = super().load(path, embedding_model=embedding_model)

    # Additional type check for safety
    if not isinstance(metric, cls):
        raise ValueError(f"Loaded metric is not a {cls.__name__}")

    return metric

MetricResult

MetricResult(value: Any, reason: Optional[str] = None, traces: Optional[Dict[str, Any]] = None)

用于保存指标评估结果的类。

此类表现得像其底层结果值,但仍提供对额外元数据(如推理)的访问。

适用于:- DiscreteMetrics(字符串结果)- NumericMetrics(浮点数/整数结果)- RankingMetrics(列表结果)

源代码位于 src/ragas/metrics/result.py
def __init__(
    self,
    value: t.Any,
    reason: t.Optional[str] = None,
    traces: t.Optional[t.Dict[str, t.Any]] = None,
):
    if traces is not None:
        invalid_keys = [
            key for key in traces.keys() if key not in {"input", "output"}
        ]
        if invalid_keys:
            raise ValueError(
                f"Invalid keys in traces: {invalid_keys}. Allowed keys are 'input' and 'output'."
            )
    self._value = value
    self.reason = reason
    self.traces = traces

value property

value

获取原始结果值。

to_dict

to_dict()

将结果转换为字典。

源代码位于 src/ragas/metrics/result.py
def to_dict(self):
    """Convert the result to a dictionary."""
    return {"result": self._value, "reason": self.reason}

validate classmethod

validate(value: Any, info: ValidationInfo)

提供与旧版 Pydantic 的兼容性。

源代码位于 src/ragas/metrics/result.py
@classmethod
def validate(cls, value: t.Any, info: ValidationInfo):
    """Provide compatibility with older Pydantic versions."""
    if isinstance(value, MetricResult):
        return value
    return cls(value=value)

discrete_metric

discrete_metric(*, name: Optional[str] = None, allowed_values: Optional[List[str]] = None, **metric_params: Any) -> Callable[[Callable[..., Any]], DiscreteMetricProtocol]

用于创建离散/分类指标的装饰器。

此装饰器将一个常规函数转换为一个 DiscreteMetric 实例,可用于具有预定义分类输出的评估。

参数

名称 类型 描述 默认值
name str

指标的名称。如果未提供,则使用函数名。

None
allowed_values List[str]

指标输出的允许分类值列表。默认为 ["pass", "fail"]。

None
**metric_params Any

传递给指标初始化的其他参数。

{}

返回

类型 描述
Callable[[Callable[..., Any]], DiscreteMetricProtocol]

一个将函数转换为 DiscreteMetric 实例的装饰器。

示例

>>> from ragas.metrics import discrete_metric
>>>
>>> @discrete_metric(name="sentiment", allowed_values=["positive", "neutral", "negative"])
>>> def sentiment_analysis(user_input: str, response: str) -> str:
...     '''Analyze sentiment of the response.'''
...     if "great" in response.lower() or "good" in response.lower():
...         return "positive"
...     elif "bad" in response.lower() or "poor" in response.lower():
...         return "negative"
...     return "neutral"
>>>
>>> result = sentiment_analysis(
...     user_input="How was your day?",
...     response="It was great!"
... )
>>> print(result.value)  # "positive"
源代码位于 src/ragas/metrics/discrete.py
def discrete_metric(
    *,
    name: t.Optional[str] = None,
    allowed_values: t.Optional[t.List[str]] = None,
    **metric_params: t.Any,
) -> t.Callable[[t.Callable[..., t.Any]], DiscreteMetricProtocol]:
    """
    Decorator for creating discrete/categorical metrics.

    This decorator transforms a regular function into a DiscreteMetric instance
    that can be used for evaluation with predefined categorical outputs.

    Parameters
    ----------
    name : str, optional
        Name for the metric. If not provided, uses the function name.
    allowed_values : List[str], optional
        List of allowed categorical values for the metric output.
        Default is ["pass", "fail"].
    **metric_params : Any
        Additional parameters to pass to the metric initialization.

    Returns
    -------
    Callable[[Callable[..., Any]], DiscreteMetricProtocol]
        A decorator that transforms a function into a DiscreteMetric instance.

    Examples
    --------
    >>> from ragas.metrics import discrete_metric
    >>>
    >>> @discrete_metric(name="sentiment", allowed_values=["positive", "neutral", "negative"])
    >>> def sentiment_analysis(user_input: str, response: str) -> str:
    ...     '''Analyze sentiment of the response.'''
    ...     if "great" in response.lower() or "good" in response.lower():
    ...         return "positive"
    ...     elif "bad" in response.lower() or "poor" in response.lower():
    ...         return "negative"
    ...     return "neutral"
    >>>
    >>> result = sentiment_analysis(
    ...     user_input="How was your day?",
    ...     response="It was great!"
    ... )
    >>> print(result.value)  # "positive"
    """
    if allowed_values is None:
        allowed_values = ["pass", "fail"]

    decorator_factory = create_metric_decorator()
    return decorator_factory(name=name, allowed_values=allowed_values, **metric_params)  # type: ignore[return-value]

numeric_metric

numeric_metric(*, name: Optional[str] = None, allowed_values: Optional[Union[Tuple[float, float], range]] = None, **metric_params: Any) -> Callable[[Callable[..., Any]], NumericMetricProtocol]

用于创建数值/连续指标的装饰器。

此装饰器将一个常规函数转换为一个 NumericMetric 实例,该实例输出指定范围内的连续值。

参数

名称 类型 描述 默认值
name str

指标的名称。如果未提供,则使用函数名。

None
allowed_values Union[Tuple[float, float], range]

指标输出的有效范围,为 (min, max) 元组或 range 对象。默认为 (0.0, 1.0)。

None
**metric_params Any

传递给指标初始化的其他参数。

{}

返回

类型 描述
Callable[[Callable[..., Any]], NumericMetricProtocol]

一个将函数转换为 NumericMetric 实例的装饰器。

示例

>>> from ragas.metrics import numeric_metric
>>>
>>> @numeric_metric(name="relevance_score", allowed_values=(0.0, 1.0))
>>> def calculate_relevance(user_input: str, response: str) -> float:
...     '''Calculate relevance score between 0 and 1.'''
...     # Simple word overlap example
...     user_words = set(user_input.lower().split())
...     response_words = set(response.lower().split())
...     if not user_words:
...         return 0.0
...     overlap = len(user_words & response_words)
...     return overlap / len(user_words)
>>>
>>> result = calculate_relevance(
...     user_input="What is Python?",
...     response="Python is a programming language"
... )
>>> print(result.value)  # Numeric score between 0.0 and 1.0
源代码位于 src/ragas/metrics/numeric.py
def numeric_metric(
    *,
    name: t.Optional[str] = None,
    allowed_values: t.Optional[t.Union[t.Tuple[float, float], range]] = None,
    **metric_params: t.Any,
) -> t.Callable[[t.Callable[..., t.Any]], NumericMetricProtocol]:
    """
    Decorator for creating numeric/continuous metrics.

    This decorator transforms a regular function into a NumericMetric instance
    that outputs continuous values within a specified range.

    Parameters
    ----------
    name : str, optional
        Name for the metric. If not provided, uses the function name.
    allowed_values : Union[Tuple[float, float], range], optional
        The valid range for metric outputs as (min, max) tuple or range object.
        Default is (0.0, 1.0).
    **metric_params : Any
        Additional parameters to pass to the metric initialization.

    Returns
    -------
    Callable[[Callable[..., Any]], NumericMetricProtocol]
        A decorator that transforms a function into a NumericMetric instance.

    Examples
    --------
    >>> from ragas.metrics import numeric_metric
    >>>
    >>> @numeric_metric(name="relevance_score", allowed_values=(0.0, 1.0))
    >>> def calculate_relevance(user_input: str, response: str) -> float:
    ...     '''Calculate relevance score between 0 and 1.'''
    ...     # Simple word overlap example
    ...     user_words = set(user_input.lower().split())
    ...     response_words = set(response.lower().split())
    ...     if not user_words:
    ...         return 0.0
    ...     overlap = len(user_words & response_words)
    ...     return overlap / len(user_words)
    >>>
    >>> result = calculate_relevance(
    ...     user_input="What is Python?",
    ...     response="Python is a programming language"
    ... )
    >>> print(result.value)  # Numeric score between 0.0 and 1.0
    """
    if allowed_values is None:
        allowed_values = (0.0, 1.0)

    decorator_factory = create_metric_decorator()
    return decorator_factory(name=name, allowed_values=allowed_values, **metric_params)  # type: ignore[return-value]

ranking_metric

ranking_metric(*, name: Optional[str] = None, allowed_values: Optional[int] = None, **metric_params: Any) -> Callable[[Callable[..., Any]], RankingMetricProtocol]

用于创建排名/排序指标的装饰器。

此装饰器将一个常规函数转换为一个 RankingMetric 实例,该实例输出有序的项目列表。

参数

名称 类型 描述 默认值
name str

指标的名称。如果未提供,则使用函数名。

None
allowed_values int

排名列表中的预期项目数。默认为 2。

None
**metric_params Any

传递给指标初始化的其他参数。

{}

返回

类型 描述
Callable[[Callable[..., Any]], RankingMetricProtocol]

一个将函数转换为 RankingMetric 实例的装饰器。

示例

>>> from ragas.metrics import ranking_metric
>>>
>>> @ranking_metric(name="priority_ranker", allowed_values=3)
>>> def rank_by_urgency(user_input: str, responses: list) -> list:
...     '''Rank responses by urgency keywords.'''
...     urgency_keywords = ["urgent", "asap", "critical"]
...     scored = []
...     for resp in responses:
...         score = sum(kw in resp.lower() for kw in urgency_keywords)
...         scored.append((score, resp))
...     # Sort by score descending and return top items
...     ranked = sorted(scored, key=lambda x: x[0], reverse=True)
...     return [item[1] for item in ranked[:3]]
>>>
>>> result = rank_by_urgency(
...     user_input="What should I do first?",
...     responses=["This is urgent", "Take your time", "Critical issue!"]
... )
>>> print(result.value)  # Ranked list of responses
源代码位于 src/ragas/metrics/ranking.py
def ranking_metric(
    *,
    name: t.Optional[str] = None,
    allowed_values: t.Optional[int] = None,
    **metric_params: t.Any,
) -> t.Callable[[t.Callable[..., t.Any]], RankingMetricProtocol]:
    """
    Decorator for creating ranking/ordering metrics.

    This decorator transforms a regular function into a RankingMetric instance
    that outputs ordered lists of items.

    Parameters
    ----------
    name : str, optional
        Name for the metric. If not provided, uses the function name.
    allowed_values : int, optional
        Expected number of items in the ranking list. Default is 2.
    **metric_params : Any
        Additional parameters to pass to the metric initialization.

    Returns
    -------
    Callable[[Callable[..., Any]], RankingMetricProtocol]
        A decorator that transforms a function into a RankingMetric instance.

    Examples
    --------
    >>> from ragas.metrics import ranking_metric
    >>>
    >>> @ranking_metric(name="priority_ranker", allowed_values=3)
    >>> def rank_by_urgency(user_input: str, responses: list) -> list:
    ...     '''Rank responses by urgency keywords.'''
    ...     urgency_keywords = ["urgent", "asap", "critical"]
    ...     scored = []
    ...     for resp in responses:
    ...         score = sum(kw in resp.lower() for kw in urgency_keywords)
    ...         scored.append((score, resp))
    ...     # Sort by score descending and return top items
    ...     ranked = sorted(scored, key=lambda x: x[0], reverse=True)
    ...     return [item[1] for item in ranked[:3]]
    >>>
    >>> result = rank_by_urgency(
    ...     user_input="What should I do first?",
    ...     responses=["This is urgent", "Take your time", "Critical issue!"]
    ... )
    >>> print(result.value)  # Ranked list of responses
    """
    if allowed_values is None:
        allowed_values = 2

    decorator_factory = create_metric_decorator()
    return decorator_factory(name=name, allowed_values=allowed_values, **metric_params)  # type: ignore[return-value]