跳转到内容

执行器

Executor dataclass

Executor(desc: str = 'Evaluating', show_progress: bool = True, keep_progress_bar: bool = True, jobs: List[Any] = list(), raise_exceptions: bool = False, batch_size: Optional[int] = None, run_config: Optional[RunConfig] = None, pbar: Optional[tqdm] = None, _jobs_processed: int = 0, _cancel_event: Event = Event())

用于运行异步作业的 Executor 类,具有进度跟踪和错误处理功能。

属性

名称 类型 描述
desc str

进度条的描述

show_progress bool

是否显示进度条

keep_progress_bar bool

完成后是否保留进度条

作业 List[Any]

要执行的作业列表

raise_exceptions bool

是抛出异常还是记录异常

batch_size int

是否批量处理(大量)任务列表

run_config RunConfig

运行的配置

_nest_asyncio_applied bool

nest_asyncio 是否已应用

_cancel_event 事件

用于发出取消信号的事件

取消

cancel() -> None

取消所有作业的执行。

源代码位于 src/ragas/executor.py
def cancel(self) -> None:
    """Cancel the execution of all jobs."""
    self._cancel_event.set()

is_cancelled

is_cancelled() -> bool

检查执行是否已被取消。

源代码位于 src/ragas/executor.py
def is_cancelled(self) -> bool:
    """Check if the execution has been cancelled."""
    return self._cancel_event.is_set()

提交

submit(callable: Callable, *args, name: Optional[str] = None, **kwargs) -> None

提交一个要执行的作业,将可调用对象包装在错误处理和索引中,以跟踪作业索引。

源代码位于 src/ragas/executor.py
def submit(
    self,
    callable: t.Callable,
    *args,
    name: t.Optional[str] = None,
    **kwargs,
) -> None:
    """
    Submit a job to be executed, wrapping the callable with error handling and indexing to keep track of the job index.
    """
    # Use _jobs_processed for consistent indexing across multiple runs
    callable_with_index = self.wrap_callable_with_index(
        callable, self._jobs_processed
    )
    self.jobs.append((callable_with_index, args, kwargs, name))
    self._jobs_processed += 1

clear_jobs

clear_jobs() -> None

清除所有已提交的作业并重置计数器。

源代码位于 src/ragas/executor.py
def clear_jobs(self) -> None:
    """Clear all submitted jobs and reset counter."""
    self.jobs.clear()
    self._jobs_processed = 0

aresults async

aresults() -> List[Any]

异步执行所有已提交的作业并返回其结果。结果按作业提交的顺序返回。

这是在已有异步上下文的情况下执行异步作业的异步入口点。

源代码位于 src/ragas/executor.py
async def aresults(self) -> t.List[t.Any]:
    """
    Execute all submitted jobs and return their results asynchronously.
    The results are returned in the order of job submission.

    This is the async entry point for executing async jobs when already in an async context.
    """
    results = await self._process_jobs()
    sorted_results = sorted(results, key=lambda x: x[0])
    return [r[1] for r in sorted_results]

结果

results() -> List[Any]

执行所有已提交的作业并返回其结果。结果按作业提交的顺序返回。

这是执行异步作业的主要同步入口点。

源代码位于 src/ragas/executor.py
def results(self) -> t.List[t.Any]:
    """
    Execute all submitted jobs and return their results. The results are returned in the order of job submission.

    This is the main sync entry point for executing async jobs.
    """

    async def _async_wrapper():
        return await self.aresults()

    apply_nest_asyncio()
    return run(_async_wrapper)

run_async_batch

run_async_batch(desc: str, func: Callable, kwargs_list: List[Dict], batch_size: Optional[int] = None)

提供并行运行带有不同参数的同一异步函数的功能。

源代码位于 src/ragas/executor.py
def run_async_batch(
    desc: str,
    func: t.Callable,
    kwargs_list: t.List[t.Dict],
    batch_size: t.Optional[int] = None,
):
    """
    Provide functionality to run the same async function with different arguments in parallel.
    """
    run_config = RunConfig()
    executor = Executor(
        desc=desc,
        keep_progress_bar=False,
        raise_exceptions=True,
        run_config=run_config,
        batch_size=batch_size,
    )

    for kwargs in kwargs_list:
        executor.submit(func, **kwargs)

    return executor.results()