Skip to content

Commit 4b78cc6

Browse files
authored
Merge pull request #566 from NERSC/rm-vector-agg
remove centralized vector aggregator
2 parents 9a85a97 + e7ef705 commit 4b78cc6

File tree

6 files changed

+132
-139
lines changed

6 files changed

+132
-139
lines changed

backend/agent/.env.example

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,3 @@ AGENT_NETWORKS='["perlmutter"]'
1212
AGENT_NAME="SecretAgentMan"
1313
NATS_STREAM_STORAGE_TYPE=file
1414
OPERATOR_EXTRA_ENV='{"CUDA_VISIBLE_DEVICES":"0","MY_FLAG":"enabled"}'
15-
16-
# Vector log aggregation
17-
VECTOR_AGGREGATOR_ADDR=host.containers.internal:6000

backend/agent/interactem/agent/agent.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,11 @@ async def run(self):
278278
await self.agent_kv.start()
279279
# Start the container monitor task at the agent level
280280
self._monitor_task = asyncio.create_task(self.monitor_containers())
281-
self._vector_container = await self._start_vector_container()
281+
self._vector_container = None
282+
if cfg.vector_enabled:
283+
self._vector_container = await self._start_vector_container()
284+
else:
285+
logger.info("Vector logging disabled; skipping vector container startup.")
282286

283287
async def _start_vector_container(self) -> Container:
284288
logger.info("Starting Vector container for log aggregation...")

backend/agent/interactem/agent/config.py

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import uuid
2+
from importlib import resources
23
from pathlib import Path
34

45
import netifaces
@@ -12,11 +13,14 @@
1213
PodmanMount,
1314
PodmanMountType,
1415
)
15-
16-
from ._vector_template import VECTOR_CONFIG_TEMPLATE
16+
from interactem.core.nats.config import NatsMode, get_nats_config
1717

1818
logger = get_logger()
19+
VECTOR_NATS_CREDS_TARGET = "/nats.creds"
20+
1921

22+
def _load_vector_template() -> str:
23+
return resources.files(__package__).joinpath("templates/vector.yaml.j2").read_text(encoding="utf-8")
2024

2125
class Settings(BaseSettings):
2226
model_config = SettingsConfigDict(env_file=".env", extra="ignore")
@@ -43,7 +47,7 @@ class Settings(BaseSettings):
4347
ALWAYS_PULL_IMAGES: bool = False
4448

4549
# Vector configuration
46-
VECTOR_AGGREGATOR_ADDR: str | None = None
50+
VECTOR_ENABLED: bool = True
4751
LOG_DIR: Path = Path("~/.interactem/logs").expanduser().resolve()
4852
VECTOR_CONFIG_PATH: Path | None = None
4953
OPERATOR_EXTRA_ENV: dict[str, str] = Field(default_factory=dict, exclude=True)
@@ -125,26 +129,52 @@ def vector_mounts(self) -> list[PodmanMount]:
125129
target="/etc/vector/vector.yaml",
126130
)
127131
log_mount = self.log_mount
132+
mounts = [config_mount]
128133
if log_mount:
129-
return [config_mount, log_mount]
130-
return [config_mount]
134+
mounts.append(log_mount)
135+
creds_mount = self.vector_creds_mount
136+
if creds_mount:
137+
mounts.append(creds_mount)
138+
return mounts
139+
140+
@property
141+
def vector_creds_mount(self) -> PodmanMount | None:
142+
creds_path = self.nats_creds_file
143+
if not creds_path:
144+
return None
145+
return PodmanMount(
146+
type=PodmanMountType.bind,
147+
source=str(creds_path),
148+
target=VECTOR_NATS_CREDS_TARGET,
149+
)
150+
151+
@property
152+
def nats_creds_file(self) -> Path | None:
153+
try:
154+
nats_cfg = get_nats_config()
155+
except ValueError:
156+
return None
157+
if nats_cfg.NATS_SECURITY_MODE != NatsMode.CREDS:
158+
return None
159+
return nats_cfg.NATS_CREDS_FILE
131160

132161
def generate_vector_config(self) -> Path | None:
133162
"""Generates a vector config file and returns path to it"""
134163

135-
if not self.VECTOR_AGGREGATOR_ADDR:
136-
logger.warning("VECTOR_AGGREGATOR_ADDR not set, skipping log aggregation.")
164+
if not self.VECTOR_ENABLED:
165+
logger.warning("VECTOR_ENABLED is false, skipping log aggregation.")
137166
return None
138167

139168
if not self.LOG_DIR.exists():
140169
raise RuntimeError(
141170
f"Log directory {self.LOG_DIR} does not exist. Should not happen."
142171
)
143-
templ: Template = Template(VECTOR_CONFIG_TEMPLATE)
172+
templ: Template = Template(_load_vector_template())
144173
vector_yaml = templ.render(
145174
logs_dir=LOGS_DIR_IN_CONTAINER,
146175
agent_id=self.ID,
147-
vector_addr=self.VECTOR_AGGREGATOR_ADDR,
176+
nats_url=str(self.NATS_SERVER_URL_IN_CONTAINER),
177+
nats_creds_path=VECTOR_NATS_CREDS_TARGET,
148178
)
149179
output_path = self.LOG_DIR / "vector.yaml"
150180
with open(output_path, "w") as f:
@@ -155,4 +185,3 @@ def generate_vector_config(self) -> Path | None:
155185

156186

157187
cfg = Settings() # pyright: ignore[reportCallIssue]
158-

backend/agent/interactem/agent/_vector_template.py renamed to backend/agent/interactem/agent/templates/vector.yaml.j2

Lines changed: 84 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
VECTOR_CONFIG_TEMPLATE = """sources:
1+
sources:
22
operator_logs:
33
type: file
44
include:
@@ -34,7 +34,7 @@
3434

3535
# Extract deployment_id and operator_id from the file path
3636
# Format: {{ logs_dir }}/<deployment_id>/op-<operator_id>.log
37-
file_match = parse_regex!(string!(.file), r'/(?P<deployment_id>[^/]+)/op-(?P<operator_id>[^.]+)\\.log$')
37+
file_match = parse_regex!(string!(.file), r'/(?P<deployment_id>[^/]+)/op-(?P<operator_id>[^.]+)\.log$')
3838
.deployment_id = file_match.deployment_id
3939
.operator_id = file_match.operator_id
4040
{% endraw %}
@@ -93,7 +93,7 @@
9393
.log_type = "agent"
9494

9595
# Strip ANSI color codes from the message
96-
.log = replace(string!(.message), r'\\x1b\\[[0-9;]*m', "")
96+
.log = replace(string!(.message), r'\x1b\[[0-9;]*m', "")
9797

9898
# Parse the Python logging format: ISO8601_TIMESTAMP - module.name - LEVEL - message
9999
# Example: 2025-10-08T13:02:06.547Z - module.name - INFO - message
@@ -119,13 +119,88 @@
119119
del(.message)
120120
{% endraw %}
121121

122-
sinks:
123-
vector_aggregator:
124-
type: vector
122+
clean_logs:
123+
type: remap
125124
inputs:
126125
- parse_operator_logs
127126
- parse_vector_logs
128127
- parse_agent_logs
129-
address: {{ vector_addr }}
130-
version: '2'
131-
"""
128+
source: |
129+
{% raw %}
130+
# Remove fields we don't want to forward to NATS
131+
if .log_type == "agent" {
132+
del(.file)
133+
del(.source_type)
134+
}
135+
if .log_type == "operator" {
136+
del(.file)
137+
del(.source_type)
138+
del(.stream)
139+
}
140+
.
141+
{% endraw %}
142+
143+
route_by_type:
144+
type: route
145+
inputs:
146+
- clean_logs
147+
route:
148+
vector_logs: '.log_type == "vector"'
149+
agent_logs: '.log_type == "agent"'
150+
operator_logs: '.log_type == "operator"'
151+
152+
sinks:
153+
console:
154+
type: console
155+
inputs:
156+
- route_by_type.vector_logs
157+
- route_by_type.agent_logs
158+
- route_by_type.operator_logs
159+
target: stdout
160+
encoding:
161+
codec: json
162+
163+
nats_vector:
164+
type: nats
165+
inputs:
166+
- route_by_type.vector_logs
167+
url: "{{ nats_url }}"
168+
{% raw %}
169+
subject: "log.vector.{{ agent_id }}"
170+
encoding:
171+
codec: json
172+
{% endraw %}
173+
auth:
174+
strategy: credentials_file
175+
credentials_file:
176+
path: "{{ nats_creds_path }}"
177+
178+
nats_agents:
179+
type: nats
180+
inputs:
181+
- route_by_type.agent_logs
182+
url: "{{ nats_url }}"
183+
{% raw %}
184+
subject: "log.agent.{{ agent_id }}"
185+
encoding:
186+
codec: json
187+
{% endraw %}
188+
auth:
189+
strategy: credentials_file
190+
credentials_file:
191+
path: "{{ nats_creds_path }}"
192+
193+
nats_operators:
194+
type: nats
195+
inputs:
196+
- route_by_type.operator_logs
197+
url: "{{ nats_url }}"
198+
{% raw %}
199+
subject: "log.depl.{{ deployment_id }}.op.{{ operator_id }}"
200+
encoding:
201+
codec: json
202+
{% endraw %}
203+
auth:
204+
strategy: credentials_file
205+
credentials_file:
206+
path: "{{ nats_creds_path }}"

backend/agent/pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ interactem-core = { path = "../core", editable = true }
3434

3535
[tool.poetry]
3636
packages = [{ include = "interactem" }]
37+
include = ["interactem/agent/templates/*.j2"]
38+
39+
[tool.uv.build-backend]
40+
source-include = ["interactem/agent/templates/*.j2"]
3741

3842
[tool.poetry.dependencies]
3943
interactem-core = {path = "../core", develop = true}

docker-compose.yml

Lines changed: 0 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -282,126 +282,10 @@ services:
282282
- ./backend/metrics/monitoring-conf/grafana/provisioning:/etc/grafana/provisioning:ro
283283
- ./backend/metrics/monitoring-conf/grafana/dashboards:/var/lib/grafana/dashboards:ro
284284

285-
vector:
286-
image: timberio/vector:0.50.0-alpine
287-
container_name: vector-aggregator
288-
ports:
289-
- "6000:6000"
290-
- "8686:8686"
291-
depends_on:
292-
nats-healthcheck:
293-
condition: service_healthy
294-
volumes:
295-
- vector_data:/vector-data-dir
296-
- "./conf/nats-conf/out_jwt/backend.creds:/backend.creds"
297-
entrypoint: ["/bin/sh", "-c"]
298-
command:
299-
- |
300-
cat > /tmp/vector.yaml <<EOF
301-
data_dir: /vector-data-dir
302-
303-
api:
304-
enabled: true
305-
address: 0.0.0.0:8686
306-
playground: false
307-
308-
sources:
309-
vector:
310-
type: vector
311-
address: 0.0.0.0:6000
312-
version: "2"
313-
314-
transforms:
315-
# Clean up all logs by removing unnecessary fields based on log type
316-
clean_logs:
317-
type: remap
318-
inputs:
319-
- vector
320-
source: |
321-
# Remove fields from agent logs
322-
if .log_type == "agent" {
323-
del(.file)
324-
del(.source_type)
325-
}
326-
# Remove fields from operator logs
327-
if .log_type == "operator" {
328-
del(.file)
329-
del(.source_type)
330-
del(.stream)
331-
}
332-
.
333-
334-
# Route logs based on their type
335-
route_by_type:
336-
type: route
337-
inputs:
338-
- clean_logs
339-
route:
340-
vector_logs: '.log_type == "vector"'
341-
agent_logs: '.log_type == "agent"'
342-
operator_logs: '.log_type == "operator"'
343-
344-
sinks:
345-
console:
346-
type: console
347-
inputs:
348-
- route_by_type.vector_logs
349-
- route_by_type.agent_logs
350-
- route_by_type.operator_logs
351-
target: stdout
352-
encoding:
353-
codec: json
354-
355-
# NATS sink for vector logs
356-
nats_vector:
357-
type: nats
358-
inputs:
359-
- route_by_type.vector_logs
360-
url: nats://nats1:4222
361-
subject: "log.vector.{{ agent_id }}"
362-
encoding:
363-
codec: json
364-
auth:
365-
strategy: credentials_file
366-
credentials_file:
367-
path: /backend.creds
368-
369-
# NATS sink for agent logs
370-
nats_agents:
371-
type: nats
372-
inputs:
373-
- route_by_type.agent_logs
374-
url: nats://nats1:4222
375-
subject: "log.agent.{{ agent_id }}"
376-
encoding:
377-
codec: json
378-
auth:
379-
strategy: credentials_file
380-
credentials_file:
381-
path: /backend.creds
382-
383-
# NATS sink for operator logs
384-
nats_operators:
385-
type: nats
386-
inputs:
387-
- route_by_type.operator_logs
388-
url: nats://nats1:4222
389-
subject: "log.depl.{{ deployment_id }}.op.{{ operator_id }}"
390-
encoding:
391-
codec: json
392-
auth:
393-
strategy: credentials_file
394-
credentials_file:
395-
path: /backend.creds
396-
EOF
397-
vector --config /tmp/vector.yaml
398-
399-
400285
volumes:
401286
app-db-data:
402287
prometheus_data:
403288
grafana_data:
404-
vector_data:
405289
nats_data1:
406290
nats_data2:
407291
nats_data3:

0 commit comments

Comments
 (0)