Skip to content

Commit 5ae4a34

Browse files
committed
maybe leak
1 parent 9dd1405 commit 5ae4a34

File tree

7 files changed

+68
-14
lines changed

7 files changed

+68
-14
lines changed

ddtrace/_trace/context.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,6 @@ def __init__(
9191
if lock is not None:
9292
self._lock = lock
9393
else:
94-
# DEV: A `forksafe.RLock` is not necessary here since Contexts
95-
# are recreated by the tracer after fork
96-
# https://github.com/DataDog/dd-trace-py/blob/a1932e8ddb704d259ea8a3188d30bf542f59fd8d/ddtrace/tracer.py#L489-L508
9794
self._lock = RLock()
9895

9996
def __getstate__(self) -> _ContextState:

ddtrace/internal/_encoding.pyx

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ from libc cimport stdint
44
from libc.string cimport strlen
55

66
from json import dumps as json_dumps
7-
import threading
87
from json import dumps as json_dumps
98

109
from ._utils cimport PyBytesLike_Check

ddtrace/internal/_threads.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,8 @@ static PyTypeObject PeriodicThreadType = {
516516
// ----------------------------------------------------------------------------
517517
static PyMethodDef _threads_methods[] = {
518518
{ "reset_locks", (PyCFunction)lock_reset_locks, METH_NOARGS, "Reset all locks (generally after a fork)" },
519+
{ "begin_reset_locks", (PyCFunction)lock_begin_reset_locks, METH_NOARGS, "Begin resetting locks (before a fork)" },
520+
{ "end_reset_locks", (PyCFunction)lock_end_reset_locks, METH_NOARGS, "End resetting locks (after a fork)" },
519521
{ NULL, NULL, 0, NULL } /* Sentinel */
520522
};
521523

ddtrace/internal/_threads.pyi

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ class Lock(_BaseLock): ...
1212
class RLock(_BaseLock): ...
1313

1414
def reset_locks() -> None: ...
15+
def begin_reset_locks() -> None: ...
16+
def end_reset_locks() -> None: ...
1517

1618
class PeriodicThread:
1719
name: str

ddtrace/internal/_threads/lock.hpp

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
#include <mutex>
88
#include <set>
99

10-
std::unique_ptr<std::mutex> _lock_set_mutex = std::make_unique<std::mutex>();
10+
std::mutex _lock_set_mutex;
1111

1212
// ----------------------------------------------------------------------------
1313
// Lock class
@@ -35,14 +35,25 @@ Lock_init(Lock* self, PyObject* args, PyObject* kwargs)
3535
{
3636
AllowThreads _;
3737

38-
std::lock_guard<std::mutex> guard(*_lock_set_mutex);
38+
std::lock_guard<std::mutex> guard(_lock_set_mutex);
3939

4040
lock_set.insert(self);
4141
}
4242

4343
return 0;
4444
}
4545

46+
// ----------------------------------------------------------------------------
47+
static inline void
48+
_Lock_maybe_leak(Lock* self)
49+
{
50+
// This function is used to ensure that the mutex is not leaked if it is
51+
// still locked when the lock object is deallocated.
52+
if (self->_locked) {
53+
self->_mutex.release(); // DEV: This releases the unique_ptr, not the mutex!
54+
}
55+
}
56+
4657
// ----------------------------------------------------------------------------
4758
static void
4859
Lock_dealloc(Lock* self)
@@ -51,11 +62,13 @@ Lock_dealloc(Lock* self)
5162
{
5263
AllowThreads _;
5364

54-
std::lock_guard<std::mutex> guard(*_lock_set_mutex);
65+
std::lock_guard<std::mutex> guard(_lock_set_mutex);
5566

5667
lock_set.erase(self);
5768
}
5869

70+
_Lock_maybe_leak(self);
71+
5972
self->_mutex = nullptr;
6073

6174
Py_TYPE(self)->tp_free((PyObject*)self);
@@ -153,6 +166,7 @@ Lock_exit(Lock* self, PyObject* args, PyObject* kwargs)
153166
static inline void
154167
Lock_reset(Lock* self)
155168
{
169+
_Lock_maybe_leak(self);
156170
self->_mutex = std::make_unique<std::timed_mutex>();
157171
self->_locked = 0;
158172
}
@@ -212,26 +226,38 @@ RLock_init(RLock* self, PyObject* args, PyObject* kwargs)
212226
{
213227
AllowThreads _;
214228

215-
std::lock_guard<std::mutex> guard(*_lock_set_mutex);
229+
std::lock_guard<std::mutex> guard(_lock_set_mutex);
216230

217231
rlock_set.insert(self);
218232
}
219233

220234
return 0;
221235
}
222236

237+
// ----------------------------------------------------------------------------
238+
static inline void
239+
_RLock_maybe_leak(RLock* self)
240+
{
241+
// This function is used to ensure that the mutex is not leaked if it is
242+
// still locked when the re-entrant lock object is deallocated.
243+
if (self->_locked) {
244+
self->_mutex.release(); // DEV: This releases the unique_ptr, not the mutex!
245+
}
246+
}
247+
223248
// ----------------------------------------------------------------------------
224249
static void
225250
RLock_dealloc(RLock* self)
226251
{
227252
{
228253
AllowThreads _;
229254

230-
std::lock_guard<std::mutex> guard(*_lock_set_mutex);
255+
std::lock_guard<std::mutex> guard(_lock_set_mutex);
231256

232257
rlock_set.erase(self);
233258
}
234259

260+
_RLock_maybe_leak(self);
235261
self->_mutex = nullptr;
236262

237263
Py_TYPE(self)->tp_free((PyObject*)self);
@@ -329,6 +355,7 @@ RLock_exit(RLock* self, PyObject* args, PyObject* kwargs)
329355
static inline void
330356
RLock_reset(RLock* self)
331357
{
358+
_RLock_maybe_leak(self);
332359
self->_mutex = std::make_unique<std::recursive_timed_mutex>();
333360
self->_locked = 0;
334361
}
@@ -380,8 +407,33 @@ lock_reset_locks(PyObject* Py_UNUSED(self), PyObject* Py_UNUSED(args))
380407
RLock_reset(rlock);
381408
}
382409

383-
// Reset the lock set mutex too!
384-
_lock_set_mutex = std::make_unique<std::mutex>();
410+
_lock_set_mutex.unlock();
411+
412+
Py_RETURN_NONE;
413+
}
414+
415+
// ----------------------------------------------------------------------------
416+
static PyObject*
417+
lock_begin_reset_locks(PyObject* Py_UNUSED(self), PyObject* Py_UNUSED(args))
418+
{
419+
// This function is called before a fork to ensure that the lock set mutex
420+
// is not held by any thread.
421+
{
422+
AllowThreads _;
423+
424+
_lock_set_mutex.lock();
425+
}
426+
427+
Py_RETURN_NONE;
428+
}
429+
430+
// ----------------------------------------------------------------------------
431+
static PyObject*
432+
lock_end_reset_locks(PyObject* Py_UNUSED(self), PyObject* Py_UNUSED(args))
433+
{
434+
// This function is called after a fork to ensure that the lock set mutex
435+
// is released and can be used by the new process.
436+
_lock_set_mutex.unlock();
385437

386438
Py_RETURN_NONE;
387439
}

ddtrace/internal/rate_limiter.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from dataclasses import dataclass
44
from dataclasses import field
55
import random
6-
import threading
76
import time
87
from typing import Any # noqa:F401
98
from typing import Callable # noqa:F401
@@ -204,7 +203,7 @@ class BudgetRateLimiterWithJitter:
204203
budget: float = field(init=False)
205204
max_budget: float = field(init=False)
206205
last_time: float = field(init=False, default_factory=time.monotonic)
207-
_lock: threading.Lock = field(init=False, default_factory=threading.Lock)
206+
_lock: Lock = field(init=False, default_factory=Lock)
208207

209208
def __post_init__(self):
210209
if self.limit_rate == float("inf"):

ddtrace/internal/threads.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@
44
from ddtrace.internal._threads import Lock
55
from ddtrace.internal._threads import PeriodicThread
66
from ddtrace.internal._threads import RLock
7+
from ddtrace.internal._threads import begin_reset_locks
8+
from ddtrace.internal._threads import end_reset_locks
79
from ddtrace.internal._threads import periodic_threads
810
from ddtrace.internal._threads import reset_locks
911

1012

1113
__all__ = [
1214
"Lock",
1315
"PeriodicThread",
14-
"periodic_threads",
1516
"RLock",
1617
]
1718

@@ -36,4 +37,6 @@ def _() -> None:
3637
periodic_threads.clear()
3738

3839

40+
forksafe.register_before_fork(begin_reset_locks)
3941
forksafe.register(reset_locks)
42+
forksafe.register_after_parent(end_reset_locks)

0 commit comments

Comments
 (0)