Skip to content

feat: enhanced support for results directory and more metadata #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions examples/agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ fractale agent --plan ./plans/run-lammps.yaml

# or try using with the cache
fractale agent --plan ./plans/run-lammps.yaml --use-cache

# Save metadata
fractale agent --plan ./plans/run-lammps.yaml --results ./results

# Save metadata and include incremental results
fractale agent --plan ./plans/run-lammps.yaml --results ./results --incremental
```

We haven't hit the case yet where the manager needs to take over - that needs further development, along with being goal oriented (e.g., parsing a log and getting an output).
Expand All @@ -66,11 +72,19 @@ We haven't hit the case yet where the manager needs to take over - that needs fu
#### To do items

- Figure out optimization agent (with some goal)
- The LLM absolutely needs detail about the data, and what to run.
- Error messages from programs are immensely important now since the LLM makes decisions entirely from it.
- Right now when we restart, we do with fresh slate (no log memory) - should there be?
- We likely want some want to quantify the amount of change between prompts, and the difficulty of the task.
- I think likely when we return to the manager, we want the last response (that might say why it is returning) should inform step selection. But not just step selection, the updated prompt to the step missing something.
- Right now we rely on random sampling of the space to avoid whatever the issue might be.

#### Research Questions

**And experiment ideas**

- Why does it make the same mistakes? E.g., always forgetting ca-certificates. Did it learn from data that was OK to do and thus errors result from inconsistencies between the way things used to work and the way they do now?
- Insight: if I don't know how to run an app, it's unlikely the LLM can do it, because I can't give any guidance (and it guesses)
- How do we define stability?
- What are the increments of change (e.g., "adding a library")? We should be able to keep track of times for each stage and what changed, and an analyzer LLM can look at result and understand (categorize) most salient contributions to change.
- We also can time the time it takes to do subsequent changes, when relevant. For example, if we are building, we should be able to use cached layers (and the build times speed up) if the LLM is changing content later in the Dockerfile.
Expand Down
2 changes: 1 addition & 1 deletion fractale/agent/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from fractale.agent.build import BuildAgent
from fractale.agent.kubernetes_job import KubernetesJobAgent
from fractale.agent.kubernetes import KubernetesJobAgent
from fractale.agent.manager import ManagerAgent


Expand Down
104 changes: 79 additions & 25 deletions fractale/agent/base.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import json
import copy
import os
import re
import sys
import time

import google.generativeai as genai

import fractale.agent.defaults as defaults
import fractale.agent.logger as logger
import fractale.utils as utils
from fractale.agent.context import get_context
from fractale.agent.decorators import save_result, timed


class Agent:
Expand All @@ -22,28 +25,37 @@ class Agent:
"""

# name and description should be on the class
state_variables = ["result", "error_message"]

def __init__(self, use_cache=False):
def __init__(
self, use_cache=False, results_dir=None, save_incremental=False, max_attempts=None
):
self.attempts = 0
self.max_attempts = max_attempts

# Max attempts defaults to unlimited
# We start counting at 1 for the user to see.
# Eat your heart out, Matlab.
self.attempts = 1
self.max_attempts = None
# For now, assume these are for the manager.
# They get added to other agents via the step creation
# We can optionally save incremental result objects
self.results_dir = results_dir or os.getcwd()
self.save_incremental = save_incremental

# The user can save if desired - caching the context to skip steps that already run.
self.setup_cache(use_cache)

# This supports saving custom logs and step (attempt) metadata
self.init_metadata()

# Custom initialization functions
self.init()

def init_metadata(self):
self.metadata = {"times": {}, "assets": {}, "retries": 0, "failures": []}

@save_result
def run(self, context):
"""
Run the agent - a wrapper around internal function _run that prepares it.
"""
# Init attempts. Each agent has an internal counter for total attempts
self.attempts = self.attempts or 1

# Load cached context. This is assumed to override user provided args
# If we have a saved context, we assume we want to use it, return early
cached_context = self.load_cache()
Expand All @@ -57,7 +69,8 @@ def run(self, context):
context = get_context(context)

# Run, wrapping with a load and save of cache
context = self._run(context)
# This will return here when the internal loop is done
context = self.run_step(context)
self.save_cache(context)
return context

Expand All @@ -70,6 +83,32 @@ def print_result(self, result):
"""
pass

def reset_context(self, context):
"""
Remove output and any stateful variables. This is assuming we
are starting again.
"""
for key in self.state_variables:
if key in context:
del context[key]

# Since we will try again, let's move current metadata into a subsection
metadata = copy.deepcopy(self.metadata)

# We don't want this to recurse forever
failures = metadata.get("failures") or []
if "failures" in metadata:
del metadata["failures"]
failures.append(metadata)

# Reset metadata, save retries
self.init_metadata()
self.metadata["failures"] = failures
self.metadata["retries"] = metadata["retries"]

# We don't need a return here, but let's be explicit
return context

def setup_cache(self, use_cache=False):
"""
Setup (or load) a cache.
Expand Down Expand Up @@ -123,10 +162,7 @@ def reached_max_attempts(self):
# Unset (None) or 1.
if not self.max_attempts:
return False
return self.attempts >= self.max_attempts

def set_max_attempts(self, max_attempts):
self.max_attempts = max_attempts
return self.attempts > self.max_attempts

def add_shared_arguments(self, agent):
"""
Expand Down Expand Up @@ -190,29 +226,25 @@ def get_code_block(self, content, code_type):
"""
Parse a code block from the response
"""
pattern = f"```(?:{code_type})?\n(.*?)```"
match = re.search(pattern, content, re.DOTALL)
if match:
return match.group(1).strip()
if content.startswith(f"```{code_type}"):
content = content[len(f"```{code_type}") :]
if content.startswith("```"):
content = content[len("```") :]
if content.endswith("```"):
content = content[: -len("```")]
return content
return content.strip()

def _run(self, context):
def run_step(self, context):
"""
Run the agent. This expects to be called with a loaded context.
"""
assert context
raise NotImplementedError(f"The {self.name} agent is missing internal 'run' function")

def get_initial_prompt(self, context):
"""
Get the initial prompt (with details) to provide context to the manager.

If we don't do this, the manager can provide a bad instruction for how to fix the error.
"""
return self.get_prompt(context)

def get_prompt(self, context):
"""
This function should take the same context as run and return the parsed prompt that
Expand All @@ -235,19 +267,41 @@ def init(self):
except KeyError:
sys.exit("ERROR: GEMINI_API_KEY environment variable not set.")

# We don't add timed here because we do it custom
def ask_gemini(self, prompt, with_history=True):
"""
Ask gemini adds a wrapper with some error handling.
"""
try:
start = time.perf_counter()
if with_history:
response = self.chat.send_message(prompt)
else:
response = self.model.generate_content(prompt)
end = time.perf_counter()

if self.save_incremental:
self.save_gemini_metadata(end - start, response, with_history)

# This line can fail. If it succeeds, return entire response
return response.text.strip()

except ValueError as e:
print(f"[Error] The API response was blocked and contained no text: {str(e)}")
return "GEMINI ERROR: The API returned an error (or stop) and we need to try again."

def save_gemini_metadata(self, elapsed_time, response, with_history):
"""
Save gemini response metadata and elapsed time
"""
if "ask_gemini" not in self.metadata:
self.metadata["ask_gemini"] = []
self.metadata["ask_gemini"].append(
{
"conversation_history": with_history,
"prompt_token_count": response.usage_metadata.prompt_token_count,
"candidates_token_count": response.usage_metadata.candidates_token_count,
"total_token_count": response.usage_metadata.total_token_count,
"time_seconds": elapsed_time,
}
)
Loading