Skip to content

Commit b8b4647

Browse files
committed
feat: fix threaded connection
Signed-off-by: Alexandre Rulleau <[email protected]>
1 parent 36c07fd commit b8b4647

File tree

5 files changed

+248
-47
lines changed

5 files changed

+248
-47
lines changed

components-rs/ddtrace.h

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,63 @@ ddog_Configurator *ddog_library_configurator_new_dummy(bool debug_logs, ddog_Cha
4343

4444
int posix_spawn_file_actions_addchdir_np(void *file_actions, const char *path);
4545

46+
void ddog_init_span_func(void (*free_func)(struct _zend_string*),
47+
void (*addref_func)(struct _zend_string*));
48+
49+
void ddog_set_span_service_zstr(ddog_SpanBytes *ptr, struct _zend_string *str);
50+
51+
void ddog_set_span_name_zstr(ddog_SpanBytes *ptr, struct _zend_string *str);
52+
53+
void ddog_set_span_resource_zstr(ddog_SpanBytes *ptr, struct _zend_string *str);
54+
55+
void ddog_set_span_type_zstr(ddog_SpanBytes *ptr, struct _zend_string *str);
56+
57+
void ddog_add_span_meta_zstr(ddog_SpanBytes *ptr,
58+
struct _zend_string *key,
59+
struct _zend_string *val);
60+
61+
void ddog_add_CharSlice_span_meta_zstr(ddog_SpanBytes *ptr,
62+
ddog_CharSlice key,
63+
struct _zend_string *val);
64+
65+
void ddog_add_zstr_span_meta_str(ddog_SpanBytes *ptr, struct _zend_string *key, const char *val);
66+
67+
void ddog_add_str_span_meta_str(ddog_SpanBytes *ptr, const char *key, const char *val);
68+
69+
void ddog_add_str_span_meta_zstr(ddog_SpanBytes *ptr, const char *key, struct _zend_string *val);
70+
71+
void ddog_add_str_span_meta_CharSlice(ddog_SpanBytes *ptr, const char *key, ddog_CharSlice val);
72+
73+
void ddog_del_span_meta_zstr(ddog_SpanBytes *ptr, struct _zend_string *key);
74+
75+
void ddog_del_span_meta_str(ddog_SpanBytes *ptr, const char *key);
76+
77+
bool ddog_has_span_meta_zstr(ddog_SpanBytes *ptr, struct _zend_string *key);
78+
79+
bool ddog_has_span_meta_str(ddog_SpanBytes *ptr, const char *key);
80+
81+
ddog_CharSlice ddog_get_span_meta_str(ddog_SpanBytes *span, const char *key);
82+
83+
void ddog_add_span_metrics_zstr(ddog_SpanBytes *ptr, struct _zend_string *key, double val);
84+
85+
bool ddog_has_span_metrics_zstr(ddog_SpanBytes *ptr, struct _zend_string *key);
86+
87+
void ddog_del_span_metrics_zstr(ddog_SpanBytes *ptr, struct _zend_string *key);
88+
89+
void ddog_add_span_metrics_str(ddog_SpanBytes *ptr, const char *key, double val);
90+
91+
bool ddog_get_span_metrics_str(ddog_SpanBytes *ptr, const char *key, double *result);
92+
93+
void ddog_del_span_metrics_str(ddog_SpanBytes *ptr, const char *key);
94+
95+
void ddog_add_span_meta_struct_zstr(ddog_SpanBytes *ptr,
96+
struct _zend_string *key,
97+
struct _zend_string *val);
98+
99+
void ddog_add_zstr_span_meta_struct_CharSlice(ddog_SpanBytes *ptr,
100+
struct _zend_string *key,
101+
ddog_CharSlice val);
102+
46103
bool ddog_shall_log(enum ddog_Log category);
47104

48105
void ddog_set_error_log_level(bool once);

components-rs/sidecar.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ ddog_MaybeError ddog_sidecar_connect_master(int32_t master_pid);
9393
ddog_MaybeError ddog_sidecar_connect_worker(int32_t master_pid,
9494
struct ddog_SidecarTransport **connection);
9595

96+
ddog_MaybeError ddog_sidecar_shutdown_master_listener(void);
97+
9698
ddog_MaybeError ddog_sidecar_ping(struct ddog_SidecarTransport **transport);
9799

98100
ddog_MaybeError ddog_sidecar_flush_traces(struct ddog_SidecarTransport **transport);

ext/ddtrace.c

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -433,10 +433,6 @@ static void dd_activate_once(void) {
433433

434434
// must run before the first zai_hook_activate as ddtrace_telemetry_setup installs a global hook
435435
if (!ddtrace_disable) {
436-
bool appsec_activation = false;
437-
bool appsec_config = false;
438-
439-
#ifndef _WIN32
440436
// Only disable sidecar sender when explicitly disabled
441437
bool bgs_fallback = DD_SIDECAR_TRACE_SENDER_DEFAULT && get_global_DD_TRACE_SIDECAR_TRACE_SENDER() && zai_config_memoized_entries[DDTRACE_CONFIG_DD_TRACE_SIDECAR_TRACE_SENDER].name_index == ZAI_CONFIG_ORIGIN_DEFAULT && !get_global_DD_INSTRUMENTATION_TELEMETRY_ENABLED();
442438
zend_string *bgs_service = NULL;
@@ -452,15 +448,15 @@ static void dd_activate_once(void) {
452448
}
453449
}
454450

455-
// if we're to enable appsec, we need to enable sidecar
451+
// If we're to enable appsec, we need to enable sidecar
452+
bool appsec_activation = false;
453+
bool appsec_config = false;
456454
bool enable_sidecar = ddtrace_sidecar_maybe_enable_appsec(&appsec_activation, &appsec_config);
457455
if (!enable_sidecar) {
458456
enable_sidecar = get_global_DD_INSTRUMENTATION_TELEMETRY_ENABLED() || get_global_DD_TRACE_SIDECAR_TRACE_SENDER();
459457
}
460458

461-
if (enable_sidecar)
462-
#endif
463-
{
459+
if (enable_sidecar) {
464460
bool request_startup = PG(during_request_startup);
465461
PG(during_request_startup) = false;
466462
ddtrace_sidecar_setup(appsec_activation, appsec_config);
@@ -1552,6 +1548,10 @@ static PHP_MINIT_FUNCTION(ddtrace) {
15521548
ddtrace_signals_minit();
15531549
#endif
15541550

1551+
// Store master PID for threaded sidecar connectivity
1552+
// The actual sidecar setup happens in first RINIT via pthread_once
1553+
ddtrace_sidecar_minit(false, false);
1554+
15551555
return SUCCESS;
15561556
}
15571557

@@ -2641,19 +2641,33 @@ void dd_internal_handle_fork(void) {
26412641
ddtrace_generate_runtime_id();
26422642

26432643
#ifndef _WIN32
2644-
// If we're a child process and using sidecar, reuse the master's sidecar connection
2645-
// Don't call ddtrace_reset_sidecar() which would create a new sidecar instance
2646-
if (is_child_process && get_global_DD_TRACE_SIDECAR_TRACE_SENDER() && ddtrace_sidecar) {
2647-
// Child process reuses master's sidecar - just update instance ID
2648-
ddtrace_force_new_instance_id();
2649-
// Reconnect with fork flag
2650-
if (ddtrace_sidecar) {
2651-
ddtrace_sidecar_submit_root_span_data();
2644+
// Handle sidecar after fork with threaded connectivity
2645+
if (get_global_DD_TRACE_SIDECAR_TRACE_SENDER() && ddtrace_sidecar) {
2646+
if (is_child_process) {
2647+
// Child process connects to master's sidecar as a worker
2648+
ddtrace_force_new_instance_id();
2649+
2650+
// Connect to master as worker using threaded connectivity
2651+
if (ddtrace_sidecar_connect_worker_after_fork()) {
2652+
LOG(DEBUG, "Child process connected to master sidecar as worker (child_pid=%d, master_pid=%d)",
2653+
current_pid, ddtrace_master_pid);
2654+
2655+
// Submit root span data with the new connection
2656+
if (ddtrace_sidecar) {
2657+
ddtrace_sidecar_submit_root_span_data();
2658+
}
2659+
} else {
2660+
LOG(WARN, "Failed to connect child process as worker to master sidecar (child_pid=%d, master_pid=%d)",
2661+
current_pid, ddtrace_master_pid);
2662+
// Fallback: reset sidecar on failure
2663+
ddtrace_reset_sidecar();
2664+
}
26522665
}
2666+
// Parent process: keep existing connection, no action needed
26532667
} else
26542668
#endif
26552669
{
2656-
// Not a child process, or not using sidecar - normal reset
2670+
// Not using sidecar - normal reset for non-sidecar trace sender
26572671
ddtrace_reset_sidecar();
26582672
}
26592673
if (!get_DD_TRACE_FORKED_PROCESS()) {

ext/sidecar.c

Lines changed: 156 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,9 @@ static void dd_sidecar_on_reconnect(ddog_SidecarTransport *transport) {
153153

154154
}
155155

156+
// Forward declaration for reconnection handler
157+
static ddog_SidecarTransport *dd_sidecar_connection_factory(void);
158+
156159
static ddog_SidecarTransport *dd_sidecar_connection_factory_ex(bool is_fork) {
157160
// Should not happen, unless the agent url is malformed
158161
if (!ddtrace_endpoint) {
@@ -173,13 +176,18 @@ static ddog_SidecarTransport *dd_sidecar_connection_factory_ex(bool is_fork) {
173176
}
174177

175178
ddog_SidecarTransport *sidecar_transport;
176-
if (!ddtrace_ffi_try("Failed connecting to the sidecar", ddog_sidecar_connect_php(&sidecar_transport, logpath, dd_zend_string_to_CharSlice(get_global_DD_TRACE_LOG_LEVEL()), get_global_DD_INSTRUMENTATION_TELEMETRY_ENABLED(), dd_sidecar_on_reconnect, ddtrace_endpoint))) {
179+
// With threaded connectivity, use connect_worker to connect to master's listener
180+
if (!ddtrace_ffi_try("Failed connecting to sidecar as worker",
181+
ddog_sidecar_connect_worker((int32_t)ddtrace_master_pid, &sidecar_transport))) {
177182
dd_free_endpoints();
178183
return NULL;
179184
}
180185

181186
dd_sidecar_post_connect(&sidecar_transport, is_fork, logpath);
182187

188+
// Set up automatic reconnection handler
189+
ddtrace_sidecar_reconnect(&sidecar_transport, dd_sidecar_connection_factory);
190+
183191
return sidecar_transport;
184192
}
185193

@@ -211,27 +219,153 @@ bool ddtrace_sidecar_maybe_enable_appsec(bool *appsec_activation, bool *appsec_c
211219
#endif
212220
}
213221

214-
void ddtrace_sidecar_setup(bool appsec_activation, bool appsec_config) {
215-
ddtrace_set_non_resettable_sidecar_globals();
216-
ddtrace_set_resettable_sidecar_globals();
222+
// Called in MINIT - stores master PID for threaded connectivity
223+
void ddtrace_sidecar_minit(bool appsec_activation, bool appsec_config) {
224+
UNUSED(appsec_activation, appsec_config);
225+
226+
// Store master PID for threaded connectivity only if not already set
227+
// This ensures that after fork, child processes keep the parent's PID
228+
if (ddtrace_master_pid == 0) {
229+
#ifdef _WIN32
230+
ddtrace_master_pid = _getpid();
231+
#else
232+
ddtrace_master_pid = getpid();
233+
#endif
234+
LOG(DEBUG, "Sidecar MINIT: Stored master PID=%d", ddtrace_master_pid);
235+
} else {
236+
LOG(DEBUG, "Sidecar MINIT: Master PID already set to %d, current PID=%d",
237+
ddtrace_master_pid,
238+
#ifdef _WIN32
239+
_getpid()
240+
#else
241+
getpid()
242+
#endif
243+
);
244+
}
245+
}
217246

218-
// Store master PID for later fork detection
247+
// Setup master sidecar connection - called once on first RINIT
248+
static void ddtrace_sidecar_setup_master(bool appsec_activation, bool appsec_config) {
249+
// Check if we're in a forked child process
250+
// pthread_once state is not inherited across fork, so child processes
251+
// might try to call this function. They should NOT start a new master listener.
219252
#ifndef _WIN32
220-
ddtrace_master_pid = getpid();
253+
ddtrace_pid_t current_pid = getpid();
254+
255+
LOG(DEBUG, "ddtrace_sidecar_setup_master called: current_pid=%d, ddtrace_master_pid=%d",
256+
current_pid, ddtrace_master_pid);
257+
258+
bool is_child_process = (ddtrace_master_pid != 0 && current_pid != ddtrace_master_pid);
259+
260+
if (is_child_process) {
261+
LOG(DEBUG, "Skipping master sidecar setup in child process (child_pid=%d, master_pid=%d)",
262+
current_pid, ddtrace_master_pid);
263+
// Child processes should only connect as workers, which is handled in dd_internal_handle_fork()
264+
return;
265+
}
266+
267+
LOG(DEBUG, "Proceeding with master sidecar setup for PID=%d", current_pid);
221268
#endif
222269

270+
ddtrace_set_non_resettable_sidecar_globals();
271+
ddtrace_set_resettable_sidecar_globals();
272+
223273
ddog_init_remote_config(get_global_DD_INSTRUMENTATION_TELEMETRY_ENABLED(), appsec_activation, appsec_config);
224274

225-
ddtrace_sidecar = dd_sidecar_connection_factory();
226-
if (!ddtrace_sidecar) { // Something went wrong
275+
// Start master listener for worker processes/threads to connect to
276+
if (!ddtrace_ffi_try("Failed starting sidecar master listener", ddog_sidecar_connect_master((int32_t)ddtrace_master_pid))) {
277+
LOG(WARN, "Failed to start sidecar master listener");
278+
if (ddtrace_endpoint) {
279+
dd_free_endpoints();
280+
}
281+
return;
282+
}
283+
284+
// Master process also needs a transport - connect as first worker
285+
ddog_SidecarTransport *sidecar_transport = NULL;
286+
if (!ddtrace_ffi_try("Failed connecting master to sidecar", ddog_sidecar_connect_worker((int32_t)ddtrace_master_pid, &sidecar_transport))) {
287+
LOG(WARN, "Failed to connect master process to sidecar");
227288
if (ddtrace_endpoint) {
228289
dd_free_endpoints();
229290
}
291+
return;
292+
}
293+
294+
char logpath[MAXPATHLEN];
295+
int error_fd = atomic_load(&ddtrace_error_log_fd);
296+
if (error_fd == -1 || ddtrace_get_fd_path(error_fd, logpath) < 0) {
297+
*logpath = 0;
230298
}
231299

300+
dd_sidecar_post_connect(&sidecar_transport, false, logpath);
301+
ddtrace_sidecar = sidecar_transport;
302+
232303
if (get_global_DD_INSTRUMENTATION_TELEMETRY_ENABLED()) {
233304
ddtrace_telemetry_first_init();
234305
}
306+
307+
LOG(DEBUG, "Sidecar master setup complete, PID=%d", ddtrace_master_pid);
308+
}
309+
310+
// Called in RINIT - handles git metadata and root span data
311+
void ddtrace_sidecar_rinit(void) {
312+
if (get_DD_TRACE_GIT_METADATA_ENABLED()) {
313+
zval git_object;
314+
ZVAL_UNDEF(&git_object);
315+
ddtrace_inject_git_metadata(&git_object);
316+
if (Z_TYPE(git_object) == IS_OBJECT) {
317+
ddtrace_git_metadata *git_metadata = (ddtrace_git_metadata *) Z_OBJ(git_object);
318+
if (Z_TYPE(git_metadata->property_commit) == IS_STRING) {
319+
UNUSED(ddog_Vec_Tag_push(&DDTRACE_G(active_global_tags), DDOG_CHARSLICE_C("git.commit.sha"),
320+
dd_zend_string_to_CharSlice(Z_STR(git_metadata->property_commit))));
321+
}
322+
if (Z_TYPE(git_metadata->property_repository) == IS_STRING) {
323+
UNUSED(ddog_Vec_Tag_push(&DDTRACE_G(active_global_tags), DDOG_CHARSLICE_C("git.repository_url"),
324+
dd_zend_string_to_CharSlice(Z_STR(git_metadata->property_repository))));
325+
}
326+
OBJ_RELEASE(&git_metadata->std);
327+
}
328+
}
329+
330+
ddtrace_sidecar_submit_root_span_data_direct_defaults(&ddtrace_sidecar, NULL);
331+
}
332+
333+
// Setup function called via pthread_once on first RINIT
334+
void ddtrace_sidecar_setup(bool appsec_activation, bool appsec_config) {
335+
// This is called on first RINIT when config is available
336+
// Set up the master sidecar connection with threaded connectivity
337+
ddtrace_sidecar_setup_master(appsec_activation, appsec_config);
338+
}
339+
340+
// Connect child process to master sidecar as worker after fork
341+
bool ddtrace_sidecar_connect_worker_after_fork(void) {
342+
if (!ddtrace_endpoint || ddtrace_master_pid == 0) {
343+
return false;
344+
}
345+
346+
// Connect to master as worker using threaded connectivity
347+
ddog_SidecarTransport *sidecar_transport = NULL;
348+
if (!ddtrace_ffi_try("Failed connecting worker to sidecar after fork",
349+
ddog_sidecar_connect_worker((int32_t)ddtrace_master_pid, &sidecar_transport))) {
350+
return false;
351+
}
352+
353+
char logpath[MAXPATHLEN];
354+
int error_fd = atomic_load(&ddtrace_error_log_fd);
355+
if (error_fd == -1 || ddtrace_get_fd_path(error_fd, logpath) < 0) {
356+
*logpath = 0;
357+
}
358+
359+
// Post-connect with fork flag set to true
360+
dd_sidecar_post_connect(&sidecar_transport, true, logpath);
361+
362+
// Drop old transport and use the new worker connection
363+
if (ddtrace_sidecar) {
364+
ddog_sidecar_transport_drop(ddtrace_sidecar);
365+
}
366+
ddtrace_sidecar = sidecar_transport;
367+
368+
return true;
235369
}
236370

237371
void ddtrace_sidecar_ensure_active(void) {
@@ -261,6 +395,20 @@ void ddtrace_sidecar_finalize(bool clear_id) {
261395
}
262396

263397
void ddtrace_sidecar_shutdown(void) {
398+
// Shutdown master listener thread if this is the master process
399+
#ifndef _WIN32
400+
ddtrace_pid_t current_pid = getpid();
401+
if (ddtrace_master_pid != 0 && current_pid == ddtrace_master_pid) {
402+
LOG(DEBUG, "Shutting down master listener thread (PID=%d)", current_pid);
403+
ddtrace_ffi_try("Failed shutting down master listener", ddog_sidecar_shutdown_master_listener());
404+
}
405+
#else
406+
if (ddtrace_master_pid != 0 && _getpid() == ddtrace_master_pid) {
407+
LOG(DEBUG, "Shutting down master listener thread (PID=%d)", _getpid());
408+
ddtrace_ffi_try("Failed shutting down master listener", ddog_sidecar_shutdown_master_listener());
409+
}
410+
#endif
411+
264412
if (ddtrace_sidecar_instance_id) {
265413
ddog_sidecar_instanceId_drop(ddtrace_sidecar_instance_id);
266414
}
@@ -563,28 +711,6 @@ void ddtrace_sidecar_activate(void) {
563711
} ZEND_HASH_FOREACH_END();
564712
}
565713

566-
void ddtrace_sidecar_rinit(void) {
567-
if (get_DD_TRACE_GIT_METADATA_ENABLED()) {
568-
zval git_object;
569-
ZVAL_UNDEF(&git_object);
570-
ddtrace_inject_git_metadata(&git_object);
571-
if (Z_TYPE(git_object) == IS_OBJECT) {
572-
ddtrace_git_metadata *git_metadata = (ddtrace_git_metadata *) Z_OBJ(git_object);
573-
if (Z_TYPE(git_metadata->property_commit) == IS_STRING) {
574-
UNUSED(ddog_Vec_Tag_push(&DDTRACE_G(active_global_tags), DDOG_CHARSLICE_C("git.commit.sha"),
575-
dd_zend_string_to_CharSlice(Z_STR(git_metadata->property_commit))));
576-
}
577-
if (Z_TYPE(git_metadata->property_repository) == IS_STRING) {
578-
UNUSED(ddog_Vec_Tag_push(&DDTRACE_G(active_global_tags), DDOG_CHARSLICE_C("git.repository_url"),
579-
dd_zend_string_to_CharSlice(Z_STR(git_metadata->property_repository))));
580-
}
581-
OBJ_RELEASE(&git_metadata->std);
582-
}
583-
}
584-
585-
ddtrace_sidecar_submit_root_span_data_direct_defaults(&ddtrace_sidecar, NULL);
586-
}
587-
588714
void ddtrace_sidecar_rshutdown(void) {
589715
ddog_Vec_Tag_drop(DDTRACE_G(active_global_tags));
590716
}

0 commit comments

Comments
 (0)