Skip to content

Commit 973209e

Browse files
committed
Add new logic for logs tracking via a thread with jsonl format
1 parent 3876c98 commit 973209e

File tree

5 files changed

+147
-43
lines changed

5 files changed

+147
-43
lines changed

traceml/traceml/events/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
get_event_assets_path,
44
get_event_path,
55
get_resource_path,
6+
get_logs_path,
67
)
78
from traceml.events.schemas import (
89
LoggedEventListSpec,

traceml/traceml/events/paths.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,18 @@
33
from clipped.utils.enums import get_enum_value
44

55

6+
def get_logs_path(
7+
run_path: str,
8+
filename: Optional[str] = None,
9+
full_path: bool = True,
10+
) -> str:
11+
_path = "{}/plxlogs".format(run_path)
12+
if full_path:
13+
filename = filename or "logs"
14+
_path = "{}/{}.jsonl".format(_path, filename)
15+
return _path
16+
17+
618
def get_resource_path(
719
run_path: str, kind: Optional[str] = None, name: Optional[str] = None
820
) -> str:

traceml/traceml/serialization/base.py

Lines changed: 74 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,51 @@
11
import os
22

3-
from typing import List
3+
from typing import List, Union
44

55
from clipped.utils.enums import get_enum_value
66
from clipped.utils.paths import check_or_create_path, set_permissions
77

88
from traceml.artifacts import V1ArtifactKind
9+
from traceml.logging import V1Log, V1Logs
910
from traceml.events import (
1011
LoggedEventListSpec,
1112
LoggedEventSpec,
1213
get_event_path,
14+
get_logs_path,
1315
get_resource_path,
1416
)
1517

1618

17-
class EventWriter:
19+
class BaseWriter:
20+
"""Base class for writing events and resources to files."""
21+
22+
def __init__(self, run_path: str):
23+
self._run_path = run_path
24+
self._closed = False
25+
26+
def flush(self):
27+
raise NotImplementedError("Subclasses must implement this method.")
28+
29+
def close(self):
30+
self.flush()
31+
self._closed = True
32+
33+
@property
34+
def closed(self):
35+
return self._closed
36+
37+
def write(self, events: List[LoggedEventSpec]):
38+
raise NotImplementedError("Subclasses must implement this method.")
39+
40+
41+
class EventWriter(BaseWriter):
1842
EVENTS_BACKEND = "events"
1943
RESOURCES_BACKEND = "resources"
2044

2145
def __init__(self, run_path: str, backend: str):
46+
super().__init__(run_path)
2247
self._events_backend = backend
23-
self._run_path = run_path
2448
self._files = {} # type: dict[str, LoggedEventListSpec]
25-
self._closed = False
2649

2750
def _get_event_path(self, kind: str, name: str) -> str:
2851
if self._events_backend == self.EVENTS_BACKEND:
@@ -86,13 +109,50 @@ def flush(self):
86109
self._append_events(events_spec)
87110
self._files[file_name].empty_events()
88111

89-
def close(self):
90-
self.flush()
91-
self._closed = True
92112

93-
@property
94-
def closed(self):
95-
return self._closed
113+
class LogWriter(BaseWriter):
114+
def __init__(self, run_path: str):
115+
super().__init__(run_path)
116+
self._file = None # type: V1Logs | None
117+
118+
def _get_event_path(self) -> str:
119+
return get_logs_path(
120+
run_path=self._run_path,
121+
)
122+
123+
def _init_events(self, events_spec: V1Logs):
124+
event_path = self._get_event_path()
125+
# Check if the file exists otherwise initialize
126+
if not os.path.exists(event_path):
127+
check_or_create_path(event_path, is_dir=False)
128+
with open(event_path, "w") as event_file:
129+
event_file.write("")
130+
set_permissions(event_path)
131+
132+
def _append_events(self, events_spec: V1Logs):
133+
event_path = self._get_event_path()
134+
with open(event_path, "a") as event_file:
135+
event_file.write(events_spec.get_jsonl_events())
136+
137+
def _events_to_files(self, logs: List[V1Log]):
138+
if self._file:
139+
self._file.logs += logs
140+
else:
141+
self._file = V1Logs(logs=logs)
142+
self._init_events(self._file)
143+
144+
def write(self, events: List[V1Log]):
145+
if not events:
146+
return
147+
if isinstance(events, V1Log):
148+
events = [events]
149+
self._events_to_files(events)
150+
151+
def flush(self):
152+
events_spec = self._file
153+
if events_spec.logs:
154+
self._append_events(events_spec)
155+
self._file = V1Logs(logs=[])
96156

97157

98158
class BaseFileWriter:
@@ -110,14 +170,14 @@ def __init__(self, run_path: str):
110170
def run_path(self):
111171
return self._run_path
112172

113-
def add_event(self, event: LoggedEventSpec):
114-
if not isinstance(event, LoggedEventSpec):
173+
def add_event(self, event: Union[LoggedEventSpec, V1Log]):
174+
if not isinstance(event, LoggedEventSpec) and not isinstance(event, V1Log):
115175
raise TypeError("Expected an LoggedEventSpec, " " but got %s" % type(event))
116176
self._async_writer.write(event)
117177

118-
def add_events(self, events: List[LoggedEventSpec]):
178+
def add_events(self, events: List[Union[LoggedEventSpec, V1Log]]):
119179
for e in events:
120-
if not isinstance(e, LoggedEventSpec):
180+
if not isinstance(e, LoggedEventSpec) and not isinstance(e, V1Log):
121181
raise TypeError("Expected an LoggedEventSpec, " " but got %s" % type(e))
122182
self._async_writer.write(events)
123183

traceml/traceml/serialization/writer.py

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
from clipped.utils.paths import check_or_create_path
88

99
from traceml.events import LoggedEventSpec, get_asset_path, get_event_path
10-
from traceml.events.paths import get_resource_path
10+
from traceml.events.paths import get_resource_path, get_logs_path
1111
from traceml.processors.gpu_processor import can_log_gpu_resources, get_gpu_metrics
1212
from traceml.processors.psutil_processor import (
1313
can_log_psutil_resources,
1414
get_psutils_metrics,
1515
)
16-
from traceml.serialization.base import BaseFileWriter, EventWriter
16+
from traceml.serialization.base import BaseFileWriter, EventWriter, LogWriter
1717

1818

1919
class EventFileWriter(BaseFileWriter):
@@ -59,10 +59,33 @@ def __init__(self, run_path: str, max_queue_size: int = 20, flush_secs: int = 10
5959
)
6060

6161

62+
class LogsFileWriter(BaseFileWriter):
63+
def __init__(self, run_path: str, max_queue_size: int = 20, flush_secs: int = 10):
64+
"""Creates a `LogsFileWriter`.
65+
66+
Args:
67+
run_path: A string. Directory where events files will be written.
68+
max_queue_size: Integer. Size of the queue for pending logs.
69+
flush_secs: Number. How often, in seconds, to flush the
70+
pending logs to disk.
71+
"""
72+
super().__init__(run_path=run_path)
73+
74+
check_or_create_path(get_logs_path(run_path, False), is_dir=True)
75+
76+
self._async_writer = LogAsyncManager(
77+
LogWriter(self._run_path),
78+
max_queue_size,
79+
flush_secs,
80+
)
81+
82+
6283
class BaseAsyncManager:
6384
"""Base manager for writing events to files by name by event kind."""
6485

65-
def __init__(self, event_writer: EventWriter, max_queue_size: int = 20):
86+
def __init__(
87+
self, event_writer: Union[EventWriter, LogWriter], max_queue_size: int = 20
88+
):
6689
"""Writes events json spec to files asynchronously. An instance of this class
6790
holds a queue to keep the incoming data temporarily. Data passed to the
6891
`write` function will be put to the queue and the function returns
@@ -248,3 +271,18 @@ def run(self):
248271
self._event_writer.write(data)
249272
self._event_writer.flush()
250273
self._next_flush_time = now + self._flush_secs
274+
275+
276+
class LogAsyncManager(BaseAsyncManager):
277+
"""Writes logs to local file."""
278+
279+
def __init__(
280+
self, event_writer: LogWriter, max_queue_size: int = 20, flush_secs: int = 10
281+
):
282+
super().__init__(event_writer=event_writer, max_queue_size=max_queue_size)
283+
self._worker = EventWriterThread(
284+
self._event_queue,
285+
self._event_writer,
286+
flush_secs,
287+
)
288+
self._worker.start()

traceml/traceml/tracking/run.py

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,13 @@
3737
from traceml.artifacts import V1ArtifactKind
3838
from traceml.events import LoggedEventSpec, V1Event, V1EventSpan, get_asset_path
3939
from traceml.logger import logger
40-
from traceml.logging import V1Log, V1Logs
4140
from traceml.processors import events_processors
4241
from traceml.processors.logs_processor import end_log_processor, start_log_processor
43-
from traceml.serialization.writer import EventFileWriter, ResourceFileWriter
42+
from traceml.serialization.writer import (
43+
EventFileWriter,
44+
ResourceFileWriter,
45+
LogsFileWriter,
46+
)
4447

4548

4649
class Run(RunClient):
@@ -139,11 +142,11 @@ def __init__(
139142
no_op=no_op,
140143
)
141144
track_logs = track_logs if track_logs is not None else self._is_offline
142-
self._logs_history = V1Logs.construct(logs=[])
143145
self._artifacts_path = None
144146
self._outputs_path = None
145147
self._event_logger = None
146148
self._resource_logger = None
149+
self._logs_logger = None
147150
self._sidecar = None
148151
self._exit_handler = None
149152
self._store_path = None
@@ -180,7 +183,8 @@ def __init__(
180183
self.log_code_ref()
181184

182185
if (is_new or has_process_sidecar) and self._artifacts_path and track_logs:
183-
start_log_processor(add_logs=self._add_logs)
186+
self.set_run_logs_logger()
187+
start_log_processor(add_logs=self._logs_logger.add_event)
184188

185189
self._set_exit_handler(force=is_new or has_process_sidecar)
186190

@@ -235,27 +239,6 @@ def _add_events(self, events: List[LoggedEventSpec]):
235239
"the event logger was not configured properly".format(len(events))
236240
)
237241

238-
def _persist_logs_history(self):
239-
if self._logs_history.logs and len(self._logs_history.logs) > 0:
240-
logs_path = os.path.join(
241-
self._artifacts_path,
242-
"plxlogs",
243-
"{}".format(datetime.timestamp(self._logs_history.logs[-1].timestamp)),
244-
)
245-
check_or_create_path(logs_path, is_dir=False)
246-
with open(logs_path, "w") as logs_file:
247-
logs_file.write(self._logs_history.to_json())
248-
set_permissions(logs_path)
249-
250-
def _add_logs(self, log: V1Log):
251-
if not log:
252-
return
253-
self._logs_history.logs.append(log)
254-
if V1Logs.should_chunk(self._logs_history.logs):
255-
self._persist_logs_history()
256-
# Reset
257-
self._logs_history = V1Logs.construct(logs=[])
258-
259242
def create(self, **kwargs):
260243
raise NotImplementedError(
261244
"The tracking `Run` subclass does not allow to call "
@@ -459,6 +442,15 @@ def set_run_resource_logger(self):
459442
"""
460443
self._resource_logger = ResourceFileWriter(run_path=self._artifacts_path)
461444

445+
@client_handler(check_no_op=True)
446+
def set_run_logs_logger(self):
447+
"""Sets a logs logger.
448+
449+
> **Note**: This is only used during manual tracking, and it's not used by the `
450+
> in-cluster` runs.
451+
"""
452+
self._logs_logger = LogsFileWriter(run_path=self._artifacts_path)
453+
462454
@client_handler(check_no_op=True)
463455
def set_run_process_sidecar(self):
464456
"""Sets a sidecar process to sync artifacts.
@@ -1730,7 +1722,6 @@ def excepthook(exception, value, tb):
17301722
def _end(self):
17311723
self.log_succeeded()
17321724
end_log_processor()
1733-
self._persist_logs_history()
17341725
self._wait(sync_artifacts=True)
17351726
if self._is_offline:
17361727
self.persist_run(path=self._artifacts_path)
@@ -1754,6 +1745,8 @@ def _wait(self, sync_artifacts: bool = False):
17541745
self._event_logger.close()
17551746
if self._resource_logger:
17561747
self._resource_logger.close()
1748+
if self._logs_logger:
1749+
self._logs_logger.close()
17571750
if self._sidecar:
17581751
self._sidecar.close()
17591752
if self._results:

0 commit comments

Comments
 (0)