跳到内容

执行器

执行器 数据类

Executor(desc: str = 'Evaluating', show_progress: bool = True, keep_progress_bar: bool = True, jobs: List[Any] = list(), raise_exceptions: bool = False, batch_size: Optional[int] = None, run_config: Optional[RunConfig] = None, _nest_asyncio_applied: bool = False, pbar: Optional[tqdm] = None)

用于运行异步任务的执行器类,具有进度跟踪和错误处理功能。

属性

名称 类型 描述
desc str

进度条的描述

show_progress bool

是否显示进度条

keep_progress_bar bool

完成后是否保留进度条

jobs 列表[任意]

要执行的任务列表

raise_exceptions bool

是否抛出异常或记录异常

batch_size int

是否对(大型)任务列表进行批处理

run_config RunConfig

运行配置

_nest_asyncio_applied bool

是否已应用 nest_asyncio

submit

submit(callable: Callable, *args, name: Optional[str] = None, **kwargs) -> None

提交一个要执行的任务,用错误处理和索引包装可调用对象,以跟踪任务索引。

源代码位于 src/ragas/executor.py
def submit(
    self,
    callable: t.Callable,
    *args,
    name: t.Optional[str] = None,
    **kwargs,
) -> None:
    """
    Submit a job to be executed, wrapping the callable with error handling and indexing to keep track of the job index.
    """
    callable_with_index = self.wrap_callable_with_index(callable, len(self.jobs))
    self.jobs.append((callable_with_index, args, kwargs, name))

results

results() -> List[Any]

执行所有提交的任务并返回结果。结果按照任务提交的顺序返回。

源代码位于 src/ragas/executor.py
def results(self) -> t.List[t.Any]:
    """
    Execute all submitted jobs and return their results. The results are returned in the order of job submission.
    """
    if is_event_loop_running():
        # an event loop is running so call nested_asyncio to fix this
        try:
            import nest_asyncio
        except ImportError as e:
            raise ImportError(
                "It seems like your running this in a jupyter-like environment. "
                "Please install nest_asyncio with `pip install nest_asyncio` to make it work."
            ) from e
        else:
            if not self._nest_asyncio_applied:
                nest_asyncio.apply()
                self._nest_asyncio_applied = True

    results = asyncio.run(self._process_jobs())
    sorted_results = sorted(results, key=lambda x: x[0])
    return [r[1] for r in sorted_results]

run_async_batch

run_async_batch(desc: str, func: Callable, kwargs_list: List[Dict], batch_size: Optional[int] = None)

提供并行运行具有不同参数的相同异步函数的功能。

源代码位于 src/ragas/executor.py
def run_async_batch(
    desc: str,
    func: t.Callable,
    kwargs_list: t.List[t.Dict],
    batch_size: t.Optional[int] = None,
):
    """
    Provide functionality to run the same async function with different arguments in parallel.
    """
    run_config = RunConfig()
    executor = Executor(
        desc=desc,
        keep_progress_bar=False,
        raise_exceptions=True,
        run_config=run_config,
        batch_size=batch_size,
    )

    for kwargs in kwargs_list:
        executor.submit(func, **kwargs)

    return executor.results()

is_event_loop_running

is_event_loop_running() -> bool

检查当前是否有事件循环正在运行。

源代码位于 src/ragas/executor.py
def is_event_loop_running() -> bool:
    """
    Check if an event loop is currently running.
    """
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        return False
    else:
        return loop.is_running()