Skip to content

Commit 26e44e8

Browse files
committed
feat: fix threaded connection
Signed-off-by: Alexandre Rulleau <[email protected]>
1 parent 7b77497 commit 26e44e8

File tree

4 files changed

+176
-48
lines changed

4 files changed

+176
-48
lines changed

ext/ddtrace.c

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -433,10 +433,6 @@ static void dd_activate_once(void) {
433433

434434
// must run before the first zai_hook_activate as ddtrace_telemetry_setup installs a global hook
435435
if (!ddtrace_disable) {
436-
bool appsec_activation = false;
437-
bool appsec_config = false;
438-
439-
#ifndef _WIN32
440436
// Only disable sidecar sender when explicitly disabled
441437
bool bgs_fallback = DD_SIDECAR_TRACE_SENDER_DEFAULT && get_global_DD_TRACE_SIDECAR_TRACE_SENDER() && zai_config_memoized_entries[DDTRACE_CONFIG_DD_TRACE_SIDECAR_TRACE_SENDER].name_index == ZAI_CONFIG_ORIGIN_DEFAULT && !get_global_DD_INSTRUMENTATION_TELEMETRY_ENABLED();
442438
zend_string *bgs_service = NULL;
@@ -452,15 +448,15 @@ static void dd_activate_once(void) {
452448
}
453449
}
454450

455-
// if we're to enable appsec, we need to enable sidecar
451+
// If we're to enable appsec, we need to enable sidecar
452+
bool appsec_activation = false;
453+
bool appsec_config = false;
456454
bool enable_sidecar = ddtrace_sidecar_maybe_enable_appsec(&appsec_activation, &appsec_config);
457455
if (!enable_sidecar) {
458456
enable_sidecar = get_global_DD_INSTRUMENTATION_TELEMETRY_ENABLED() || get_global_DD_TRACE_SIDECAR_TRACE_SENDER();
459457
}
460458

461-
if (enable_sidecar)
462-
#endif
463-
{
459+
if (enable_sidecar) {
464460
bool request_startup = PG(during_request_startup);
465461
PG(during_request_startup) = false;
466462
ddtrace_sidecar_setup(appsec_activation, appsec_config);
@@ -1550,6 +1546,10 @@ static PHP_MINIT_FUNCTION(ddtrace) {
15501546
ddtrace_signals_minit();
15511547
#endif
15521548

1549+
// Store master PID for threaded sidecar connectivity
1550+
// The actual sidecar setup happens in first RINIT via pthread_once
1551+
ddtrace_sidecar_minit(false, false);
1552+
15531553
return SUCCESS;
15541554
}
15551555

@@ -2639,19 +2639,33 @@ void dd_internal_handle_fork(void) {
26392639
ddtrace_generate_runtime_id();
26402640

26412641
#ifndef _WIN32
2642-
// If we're a child process and using sidecar, reuse the master's sidecar connection
2643-
// Don't call ddtrace_reset_sidecar() which would create a new sidecar instance
2644-
if (is_child_process && get_global_DD_TRACE_SIDECAR_TRACE_SENDER() && ddtrace_sidecar) {
2645-
// Child process reuses master's sidecar - just update instance ID
2646-
ddtrace_force_new_instance_id();
2647-
// Reconnect with fork flag
2648-
if (ddtrace_sidecar) {
2649-
ddtrace_sidecar_submit_root_span_data();
2642+
// Handle sidecar after fork with threaded connectivity
2643+
if (get_global_DD_TRACE_SIDECAR_TRACE_SENDER() && ddtrace_sidecar) {
2644+
if (is_child_process) {
2645+
// Child process connects to master's sidecar as a worker
2646+
ddtrace_force_new_instance_id();
2647+
2648+
// Connect to master as worker using threaded connectivity
2649+
if (ddtrace_sidecar_connect_worker_after_fork()) {
2650+
LOG(DEBUG, "Child process connected to master sidecar as worker (child_pid=%d, master_pid=%d)",
2651+
current_pid, ddtrace_master_pid);
2652+
2653+
// Submit root span data with the new connection
2654+
if (ddtrace_sidecar) {
2655+
ddtrace_sidecar_submit_root_span_data();
2656+
}
2657+
} else {
2658+
LOG(WARN, "Failed to connect child process as worker to master sidecar (child_pid=%d, master_pid=%d)",
2659+
current_pid, ddtrace_master_pid);
2660+
// Fallback: reset sidecar on failure
2661+
ddtrace_reset_sidecar();
2662+
}
26502663
}
2664+
// Parent process: keep existing connection, no action needed
26512665
} else
26522666
#endif
26532667
{
2654-
// Not a child process, or not using sidecar - normal reset
2668+
// Not using sidecar - normal reset for non-sidecar trace sender
26552669
ddtrace_reset_sidecar();
26562670
}
26572671
if (!get_DD_TRACE_FORKED_PROCESS()) {

ext/sidecar.c

Lines changed: 142 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,9 @@ static void dd_sidecar_on_reconnect(ddog_SidecarTransport *transport) {
152152

153153
}
154154

155+
// Forward declaration for reconnection handler
156+
static ddog_SidecarTransport *dd_sidecar_connection_factory(void);
157+
155158
static ddog_SidecarTransport *dd_sidecar_connection_factory_ex(bool is_fork) {
156159
// Should not happen, unless the agent url is malformed
157160
if (!ddtrace_endpoint) {
@@ -172,13 +175,18 @@ static ddog_SidecarTransport *dd_sidecar_connection_factory_ex(bool is_fork) {
172175
}
173176

174177
ddog_SidecarTransport *sidecar_transport;
175-
if (!ddtrace_ffi_try("Failed connecting to the sidecar", ddog_sidecar_connect_php(&sidecar_transport, logpath, dd_zend_string_to_CharSlice(get_global_DD_TRACE_LOG_LEVEL()), get_global_DD_INSTRUMENTATION_TELEMETRY_ENABLED(), dd_sidecar_on_reconnect))) {
178+
// With threaded connectivity, use connect_worker to connect to master's listener
179+
if (!ddtrace_ffi_try("Failed connecting to sidecar as worker",
180+
ddog_sidecar_connect_worker((int32_t)ddtrace_master_pid, &sidecar_transport))) {
176181
dd_free_endpoints();
177182
return NULL;
178183
}
179184

180185
dd_sidecar_post_connect(&sidecar_transport, is_fork, logpath);
181186

187+
// Set up automatic reconnection handler
188+
ddtrace_sidecar_reconnect(&sidecar_transport, dd_sidecar_connection_factory);
189+
182190
return sidecar_transport;
183191
}
184192

@@ -210,27 +218,153 @@ bool ddtrace_sidecar_maybe_enable_appsec(bool *appsec_activation, bool *appsec_c
210218
#endif
211219
}
212220

213-
void ddtrace_sidecar_setup(bool appsec_activation, bool appsec_config) {
214-
ddtrace_set_non_resettable_sidecar_globals();
215-
ddtrace_set_resettable_sidecar_globals();
221+
// Called in MINIT - stores master PID for threaded connectivity
222+
void ddtrace_sidecar_minit(bool appsec_activation, bool appsec_config) {
223+
UNUSED(appsec_activation, appsec_config);
216224

217-
// Store master PID for later fork detection
225+
// Store master PID for threaded connectivity only if not already set
226+
// This ensures that after fork, child processes keep the parent's PID
227+
if (ddtrace_master_pid == 0) {
228+
#ifdef _WIN32
229+
ddtrace_master_pid = _getpid();
230+
#else
231+
ddtrace_master_pid = getpid();
232+
#endif
233+
LOG(DEBUG, "Sidecar MINIT: Stored master PID=%d", ddtrace_master_pid);
234+
} else {
235+
LOG(DEBUG, "Sidecar MINIT: Master PID already set to %d, current PID=%d",
236+
ddtrace_master_pid,
237+
#ifdef _WIN32
238+
_getpid()
239+
#else
240+
getpid()
241+
#endif
242+
);
243+
}
244+
}
245+
246+
// Setup master sidecar connection - called once on first RINIT
247+
static void ddtrace_sidecar_setup_master(bool appsec_activation, bool appsec_config) {
248+
// Check if we're in a forked child process
249+
// pthread_once state is not inherited across fork, so child processes
250+
// might try to call this function. They should NOT start a new master listener.
218251
#ifndef _WIN32
219-
ddtrace_master_pid = getpid();
252+
ddtrace_pid_t current_pid = getpid();
253+
254+
LOG(DEBUG, "ddtrace_sidecar_setup_master called: current_pid=%d, ddtrace_master_pid=%d",
255+
current_pid, ddtrace_master_pid);
256+
257+
bool is_child_process = (ddtrace_master_pid != 0 && current_pid != ddtrace_master_pid);
258+
259+
if (is_child_process) {
260+
LOG(DEBUG, "Skipping master sidecar setup in child process (child_pid=%d, master_pid=%d)",
261+
current_pid, ddtrace_master_pid);
262+
// Child processes should only connect as workers, which is handled in dd_internal_handle_fork()
263+
return;
264+
}
265+
266+
LOG(DEBUG, "Proceeding with master sidecar setup for PID=%d", current_pid);
220267
#endif
221268

269+
ddtrace_set_non_resettable_sidecar_globals();
270+
ddtrace_set_resettable_sidecar_globals();
271+
222272
ddog_init_remote_config(get_global_DD_INSTRUMENTATION_TELEMETRY_ENABLED(), appsec_activation, appsec_config);
223273

224-
ddtrace_sidecar = dd_sidecar_connection_factory();
225-
if (!ddtrace_sidecar) { // Something went wrong
274+
// Start master listener for worker processes/threads to connect to
275+
if (!ddtrace_ffi_try("Failed starting sidecar master listener", ddog_sidecar_connect_master((int32_t)ddtrace_master_pid))) {
276+
LOG(WARN, "Failed to start sidecar master listener");
226277
if (ddtrace_endpoint) {
227278
dd_free_endpoints();
228279
}
280+
return;
229281
}
230282

283+
// Master process also needs a transport - connect as first worker
284+
ddog_SidecarTransport *sidecar_transport = NULL;
285+
if (!ddtrace_ffi_try("Failed connecting master to sidecar", ddog_sidecar_connect_worker((int32_t)ddtrace_master_pid, &sidecar_transport))) {
286+
LOG(WARN, "Failed to connect master process to sidecar");
287+
if (ddtrace_endpoint) {
288+
dd_free_endpoints();
289+
}
290+
return;
291+
}
292+
293+
char logpath[MAXPATHLEN];
294+
int error_fd = atomic_load(&ddtrace_error_log_fd);
295+
if (error_fd == -1 || ddtrace_get_fd_path(error_fd, logpath) < 0) {
296+
*logpath = 0;
297+
}
298+
299+
dd_sidecar_post_connect(&sidecar_transport, false, logpath);
300+
ddtrace_sidecar = sidecar_transport;
301+
231302
if (get_global_DD_INSTRUMENTATION_TELEMETRY_ENABLED()) {
232303
ddtrace_telemetry_first_init();
233304
}
305+
306+
LOG(DEBUG, "Sidecar master setup complete, PID=%d", ddtrace_master_pid);
307+
}
308+
309+
// Called in RINIT - handles git metadata and root span data
310+
void ddtrace_sidecar_rinit(void) {
311+
if (get_DD_TRACE_GIT_METADATA_ENABLED()) {
312+
zval git_object;
313+
ZVAL_UNDEF(&git_object);
314+
ddtrace_inject_git_metadata(&git_object);
315+
if (Z_TYPE(git_object) == IS_OBJECT) {
316+
ddtrace_git_metadata *git_metadata = (ddtrace_git_metadata *) Z_OBJ(git_object);
317+
if (Z_TYPE(git_metadata->property_commit) == IS_STRING) {
318+
UNUSED(ddog_Vec_Tag_push(&DDTRACE_G(active_global_tags), DDOG_CHARSLICE_C("git.commit.sha"),
319+
dd_zend_string_to_CharSlice(Z_STR(git_metadata->property_commit))));
320+
}
321+
if (Z_TYPE(git_metadata->property_repository) == IS_STRING) {
322+
UNUSED(ddog_Vec_Tag_push(&DDTRACE_G(active_global_tags), DDOG_CHARSLICE_C("git.repository_url"),
323+
dd_zend_string_to_CharSlice(Z_STR(git_metadata->property_repository))));
324+
}
325+
OBJ_RELEASE(&git_metadata->std);
326+
}
327+
}
328+
329+
ddtrace_sidecar_submit_root_span_data_direct_defaults(&ddtrace_sidecar, NULL);
330+
}
331+
332+
// Setup function called via pthread_once on first RINIT
333+
void ddtrace_sidecar_setup(bool appsec_activation, bool appsec_config) {
334+
// This is called on first RINIT when config is available
335+
// Set up the master sidecar connection with threaded connectivity
336+
ddtrace_sidecar_setup_master(appsec_activation, appsec_config);
337+
}
338+
339+
// Connect child process to master sidecar as worker after fork
340+
bool ddtrace_sidecar_connect_worker_after_fork(void) {
341+
if (!ddtrace_endpoint || ddtrace_master_pid == 0) {
342+
return false;
343+
}
344+
345+
// Connect to master as worker using threaded connectivity
346+
ddog_SidecarTransport *sidecar_transport = NULL;
347+
if (!ddtrace_ffi_try("Failed connecting worker to sidecar after fork",
348+
ddog_sidecar_connect_worker((int32_t)ddtrace_master_pid, &sidecar_transport))) {
349+
return false;
350+
}
351+
352+
char logpath[MAXPATHLEN];
353+
int error_fd = atomic_load(&ddtrace_error_log_fd);
354+
if (error_fd == -1 || ddtrace_get_fd_path(error_fd, logpath) < 0) {
355+
*logpath = 0;
356+
}
357+
358+
// Post-connect with fork flag set to true
359+
dd_sidecar_post_connect(&sidecar_transport, true, logpath);
360+
361+
// Drop old transport and use the new worker connection
362+
if (ddtrace_sidecar) {
363+
ddog_sidecar_transport_drop(ddtrace_sidecar);
364+
}
365+
ddtrace_sidecar = sidecar_transport;
366+
367+
return true;
234368
}
235369

236370
void ddtrace_sidecar_ensure_active(void) {
@@ -562,28 +696,6 @@ void ddtrace_sidecar_activate(void) {
562696
} ZEND_HASH_FOREACH_END();
563697
}
564698

565-
void ddtrace_sidecar_rinit(void) {
566-
if (get_DD_TRACE_GIT_METADATA_ENABLED()) {
567-
zval git_object;
568-
ZVAL_UNDEF(&git_object);
569-
ddtrace_inject_git_metadata(&git_object);
570-
if (Z_TYPE(git_object) == IS_OBJECT) {
571-
ddtrace_git_metadata *git_metadata = (ddtrace_git_metadata *) Z_OBJ(git_object);
572-
if (Z_TYPE(git_metadata->property_commit) == IS_STRING) {
573-
UNUSED(ddog_Vec_Tag_push(&DDTRACE_G(active_global_tags), DDOG_CHARSLICE_C("git.commit.sha"),
574-
dd_zend_string_to_CharSlice(Z_STR(git_metadata->property_commit))));
575-
}
576-
if (Z_TYPE(git_metadata->property_repository) == IS_STRING) {
577-
UNUSED(ddog_Vec_Tag_push(&DDTRACE_G(active_global_tags), DDOG_CHARSLICE_C("git.repository_url"),
578-
dd_zend_string_to_CharSlice(Z_STR(git_metadata->property_repository))));
579-
}
580-
OBJ_RELEASE(&git_metadata->std);
581-
}
582-
}
583-
584-
ddtrace_sidecar_submit_root_span_data_direct_defaults(&ddtrace_sidecar, NULL);
585-
}
586-
587699
void ddtrace_sidecar_rshutdown(void) {
588700
ddog_Vec_Tag_drop(DDTRACE_G(active_global_tags));
589701
}

ext/sidecar.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ struct telemetry_rc_info {
3030
};
3131
DDTRACE_PUBLIC struct telemetry_rc_info ddtrace_get_telemetry_rc_info(void);
3232

33+
void ddtrace_sidecar_minit(bool appsec_activation, bool appsec_config);
3334
void ddtrace_sidecar_setup(bool appsec_activation, bool appsec_config);
35+
bool ddtrace_sidecar_connect_worker_after_fork(void);
3436
bool ddtrace_sidecar_maybe_enable_appsec(bool *appsec_activation, bool *appsec_config);
3537
void ddtrace_sidecar_ensure_active(void);
3638
void ddtrace_sidecar_finalize(bool clear_id);

0 commit comments

Comments
 (0)