Skip to content

Commit 75a7b4f

Browse files
committed
acquire-release
1 parent 9dd1405 commit 75a7b4f

File tree

4 files changed

+62
-43
lines changed

4 files changed

+62
-43
lines changed

ddtrace/internal/_threads.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,8 @@ static PyTypeObject PeriodicThreadType = {
515515

516516
// ----------------------------------------------------------------------------
517517
static PyMethodDef _threads_methods[] = {
518-
{ "reset_locks", (PyCFunction)lock_reset_locks, METH_NOARGS, "Reset all locks (generally after a fork)" },
518+
{ "acquire_all", (PyCFunction)lock_acquire_all, METH_NOARGS, "Acquire all locks (generally before a fork)" },
519+
{ "release_all", (PyCFunction)lock_release_all, METH_NOARGS, "Release all locks (generally after a fork)" },
519520
{ NULL, NULL, 0, NULL } /* Sentinel */
520521
};
521522

ddtrace/internal/_threads.pyi

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ class _BaseLock:
1111
class Lock(_BaseLock): ...
1212
class RLock(_BaseLock): ...
1313

14-
def reset_locks() -> None: ...
14+
def acquire_all() -> None: ...
15+
def release_all() -> None: ...
1516

1617
class PeriodicThread:
1718
name: str

ddtrace/internal/_threads/lock.hpp

Lines changed: 51 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
#include <mutex>
88
#include <set>
99

10-
std::unique_ptr<std::mutex> _lock_set_mutex = std::make_unique<std::mutex>();
10+
std::mutex _lock_set_mutex;
1111

1212
// ----------------------------------------------------------------------------
1313
// Lock class
@@ -23,19 +23,18 @@ typedef struct lock
2323
std::unique_ptr<std::timed_mutex> _mutex = nullptr;
2424
} Lock;
2525

26-
std::set<Lock*> lock_set; // Global set of locks for reset after fork
26+
std::set<Lock*> lock_set;
2727

2828
// ----------------------------------------------------------------------------
2929
static int
30-
Lock_init(Lock* self, PyObject* args, PyObject* kwargs)
30+
Lock_init(Lock* self, PyObject* Py_UNUSED(args), PyObject* Py_UNUSED(kwargs))
3131
{
3232
self->_mutex = std::make_unique<std::timed_mutex>();
3333

34-
// Register the lock for reset after fork
3534
{
3635
AllowThreads _;
3736

38-
std::lock_guard<std::mutex> guard(*_lock_set_mutex);
37+
std::lock_guard<std::mutex> guard(_lock_set_mutex);
3938

4039
lock_set.insert(self);
4140
}
@@ -51,11 +50,16 @@ Lock_dealloc(Lock* self)
5150
{
5251
AllowThreads _;
5352

54-
std::lock_guard<std::mutex> guard(*_lock_set_mutex);
53+
std::lock_guard<std::mutex> guard(_lock_set_mutex);
5554

5655
lock_set.erase(self);
5756
}
5857

58+
if (self->_locked > 0) {
59+
self->_mutex->unlock();
60+
self->_locked = 0;
61+
}
62+
5963
self->_mutex = nullptr;
6064

6165
Py_TYPE(self)->tp_free((PyObject*)self);
@@ -109,7 +113,7 @@ Lock_release(Lock* self)
109113
}
110114

111115
self->_mutex->unlock();
112-
self->_locked = 0; // Reset the lock state
116+
self->_locked = 0;
113117

114118
Py_RETURN_NONE;
115119
}
@@ -127,7 +131,7 @@ Lock_locked(Lock* self)
127131

128132
// ----------------------------------------------------------------------------
129133
static PyObject*
130-
Lock_enter(Lock* self, PyObject* args, PyObject* kwargs)
134+
Lock_enter(Lock* self)
131135
{
132136
AllowThreads _;
133137

@@ -140,7 +144,7 @@ Lock_enter(Lock* self, PyObject* args, PyObject* kwargs)
140144

141145
// ----------------------------------------------------------------------------
142146
static PyObject*
143-
Lock_exit(Lock* self, PyObject* args, PyObject* kwargs)
147+
Lock_exit(Lock* self, PyObject* Py_UNUSED(args), PyObject* Py_UNUSED(kwargs))
144148
{
145149
// This method is called when the lock is used in a "with" statement
146150
if (Lock_release(self) == NULL) {
@@ -150,13 +154,6 @@ Lock_exit(Lock* self, PyObject* args, PyObject* kwargs)
150154
Py_RETURN_FALSE;
151155
}
152156

153-
static inline void
154-
Lock_reset(Lock* self)
155-
{
156-
self->_mutex = std::make_unique<std::timed_mutex>();
157-
self->_locked = 0;
158-
}
159-
160157
// ----------------------------------------------------------------------------
161158
static PyMethodDef Lock_methods[] = {
162159
{ "acquire", (PyCFunction)Lock_acquire, METH_VARARGS | METH_KEYWORDS, "Acquire the lock with an optional timeout" },
@@ -200,19 +197,18 @@ typedef struct rlock
200197
std::unique_ptr<std::recursive_timed_mutex> _mutex = nullptr;
201198
} RLock;
202199

203-
std::set<RLock*> rlock_set; // Global set of re-entrant locks for reset after fork
200+
std::set<RLock*> rlock_set;
204201

205202
// ----------------------------------------------------------------------------
206203
static int
207-
RLock_init(RLock* self, PyObject* args, PyObject* kwargs)
204+
RLock_init(RLock* self, PyObject* Py_UNUSED(args), PyObject* Py_UNUSED(kwargs))
208205
{
209206
self->_mutex = std::make_unique<std::recursive_timed_mutex>();
210207

211-
// Register the re-entrant lock for reset after fork
212208
{
213209
AllowThreads _;
214210

215-
std::lock_guard<std::mutex> guard(*_lock_set_mutex);
211+
std::lock_guard<std::mutex> guard(_lock_set_mutex);
216212

217213
rlock_set.insert(self);
218214
}
@@ -227,11 +223,15 @@ RLock_dealloc(RLock* self)
227223
{
228224
AllowThreads _;
229225

230-
std::lock_guard<std::mutex> guard(*_lock_set_mutex);
226+
std::lock_guard<std::mutex> guard(_lock_set_mutex);
231227

232228
rlock_set.erase(self);
233229
}
234230

231+
for (; self->_locked > 0; self->_locked--) {
232+
self->_mutex->unlock();
233+
}
234+
235235
self->_mutex = nullptr;
236236

237237
Py_TYPE(self)->tp_free((PyObject*)self);
@@ -303,7 +303,7 @@ RLock_locked(RLock* self)
303303

304304
// ----------------------------------------------------------------------------
305305
static PyObject*
306-
RLock_enter(RLock* self, PyObject* args, PyObject* kwargs)
306+
RLock_enter(RLock* self)
307307
{
308308
AllowThreads _;
309309

@@ -316,7 +316,7 @@ RLock_enter(RLock* self, PyObject* args, PyObject* kwargs)
316316

317317
// ----------------------------------------------------------------------------
318318
static PyObject*
319-
RLock_exit(RLock* self, PyObject* args, PyObject* kwargs)
319+
RLock_exit(RLock* self, PyObject* Py_UNUSED(args), PyObject* Py_UNUSED(kwargs))
320320
{
321321
// This method is called when the lock is used in a "with" statement
322322
if (RLock_release(self) == NULL) {
@@ -326,13 +326,6 @@ RLock_exit(RLock* self, PyObject* args, PyObject* kwargs)
326326
Py_RETURN_FALSE;
327327
}
328328

329-
static inline void
330-
RLock_reset(RLock* self)
331-
{
332-
self->_mutex = std::make_unique<std::recursive_timed_mutex>();
333-
self->_locked = 0;
334-
}
335-
336329
// ----------------------------------------------------------------------------
337330
static PyMethodDef RLock_methods[] = {
338331
{ "acquire",
@@ -367,21 +360,41 @@ static PyTypeObject RLockType = {
367360

368361
// ----------------------------------------------------------------------------
369362
static PyObject*
370-
lock_reset_locks(PyObject* Py_UNUSED(self), PyObject* Py_UNUSED(args))
363+
lock_acquire_all(PyObject* Py_UNUSED(self), PyObject* Py_UNUSED(args))
371364
{
372-
// Reset all locks that have been registered for reset after a fork. This
373-
// MUST be called in a single-thread scenario only, e.g. soon after the
374-
// fork.
365+
{
366+
AllowThreads _;
367+
368+
_lock_set_mutex.lock();
369+
370+
for (Lock* lock : lock_set) {
371+
lock->_mutex->lock();
372+
lock->_locked = 1;
373+
}
374+
375+
for (RLock* rlock : rlock_set) {
376+
rlock->_mutex->lock();
377+
rlock->_locked++;
378+
}
379+
}
380+
381+
Py_RETURN_NONE;
382+
}
383+
384+
// ----------------------------------------------------------------------------
385+
static PyObject*
386+
lock_release_all(PyObject* Py_UNUSED(self), PyObject* Py_UNUSED(args))
387+
{
388+
375389
for (Lock* lock : lock_set) {
376-
Lock_reset(lock);
390+
Lock_exit(lock, NULL, NULL);
377391
}
378392

379393
for (RLock* rlock : rlock_set) {
380-
RLock_reset(rlock);
394+
RLock_exit(rlock, NULL, NULL);
381395
}
382396

383-
// Reset the lock set mutex too!
384-
_lock_set_mutex = std::make_unique<std::mutex>();
397+
_lock_set_mutex.unlock();
385398

386399
Py_RETURN_NONE;
387400
}

ddtrace/internal/threads.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@
44
from ddtrace.internal._threads import Lock
55
from ddtrace.internal._threads import PeriodicThread
66
from ddtrace.internal._threads import RLock
7+
from ddtrace.internal._threads import acquire_all
78
from ddtrace.internal._threads import periodic_threads
8-
from ddtrace.internal._threads import reset_locks
9+
from ddtrace.internal._threads import release_all
910

1011

1112
__all__ = [
1213
"Lock",
1314
"PeriodicThread",
14-
"periodic_threads",
1515
"RLock",
1616
]
1717

@@ -36,4 +36,8 @@ def _() -> None:
3636
periodic_threads.clear()
3737

3838

39-
forksafe.register(reset_locks)
39+
# Acquire all locks before a fork in the thread that is forking. We then
40+
# release them after the fork in both the parent and child processes.
41+
forksafe.register_before_fork(acquire_all)
42+
forksafe.register(release_all)
43+
forksafe.register_after_parent(release_all)

0 commit comments

Comments
 (0)