From ffc6d2f7bf7eb7f631466fda03aeff5283f10135 Mon Sep 17 00:00:00 2001 From: Ashley Sommer Date: Tue, 12 Aug 2025 12:38:54 +1000 Subject: [PATCH 1/2] Save lock-token from `receive_deferred_message` AMQP payload, so it can be used as the message's lock_token property. This allows the deferred message to be correctly renewed or settled (abandoned, completed, deferred again). --- .../azure-servicebus/azure/servicebus/_common/message.py | 5 +++++ .../azure/servicebus/_transport/_pyamqp_transport.py | 8 +++++++- .../azure/servicebus/_transport/_uamqp_transport.py | 8 +++++++- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 6b16db7fea71..58d88b7c71d9 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -775,6 +775,7 @@ def __init__( self._settled = receive_mode == ServiceBusReceiveMode.RECEIVE_AND_DELETE self._delivery_tag = self._amqp_transport.get_message_delivery_tag(message, frame) self._delivery_id = self._amqp_transport.get_message_delivery_id(message, frame) # only used by pyamqp + self._received_lock_token = kwargs.pop("lock_token", None) self._received_timestamp_utc = utc_now() self._is_deferred_message = kwargs.get("is_deferred_message", False) self._is_peeked_message = kwargs.get("is_peeked_message", False) @@ -1078,6 +1079,10 @@ def lock_token(self) -> Optional[Union[uuid.UUID, str]]: if self._settled: return None + if self._received_lock_token: + # this will already be a uuid.UUID + return self._received_lock_token + if self._delivery_tag: return uuid.UUID(bytes_le=self._delivery_tag) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_pyamqp_transport.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_pyamqp_transport.py index cab4f91647a5..c9a7fffaed48 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_pyamqp_transport.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_pyamqp_transport.py @@ -818,15 +818,21 @@ def parse_received_message( # pylint:disable=docstring-keyword-should-match-keyw :keyword ~azure.servicebus.ServiceBusReceiver receiver: Required. :keyword bool is_peeked_message: Optional. For peeked messages. :keyword bool is_deferred_message: Optional. For deferred messages. + :keyword uuid.UUID lock_token: Optional. Lock token, if it is given by the message receiver. :keyword ~azure.servicebus.ServiceBusReceiveMode receive_mode: Optional. :return: List of service bus received messages. :rtype: list[~azure.servicebus.ServiceBusReceivedMessage] """ + is_deferred_message = kwargs.get("is_deferred_message", False) parsed = [] if message.value: for m in message.value[b"messages"]: wrapped = decode_payload(memoryview(m[b"message"])) - parsed.append(message_type(wrapped, **kwargs)) + if is_deferred_message and b"lock-token" in m: + lock_token = m[b"lock-token"] + else: + lock_token = kwargs.pop("lock_token", None) + parsed.append(message_type(wrapped, lock_token=lock_token, **kwargs)) return parsed @staticmethod diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_uamqp_transport.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_uamqp_transport.py index 651b6fd4f12b..316b8eddaf48 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_uamqp_transport.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_transport/_uamqp_transport.py @@ -863,14 +863,20 @@ def parse_received_message( # pylint:disable=docstring-keyword-should-match-keyw :keyword ~azure.servicebus.ServiceBusReceiver receiver: Required. :keyword bool is_peeked_message: Optional. For peeked messages. :keyword bool is_deferred_message: Optional. For deferred messages. + :keyword uuid.UUID lock_token: Optional. Lock token, if it is given by the message receiver. :keyword ~azure.servicebus.ServiceBusReceiveMode receive_mode: Optional. :return: List of ServiceBusReceivedMessage. :rtype: list[~azure.servicebus.ServiceBusReceivedMessage] """ + is_deferred_message = kwargs.get("is_deferred_message", False) parsed = [] for m in message.get_data()[b"messages"]: wrapped = Message.decode_from_bytes(bytearray(m[b"message"])) - parsed.append(message_type(wrapped, **kwargs)) + if is_deferred_message and b"lock-token" in m: + lock_token = m[b"lock-token"] + else: + lock_token = kwargs.pop("lock_token", None) + parsed.append(message_type(wrapped, lock_token=lock_token, **kwargs)) return parsed @staticmethod From 48cc1cd8b8af68528aa99b8a5db88eb7bf3d8c93 Mon Sep 17 00:00:00 2001 From: Ashley Sommer Date: Tue, 12 Aug 2025 12:51:25 +1000 Subject: [PATCH 2/2] Use `kwargs.get()` instead of `kwargs.pop()` to get the optional received lock token in the constructor Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../azure-servicebus/azure/servicebus/_common/message.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 58d88b7c71d9..5ee236ad0285 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -775,7 +775,7 @@ def __init__( self._settled = receive_mode == ServiceBusReceiveMode.RECEIVE_AND_DELETE self._delivery_tag = self._amqp_transport.get_message_delivery_tag(message, frame) self._delivery_id = self._amqp_transport.get_message_delivery_id(message, frame) # only used by pyamqp - self._received_lock_token = kwargs.pop("lock_token", None) + self._received_lock_token = kwargs.get("lock_token", None) self._received_timestamp_utc = utc_now() self._is_deferred_message = kwargs.get("is_deferred_message", False) self._is_peeked_message = kwargs.get("is_peeked_message", False)