From 09fa62b4af241e0cdf73598976b841a3613d5e34 Mon Sep 17 00:00:00 2001 From: Pablo Garrido Date: Mon, 15 Nov 2021 14:28:28 +0100 Subject: [PATCH 1/6] Add uxr_run_session_until_confirm_delivery_one_stream API --- include/uxr/client/core/session/session.h | 16 ++++++++++++ src/c/core/session/session.c | 25 +++++++++++++++++++ src/c/core/session/stream/stream_storage.c | 10 ++++++++ .../session/stream/stream_storage_internal.h | 3 +++ 4 files changed, 54 insertions(+) diff --git a/include/uxr/client/core/session/session.h b/include/uxr/client/core/session/session.h index dfffe6d7..fb33d8cc 100644 --- a/include/uxr/client/core/session/session.h +++ b/include/uxr/client/core/session/session.h @@ -472,6 +472,22 @@ UXRDLLAPI bool uxr_run_session_until_timeout( UXRDLLAPI bool uxr_run_session_until_confirm_delivery( uxrSession* session, int timeout); +/** + * @brief Keeps communication between the Client and the Agent. + * This function involves the following actions: + * 1. flushing one output streams sending the data through the transport, + * 2. listening messages from the Agent calling the associated callback (topic and status). + * The aforementioned actions will be performed in a loop until a the `timeout` is exceeded + * or the output reliable streams confirm the delivery of all their messages. + * @param session A uxrSession structure previously initialized. + * @param stream A uxrOutputReliableStream structure previously initialized. + * @param timeout_ms The waiting time in milliseconds. + * @return `true` if all output reliable streams confirm the delivery of their messages. `false` in other case. + */ +UXRDLLAPI bool uxr_run_session_until_confirm_delivery_one_stream( + uxrSession* session, + const uxrOutputReliableStream* stream, + int timeout_ms); /** * @brief Keeps communication between the Client and the Agent. diff --git a/src/c/core/session/session.c b/src/c/core/session/session.c index 1df93894..a72b3211 100644 --- a/src/c/core/session/session.c +++ b/src/c/core/session/session.c @@ -401,6 +401,31 @@ bool uxr_run_session_until_confirm_delivery( return ret; } +bool uxr_run_session_until_confirm_delivery_one_stream( + uxrSession* session, + const uxrOutputReliableStream* stream, + int timeout_ms) +{ + UXR_LOCK_SESSION(session); + + uxr_flash_output_streams(session); + + int64_t start_timestamp = uxr_millis(); + int remaining_time = timeout_ms; + + do + { + listen_message_reliably(session, remaining_time); + remaining_time = timeout_ms - (int)(uxr_millis() - start_timestamp); + } + while (remaining_time > 0 && !uxr_output_one_stream_confirmed(stream)); + + bool ret = uxr_output_one_stream_confirmed(stream); + + UXR_UNLOCK_SESSION(session); + return ret; +} + bool uxr_run_session_until_all_status( uxrSession* session, int timeout_ms, diff --git a/src/c/core/session/stream/stream_storage.c b/src/c/core/session/stream/stream_storage.c index 1d9bcdc3..5d44fbbb 100644 --- a/src/c/core/session/stream/stream_storage.c +++ b/src/c/core/session/stream/stream_storage.c @@ -148,3 +148,13 @@ bool uxr_output_streams_confirmed( } return up_to_date; } + +bool uxr_output_one_stream_confirmed( + const uxrOutputReliableStream* stream) +{ + bool up_to_date = true; + UXR_LOCK((uxrMutex*) &stream->mutex); + up_to_date = uxr_is_output_up_to_date(stream); + UXR_UNLOCK((uxrMutex*) &stream->mutex); + return up_to_date; +} diff --git a/src/c/core/session/stream/stream_storage_internal.h b/src/c/core/session/stream/stream_storage_internal.h index 647b1266..96e05ca9 100644 --- a/src/c/core/session/stream/stream_storage_internal.h +++ b/src/c/core/session/stream/stream_storage_internal.h @@ -69,6 +69,9 @@ uxrInputReliableStream* uxr_get_input_reliable_stream( bool uxr_output_streams_confirmed( const uxrStreamStorage* storage); +bool uxr_output_one_stream_confirmed( + const uxrOutputReliableStream* storage); + #ifdef __cplusplus } #endif // ifdef __cplusplus From 95420109b69ab67dfcfd010ed2c81070a088e4af Mon Sep 17 00:00:00 2001 From: Pablo Garrido Date: Mon, 15 Nov 2021 14:30:00 +0100 Subject: [PATCH 2/6] Fix timing in uxr_run_session_until_confirm_delivery --- include/uxr/client/core/session/session.h | 5 +++-- src/c/core/session/session.c | 10 +++++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/include/uxr/client/core/session/session.h b/include/uxr/client/core/session/session.h index fb33d8cc..0541939f 100644 --- a/include/uxr/client/core/session/session.h +++ b/include/uxr/client/core/session/session.h @@ -466,12 +466,13 @@ UXRDLLAPI bool uxr_run_session_until_timeout( * The aforementioned actions will be performed in a loop until a the `timeout` is exceeded * or the output reliable streams confirm the delivery of all their messages. * @param session A uxrSession structure previously initialized. - * @param timeout The waiting time in milliseconds. + * @param timeout_ms The waiting time in milliseconds. * @return `true` if all output reliable streams confirm the delivery of their messages. `false` in other case. */ UXRDLLAPI bool uxr_run_session_until_confirm_delivery( uxrSession* session, - int timeout); + int timeout_ms); + /** * @brief Keeps communication between the Client and the Agent. * This function involves the following actions: diff --git a/src/c/core/session/session.c b/src/c/core/session/session.c index a72b3211..05f9516c 100644 --- a/src/c/core/session/session.c +++ b/src/c/core/session/session.c @@ -389,11 +389,15 @@ bool uxr_run_session_until_confirm_delivery( uxr_flash_output_streams(session); - bool timeout = false; - while (!uxr_output_streams_confirmed(&session->streams) && !timeout) + int64_t start_timestamp = uxr_millis(); + int remaining_time = timeout_ms; + + do { - timeout = !listen_message_reliably(session, timeout_ms); + listen_message_reliably(session, remaining_time); + remaining_time = timeout_ms - (int)(uxr_millis() - start_timestamp); } + while (remaining_time > 0 && !uxr_output_streams_confirmed(&session->streams)); bool ret = uxr_output_streams_confirmed(&session->streams); From 62590d55ac053bcf7e74d26b13ab832f12a5205b Mon Sep 17 00:00:00 2001 From: Pablo Garrido Date: Mon, 15 Nov 2021 14:36:24 +0100 Subject: [PATCH 3/6] Use streamid insteam of raw pointer --- include/uxr/client/core/session/session.h | 4 ++-- src/c/core/session/session.c | 12 +++++++++++- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/include/uxr/client/core/session/session.h b/include/uxr/client/core/session/session.h index 0541939f..f1493991 100644 --- a/include/uxr/client/core/session/session.h +++ b/include/uxr/client/core/session/session.h @@ -481,13 +481,13 @@ UXRDLLAPI bool uxr_run_session_until_confirm_delivery( * The aforementioned actions will be performed in a loop until a the `timeout` is exceeded * or the output reliable streams confirm the delivery of all their messages. * @param session A uxrSession structure previously initialized. - * @param stream A uxrOutputReliableStream structure previously initialized. + * @param stream A uxrStreamId previously initialized. * @param timeout_ms The waiting time in milliseconds. * @return `true` if all output reliable streams confirm the delivery of their messages. `false` in other case. */ UXRDLLAPI bool uxr_run_session_until_confirm_delivery_one_stream( uxrSession* session, - const uxrOutputReliableStream* stream, + const uxrStreamId stream, int timeout_ms); /** diff --git a/src/c/core/session/session.c b/src/c/core/session/session.c index 05f9516c..94987352 100644 --- a/src/c/core/session/session.c +++ b/src/c/core/session/session.c @@ -407,11 +407,21 @@ bool uxr_run_session_until_confirm_delivery( bool uxr_run_session_until_confirm_delivery_one_stream( uxrSession* session, - const uxrOutputReliableStream* stream, + const uxrStreamId streamid, int timeout_ms) { + if (streamid.direction != UXR_OUTPUT_STREAM || + streamid.type != UXR_RELIABLE_STREAM || + streamid.index >= session->streams.output_reliable_size) + { + return false; + } + UXR_LOCK_SESSION(session); + const uxrOutputReliableStream * stream = + &session->streams.output_reliable[streamid.index]; + uxr_flash_output_streams(session); int64_t start_timestamp = uxr_millis(); From 1e2194ef5b4f5f7f240a2aefcb3de95d46fd609d Mon Sep 17 00:00:00 2001 From: Pablo Garrido Date: Mon, 15 Nov 2021 14:50:46 +0100 Subject: [PATCH 4/6] Flush only one stream --- include/uxr/client/core/session/session.h | 9 ++++ src/c/core/session/session.c | 57 ++++++++++++++++++++--- 2 files changed, 59 insertions(+), 7 deletions(-) diff --git a/include/uxr/client/core/session/session.h b/include/uxr/client/core/session/session.h index f1493991..873c3d18 100644 --- a/include/uxr/client/core/session/session.h +++ b/include/uxr/client/core/session/session.h @@ -399,6 +399,15 @@ UXRDLLAPI uxrStreamId uxr_create_input_reliable_stream( UXRDLLAPI void uxr_flash_output_streams( uxrSession* session); +/** + * @brief Flashes one output stream seding the data through the transport. + * @param session A uxrSession structure previously initialized. + * @param stream_id A uxrStreamId structure previously initialized. + */ +UXRDLLAPI void uxr_flash_one_output_stream( + const uxrSession* session, + const uxrStreamId stream_id); + /** * @brief Keeps communication between the Client and the Agent. * This function involves the following actions: diff --git a/src/c/core/session/session.c b/src/c/core/session/session.c index 94987352..0d5a3282 100644 --- a/src/c/core/session/session.c +++ b/src/c/core/session/session.c @@ -407,22 +407,22 @@ bool uxr_run_session_until_confirm_delivery( bool uxr_run_session_until_confirm_delivery_one_stream( uxrSession* session, - const uxrStreamId streamid, + const uxrStreamId stream_id, int timeout_ms) { - if (streamid.direction != UXR_OUTPUT_STREAM || - streamid.type != UXR_RELIABLE_STREAM || - streamid.index >= session->streams.output_reliable_size) + if (stream_id.direction != UXR_OUTPUT_STREAM || + stream_id.type != UXR_RELIABLE_STREAM || + stream_id.index >= session->streams.output_reliable_size) { return false; } UXR_LOCK_SESSION(session); - const uxrOutputReliableStream * stream = - &session->streams.output_reliable[streamid.index]; + const uxrOutputReliableStream* stream = + &session->streams.output_reliable[stream_id.index]; - uxr_flash_output_streams(session); + uxr_flash_one_output_stream(session, stream_id); int64_t start_timestamp = uxr_millis(); int remaining_time = timeout_ms; @@ -643,6 +643,49 @@ void uxr_flash_output_streams( } } +void uxr_flash_one_output_stream( + const uxrSession* session, + const uxrStreamId stream_id) +{ + UXR_HANDLE_SHARED_MEMORY(); + + if (stream_id.direction == UXR_OUTPUT_STREAM) + { + if (stream_id.type == UXR_BEST_EFFORT_STREAM) + { + uxrOutputBestEffortStream* stream = &session->streams.output_best_effort[stream_id.index]; + + uint8_t* buffer; size_t length; uxrSeqNum seq_num; + + UXR_LOCK_STREAM_ID(session, stream_id); + + if (uxr_prepare_best_effort_buffer_to_send(stream, &buffer, &length, &seq_num)) + { + uxr_stamp_session_header(&session->info, stream_id.raw, seq_num, buffer); + send_message(session, buffer, length); + } + + UXR_UNLOCK_STREAM_ID(session, stream_id); + } + else if (stream_id.type == UXR_RELIABLE_STREAM) + { + uxrOutputReliableStream* stream = &session->streams.output_reliable[stream_id.index]; + + uint8_t* buffer; size_t length; uxrSeqNum seq_num; + + UXR_LOCK_STREAM_ID(session, stream_id); + + while (uxr_prepare_next_reliable_buffer_to_send(stream, &buffer, &length, &seq_num)) + { + uxr_stamp_session_header(&session->info, stream_id.raw, seq_num, buffer); + send_message(session, buffer, length); + } + + UXR_UNLOCK_STREAM_ID(session, stream_id); + } + } +} + //================================================================== // PRIVATE //================================================================== From 393f55a215bd8a085219bc72cc4824c03c6e4b7b Mon Sep 17 00:00:00 2001 From: Pablo Garrido Date: Mon, 15 Nov 2021 15:01:04 +0100 Subject: [PATCH 5/6] Fix wranings --- include/uxr/client/core/session/session.h | 2 +- src/c/core/session/session.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/include/uxr/client/core/session/session.h b/include/uxr/client/core/session/session.h index 873c3d18..5af608f2 100644 --- a/include/uxr/client/core/session/session.h +++ b/include/uxr/client/core/session/session.h @@ -405,7 +405,7 @@ UXRDLLAPI void uxr_flash_output_streams( * @param stream_id A uxrStreamId structure previously initialized. */ UXRDLLAPI void uxr_flash_one_output_stream( - const uxrSession* session, + uxrSession* session, const uxrStreamId stream_id); /** diff --git a/src/c/core/session/session.c b/src/c/core/session/session.c index 0d5a3282..20572c28 100644 --- a/src/c/core/session/session.c +++ b/src/c/core/session/session.c @@ -644,7 +644,7 @@ void uxr_flash_output_streams( } void uxr_flash_one_output_stream( - const uxrSession* session, + uxrSession* session, const uxrStreamId stream_id) { UXR_HANDLE_SHARED_MEMORY(); From 6054faa1bddbbfac0e07689be9a359de28ec7dd1 Mon Sep 17 00:00:00 2001 From: Pablo Garrido Date: Mon, 15 Nov 2021 15:10:59 +0100 Subject: [PATCH 6/6] Update include/uxr/client/core/session/session.h Co-authored-by: Antonio Cuadros <49162117+Acuadros95@users.noreply.github.com> --- include/uxr/client/core/session/session.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/uxr/client/core/session/session.h b/include/uxr/client/core/session/session.h index 5af608f2..4112c5c4 100644 --- a/include/uxr/client/core/session/session.h +++ b/include/uxr/client/core/session/session.h @@ -492,7 +492,7 @@ UXRDLLAPI bool uxr_run_session_until_confirm_delivery( * @param session A uxrSession structure previously initialized. * @param stream A uxrStreamId previously initialized. * @param timeout_ms The waiting time in milliseconds. - * @return `true` if all output reliable streams confirm the delivery of their messages. `false` in other case. + * @return `true` if given output reliable stream confirms the delivery of his messages. `false` in other case. */ UXRDLLAPI bool uxr_run_session_until_confirm_delivery_one_stream( uxrSession* session,