Skip to content

braintrustdata/claude-multiturn

Repository files navigation

Distributed Tracing with Async Tool Workers

Demonstrates maintaining unified traces when Claude's tool calls execute across multiple processes (client → server → worker) using span export via HTTP headers.

Quick Start

# Terminal 1: Start tool worker
python distributed_worker.py

# Terminal 2: Start conversation server
python distributed_server.py

# Terminal 3: Run client
python distributed_main.py "I have a 45-year-old female patient, 5'4\" tall, 180 lbs, presenting with fatigue and increased thirst. What conditions should I evaluate for, and what are the treatment options?"

View traces: https://www.braintrust.dev/app


Core Concepts

1. Tool Call Flow

Claude returns tool call requests (not results). You execute tools and send results back to Claude for the final response.

# 1. Claude returns tool requests
response = client.messages.create(
    messages=[{"role": "user", "content": "Calculate BMI for 5'4\", 180lbs"}],
    tools=[calculate_clinical_metrics_tool]
)

# 2. Execute tool
if response.content[0].type == "tool_use":
    result = calculate_clinical_metrics(**response.content[0].input)

    # 3. Send results back to Claude
    response = client.messages.create(
        messages=[
            {"role": "user", "content": "Calculate BMI..."},
            {"role": "assistant", "content": response.content},
            {"role": "user", "content": [{
                "type": "tool_result",
                "tool_use_id": response.content[0].id,
                "content": json.dumps(result)
            }]}
        ]
    )

2. Distributed Tracing

Pass span context via HTTP headers to link spans across processes.

# CLIENT: Export and send span
turn_span = braintrust.start_span(name="client_turn")
turn_export = turn_span.export()

response = requests.post(
    "http://server/api/chat",
    headers={"x-bt-parent-span": turn_export},
    json={"message": "Hello"}
)

# SERVER: Continue span
@app.post("/api/chat")
def chat(request: Request):
    parent_span = request.headers.get("x-bt-parent-span")

    with braintrust.start_span(
        name="server_process_turn",
        parent=parent_span  # Links to client span
    ) as span:
        result = do_work()
        span.log(output=result)

3. Root Span Logging for Custom Views

Braintrust custom views show complete data only for the selected span. Log aggregated data to the root span for queryability.

conversation_span = braintrust.start_span(name="client_conversation")

# Collect turn data
turns_data = []
for query in queries:
    result = make_request(query)
    turns_data.append({"query": query, "response": result["response"]})

# Log to root span
conversation_span.log(
    output={
        "conversation_id": conv_id,
        "total_turns": len(queries),
        "turns": turns_data,
        "patient_query": queries[0],
        "final_response": turns_data[-1]["response"]
    }
)

BTQL query:

SELECT output.patient_query, output.final_response, output.turns
FROM project_logs('project-id')
WHERE span_attributes.name = 'client_conversation'

4. Online Scoring

Add scores to spans for real-time quality evaluation.

from braintrust import current_span

def calculate_bmi(height, weight, age, gender):
    span = current_span()
    bmi = (weight / (height ** 2)) * 703

    if bmi < 18.5:
        category, score = "underweight", 0.5
    elif bmi < 25:
        category, score = "normal", 1.0
    elif bmi < 30:
        category, score = "overweight", 0.7
    else:
        category, score = "obese", 0.3

    span.log(
        output={"bmi": round(bmi, 1), "category": category},
        scores={"health_risk": score}
    )

    return {"bmi": round(bmi, 1), "category": category}

Architecture

Client (Python/TypeScript)
  └─ client_conversation (root span - logs aggregated data)
     └─ client_turn (exports via x-bt-parent-span header)
        └─ server_process_turn (FastAPI:8001)
           ├─ claude_messages_create (returns tool requests)
           ├─ worker_execute_tool_1 (FastAPI:8002)
           │  └─ calculate_clinical_metrics
           ├─ worker_execute_tool_2 (FastAPI:8002)
           │  └─ search_medical_knowledge
           └─ claude_messages_create (final response)

Files

File Purpose
distributed_worker.py Tool execution worker (Port 8002)
distributed_server.py Conversation server (Port 8001)
distributed_main.py Python client with CLI args
distributed_client.ts TypeScript client alternative
conversation_agent.py Handles Claude's agentic loop
claude_client.py Claude API wrapper
tools.py Tool implementations (BMI, medication, RAG)
rag_service.py ChromaDB vector search
knowledge_base.py Medical knowledge data

Setup

# Install Python dependencies
uv sync

# Install TypeScript dependencies (optional)
npm install

# Configure .env
ANTHROPIC_API_KEY=sk-ant-...
BRAINTRUST_API_KEY=sk-...

Running

# Terminal 1
python distributed_worker.py

# Terminal 2
python distributed_server.py

# Terminal 3 - Python client
python distributed_main.py "Patient query here"

# OR - TypeScript client
npx tsx distributed_client.ts "Patient query here"

Troubleshooting

Worker not reachable:

  • Start distributed_worker.py before running client

Spans not linking:

  • Verify x-bt-parent-span header is sent and received

Custom views missing data:

  • Ensure root span logs aggregated data (see section 3)

Querying Traces with BTQL

Use the Python SDK with BTQL (SQL) to query traces programmatically:

import requests

def query_btql(query: str) -> dict:
    """Execute BTQL query against Braintrust."""
    response = requests.post(
        "https://api.braintrust.dev/btql",
        headers={
            "Authorization": f"Bearer {BRAINTRUST_API_KEY}",
            "Content-Type": "application/json"
        },
        json={"query": query}
    )
    return response.json()

# Get recent conversations
result = query_btql("""
    SELECT
        span_id,
        output.conversation_id,
        output.patient_query,
        output.final_response
    FROM project_logs('project-id')
    WHERE span_attributes.name = 'client_conversation'
    ORDER BY created DESC
    LIMIT 10
""")

for row in result["data"]:
    print(row["conversation_id"], row["patient_query"][:60])

See query_traces.py for complete examples including:

  • Search by patient demographics/symptoms
  • Tool execution statistics
  • Time-based queries
  • Low score detection
  • Conversation trace analysis

Production Logs to Eval Datasets

Convert your production traces into evaluation datasets to validate quality over time.

Overview

The logs-to-evals workflow enables you to:

  1. Query production logs using BTQL
  2. Transform traces into structured eval datasets
  3. Run automated quality assessments with custom scoring functions
  4. Track quality metrics across prompt iterations

Files

File Purpose
logs_to_dataset.py Convert production logs to eval datasets
run_eval_from_dataset.py Run evaluations with custom scorers
query_traces.py BTQL query examples for accessing logs

Creating Datasets from Logs

Run logs_to_dataset.py to create datasets from your production traces:

python logs_to_dataset.py

This creates three types of datasets:

1. All Recent Production Cases

create_dataset_from_recent_logs(
    limit=10,
    dataset_name="Production Healthcare Cases"
)

2. Filtered by Criteria

create_dataset_by_criteria(
    search_term="diabetes",
    dataset_name="Diabetes Cases",
    limit=5
)

3. Complex Multi-Tool Cases

create_complex_cases_dataset(
    dataset_name="Complex Multi-Tool Cases",
    limit=10
)

Dataset structure:

dataset.insert(
    input={"patient_query": "..."},
    expected={"final_response": "..."},
    metadata={
        "conversation_id": "...",
        "total_turns": 1,
        "source": "production_logs"
    },
    tags=["production", "healthcare"]
)

Running Evaluations

Execute run_eval_from_dataset.py to score production responses:

python run_eval_from_dataset.py

This validates production responses without re-running the model using custom scoring functions:

Scoring Functions (Healthcare Example)

  1. length_score: Response length appropriateness (100-500 words optimal)
  2. contains_clinical_metrics: Checks for BMI, blood pressure, risk factors
  3. mentions_medications: Medication and treatment discussion depth
  4. follows_medical_structure: Assessment → Recommendations → Follow-up

Example results:

90% - Response length (comprehensive but concise)
97% - Medical structure (proper format)
91% - Medication coverage (appropriate treatments)
73% - Clinical metrics (BMI, BP calculations)

Custom Scoring Functions

Define domain-specific scorers in run_eval_from_dataset.py:

def custom_score(output: str, expected: Dict[str, Any] = None) -> float:
    """
    Score based on your criteria.

    Args:
        output: The model's response text
        expected: Optional expected output for comparison

    Returns:
        Score between 0.0 and 1.0
    """
    # Your scoring logic
    score = calculate_quality(output)
    return score

# Add to eval
results = Eval(
    PROJECT_NAME,
    data=dataset,
    task=validation_task,
    scores=[
        length_score,
        contains_clinical_metrics,
        custom_score  # Your custom scorer
    ]
)

LLM-as-Judge Scoring

Use Claude to evaluate response quality:

from anthropic import Anthropic

def llm_judge_score(output: str, expected: Dict = None) -> float:
    """Use Claude to score response quality."""
    client = Anthropic()

    prompt = f"""Rate this medical response from 0-1 based on:
    - Clinical accuracy
    - Completeness of assessment
    - Appropriate recommendations

    Response: {output}

    Return only a number between 0 and 1."""

    response = client.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=10,
        messages=[{"role": "user", "content": prompt}]
    )

    return float(response.content[0].text.strip())

Querying Logs with BTQL

Use query_traces.py functions to access production data:

from query_traces import (
    get_recent_conversations,
    find_by_patient_query,
    get_tool_statistics,
    get_conversations_by_date_range
)

# Get recent conversations
conversations = get_recent_conversations(limit=10)

# Search by medical condition
diabetes_cases = find_by_patient_query("diabetes", limit=5)

# Get tool usage stats
stats = get_tool_statistics()

# Time-based filtering
recent = get_conversations_by_date_range(hours_ago=24)

Best Practices

1. Regular Dataset Updates

  • Run logs_to_dataset.py weekly to capture new patterns
  • Include edge cases and failure modes
  • Maintain diverse examples across conditions

2. Evolving Scorers

  • Update scoring functions as requirements change
  • Add domain-specific quality metrics
  • Combine rule-based and LLM-as-judge scoring

3. Baseline Tracking

  • Compare new eval runs against previous baselines
  • Monitor score trends over time
  • Investigate regressions immediately

4. CI/CD Integration

  • Run evals before deploying prompt changes
  • Set score thresholds for deployment gates
  • Alert on quality degradation

5. Feedback Loops

  • Incorporate low-scoring examples into training data
  • Use user feedback to refine scoring criteria
  • Create targeted datasets for specific improvements

Workflow

Production System
    |
    v
Production Logs (Braintrust)
    |
    v (BTQL queries)
logs_to_dataset.py
    |
    v (dataset.insert())
Braintrust Dataset
    |
    v (Eval() with scorers)
run_eval_from_dataset.py
    |
    v (quality metrics)
Eval Results Dashboard
    |
    v (iterate on prompts)
Production System (improved)

Advanced Usage

Filter by Quality Signals

Create datasets from specific scenarios:

# Get conversations with low scores
def get_failing_cases():
    query = """
    SELECT span_id, output.conversation_id, output.patient_query
    FROM project_logs('project-id')
    WHERE span_attributes.name = 'client_conversation'
      AND scores.clinical_completeness < 0.5
    """
    return query_btql(query)

Automated Dataset Generation

Schedule regular dataset updates:

# crontab -e
0 0 * * 0 cd /path/to/project && python logs_to_dataset.py

Compare Model Versions

Run evals on the same dataset across different models:

for model in ["claude-3-5-sonnet-20241022", "claude-opus-4-5"]:
    results = Eval(
        f"{PROJECT_NAME} - {model}",
        data=dataset,
        task=lambda x: run_with_model(x, model),
        scores=[...]
    )

Viewing Results


References

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published