Skip to content

Commit 4baebd8

Browse files
committed
chore: implement native lock objects
We implement native lock objects to avoid using the ones from the standard library. This is yet another step to isolate the library from interacting with potentially critical stdlib modules that could get patched by the target application (e.g. gevent).
1 parent ddb188a commit 4baebd8

File tree

32 files changed

+304
-85
lines changed

32 files changed

+304
-85
lines changed

ddtrace/_monkey.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import importlib
22
import os
3-
import threading
43
from types import ModuleType
54
from typing import TYPE_CHECKING # noqa:F401
65
from typing import Union
@@ -9,6 +8,7 @@
98

109
from ddtrace.appsec._listeners import load_common_appsec_modules
1110
from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE
11+
from ddtrace.internal.threads import Lock
1212
from ddtrace.settings._config import config
1313
from ddtrace.settings.asm import config as asm_config
1414
from ddtrace.vendor.debtcollector import deprecate
@@ -130,7 +130,7 @@
130130
}
131131

132132

133-
_LOCK = threading.Lock()
133+
_LOCK = Lock()
134134
_PATCHED_MODULES = set()
135135

136136
# Module names that need to be patched for a given integration. If the module

ddtrace/_trace/context.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from ddtrace.internal.constants import W3C_TRACEPARENT_KEY
2020
from ddtrace.internal.constants import W3C_TRACESTATE_KEY
2121
from ddtrace.internal.logger import get_logger
22+
from ddtrace.internal.threads import RLock
2223
from ddtrace.internal.utils.http import w3c_get_dd_list_member as _w3c_get_dd_list_member
2324

2425

@@ -94,7 +95,7 @@ def __init__(
9495
# DEV: A `forksafe.RLock` is not necessary here since Contexts
9596
# are recreated by the tracer after fork
9697
# https://github.com/DataDog/dd-trace-py/blob/a1932e8ddb704d259ea8a3188d30bf542f59fd8d/ddtrace/tracer.py#L489-L508
97-
self._lock = threading.RLock()
98+
self._lock = RLock()
9899

99100
def __getstate__(self) -> _ContextState:
100101
return (
@@ -121,7 +122,7 @@ def __setstate__(self, state: _ContextState) -> None:
121122
self._reactivate,
122123
) = state
123124
# We cannot serialize and lock, so we must recreate it unless we already have one
124-
self._lock = threading.RLock()
125+
self._lock = RLock()
125126

126127
def __enter__(self) -> "Context":
127128
self._lock.acquire()

ddtrace/_trace/processor/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import abc
22
from collections import defaultdict
33
from itertools import chain
4-
from threading import RLock
54
from typing import Any
65
from typing import Dict
76
from typing import List
@@ -28,6 +27,7 @@
2827
from ddtrace.internal.service import ServiceStatusError
2928
from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL
3029
from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE
30+
from ddtrace.internal.threads import RLock
3131
from ddtrace.internal.writer import AgentResponse
3232
from ddtrace.internal.writer import create_trace_writer
3333
from ddtrace.settings._config import config
@@ -284,7 +284,7 @@ def __init__(
284284
self.writer = create_trace_writer(response_callback=self._agent_response_callback)
285285
# Initialize the trace buffer and lock
286286
self._traces: DefaultDict[int, _Trace] = defaultdict(lambda: _Trace())
287-
self._lock: RLock = RLock()
287+
self._lock = RLock()
288288
# Track telemetry span metrics by span api
289289
# ex: otel api, opentracing api, datadog api
290290
self._span_metrics: Dict[str, DefaultDict] = {

ddtrace/_trace/tracer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import logging
77
import os
88
from os import getpid
9-
from threading import RLock
109
from typing import TYPE_CHECKING
1110
from typing import Callable
1211
from typing import Dict
@@ -54,6 +53,7 @@
5453
from ddtrace.internal.processor.endpoint_call_counter import EndpointCallCounterProcessor
5554
from ddtrace.internal.runtime import get_runtime_id
5655
from ddtrace.internal.schema.processor import BaseServiceProcessor
56+
from ddtrace.internal.threads import RLock
5757
from ddtrace.internal.utils import _get_metas_to_propagate
5858
from ddtrace.internal.utils.formats import format_trace_id
5959
from ddtrace.internal.writer import AgentWriterInterface

ddtrace/appsec/_iast/_overhead_control_engine.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
limit. It will measure operations being executed in a request and it will deactivate detection
44
(and therefore reduce the overhead to nearly 0) if a certain threshold is reached.
55
"""
6+
67
from ddtrace._trace.sampler import RateSampler
78
from ddtrace._trace.span import Span
89
from ddtrace.appsec._iast._utils import _is_iast_debug_enabled
9-
from ddtrace.internal._unpatched import _threading as threading
1010
from ddtrace.internal.logger import get_logger
11+
from ddtrace.internal.threads import Lock
1112
from ddtrace.settings.asm import config as asm_config
1213

1314

@@ -24,7 +25,7 @@ class OverheadControl(object):
2425
The goal is to do sampling at different levels of the IAST analysis (per process, per request, etc)
2526
"""
2627

27-
_lock = threading.Lock()
28+
_lock = Lock()
2829
_request_quota = asm_config._iast_max_concurrent_requests
2930
_sampler = RateSampler(sample_rate=get_request_sampling_value() / 100.0)
3031

ddtrace/contrib/internal/subprocess/patch.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
from ddtrace.contrib.internal.subprocess.constants import COMMANDS
2020
from ddtrace.ext import SpanTypes
2121
from ddtrace.internal import core
22-
from ddtrace.internal.forksafe import RLock
2322
from ddtrace.internal.logger import get_logger
23+
from ddtrace.internal.threads import RLock
2424
from ddtrace.settings._config import config
2525
from ddtrace.settings.asm import config as asm_config
2626

ddtrace/debugging/_encoding.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
from ddtrace.debugging._config import di_config
1919
from ddtrace.debugging._signal.log import LogSignal
2020
from ddtrace.debugging._signal.snapshot import Snapshot
21-
from ddtrace.internal import forksafe
2221
from ddtrace.internal._encoding import BufferFull
2322
from ddtrace.internal.logger import get_logger
23+
from ddtrace.internal.threads import Lock
2424
from ddtrace.internal.utils.formats import format_trace_id
2525

2626

@@ -310,7 +310,7 @@ def __init__(
310310
) -> None:
311311
self._encoder = encoder
312312
self._buffer = JsonBuffer(buffer_size)
313-
self._lock = forksafe.Lock()
313+
self._lock = Lock()
314314
self._on_full = on_full
315315
self.count = 0
316316
self.max_size = buffer_size - self._buffer.size

ddtrace/debugging/_probe/registry.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
from ddtrace.debugging._probe.model import Probe
99
from ddtrace.debugging._probe.model import ProbeLocationMixin
1010
from ddtrace.debugging._probe.status import ProbeStatusLogger
11-
from ddtrace.internal import forksafe
1211
from ddtrace.internal.logger import get_logger
12+
from ddtrace.internal.threads import RLock
1313

1414

1515
logger = get_logger(__name__)
@@ -68,7 +68,7 @@ def __init__(self, status_logger: ProbeStatusLogger, *args: Any, **kwargs: Any)
6868
# Used to keep track of probes pending installation
6969
self._pending: Dict[str, List[Probe]] = defaultdict(list)
7070

71-
self._lock = forksafe.RLock()
71+
self._lock = RLock()
7272

7373
def register(self, *probes: Probe) -> None:
7474
"""Register a probe."""

ddtrace/internal/_encoding.pyx

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ from .constants import MAX_UINT_64BITS
2626
from .._trace._limits import MAX_SPAN_META_VALUE_LEN
2727
from .._trace._limits import TRUNCATED_SPAN_ATTRIBUTE_LEN
2828
from ..settings._agent import config as agent_config
29+
from ddtrace.internal.threads import Lock
30+
from ddtrace.internal.threads import RLock
2931

3032

3133
DEF MSGPACK_ARRAY_LENGTH_PREFIX_SIZE = 5
@@ -252,7 +254,7 @@ cdef class MsgpackStringTable(StringTable):
252254
self.max_size = max_size
253255
self.pk.length = MSGPACK_STRING_TABLE_LENGTH_PREFIX_SIZE
254256
self._sp_len = 0
255-
self._lock = threading.RLock()
257+
self._lock = RLock()
256258
super(MsgpackStringTable, self).__init__()
257259

258260
self.index(ORIGIN_KEY)
@@ -367,7 +369,7 @@ cdef class BufferedEncoder(object):
367369
def __cinit__(self, size_t max_size, size_t max_item_size):
368370
self.max_size = max_size
369371
self.max_item_size = max_item_size
370-
self._lock = threading.Lock()
372+
self._lock = Lock()
371373

372374
# ---- Abstract methods ----
373375

@@ -439,7 +441,7 @@ cdef class MsgpackEncoderBase(BufferedEncoder):
439441
self.max_size = max_size
440442
self.pk.buf_size = buf_size
441443
self.max_item_size = max_item_size if max_item_size < max_size else max_size
442-
self._lock = threading.RLock()
444+
self._lock = RLock()
443445
self._reset_buffer()
444446

445447
def __dealloc__(self):

ddtrace/internal/_threads.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ class PyRef
8585
PyObject* _obj;
8686
};
8787

88+
// ----------------------------------------------------------------------------
89+
90+
#include "_threads/lock.hpp"
91+
8892
// ----------------------------------------------------------------------------
8993
class Event
9094
{
@@ -533,6 +537,9 @@ PyInit__threads(void)
533537
if (PyType_Ready(&PeriodicThreadType) < 0)
534538
return NULL;
535539

540+
if (PyType_Ready(&LockType) < 0)
541+
return NULL;
542+
536543
_periodic_threads = PyDict_New();
537544
if (_periodic_threads == NULL)
538545
return NULL;
@@ -541,6 +548,7 @@ PyInit__threads(void)
541548
if (m == NULL)
542549
goto error;
543550

551+
// Periodic thread
544552
Py_INCREF(&PeriodicThreadType);
545553
if (PyModule_AddObject(m, "PeriodicThread", (PyObject*)&PeriodicThreadType) < 0) {
546554
Py_DECREF(&PeriodicThreadType);
@@ -550,6 +558,13 @@ PyInit__threads(void)
550558
if (PyModule_AddObject(m, "periodic_threads", _periodic_threads) < 0)
551559
goto error;
552560

561+
// Lock
562+
Py_INCREF(&LockType);
563+
if (PyModule_AddObject(m, "Lock", (PyObject*)&LockType) < 0) {
564+
Py_DECREF(&LockType);
565+
goto error;
566+
}
567+
553568
return m;
554569

555570
error:

0 commit comments

Comments
 (0)