跳转到内容

取消长时间运行的任务

在处理大型数据集或复杂评估时,一些 Ragas 操作可能需要很长时间才能完成。取消功能允许您在需要时优雅地终止这些长时间运行的任务,这在生产环境中尤为重要。

概述

Ragas 为以下操作提供取消支持: - evaluate() - 使用指标评估数据集 - generate_with_langchain_docs() - 从文档生成测试集

取消机制是线程安全的,并允许在可能的情况下优雅地终止并返回部分结果。

基本用法

可取消的评估

您可以获取一个允许取消的执行器,而不是直接运行评估

from ragas import evaluate
from ragas.dataset_schema import EvaluationDataset

# Your dataset and metrics
dataset = EvaluationDataset(...)
metrics = [...]

# Get executor instead of running evaluation immediately
executor = evaluate(
    dataset=dataset,
    metrics=metrics,
    return_executor=True  # Key parameter
)

# Now you can:
# - Cancel: executor.cancel()
# - Check status: executor.is_cancelled()
# - Get results: executor.results()  # This blocks until completion

可取消的测试集生成

测试集生成也采用类似的方法

from ragas.testset.synthesizers.generate import TestsetGenerator

generator = TestsetGenerator(...)

# Get executor for cancellable generation
executor = generator.generate_with_langchain_docs(
    documents=documents,
    testset_size=100,
    return_executor=True  # Allow access to Executor to cancel
)

# Use the same cancellation interface
executor.cancel()

生产环境模式

1. 超时模式

自动取消超过时间限制的操作

import threading
import time

def evaluate_with_timeout(dataset, metrics, timeout_seconds=300):
    """Run evaluation with automatic timeout."""
    # Get cancellable executor
    executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)

    results = None
    exception = None

    def run_evaluation():
        nonlocal results, exception
        try:
            results = executor.results()
        except Exception as e:
            exception = e

    # Start evaluation in background thread
    thread = threading.Thread(target=run_evaluation)
    thread.start()

    # Wait for completion or timeout
    thread.join(timeout=timeout_seconds)

    if thread.is_alive():
        print(f"Evaluation exceeded {timeout_seconds}s timeout, cancelling...")
        executor.cancel()
        thread.join(timeout=10)  # Custom timeout as per need
        return None, "timeout"

    return results, exception

# Usage
results, error = evaluate_with_timeout(dataset, metrics, timeout_seconds=600)
if error == "timeout":
    print("Evaluation was cancelled due to timeout")
else:
    print(f"Evaluation completed: {results}")

2. 信号处理模式 (Ctrl+C)

允许用户通过键盘中断来取消

import signal
import sys

def setup_cancellation_handler():
    """Set up graceful cancellation on Ctrl+C."""
    executor = None

    def signal_handler(signum, frame):
        if executor and not executor.is_cancelled():
            print("\nReceived interrupt signal, cancelling evaluation...")
            executor.cancel()
            print("Cancellation requested. Waiting for graceful shutdown...")
        sys.exit(0)

    # Register signal handler
    signal.signal(signal.SIGINT, signal_handler)

    return lambda exec: setattr(signal_handler, 'executor', exec)

# Usage
set_executor = setup_cancellation_handler()

executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
set_executor(executor)

print("Running evaluation... Press Ctrl+C to cancel gracefully")
try:
    results = executor.results()
    print("Evaluation completed successfully")
except KeyboardInterrupt:
    print("Evaluation was cancelled")

3. Web 应用模式

对于 Web 应用程序,在请求中止时取消操作

from flask import Flask, request
import threading
import uuid

app = Flask(__name__)
active_evaluations = {}

@app.route('/evaluate', methods=['POST'])
def start_evaluation():
    # Create unique evaluation ID
    eval_id = str(uuid.uuid4())

    # Get dataset and metrics from request
    dataset = get_dataset_from_request(request)
    metrics = get_metrics_from_request(request)

    # Start cancellable evaluation
    executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
    active_evaluations[eval_id] = executor

    # Start evaluation in background
    def run_eval():
        try:
            results = executor.results()
            # Store results somewhere
            store_results(eval_id, results)
        except Exception as e:
            store_error(eval_id, str(e))
        finally:
            active_evaluations.pop(eval_id, None)

    threading.Thread(target=run_eval).start()

    return {"evaluation_id": eval_id, "status": "started"}

@app.route('/evaluate/<eval_id>/cancel', methods=['POST'])
def cancel_evaluation(eval_id):
    executor = active_evaluations.get(eval_id)
    if executor:
        executor.cancel()
        return {"status": "cancelled"}
    return {"error": "Evaluation not found"}, 404

高级用法

检查取消状态

executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)

# Start in background
def monitor_evaluation():
    while not executor.is_cancelled():
        print("Evaluation still running...")
        time.sleep(5)
    print("Evaluation was cancelled")

threading.Thread(target=monitor_evaluation).start()

# Cancel after some condition
if some_condition():
    executor.cancel()

部分结果

当在执行期间发生取消时,您可能会得到部分结果

executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)

try:
    results = executor.results()
    print(f"Completed {len(results)} evaluations")
except Exception as e:
    if executor.is_cancelled():
        print("Evaluation was cancelled - may have partial results")
    else:
        print(f"Evaluation failed: {e}")

自定义取消逻辑

class EvaluationManager:
    def __init__(self):
        self.executors = []

    def start_evaluation(self, dataset, metrics):
        executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
        self.executors.append(executor)
        return executor

    def cancel_all(self):
        """Cancel all running evaluations."""
        for executor in self.executors:
            if not executor.is_cancelled():
                executor.cancel()
        print(f"Cancelled {len(self.executors)} evaluations")

    def cleanup_completed(self):
        """Remove completed executors."""
        self.executors = [ex for ex in self.executors if not ex.is_cancelled()]

# Usage
manager = EvaluationManager()

# Start multiple evaluations
exec1 = manager.start_evaluation(dataset1, metrics)
exec2 = manager.start_evaluation(dataset2, metrics)

# Cancel all if needed
manager.cancel_all()

最佳实践

1. 在生产环境中始终使用超时

# Good: Always set reasonable timeouts
results, error = evaluate_with_timeout(dataset, metrics, timeout_seconds=1800)  # 30 minutes

# Avoid: Indefinite blocking
results = executor.results()  # Could block forever

2. 优雅地处理取消

try:
    results = executor.results()
    process_results(results)
except Exception as e:
    if executor.is_cancelled():
        log_cancellation()
        cleanup_partial_work()
    else:
        log_error(e)
        handle_failure()

3. 提供用户反馈

def run_with_progress_and_cancellation(executor):
    print("Starting evaluation... Press Ctrl+C to cancel")

    # Monitor progress in background
    def show_progress():
        while not executor.is_cancelled():
            # Show some progress indication
            print(".", end="", flush=True)
            time.sleep(1)

    progress_thread = threading.Thread(target=show_progress)
    progress_thread.daemon = True
    progress_thread.start()

    try:
        return executor.results()
    except KeyboardInterrupt:
        print("\nCancelling...")
        executor.cancel()
        return None

4. 清理资源

def managed_evaluation(dataset, metrics):
    executor = None
    try:
        executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
        return executor.results()
    except Exception as e:
        if executor:
            executor.cancel()
        raise
    finally:
        # Clean up any temporary resources
        cleanup_temp_files()

限制

  • 异步操作:取消在任务级别生效,而不是在单个 LLM 调用内部
  • 部分状态:被取消的操作可能会留下部分结果或临时文件
  • 时机:取消是协作式的——任务需要定期检查取消状态
  • 依赖项:一些外部服务可能不会立即响应取消请求

问题排查

取消不生效

# Check if cancellation is set
if executor.is_cancelled():
    print("Cancellation was requested")
else:
    print("Cancellation not requested yet")

# Ensure you're calling cancel()
executor.cancel()
assert executor.is_cancelled()

取消后任务仍在运行

# Give time for graceful shutdown
executor.cancel()
time.sleep(2)  # Allow tasks to detect cancellation

# Force cleanup if needed
import asyncio
try:
    loop = asyncio.get_running_loop()
    for task in asyncio.all_tasks(loop):
        task.cancel()
except RuntimeError:
    pass  # No event loop running

取消功能为长时间运行的 Ragas 操作提供了强大的控制,通过适当的资源管理和用户体验,使其能够进行生产就绪的部署。