diff --git a/src/uipath/_cli/__init__.py b/src/uipath/_cli/__init__.py index 7c24e1d6..d2b19137 100644 --- a/src/uipath/_cli/__init__.py +++ b/src/uipath/_cli/__init__.py @@ -13,6 +13,7 @@ from .cli_pull import pull as pull # type: ignore from .cli_push import push as push # type: ignore from .cli_run import run as run # type: ignore +from .cli_eval import eval as eval # type: ignore def _get_safe_version() -> str: @@ -67,3 +68,4 @@ def cli(lv: bool, v: bool) -> None: cli.add_command(invoke) cli.add_command(push) cli.add_command(pull) +cli.add_command(eval) diff --git a/src/uipath/_cli/_evals/evaluation_service.py b/src/uipath/_cli/_evals/evaluation_service.py new file mode 100644 index 00000000..86b7aa2f --- /dev/null +++ b/src/uipath/_cli/_evals/evaluation_service.py @@ -0,0 +1,236 @@ +import asyncio +import json +from datetime import UTC, datetime +from pathlib import Path +from typing import Any, Dict, List + +from uipath._cli._utils._console import ConsoleLogger + +from ..cli_run import run +from .evaluators.llm_evaluator import LLMEvaluator +from .models import EvaluationSetResult + +console = ConsoleLogger() + + +class EvaluationService: + """Service for running evaluations.""" + + def __init__(self, eval_set_path: str | Path): + """Initialize the evaluation service. + + Args: + eval_set_path: Path to the evaluation set file (can be string or Path) + """ + self.eval_set_path = Path(eval_set_path) + self.eval_set = self._load_eval_set() + self.evaluators = self._load_evaluators() + self.num_workers = 8 + self.results_lock = asyncio.Lock() + self._initialize_results() + + def _initialize_results(self) -> None: + """Initialize the results file and directory.""" + # Create results directory if it doesn't exist + results_dir = self.eval_set_path.parent.parent / "results" + results_dir.mkdir(exist_ok=True) + + # Create results file + timestamp = datetime.now(UTC).strftime("%M-%H-%d-%m-%Y") + eval_set_name = self.eval_set["name"] + self.result_file = results_dir / f"eval-{eval_set_name}-{timestamp}.json" + + # Initialize with empty results + initial_results = EvaluationSetResult( + eval_set_id=self.eval_set["id"], + eval_set_name=self.eval_set["name"], + results=[], + average_score=0.0, + ) + + with open(self.result_file, "w", encoding="utf-8") as f: + f.write(initial_results.model_dump_json(indent=2)) + + def _load_eval_set(self) -> Dict[str, Any]: + """Load the evaluation set from file. + + Returns: + The loaded evaluation set + """ + with open(self.eval_set_path, "r", encoding="utf-8") as f: + return json.load(f) + + def _load_evaluators(self) -> List[LLMEvaluator]: + """Load evaluators referenced by the evaluation set.""" + evaluators = [] + evaluators_dir = self.eval_set_path.parent.parent / "evaluators" + + for evaluator_id in self.eval_set["evaluatorRefs"]: + # Find evaluator file + evaluator_file = None + for file in evaluators_dir.glob("*.json"): + with open(file) as f: + data = json.load(f) + if data.get("id") == evaluator_id: + evaluator_file = data + break + + if not evaluator_file: + raise ValueError(f"Could not find evaluator with ID {evaluator_id}") + + evaluators.append(LLMEvaluator(evaluator_file)) + + return evaluators + + async def _write_results(self, results: List[Any]) -> None: + """Write evaluation results to file with async lock. + + Args: + results: List of evaluation results to write + """ + async with self.results_lock: + # Read current results + with open(self.result_file, "r", encoding="utf-8") as f: + current_results = EvaluationSetResult.model_validate_json(f.read()) + + # Add new results + current_results.results.extend(results) + + if current_results.results: + current_results.average_score = sum( + r.score for r in current_results.results + ) / len(current_results.results) + + # Write updated results + with open(self.result_file, "w", encoding="utf-8") as f: + f.write(current_results.model_dump_json(indent=2)) + + def _run_agent(self, input_json: str) -> Dict[str, Any]: + """Run the agent with the given input. + + Args: + input_json: JSON string containing input data + + Returns: + Agent output as dictionary + """ + try: + # Run the agent using the CLI run command + run.callback( + entrypoint=None, + input=input_json, + resume=False, + file=None, + debug=False, + debug_port=5678, + ) + + # Read the output file + output_file = Path("__uipath") / "output.json" + with open(output_file, "r", encoding="utf-8") as f: + result = json.load(f) + + # Extract and parse the output content + output_content = result.get("output", {}) + if isinstance(output_content, str): + try: + return json.loads(output_content) + except json.JSONDecodeError as e: + raise Exception(f"Error parsing output: {e}") from e + return output_content + + except Exception as e: + console.error(f"Error running agent: {str(e)}") + return {"error": str(e)} + + async def _process_evaluation(self, eval_item: Dict[str, Any]) -> None: + """Process a single evaluation item. + + Args: + eval_item: The evaluation item to process + """ + console.info(f"Running evaluation: {eval_item['name']}") + + # Run the agent using the evaluation input + input_json = json.dumps(eval_item["inputs"]) + + # Run _run_agent in a non-async context using run_in_executor + loop = asyncio.get_running_loop() + actual_output = await loop.run_in_executor(None, self._run_agent, input_json) + + # Run each evaluator + eval_results = [] + for evaluator in self.evaluators: + result = await evaluator.evaluate( + evaluation_id=eval_item["id"], + evaluation_name=eval_item["name"], + input_data=eval_item["inputs"], + expected_output=eval_item["expectedOutput"], + actual_output=actual_output, + ) + eval_results.append(result) + + # Write results immediately + await self._write_results(eval_results) + + # TODO: here we should send the event to the SW eval API + console.info(f"Evaluation {eval_item['name']} complete.") + + async def _producer_task(self, task_queue: asyncio.Queue) -> None: + """Producer task that adds all evaluations to the queue. + + Args: + task_queue: The asyncio queue to add tasks to + """ + for eval_item in self.eval_set["evaluations"]: + await task_queue.put(eval_item) + + # Add sentinel values to signal workers to stop + for _ in range(self.num_workers): + await task_queue.put(None) + + async def _consumer_task(self, task_queue: asyncio.Queue, worker_id: int) -> None: + """Consumer task that processes evaluations from the queue. + + Args: + task_queue: The asyncio queue to get tasks from + worker_id: ID of this worker for logging + """ + while True: + eval_item = await task_queue.get() + if eval_item is None: + # Sentinel value - worker should stop + task_queue.task_done() + return + + try: + await self._process_evaluation(eval_item) + task_queue.task_done() + except Exception as e: + # Log error and continue to next item + task_queue.task_done() + console.warning( + f"Worker {worker_id} failed evaluation {eval_item.get('name', 'Unknown')}: {str(e)}" + ) + + async def run_evaluation(self) -> None: + """Run the evaluation set using multiple worker tasks.""" + task_queue = asyncio.Queue() + + producer = asyncio.create_task(self._producer_task(task_queue)) + + consumers = [] + for worker_id in range(self.num_workers): + consumer = asyncio.create_task(self._consumer_task(task_queue, worker_id)) + consumers.append(consumer) + + await producer + + await task_queue.join() + + # Wait for all consumers to finish + await asyncio.gather(*consumers) + + console.success( + f"All evaluations complete. Results saved to {self.result_file}" + ) diff --git a/src/uipath/_cli/_evals/evaluators/llm_evaluator.py b/src/uipath/_cli/_evals/evaluators/llm_evaluator.py new file mode 100644 index 00000000..ad9050bc --- /dev/null +++ b/src/uipath/_cli/_evals/evaluators/llm_evaluator.py @@ -0,0 +1,119 @@ +import json +from typing import Any, Dict, Optional + +from ..._utils._debug import console +from ...._config import Config +from ...._execution_context import ExecutionContext +from ...._services.llm_gateway_service import UiPathLlmChatService +from ...._utils.constants import ( + ENV_BASE_URL, + ENV_UIPATH_ACCESS_TOKEN, + ENV_UNATTENDED_USER_ACCESS_TOKEN, COMMUNITY_AGENTS_SUFFIX, +) +from ..models import EvaluationResult, EvaluatorCategory, LLMResponse + + +class LLMEvaluator: + """Service for evaluating outputs using LLM.""" + + # TODO: find a better way to structure the output + format_instructions: dict[str, str] = { + "role": "system", + "content": 'Extract the data from the following text and model it like this in JSON format: {"similarity_score" = "", "score_justification" = "" . Similarity_score is a float between 0 and 100 and score_justification is a str. The output should be a plain json, nothing else. No markdown.', + } + + def __init__(self, evaluator_config: Dict[str, Any]): + """Initialize LLM evaluator. + + Args: + evaluator_config: Configuration for the evaluator from evaluator JSON file + """ + import os + + self.config = evaluator_config + base_url_value = os.getenv(ENV_BASE_URL) + secret_value = os.getenv(ENV_UNATTENDED_USER_ACCESS_TOKEN) or os.getenv( + ENV_UIPATH_ACCESS_TOKEN + ) + config = Config( + base_url=base_url_value, # type: ignore + secret=secret_value, # type: ignore + ) + self.llm = UiPathLlmChatService(config, ExecutionContext()) + + # Validate evaluator category + if self.config.get("category") != EvaluatorCategory.LlmAsAJudge: + raise ValueError("Evaluator must be of type LlmAsAJudge") + + async def evaluate( + self, + evaluation_id: str, + evaluation_name: str, + input_data: Dict[str, Any], + expected_output: Dict[str, Any], + actual_output: Dict[str, Any], + ) -> EvaluationResult: + """Evaluate the actual output against expected output using LLM. + + Args: + evaluation_id: ID of the evaluation + evaluation_name: Name of the evaluation + input_data: Input data used for the evaluation + expected_output: Expected output from the evaluation + actual_output: Actual output received + + Returns: + EvaluationResult containing the evaluation score and details + """ + # Prepare the prompt by replacing placeholders + prompt = self.config["prompt"] + prompt = prompt.replace( + "{{ExpectedOutput}}", json.dumps(expected_output, indent=2) + ) + content = prompt.replace( + "{{ActualOutput}}", json.dumps(actual_output, indent=2) + ) + + model: Optional[str] = self.config.get("model", None) + if not model: + console.error("Evaluator model cannot be extracted") + + # remove community-agents suffix from llm model name + if model.endswith(COMMUNITY_AGENTS_SUFFIX): + model = model.replace(COMMUNITY_AGENTS_SUFFIX, "") + + response = await self.llm.chat_completions( + messages=[{"role": "user", "content": content}], model=model + ) + structured_response = await self.llm.chat_completions( + messages=[ + self.format_instructions, + {"role": "user", "content": response.choices[-1].message.content}, + ], + model=model, + ) + try: + llm_response = LLMResponse( + **json.loads(structured_response.choices[-1].message.content) + ) + except Exception as e: + raise Exception(f"Error parsing LLM response: {e}") from e + # Leave those comments + # llm_response = LLMResponse(similarity_score=90, score_justification="test justification") + score = llm_response.similarity_score + details = llm_response.score_justification + + if score < 0 or score > 100: + raise ValueError(f"Score {score} is outside valid range 0-100") + + return EvaluationResult( + evaluation_id=evaluation_id, + evaluation_name=evaluation_name, + evaluator_id=self.config["id"], + evaluator_name=self.config["name"], + score=score, + input=input_data, + expected_output=expected_output, + actual_output=actual_output, + details=details, + ) diff --git a/src/uipath/_cli/_evals/models.py b/src/uipath/_cli/_evals/models.py new file mode 100644 index 00000000..328f4aac --- /dev/null +++ b/src/uipath/_cli/_evals/models.py @@ -0,0 +1,58 @@ +from datetime import datetime +from enum import IntEnum +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + + +class LLMResponse(BaseModel): + similarity_score: float + score_justification: str + +class EvaluatorCategory(IntEnum): + """Types of evaluators.""" + + Deterministic = 0 + LlmAsAJudge = 1 + AgentScorer = 2 + Trajectory = 3 + + +class EvaluatorType(IntEnum): + """Subtypes of evaluators.""" + + Unknown = 0 + Equals = 1 + Contains = 2 + Regex = 3 + Factuality = 4 + Custom = 5 + JsonSimilarity = 6 + Trajectory = 7 + ContextPrecision = 8 + Faithfulness = 9 + + +class EvaluationResult(BaseModel): + """Result of a single evaluation.""" + + evaluation_id: str + evaluation_name: str + evaluator_id: str + evaluator_name: str + score: float + input: Dict[str, Any] + expected_output: Dict[str, Any] + actual_output: Dict[str, Any] + timestamp: datetime = Field(default_factory=datetime.utcnow) + details: Optional[str] = None + + +class EvaluationSetResult(BaseModel): + """Results of running an evaluation set.""" + + eval_set_id: str + eval_set_name: str + results: List[EvaluationResult] + average_score: float + timestamp: datetime = Field(default_factory=datetime.utcnow) diff --git a/src/uipath/_cli/_evals/random_sampler.py b/src/uipath/_cli/_evals/random_sampler.py new file mode 100644 index 00000000..f3e56f89 --- /dev/null +++ b/src/uipath/_cli/_evals/random_sampler.py @@ -0,0 +1,39 @@ +import random +from typing import Any, Generator, Sequence + + +class RandomChainSampler: + """Sampler that randomly chains multiple iterables together. + + This class takes a sequence of generators and yields items from them + in random order, removing generators as they become exhausted. + """ + + def __init__(self, iterables: Sequence[Generator[Any, None, None]]): + """Initialize the sampler with a sequence of iterables. + + Args: + iterables: Sequence of generators to sample from + """ + self.iterables = list(iterables) + + def __iter__(self): + """Return the iterator object.""" + return self + + def __next__(self): + """Get the next item from a randomly selected iterable. + + Returns: + The next item from one of the iterables + + Raises: + StopIteration: When all iterables are exhausted + """ + while len(self.iterables) > 0: + current_iterable = random.choice(self.iterables) + try: + return next(current_iterable) + except StopIteration: + self.iterables.remove(current_iterable) + raise StopIteration diff --git a/src/uipath/_cli/cli_eval.py b/src/uipath/_cli/cli_eval.py new file mode 100644 index 00000000..b11f265a --- /dev/null +++ b/src/uipath/_cli/cli_eval.py @@ -0,0 +1,45 @@ +# type: ignore +import asyncio +import os +from pathlib import Path +from typing import Optional + +import click +from dotenv import load_dotenv + +from .._utils.constants import ENV_JOB_ID +from ..telemetry import track +from ._evals.evaluation_service import EvaluationService +from ._utils._console import ConsoleLogger + +console = ConsoleLogger() +load_dotenv(override=True) + + +@click.command() +@click.argument("eval_set", required=True, type=click.Path(exists=True)) +@track(when=lambda *_a, **_kw: os.getenv(ENV_JOB_ID) is None) +def eval(eval_set: str) -> None: + """Run an evaluation set against the agent. + + Args: + eval_set: Path to the evaluation set JSON file + """ + try: + # Validate file path + eval_path = Path(eval_set) + if not eval_path.is_file() or eval_path.suffix != ".json": + console.error("Evaluation set must be a JSON file") + click.get_current_context().exit(1) + + # Run evaluation + service = EvaluationService(eval_set) + asyncio.run(service.run_evaluation()) + + except Exception as e: + console.error(f"Error running evaluation: {str(e)}") + click.get_current_context().exit(1) + + +if __name__ == "__main__": + eval() diff --git a/src/uipath/_cli/cli_run.py b/src/uipath/_cli/cli_run.py index 62a6bb1b..796eb936 100644 --- a/src/uipath/_cli/cli_run.py +++ b/src/uipath/_cli/cli_run.py @@ -145,14 +145,13 @@ def run( console.error("Input file extension must be '.json'.") with open(file) as f: input = f.read() - # Setup debugging if requested + # Setup debugging if requested if not setup_debugging(debug, debug_port): console.error(f"Failed to start debug server on port {debug_port}") # Process through middleware chain result = Middlewares.next("run", entrypoint, input, resume) - if result.should_continue: result = python_run_middleware( entrypoint=entrypoint, diff --git a/src/uipath/_utils/constants.py b/src/uipath/_utils/constants.py index 6050d4e2..08fe2cd5 100644 --- a/src/uipath/_utils/constants.py +++ b/src/uipath/_utils/constants.py @@ -25,3 +25,6 @@ # Local storage TEMP_ATTACHMENTS_FOLDER = "uipath_attachments" + +# LLM models +COMMUNITY_AGENTS_SUFFIX = "-community-agents"