Skip to content

Commit 8560e71

Browse files
committed
wip: new results and optimize agent
Signed-off-by: vsoch <[email protected]>
1 parent 85ecd28 commit 8560e71

File tree

10 files changed

+379
-11
lines changed

10 files changed

+379
-11
lines changed

fractale/agent/kubernetes/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
from .job import KubernetesJobAgent
2-
assert KubernetesJobAgent
2+
3+
assert KubernetesJobAgent

fractale/agent/kubernetes/base.py

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
import argparse
2+
import json
3+
import subprocess
4+
5+
from rich import print
6+
from rich.panel import Panel
27
from rich.syntax import Syntax
38

49
import fractale.agent.logger as logger
@@ -45,7 +50,6 @@ def print_result(self, job_crd):
4550
highlighted_syntax, title="Final Kubernetes Job", border_style="green", expand=True
4651
)
4752

48-
4953
def save_log(self, full_logs):
5054
"""
5155
Save logs to metadata
@@ -64,4 +68,58 @@ def save_job_manifest(self, job):
6468
self.metadata["assets"][self.result_type] = []
6569
self.metadata["assets"][self.result_type].append(
6670
{"item": job, "attempt": self.attempts}
67-
)
71+
)
72+
73+
def cluster_resources(self):
74+
"""
75+
Get cluster resources - count of nodes and resources.
76+
I was thinking of caching this, but clusters can change,
77+
and it's easy (and inexpensive) enough to query that we repeat.
78+
"""
79+
print("[yellow]Querying Kubernetes cluster for node resources...[/yellow]")
80+
try:
81+
# Execute the kubectl command
82+
result = subprocess.run(
83+
["kubectl", "get", "nodes", "-o", "json"],
84+
capture_output=True,
85+
text=True,
86+
check=True,
87+
timeout=30,
88+
)
89+
90+
# Parse the JSON output
91+
nodes_data = json.loads(result.stdout)
92+
nodes = nodes_data.get("items", [])
93+
94+
if not nodes:
95+
print("[red]Error: No nodes found in the cluster.[/red]")
96+
return None
97+
98+
# Keep a listing (with count) of node specs
99+
# The key is the cpu, memory, and arch, and then node count
100+
node_specs = {}
101+
for node in nodes:
102+
node_spec = (
103+
node["status"]["allocatable"]["cpu"],
104+
node["status"]["allocatable"]["memory"],
105+
node["status"]["nodeInfo"]["architecture"],
106+
)
107+
if node_spec not in node_specs:
108+
node_specs[node_spec] = 0
109+
node_specs[node_spec] += 1
110+
111+
# Ensure we expand the resources
112+
node_specs = [
113+
{"cpu": x[0], "memory": x[1], "arch": x[2], "count": v}
114+
for x, v in node_specs.items()
115+
]
116+
cluster_info = {"total_nodes": len(nodes), "node_specs": node_specs}
117+
118+
print("[green]✅ Successfully retrieved cluster information.[/green]")
119+
return cluster_info
120+
121+
except Exception as e:
122+
print(
123+
f"[bold red]Error executing kubectl command. Do you have access to the cluster?[/bold red]"
124+
)
125+
print(f"Stderr: {e.stderr}")

fractale/agent/kubernetes/job/agent.py

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
import argparse
21
import json
32
import os
4-
import re
53
import shutil
64
import subprocess
75
import sys
@@ -11,17 +9,16 @@
119

1210
import yaml
1311
from rich import print
14-
from rich.syntax import Syntax
1512

16-
import fractale.agent.kubernetes.objects as objects
17-
from fractale.agent.kubernetes.base import KubernetesAgent
1813
import fractale.agent.kubernetes.job.prompts as prompts
14+
import fractale.agent.kubernetes.objects as objects
1915
import fractale.agent.logger as logger
2016
import fractale.utils as utils
21-
from fractale.agent.base import GeminiAgent
2217
from fractale.agent.context import get_context
2318
from fractale.agent.decorators import timed
2419
from fractale.agent.errors import DebugAgent
20+
from fractale.agent.kubernetes.base import KubernetesAgent
21+
from fractale.agent.optimize import OptimizationAgent
2522

2623

2724
class KubernetesJobAgent(KubernetesAgent):
@@ -33,6 +30,13 @@ class KubernetesJobAgent(KubernetesAgent):
3330
description = "Kubernetes Job agent"
3431
result_type = "kubernetes-job-manifest"
3532

33+
def __init__(self, *args, **kwargs):
34+
"""
35+
Add the optimization agent, even if we don't need it.
36+
"""
37+
super().__init__(*args, **kwargs)
38+
self.optimize_agent = OptimizationAgent()
39+
3640
def get_prompt(self, context):
3741
"""
3842
Get the prompt for the LLM. We expose this so the manager can take it
@@ -139,7 +143,6 @@ def deploy(self, context):
139143
Deploy the Kubernetes Job.
140144
"""
141145
job_crd = context.result
142-
cleanup = context.get("cleanup", True)
143146

144147
# Not sure if this can happen, assume it can
145148
if not job_crd:
@@ -320,21 +323,56 @@ def deploy(self, context):
320323
# But did it succeed?
321324
if final_status.get("succeeded", 0) > 0:
322325
print("\n[green]✅ Job final status is Succeeded.[/green]")
326+
327+
# if we want to optimize, we continue to run until we are instructed not to.
328+
if context.get("optimize") is not None:
329+
330+
# TODO move into own function?
331+
# We should provide the cluster resources to the agent
332+
resources = self.cluster_resources()
333+
334+
# The agent calling the optimize agent decides what metadata to present.
335+
# This is how this agent will work for cloud vs. bare metal
336+
context.requires = prompts.get_optimize_prompt(context, resources)
337+
context = self.optimize_agent.run(context, full_logs)
338+
339+
# Go through spec and update fields that match.
340+
decision = context.optimize_result['decision']
341+
print(f"\n[green]✅ Optimization agent decided to {decision}.[/green]")
342+
if decision == "RETRY":
343+
context.result = self.update_job_crd(context.optimize_result, job_crd)
344+
print(context.result)
345+
return self.deploy(context)
346+
347+
# Agent has decided to return - no more optimize.
348+
return 0, full_logs
349+
323350
else:
324351
print("\n[red]❌ Job final status is Failed.[/red]")
325352
diagnostics = self.get_diagnostics(job, pod)
326353
job.delete()
327354
# We already have the logs, so we can pass them directly.
328355
return 1, prompts.failure_message % diagnostics
329356

330-
if cleanup and os.path.exists(deploy_dir):
357+
if context.get('cleanup') is True and os.path.exists(deploy_dir):
331358
print(f"[dim]Cleaning up temporary deploy directory: {deploy_dir}[/dim]")
332359
job.delete()
333360
shutil.rmtree(deploy_dir, ignore_errors=True)
334361

335362
# Save full logs for the step
336363
return 0, full_logs
337364

365+
def update_job_crd(self, updates, job_crd):
366+
"""
367+
Update the job crd with a set of controlled fields.
368+
"""
369+
for key in ['decision', 'reason']:
370+
if key in updates:
371+
del updates[key]
372+
prompt = prompts.update_prompt % (job_crd, json.dumps(updates))
373+
result = self.ask_gemini(prompt)
374+
return self.get_code_block(result, 'yaml')
375+
338376
def save_job_manifest(self, job):
339377
"""
340378
Save job manifest to metadata

fractale/agent/kubernetes/job/prompts.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import fractale.agent.defaults as defaults
22
from fractale.agent.prompts import prompt_wrapper
3+
import json
34

45
# Requirements are separate to give to error helper agent
56
# This should explicitly state what the agent is capable of doing.
@@ -29,6 +30,37 @@
2930
%s
3031
"""
3132

33+
update_prompt = """You are a Kubernetes Job update agent. Your job is to take a spec of updates for a Job Manifest and apply them.
34+
You are NOT allowed to make other changes to the manifest. Ignore the 'decision' field and if you think appropriate, add context from "reason" as comments.
35+
Here are the updates:
36+
37+
%s
38+
39+
And here is the Job manifest to apply them to:
40+
%s
41+
Return ONLY the YAML with no other text or commentary.
42+
"""
43+
44+
def get_optimize_prompt(context, resources):
45+
"""
46+
Get a description of cluster resources and optimization goals.
47+
"""
48+
prompt = """
49+
Your task is to optimize the running of a Kubernetes Job: %s in %s. You are allowed to request anywhere in the range of available resources, including count and type. Here are the available resources:
50+
%s
51+
Here is the current job manifest:
52+
```yaml
53+
%s
54+
```
55+
Please return ONLY a json structure to be loaded that includes a limited set of fields (with keys corresponding to the names that are organized the same as a Kubernetes Job, e.g., spec -> template -spec.
56+
The result should be provided as json. The fields should map 1:1 into a pod spec serialzied as json.
57+
Do not make requests that lead to Guaranteed pods. DO NOT CHANGE PROBLEM SIZE PARAMETERS OR COMMAND. You can change args. Remember that
58+
to get a full node resources you often have to ask for slightly less than what is available.
59+
""" % (context.optimize, context.environment, json.dumps(resources), context.result)
60+
dockerfile = context.get('dockerfile')
61+
if dockerfile:
62+
prompt += f" Here is the Dockerfile that helped to generate the application.\n {dockerfile}\n"
63+
return prompt
3264

3365
def get_regenerate_prompt(context):
3466
"""

fractale/agent/optimize/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .agent import OptimizationAgent

fractale/agent/optimize/agent.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import argparse
2+
import textwrap
3+
import json
4+
import os
5+
from rich import print
6+
7+
import google.generativeai as genai
8+
import sys
9+
import fractale.agent.build.prompts as prompts
10+
import fractale.agent.logger as logger
11+
from fractale.agent.base import GeminiAgent
12+
from fractale.agent.context import get_context
13+
import fractale.agent.optimize.prompts as prompts
14+
import fractale.agent.defaults as defaults
15+
16+
# The result parser holds a ResultAgent
17+
from fractale.agent.results import ResultParser
18+
19+
class OptimizationAgent(GeminiAgent):
20+
"""
21+
Optimization Agent
22+
23+
The optimization agent receives a figure of merit request
24+
and a need to minimize or maximize. It will return to a job
25+
running agent (akin to Kubernetes Job) that has explicit
26+
instructions to only change a subset of the execution arguments.
27+
This means that the optimization agent can be general to take
28+
some application and parameters attempted before. The optimization
29+
agent should return a new parameter set to the build agent.
30+
"""
31+
32+
name = "optimization"
33+
description = "optimization agent agent"
34+
state_variables = ["optimize"]
35+
36+
def init(self):
37+
self.model = genai.GenerativeModel(defaults.gemini_model)
38+
self.chat = self.model.start_chat()
39+
try:
40+
genai.configure(api_key=os.environ["GEMINI_API_KEY"])
41+
except KeyError:
42+
sys.exit("ERROR: GEMINI_API_KEY environment variable not set.")
43+
44+
self.foms = []
45+
# We could just take the listing of FOMs, but I am not sure
46+
# if there are cases where that might not reflect attempts.
47+
self.attempts = 0
48+
self.parser = ResultParser()
49+
50+
51+
def _add_arguments(self, subparser):
52+
"""
53+
Add arguments for the plugin to show up in argparse
54+
"""
55+
agent = subparser.add_parser(
56+
self.name,
57+
formatter_class=argparse.RawTextHelpFormatter,
58+
description=self.description,
59+
)
60+
agent.add_argument(
61+
"optimize",
62+
help="Optimization instruction (include application, environment, algorithm, etc.).",
63+
)
64+
return agent
65+
66+
def run(self, context, log):
67+
"""
68+
Run the optimization agent.
69+
"""
70+
# We don't do attempts because we have no condition for success.
71+
context = get_context(context)
72+
73+
prompt = context.get('requires')
74+
# If requirements not specified, we require the "optimize" context
75+
if not prompt:
76+
prompt = prompts.get_optimize_prompt(context)
77+
78+
# Parser requires is the FOM and optimize directive.
79+
# This returns a list of foms.
80+
foms = self.parser.parse(context.optimize, log)
81+
self.foms += foms
82+
self.attempts += 1
83+
84+
# This adds supplementary detail about how to optimize - "keep going until it's good":_
85+
prompt += prompts.supplement_optimize_prompt % (self.attempts, json.dumps(self.foms))
86+
87+
# TODO: if this agent stores memory we don't need to include dockerfile after the first...
88+
print("Sending optimization prompt to Gemini...")
89+
90+
# Get the updates. We assume that optimization updates for resources
91+
# need to come back and be parsed into json.
92+
print(textwrap.indent(prompt[0:500], "> ", predicate=lambda _: True))
93+
content = self.ask_gemini(prompt, with_history=True)
94+
print("Received optimization from Gemini...")
95+
logger.custom(content, title="[green]Result Parser[/green]", border_style="green")
96+
result = json.loads(self.get_code_block(content, 'json'))
97+
if "decision" not in result:
98+
return self.run(context, log)
99+
# We can't be sure of the format or how to update, so return to job agent
100+
context.optimize_result = result
101+
return context

fractale/agent/optimize/prompts.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import fractale.agent.defaults as defaults
2+
from fractale.agent.prompts import prompt_wrapper
3+
4+
# TODO should this be allowed to return to a different agent?
5+
common_instructions = """
6+
- You can make changes to the application execution only.
7+
- You are not allowed to request changes to any configuration beyond the application execution command.
8+
"""
9+
10+
optimize_prompt = f"""You are an optimization agent. Your job is to receive application commands and environments, and make a suggestion for how to improve a metric of interest.
11+
Here are your instructions:
12+
13+
%s
14+
15+
- The response should ONLY contain parameters for resources cpu, memory, nodes, and environment variables, formatting as a JSON string that can be parsed.
16+
"""
17+
18+
# This is added to details from a job manager optimization prompt about the decision that should come back.
19+
supplement_optimize_prompt = """You also need to decide if the job is worth retrying again. You have made %s attempts and here are the figure of merits as described for those attempts:
20+
%s
21+
Please include in your response a "decision" field that is RETRY or STOP. You should keep retrying until you determine the application run is optimized. If you like, you can add a "reason" field that briefly summarizes the decision.
22+
"""
23+
24+
# These are currently required, but don't necessarily have to be...
25+
def get_optimize_prompt(context):
26+
return optimize_prompt % context.requires

fractale/agent/results/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from .agent import ResultAgent, ResultParser
2+
assert ResultAgent, ResultParser

0 commit comments

Comments
 (0)