跳到内容

evaluate()

使用不同指标对数据集执行评估

参数

名称 类型 描述 默认值
dataset (Dataset, EvaluationDataset)

指标用于评估 RAG 管道的数据集。

必需
metrics list[Metric]

用于评估的指标列表。如果未提供,ragas 将使用最佳指标集运行评估以提供完整视图。

None
llm BaseRagasLLM

用于生成分数以计算指标的语言模型 (LLM)。如果未提供,ragas 将对需要 LLM 的指标使用默认语言模型。这可以通过 metric.llm 中指定的指标级别的 LLM 覆盖。

None
embeddings BaseRagasEmbeddings

指标使用的 embeddings 模型。如果未提供,ragas 将对需要 embeddings 的指标使用默认 embeddings。这可以通过 metric.embeddings 中指定的指标级别的 embeddings 覆盖。

None
experiment_name str

要跟踪的实验名称。这用于在跟踪工具中跟踪评估。

None
callbacks Callbacks

在评估期间运行的生命周期 Langchain Callbacks。有关更多信息,请查看Langchain 文档

None
run_config RunConfig

运行时设置的配置,如超时和重试。如果未提供,则使用默认值。

None
token_usage_parser TokenUsageParser

用于从 LLM 结果中获取 token 使用情况的解析器。如果未提供,将不会计算成本和总 token 计数。默认为 None。

None
raise_exceptions False

是否抛出异常。如果设置为 True,如果任何指标失败,评估将抛出异常。如果设置为 False,评估将为失败的行返回 np.nan。默认为 False。

False
column_map dict[str, str]

用于评估的数据集列名。如果数据集的列名与默认列名不同,可以在此处以字典形式提供映射。示例:如果数据集列名为 contexts_v1,可以将 column_map 传递为 {"contexts": "contexts_v1"}

None
show_progress bool

是否在评估期间显示进度条。如果设置为 False,将禁用进度条。默认为 True。

True
batch_size int

批次的大小。如果设置为 None(默认值),则不进行批处理。

None

返回

类型 描述
EvaluationResult

包含每个指标分数的 EvaluationResult 对象。您可以稍后使用此对象进行分析。

抛出异常

类型 描述
ValueError

如果验证失败,因为指标所需的列缺失或列格式错误。

示例

基本用法如下所示

from ragas import evaluate

>>> dataset
Dataset({
    features: ['question', 'ground_truth', 'answer', 'contexts'],
    num_rows: 30
})

>>> result = evaluate(dataset)
>>> print(result)
{'context_precision': 0.817,
'faithfulness': 0.892,
'answer_relevancy': 0.874}

源代码位于 src/ragas/evaluation.py
@track_was_completed
def evaluate(
    dataset: t.Union[Dataset, EvaluationDataset],
    metrics: t.Optional[t.Sequence[Metric]] = None,
    llm: t.Optional[BaseRagasLLM | LangchainLLM] = None,
    embeddings: t.Optional[BaseRagasEmbeddings | LangchainEmbeddings] = None,
    experiment_name: t.Optional[str] = None,
    callbacks: Callbacks = None,
    run_config: t.Optional[RunConfig] = None,
    token_usage_parser: t.Optional[TokenUsageParser] = None,
    raise_exceptions: bool = False,
    column_map: t.Optional[t.Dict[str, str]] = None,
    show_progress: bool = True,
    batch_size: t.Optional[int] = None,
    _run_id: t.Optional[UUID] = None,
    _pbar: t.Optional[tqdm] = None,
) -> EvaluationResult:
    """
    Perform the evaluation on the dataset with different metrics

    Parameters
    ----------
    dataset : Dataset, EvaluationDataset
        The dataset used by the metrics to evaluate the RAG pipeline.
    metrics : list[Metric], optional
        List of metrics to use for evaluation. If not provided, ragas will run
        the evaluation on the best set of metrics to give a complete view.
    llm : BaseRagasLLM, optional
        The language model (LLM) to use to generate the score for calculating the metrics.
        If not provided, ragas will use the default
        language model for metrics that require an LLM. This can be overridden by the LLM
        specified in the metric level with `metric.llm`.
    embeddings : BaseRagasEmbeddings, optional
        The embeddings model to use for the metrics.
        If not provided, ragas will use the default embeddings for metrics that require embeddings.
        This can be overridden by the embeddings specified in the metric level with `metric.embeddings`.
    experiment_name : str, optional
        The name of the experiment to track. This is used to track the evaluation in the tracing tool.
    callbacks : Callbacks, optional
        Lifecycle Langchain Callbacks to run during evaluation.
        Check the [Langchain documentation](https://python.langchain.ac.cn/docs/modules/callbacks/) for more information.
    run_config : RunConfig, optional
        Configuration for runtime settings like timeout and retries. If not provided, default values are used.
    token_usage_parser : TokenUsageParser, optional
        Parser to get the token usage from the LLM result.
        If not provided, the cost and total token count will not be calculated. Default is None.
    raise_exceptions : False
        Whether to raise exceptions or not. If set to True, the evaluation will raise an exception
        if any of the metrics fail. If set to False, the evaluation will return `np.nan` for the row that failed. Default is False.
    column_map : dict[str, str], optional
        The column names of the dataset to use for evaluation. If the column names of the dataset are different from the default ones,
        it is possible to provide the mapping as a dictionary here. Example: If the dataset column name is `contexts_v1`, it is possible to pass column_map as `{"contexts": "contexts_v1"}`.
    show_progress : bool, optional
        Whether to show the progress bar during evaluation. If set to False, the progress bar will be disabled. The default is True.
    batch_size : int, optional
        How large the batches should be. If set to None (default), no batching is done.

    Returns
    -------
    EvaluationResult
        EvaluationResult object containing the scores of each metric.
        You can use this do analysis later.

    Raises
    ------
    ValueError
        if validation fails because the columns required for the metrics are missing or
        if the columns are of the wrong format.

    Examples
    --------
    the basic usage is as follows:
    ```
    from ragas import evaluate

    >>> dataset
    Dataset({
        features: ['question', 'ground_truth', 'answer', 'contexts'],
        num_rows: 30
    })

    >>> result = evaluate(dataset)
    >>> print(result)
    {'context_precision': 0.817,
    'faithfulness': 0.892,
    'answer_relevancy': 0.874}
    ```
    """
    column_map = column_map or {}
    callbacks = callbacks or []
    run_config = run_config or RunConfig()

    if helicone_config.is_enabled:
        import uuid

        helicone_config.session_name = "ragas-evaluation"
        helicone_config.session_id = str(uuid.uuid4())

    if dataset is None:
        raise ValueError("Provide dataset!")

    # default metrics
    if metrics is None:
        from ragas.metrics import (
            answer_relevancy,
            context_precision,
            context_recall,
            faithfulness,
        )

        metrics = [answer_relevancy, context_precision, faithfulness, context_recall]

    if isinstance(dataset, Dataset):
        # remap column names from the dataset
        dataset = remap_column_names(dataset, column_map)
        dataset = convert_v1_to_v2_dataset(dataset)
        # validation
        dataset = EvaluationDataset.from_list(dataset.to_list())

    if isinstance(dataset, EvaluationDataset):
        validate_required_columns(dataset, metrics)
        validate_supported_metrics(dataset, metrics)

    # set the llm and embeddings
    if isinstance(llm, LangchainLLM):
        llm = LangchainLLMWrapper(llm, run_config=run_config)
    if isinstance(embeddings, LangchainEmbeddings):
        embeddings = LangchainEmbeddingsWrapper(embeddings)

    # init llms and embeddings
    binary_metrics = []
    llm_changed: t.List[int] = []
    embeddings_changed: t.List[int] = []
    answer_correctness_is_set = -1

    # loop through the metrics and perform initializations
    for i, metric in enumerate(metrics):
        # set llm and embeddings if not set
        if isinstance(metric, AspectCritic):
            binary_metrics.append(metric.name)
        if isinstance(metric, MetricWithLLM) and metric.llm is None:
            if llm is None:
                llm = llm_factory()
            metric.llm = llm
            llm_changed.append(i)
        if isinstance(metric, MetricWithEmbeddings) and metric.embeddings is None:
            if embeddings is None:
                embeddings = embedding_factory()
            metric.embeddings = embeddings
            embeddings_changed.append(i)
        if isinstance(metric, AnswerCorrectness):
            if metric.answer_similarity is None:
                answer_correctness_is_set = i

        # init all the models
        metric.init(run_config)

    executor = Executor(
        desc="Evaluating",
        keep_progress_bar=True,
        raise_exceptions=raise_exceptions,
        run_config=run_config,
        show_progress=show_progress,
        batch_size=batch_size,
        pbar=_pbar,
    )

    # Ragas Callbacks
    # init the callbacks we need for various tasks
    ragas_callbacks: t.Dict[str, BaseCallbackHandler] = {}

    # Ragas Tracer which traces the run
    tracer = RagasTracer()
    ragas_callbacks["tracer"] = tracer

    # check if cost needs to be calculated
    if token_usage_parser is not None:
        from ragas.cost import CostCallbackHandler

        cost_cb = CostCallbackHandler(token_usage_parser=token_usage_parser)
        ragas_callbacks["cost_cb"] = cost_cb

    # append all the ragas_callbacks to the callbacks
    for cb in ragas_callbacks.values():
        if isinstance(callbacks, BaseCallbackManager):
            callbacks.add_handler(cb)
        else:
            callbacks.append(cb)

    # new evaluation chain
    row_run_managers = []
    evaluation_rm, evaluation_group_cm = new_group(
        name=experiment_name or RAGAS_EVALUATION_CHAIN_NAME,
        inputs={},
        callbacks=callbacks,
        metadata={"type": ChainType.EVALUATION},
    )

    sample_type = dataset.get_sample_type()
    for i, sample in enumerate(dataset):
        row = t.cast(t.Dict[str, t.Any], sample.model_dump())
        row_rm, row_group_cm = new_group(
            name=f"row {i}",
            inputs=row,
            callbacks=evaluation_group_cm,
            metadata={"type": ChainType.ROW, "row_index": i},
        )
        row_run_managers.append((row_rm, row_group_cm))
        if sample_type == SingleTurnSample:
            _ = [
                executor.submit(
                    metric.single_turn_ascore,
                    sample,
                    row_group_cm,
                    name=f"{metric.name}-{i}",
                    timeout=run_config.timeout,
                )
                for metric in metrics
                if isinstance(metric, SingleTurnMetric)
            ]
        elif sample_type == MultiTurnSample:
            _ = [
                executor.submit(
                    metric.multi_turn_ascore,
                    sample,
                    row_group_cm,
                    name=f"{metric.name}-{i}",
                    timeout=run_config.timeout,
                )
                for metric in metrics
                if isinstance(metric, MultiTurnMetric)
            ]
        else:
            raise ValueError(f"Unsupported sample type {sample_type}")

    scores: t.List[t.Dict[str, t.Any]] = []
    try:
        # get the results
        results = executor.results()
        if results == []:
            raise ExceptionInRunner()

        # convert results to dataset_like
        for i, _ in enumerate(dataset):
            s = {}
            for j, m in enumerate(metrics):
                if isinstance(m, ModeMetric):  # type: ignore
                    key = f"{m.name}(mode={m.mode})"
                else:
                    key = m.name
                s[key] = results[len(metrics) * i + j]
            scores.append(s)
            # close the row chain
            row_rm, row_group_cm = row_run_managers[i]
            if not row_group_cm.ended:
                row_rm.on_chain_end(s)

    # run evaluation task
    except Exception as e:
        if not evaluation_group_cm.ended:
            evaluation_rm.on_chain_error(e)

        raise e
    else:
        # evalution run was successful
        # now lets process the results
        cost_cb = ragas_callbacks["cost_cb"] if "cost_cb" in ragas_callbacks else None
        result = EvaluationResult(
            scores=scores,
            dataset=dataset,
            binary_columns=binary_metrics,
            cost_cb=t.cast(
                t.Union["CostCallbackHandler", None],
                cost_cb,
            ),
            ragas_traces=tracer.traces,
            run_id=_run_id,
        )
        if not evaluation_group_cm.ended:
            evaluation_rm.on_chain_end({"scores": result.scores})
    finally:
        # reset llms and embeddings if changed
        for i in llm_changed:
            t.cast(MetricWithLLM, metrics[i]).llm = None
        for i in embeddings_changed:
            t.cast(MetricWithEmbeddings, metrics[i]).embeddings = None
        if answer_correctness_is_set != -1:
            t.cast(
                AnswerCorrectness, metrics[answer_correctness_is_set]
            ).answer_similarity = None

        # flush the analytics batcher
        from ragas._analytics import _analytics_batcher

        _analytics_batcher.flush()

    return result