Skip to content

Commit e7abe60

Browse files
authored
Remove debug traces (#62572)
Part of larger effort to clean up OTEL in the codebase. May be added back later.
1 parent cb26515 commit e7abe60

File tree

8 files changed

+204
-416
lines changed

8 files changed

+204
-416
lines changed

airflow-core/src/airflow/dag_processing/manager.py

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
from airflow.models.db_callback_request import DbCallbackRequest
6363
from airflow.models.errors import ParseImportError
6464
from airflow.observability.metrics import stats_utils
65-
from airflow.observability.trace import DebugTrace
6665
from airflow.sdk import SecretCache
6766
from airflow.sdk.log import init_log_file, logging_processors
6867
from airflow.typing_compat import assert_never
@@ -1222,17 +1221,9 @@ def emit_metrics(*, parse_time: float, stats: Sequence[DagFileStat]):
12221221
This is called once every time around the parsing "loop" - i.e. after
12231222
all files have been parsed.
12241223
"""
1225-
with DebugTrace.start_span(span_name="emit_metrics", component="DagFileProcessorManager") as span:
1226-
Stats.gauge("dag_processing.total_parse_time", parse_time)
1227-
Stats.gauge("dagbag_size", sum(stat.num_dags for stat in stats))
1228-
Stats.gauge("dag_processing.import_errors", sum(stat.import_errors for stat in stats))
1229-
span.set_attributes(
1230-
{
1231-
"total_parse_time": parse_time,
1232-
"dag_bag_size": sum(stat.num_dags for stat in stats),
1233-
"import_errors": sum(stat.import_errors for stat in stats),
1234-
}
1235-
)
1224+
Stats.gauge("dag_processing.total_parse_time", parse_time)
1225+
Stats.gauge("dagbag_size", sum(stat.num_dags for stat in stats))
1226+
Stats.gauge("dag_processing.import_errors", sum(stat.import_errors for stat in stats))
12361227

12371228

12381229
def process_parse_results(

airflow-core/src/airflow/executors/base_executor.py

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,13 @@
2828
import pendulum
2929

3030
from airflow._shared.observability.metrics.stats import Stats
31-
from airflow._shared.observability.traces import NO_TRACE_ID
3231
from airflow.cli.cli_config import DefaultHelpParser
3332
from airflow.configuration import conf
3433
from airflow.executors import workloads
3534
from airflow.executors.executor_loader import ExecutorLoader
3635
from airflow.models import Log
3736
from airflow.observability.metrics import stats_utils
38-
from airflow.observability.trace import DebugTrace, Trace, add_debug_span
37+
from airflow.observability.trace import Trace
3938
from airflow.utils.log.logging_mixin import LoggingMixin
4039
from airflow.utils.state import TaskInstanceState
4140
from airflow.utils.thread_safe_dict import ThreadSafeDict
@@ -263,7 +262,6 @@ def sync(self) -> None:
263262
Executors should override this to perform gather statuses.
264263
"""
265264

266-
@add_debug_span
267265
def heartbeat(self) -> None:
268266
"""Heartbeat sent to trigger new jobs."""
269267
open_slots = self.parallelism - len(self.running)
@@ -350,7 +348,6 @@ def order_queued_tasks_by_priority(self) -> list[tuple[TaskInstanceKey, workload
350348
reverse=False,
351349
)
352350

353-
@add_debug_span
354351
def trigger_tasks(self, open_slots: int) -> None:
355352
"""
356353
Initiate async execution of the queued tasks, up to the number of available slots.
@@ -433,22 +430,6 @@ def fail(self, key: TaskInstanceKey, info=None) -> None:
433430
:param info: Executor information for the task instance
434431
:param key: Unique key for the task instance
435432
"""
436-
trace_id = Trace.get_current_span().get_span_context().trace_id
437-
if trace_id != NO_TRACE_ID:
438-
with DebugTrace.start_child_span(
439-
span_name="fail",
440-
component="BaseExecutor",
441-
) as span:
442-
span.set_attributes(
443-
{
444-
"dag_id": key.dag_id,
445-
"run_id": key.run_id,
446-
"task_id": key.task_id,
447-
"try_number": key.try_number,
448-
"error": True,
449-
}
450-
)
451-
452433
self.change_state(key, TaskInstanceState.FAILED, info)
453434

454435
def success(self, key: TaskInstanceKey, info=None) -> None:
@@ -458,21 +439,6 @@ def success(self, key: TaskInstanceKey, info=None) -> None:
458439
:param info: Executor information for the task instance
459440
:param key: Unique key for the task instance
460441
"""
461-
trace_id = Trace.get_current_span().get_span_context().trace_id
462-
if trace_id != NO_TRACE_ID:
463-
with DebugTrace.start_child_span(
464-
span_name="success",
465-
component="BaseExecutor",
466-
) as span:
467-
span.set_attributes(
468-
{
469-
"dag_id": key.dag_id,
470-
"run_id": key.run_id,
471-
"task_id": key.task_id,
472-
"try_number": key.try_number,
473-
}
474-
)
475-
476442
self.change_state(key, TaskInstanceState.SUCCESS, info)
477443

478444
def queued(self, key: TaskInstanceKey, info=None) -> None:

airflow-core/src/airflow/jobs/job.py

Lines changed: 53 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
from airflow.exceptions import AirflowException
3636
from airflow.listeners.listener import get_listener_manager
3737
from airflow.models.base import ID_LEN, Base
38-
from airflow.observability.trace import DebugTrace, add_debug_span
3938
from airflow.utils.helpers import convert_camel_to_snake
4039
from airflow.utils.log.logging_mixin import LoggingMixin
4140
from airflow.utils.net import get_hostname
@@ -210,68 +209,61 @@ def heartbeat(
210209
:param session to use for saving the job
211210
"""
212211
previous_heartbeat = self.latest_heartbeat
213-
with DebugTrace.start_span(span_name="heartbeat", component="Job") as span:
214-
try:
215-
span.set_attribute("heartbeat", str(self.latest_heartbeat))
216-
# This will cause it to load from the db
212+
try:
213+
# This will cause it to load from the db
214+
session.merge(self)
215+
previous_heartbeat = self.latest_heartbeat
216+
217+
if self.state == JobState.RESTARTING:
218+
self.kill()
219+
220+
# Figure out how long to sleep for
221+
sleep_for: float = 0
222+
if self.latest_heartbeat:
223+
seconds_remaining = (
224+
self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
225+
)
226+
sleep_for = max(0, seconds_remaining)
227+
sleep(sleep_for)
228+
# Update last heartbeat time
229+
with create_session() as session:
230+
# Make the session aware of this object
217231
session.merge(self)
232+
self.latest_heartbeat = timezone.utcnow()
233+
session.commit()
234+
time_since_last_heartbeat: float = (
235+
0
236+
if previous_heartbeat is None
237+
else (timezone.utcnow() - previous_heartbeat).total_seconds()
238+
)
239+
health_check_threshold_value = health_check_threshold(self.job_type, self.heartrate)
240+
if time_since_last_heartbeat > health_check_threshold_value:
241+
self.log.info("Heartbeat recovered after %.2f seconds", time_since_last_heartbeat)
242+
# At this point, the DB has updated.
218243
previous_heartbeat = self.latest_heartbeat
219-
220-
if self.state == JobState.RESTARTING:
221-
self.kill()
222-
223-
# Figure out how long to sleep for
224-
sleep_for: float = 0
225-
if self.latest_heartbeat:
226-
seconds_remaining = (
227-
self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
228-
)
229-
sleep_for = max(0, seconds_remaining)
230-
if span.is_recording():
231-
span.add_event(name="sleep", attributes={"sleep_for": sleep_for})
232-
sleep(sleep_for)
233-
# Update last heartbeat time
234-
with create_session() as session:
235-
# Make the session aware of this object
236-
session.merge(self)
237-
self.latest_heartbeat = timezone.utcnow()
238-
session.commit()
239-
time_since_last_heartbeat: float = (
240-
0
241-
if previous_heartbeat is None
242-
else (timezone.utcnow() - previous_heartbeat).total_seconds()
243-
)
244-
health_check_threshold_value = health_check_threshold(self.job_type, self.heartrate)
245-
if time_since_last_heartbeat > health_check_threshold_value:
246-
self.log.info("Heartbeat recovered after %.2f seconds", time_since_last_heartbeat)
247-
# At this point, the DB has updated.
248-
previous_heartbeat = self.latest_heartbeat
249-
heartbeat_callback(session)
250-
self.log.debug("[heartbeat]")
251-
self.heartbeat_failed = False
252-
except OperationalError:
253-
Stats.incr(convert_camel_to_snake(self.__class__.__name__) + "_heartbeat_failure", 1, 1)
254-
if not self.heartbeat_failed:
255-
self.log.exception("%s heartbeat failed with error", self.__class__.__name__)
256-
self.heartbeat_failed = True
257-
msg = f"{self.__class__.__name__} heartbeat got an exception"
258-
if span.is_recording():
259-
span.add_event(name="error", attributes={"message": msg})
260-
if self.is_alive():
261-
self.log.error(
262-
"%s heartbeat failed with error. Scheduler may go into unhealthy state",
263-
self.__class__.__name__,
264-
)
265-
msg = f"{self.__class__.__name__} heartbeat failed with error. Scheduler may go into unhealthy state"
266-
if span.is_recording():
267-
span.add_event(name="error", attributes={"message": msg})
268-
else:
269-
msg = f"{self.__class__.__name__} heartbeat failed with error. Scheduler is in unhealthy state"
270-
self.log.error(msg)
271-
if span.is_recording():
272-
span.add_event(name="error", attributes={"message": msg})
273-
# We didn't manage to heartbeat, so make sure that the timestamp isn't updated
274-
self.latest_heartbeat = previous_heartbeat
244+
heartbeat_callback(session)
245+
self.log.debug("[heartbeat]")
246+
self.heartbeat_failed = False
247+
except OperationalError:
248+
Stats.incr(convert_camel_to_snake(self.__class__.__name__) + "_heartbeat_failure", 1, 1)
249+
if not self.heartbeat_failed:
250+
self.log.exception("%s heartbeat failed with error", self.__class__.__name__)
251+
self.heartbeat_failed = True
252+
msg = f"{self.__class__.__name__} heartbeat got an exception"
253+
self.log.error(msg)
254+
if self.is_alive():
255+
self.log.error(
256+
"%s heartbeat failed with error. Scheduler may go into unhealthy state",
257+
self.__class__.__name__,
258+
)
259+
msg = f"{self.__class__.__name__} heartbeat failed with error. Scheduler may go into unhealthy state"
260+
else:
261+
msg = (
262+
f"{self.__class__.__name__} heartbeat failed with error. Scheduler is in unhealthy state"
263+
)
264+
self.log.error(msg)
265+
# We didn't manage to heartbeat, so make sure that the timestamp isn't updated
266+
self.latest_heartbeat = previous_heartbeat
275267

276268
@provide_session
277269
def prepare_for_execution(self, session: Session = NEW_SESSION):
@@ -401,7 +393,6 @@ def execute_job(job: Job, execute_callable: Callable[[], int | None]) -> int | N
401393
return ret
402394

403395

404-
@add_debug_span
405396
def perform_heartbeat(
406397
job: Job, heartbeat_callback: Callable[[Session], None], only_if_necessary: bool
407398
) -> None:

0 commit comments

Comments
 (0)