Skip to content

feat: add eval cli command #464

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: release/sw-integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/uipath/_cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .cli_pull import pull as pull # type: ignore
from .cli_push import push as push # type: ignore
from .cli_run import run as run # type: ignore
from .cli_eval import eval as eval # type: ignore


def _get_safe_version() -> str:
Expand Down Expand Up @@ -67,3 +68,4 @@ def cli(lv: bool, v: bool) -> None:
cli.add_command(invoke)
cli.add_command(push)
cli.add_command(pull)
cli.add_command(eval)
236 changes: 236 additions & 0 deletions src/uipath/_cli/_evals/evaluation_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
import asyncio
import json
from datetime import UTC, datetime
from pathlib import Path
from typing import Any, Dict, List

from uipath._cli._utils._console import ConsoleLogger

from ..cli_run import run
from .evaluators.llm_evaluator import LLMEvaluator
from .models import EvaluationSetResult

console = ConsoleLogger()


class EvaluationService:
"""Service for running evaluations."""

def __init__(self, eval_set_path: str | Path):
"""Initialize the evaluation service.

Args:
eval_set_path: Path to the evaluation set file (can be string or Path)
"""
self.eval_set_path = Path(eval_set_path)
self.eval_set = self._load_eval_set()
self.evaluators = self._load_evaluators()
self.num_workers = 8
self.results_lock = asyncio.Lock()
self._initialize_results()

def _initialize_results(self) -> None:
"""Initialize the results file and directory."""
# Create results directory if it doesn't exist
results_dir = self.eval_set_path.parent.parent / "results"
results_dir.mkdir(exist_ok=True)

# Create results file
timestamp = datetime.now(UTC).strftime("%M-%H-%d-%m-%Y")
eval_set_name = self.eval_set["name"]
self.result_file = results_dir / f"eval-{eval_set_name}-{timestamp}.json"

# Initialize with empty results
initial_results = EvaluationSetResult(
eval_set_id=self.eval_set["id"],
eval_set_name=self.eval_set["name"],
results=[],
average_score=0.0,
)

with open(self.result_file, "w", encoding="utf-8") as f:
f.write(initial_results.model_dump_json(indent=2))

def _load_eval_set(self) -> Dict[str, Any]:
"""Load the evaluation set from file.

Returns:
The loaded evaluation set
"""
with open(self.eval_set_path, "r", encoding="utf-8") as f:
return json.load(f)

def _load_evaluators(self) -> List[LLMEvaluator]:
"""Load evaluators referenced by the evaluation set."""
evaluators = []
evaluators_dir = self.eval_set_path.parent.parent / "evaluators"

for evaluator_id in self.eval_set["evaluatorRefs"]:
# Find evaluator file
evaluator_file = None
for file in evaluators_dir.glob("*.json"):
with open(file) as f:
data = json.load(f)
if data.get("id") == evaluator_id:
evaluator_file = data
break

if not evaluator_file:
raise ValueError(f"Could not find evaluator with ID {evaluator_id}")

evaluators.append(LLMEvaluator(evaluator_file))

return evaluators

async def _write_results(self, results: List[Any]) -> None:
"""Write evaluation results to file with async lock.

Args:
results: List of evaluation results to write
"""
async with self.results_lock:
# Read current results
with open(self.result_file, "r", encoding="utf-8") as f:
current_results = EvaluationSetResult.model_validate_json(f.read())

# Add new results
current_results.results.extend(results)

if current_results.results:
current_results.average_score = sum(
r.score for r in current_results.results
) / len(current_results.results)

# Write updated results
with open(self.result_file, "w", encoding="utf-8") as f:
f.write(current_results.model_dump_json(indent=2))

def _run_agent(self, input_json: str) -> Dict[str, Any]:
"""Run the agent with the given input.

Args:
input_json: JSON string containing input data

Returns:
Agent output as dictionary
"""
try:
# Run the agent using the CLI run command
run.callback(
entrypoint=None,
input=input_json,
resume=False,
file=None,
debug=False,
debug_port=5678,
)

# Read the output file
output_file = Path("__uipath") / "output.json"
with open(output_file, "r", encoding="utf-8") as f:
result = json.load(f)

# Extract and parse the output content
output_content = result.get("output", {})
if isinstance(output_content, str):
try:
return json.loads(output_content)
except json.JSONDecodeError as e:
raise Exception(f"Error parsing output: {e}") from e
return output_content

except Exception as e:
console.error(f"Error running agent: {str(e)}")
return {"error": str(e)}

async def _process_evaluation(self, eval_item: Dict[str, Any]) -> None:
"""Process a single evaluation item.

Args:
eval_item: The evaluation item to process
"""
console.info(f"Running evaluation: {eval_item['name']}")

# Run the agent using the evaluation input
input_json = json.dumps(eval_item["inputs"])

# Run _run_agent in a non-async context using run_in_executor
loop = asyncio.get_running_loop()
actual_output = await loop.run_in_executor(None, self._run_agent, input_json)

# Run each evaluator
eval_results = []
for evaluator in self.evaluators:
result = await evaluator.evaluate(
evaluation_id=eval_item["id"],
evaluation_name=eval_item["name"],
input_data=eval_item["inputs"],
expected_output=eval_item["expectedOutput"],
actual_output=actual_output,
)
eval_results.append(result)

# Write results immediately
await self._write_results(eval_results)

# TODO: here we should send the event to the SW eval API
console.info(f"Evaluation {eval_item['name']} complete.")

async def _producer_task(self, task_queue: asyncio.Queue) -> None:
"""Producer task that adds all evaluations to the queue.

Args:
task_queue: The asyncio queue to add tasks to
"""
for eval_item in self.eval_set["evaluations"]:
await task_queue.put(eval_item)

# Add sentinel values to signal workers to stop
for _ in range(self.num_workers):
await task_queue.put(None)

async def _consumer_task(self, task_queue: asyncio.Queue, worker_id: int) -> None:
"""Consumer task that processes evaluations from the queue.

Args:
task_queue: The asyncio queue to get tasks from
worker_id: ID of this worker for logging
"""
while True:
eval_item = await task_queue.get()
if eval_item is None:
# Sentinel value - worker should stop
task_queue.task_done()
return

try:
await self._process_evaluation(eval_item)
task_queue.task_done()
except Exception as e:
# Log error and continue to next item
task_queue.task_done()
console.warning(
f"Worker {worker_id} failed evaluation {eval_item.get('name', 'Unknown')}: {str(e)}"
)

async def run_evaluation(self) -> None:
"""Run the evaluation set using multiple worker tasks."""
task_queue = asyncio.Queue()

producer = asyncio.create_task(self._producer_task(task_queue))

consumers = []
for worker_id in range(self.num_workers):
consumer = asyncio.create_task(self._consumer_task(task_queue, worker_id))
consumers.append(consumer)

await producer

await task_queue.join()

# Wait for all consumers to finish
await asyncio.gather(*consumers)

console.success(
f"All evaluations complete. Results saved to {self.result_file}"
)
119 changes: 119 additions & 0 deletions src/uipath/_cli/_evals/evaluators/llm_evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import json
from typing import Any, Dict, Optional

from ..._utils._debug import console
from ...._config import Config
from ...._execution_context import ExecutionContext
from ...._services.llm_gateway_service import UiPathLlmChatService
from ...._utils.constants import (
ENV_BASE_URL,
ENV_UIPATH_ACCESS_TOKEN,
ENV_UNATTENDED_USER_ACCESS_TOKEN, COMMUNITY_AGENTS_SUFFIX,
)
from ..models import EvaluationResult, EvaluatorCategory, LLMResponse


class LLMEvaluator:
"""Service for evaluating outputs using LLM."""

# TODO: find a better way to structure the output
format_instructions: dict[str, str] = {
"role": "system",
"content": 'Extract the data from the following text and model it like this in JSON format: {"similarity_score" = "", "score_justification" = "" . Similarity_score is a float between 0 and 100 and score_justification is a str. The output should be a plain json, nothing else. No markdown.',
}

def __init__(self, evaluator_config: Dict[str, Any]):
"""Initialize LLM evaluator.

Args:
evaluator_config: Configuration for the evaluator from evaluator JSON file
"""
import os

self.config = evaluator_config
base_url_value = os.getenv(ENV_BASE_URL)
secret_value = os.getenv(ENV_UNATTENDED_USER_ACCESS_TOKEN) or os.getenv(
ENV_UIPATH_ACCESS_TOKEN
)
config = Config(
base_url=base_url_value, # type: ignore
secret=secret_value, # type: ignore
)
self.llm = UiPathLlmChatService(config, ExecutionContext())

# Validate evaluator category
if self.config.get("category") != EvaluatorCategory.LlmAsAJudge:
raise ValueError("Evaluator must be of type LlmAsAJudge")

async def evaluate(
self,
evaluation_id: str,
evaluation_name: str,
input_data: Dict[str, Any],
expected_output: Dict[str, Any],
actual_output: Dict[str, Any],
) -> EvaluationResult:
"""Evaluate the actual output against expected output using LLM.

Args:
evaluation_id: ID of the evaluation
evaluation_name: Name of the evaluation
input_data: Input data used for the evaluation
expected_output: Expected output from the evaluation
actual_output: Actual output received

Returns:
EvaluationResult containing the evaluation score and details
"""
# Prepare the prompt by replacing placeholders
prompt = self.config["prompt"]
prompt = prompt.replace(
"{{ExpectedOutput}}", json.dumps(expected_output, indent=2)
)
content = prompt.replace(
"{{ActualOutput}}", json.dumps(actual_output, indent=2)
)

model: Optional[str] = self.config.get("model", None)
if not model:
console.error("Evaluator model cannot be extracted")

# remove community-agents suffix from llm model name
if model.endswith(COMMUNITY_AGENTS_SUFFIX):
model = model.replace(COMMUNITY_AGENTS_SUFFIX, "")

response = await self.llm.chat_completions(
messages=[{"role": "user", "content": content}], model=model
)
structured_response = await self.llm.chat_completions(
messages=[
self.format_instructions,
{"role": "user", "content": response.choices[-1].message.content},
],
model=model,
)
try:
llm_response = LLMResponse(
**json.loads(structured_response.choices[-1].message.content)
)
except Exception as e:
raise Exception(f"Error parsing LLM response: {e}") from e
# Leave those comments
# llm_response = LLMResponse(similarity_score=90, score_justification="test justification")
score = llm_response.similarity_score
details = llm_response.score_justification

if score < 0 or score > 100:
raise ValueError(f"Score {score} is outside valid range 0-100")

return EvaluationResult(
evaluation_id=evaluation_id,
evaluation_name=evaluation_name,
evaluator_id=self.config["id"],
evaluator_name=self.config["name"],
score=score,
input=input_data,
expected_output=expected_output,
actual_output=actual_output,
details=details,
)
Loading
Loading