Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions sdk/servicebus/azure_messaging_servicebus/src/clients/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

//! Clients used to communicate with Azure Service Bus

mod servicebus_client;

pub use servicebus_client::{
CreateReceiverOptions, CreateSenderOptions, ServiceBusClient, ServiceBusClientBuilder,
ServiceBusClientOptions, SubQueue,
};
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub enum SubQueue {

impl SubQueue {
/// Returns the path suffix for the sub-queue.
pub fn as_path_suffix(&self) -> &'static str {
pub(crate) fn as_path_suffix(&self) -> &'static str {
match self {
SubQueue::DeadLetter => "/$DeadLetterQueue",
SubQueue::Transfer => "/$Transfer/$DeadLetterQueue",
Expand All @@ -52,7 +52,7 @@ pub struct ServiceBusClientOptions {
impl Default for ServiceBusClientOptions {
fn default() -> Self {
Self {
api_version: "2017-04".to_string(), // Default Service Bus API version
api_version: "2024-01-01".to_string(), // Default Service Bus API version
client_options: ClientOptions::default(),
application_id: None,
}
Expand Down
73 changes: 65 additions & 8 deletions sdk/servicebus/azure_messaging_servicebus/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ impl fmt::Display for ErrorKind {
}

/// A Service Bus specific error.
#[derive(SafeDebug, Clone, PartialEq, Eq)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error type should just be Debug not SafeDebug. All the information is kinda important and you should make sure nothing inside it otherwise leaks. typespec::Error impl Debug.

#[derive(SafeDebug)]
pub struct ServiceBusError {
kind: ErrorKind,
message: String,
source: Option<Box<ServiceBusError>>,
source: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add Send, Sync, and 'static constraints? That makes it pretty difficult to send any error since not all implement those traits.

}

impl ServiceBusError {
Expand All @@ -74,7 +74,7 @@ impl ServiceBusError {
pub fn with_source(
kind: ErrorKind,
message: impl Into<String>,
source: ServiceBusError,
source: impl std::error::Error + Send + Sync + 'static,
) -> Self {
Self {
kind,
Expand All @@ -94,8 +94,10 @@ impl ServiceBusError {
}

/// Returns the source error, if any.
pub fn source(&self) -> Option<&ServiceBusError> {
self.source.as_deref()
pub fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have a source() function on ServiceBusError which is a different implementation from the trait version? The two appear to be basically identical.

self.source
.as_ref()
.map(|e| e.as_ref() as &(dyn std::error::Error + 'static))
}
}

Expand All @@ -107,7 +109,9 @@ impl fmt::Display for ServiceBusError {

impl std::error::Error for ServiceBusError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.source.as_ref().map(|e| e as &dyn std::error::Error)
self.source
.as_ref()
.map(|e| e.as_ref() as &(dyn std::error::Error + 'static))
}
}

Expand All @@ -123,12 +127,65 @@ impl From<azure_core::error::Error> for ServiceBusError {
_ => ErrorKind::Unknown,
};

ServiceBusError::new(kind, error.to_string())
ServiceBusError::with_source(kind, error.to_string(), error)
}
}

impl From<azure_core_amqp::AmqpError> for ServiceBusError {
fn from(error: azure_core_amqp::AmqpError) -> Self {
ServiceBusError::new(ErrorKind::Amqp, error.to_string())
ServiceBusError::with_source(ErrorKind::Amqp, error.to_string(), error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you expect to provide descriptive texts for servicebus errors generated from other errors? If your general practice is to just use .to_string(), there's not a ton of value in the message parameter.

}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_servicebus_error_can_store_any_std_error() {
// Test that we can store any std::error::Error as a source
let io_error = std::io::Error::other("test error");
let service_bus_error =
ServiceBusError::with_source(ErrorKind::Unknown, "wrapper error", io_error);

assert_eq!(service_bus_error.kind(), &ErrorKind::Unknown);
assert_eq!(service_bus_error.message(), "wrapper error");
assert!(service_bus_error.source().is_some());

// Verify the source can be downcast to the original error type
let source = service_bus_error.source().unwrap();
assert!(source.downcast_ref::<std::io::Error>().is_some());
}

#[test]
fn test_servicebus_error_implements_std_error() {
let error = ServiceBusError::new(ErrorKind::InvalidRequest, "test message");

// Should implement std::error::Error
let _: &dyn std::error::Error = &error;

// Should return None for source when no source is set
assert!(error.source().is_none());
}

#[test]
fn test_servicebus_error_with_chain() {
let inner_error = std::io::Error::other("inner error");
let middle_error =
ServiceBusError::with_source(ErrorKind::Amqp, "middle error", inner_error);
let outer_error =
ServiceBusError::with_source(ErrorKind::Unknown, "outer error", middle_error);

// Check that we can traverse the error chain
assert_eq!(outer_error.kind(), &ErrorKind::Unknown);
assert_eq!(outer_error.message(), "outer error");

let source = outer_error.source().unwrap();
let middle_as_servicebus = source.downcast_ref::<ServiceBusError>().unwrap();
assert_eq!(middle_as_servicebus.kind(), &ErrorKind::Amqp);
assert_eq!(middle_as_servicebus.message(), "middle error");

let inner_source = middle_as_servicebus.source().unwrap();
assert!(inner_source.downcast_ref::<std::io::Error>().is_some());
}
}
4 changes: 2 additions & 2 deletions sdk/servicebus/azure_messaging_servicebus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#![cfg_attr(docsrs, feature(doc_auto_cfg))]

/// Service Bus client
pub mod client;
pub mod clients;
mod error;
mod message;
/// Service Bus message receiving functionality and options.
Expand All @@ -22,7 +22,7 @@ pub mod models;
/// Common types and utilities.
mod common;

pub use client::{
pub use clients::{
CreateReceiverOptions, CreateSenderOptions, ServiceBusClient, ServiceBusClientBuilder,
ServiceBusClientOptions, SubQueue,
};
Expand Down
72 changes: 51 additions & 21 deletions sdk/servicebus/azure_messaging_servicebus/src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@
//! ```

use crate::{
client::ServiceBusClientOptions, message::SystemProperties, ErrorKind, ReceivedMessage, Result,
ServiceBusError,
clients::ServiceBusClientOptions, message::SystemProperties, ErrorKind, ReceivedMessage,
Result, ServiceBusError,
};
use async_lock::{Mutex, OnceCell};
use azure_core::{fmt::SafeDebug, time::Duration, time::OffsetDateTime, Uuid};
Expand All @@ -113,6 +113,33 @@ use futures::{select, FutureExt};
use std::{collections::HashMap, sync::Arc};
use tracing::{debug, trace, warn};

/// Property key for lock token in Service Bus management operations.
const LOCK_TOKEN_PROPERTY_KEY: &str = "lock-token";

/// Property key for sequence numbers in Service Bus management operations.
const SEQUENCE_NUMBERS_PROPERTY_KEY: &str = "sequence-numbers";

/// Property key for receiver settle mode in Service Bus management operations.
const RECEIVER_SETTLE_MODE_PROPERTY_KEY: &str = "receiver-settle-mode";

/// Property key for message count in Service Bus management operations.
const MESSAGE_COUNT_PROPERTY_KEY: &str = "message-count";

/// Property key for from sequence number in Service Bus management operations.
const FROM_SEQUENCE_NUMBER_PROPERTY_KEY: &str = "from-sequence-number";

/// Service Bus management operation name for deferring messages.
const DEFER_MESSAGE_OPERATION: &str = "com.microsoft:defer-message";

/// Service Bus management operation name for receiving messages by sequence number.
const RECEIVE_BY_SEQUENCE_NUMBER_OPERATION: &str = "com.microsoft:receive-by-sequence-number";

/// Service Bus management operation name for renewing message locks.
const RENEW_LOCK_OPERATION: &str = "com.microsoft:renew-lock";

/// Service Bus management operation name for peeking messages.
const PEEK_MESSAGE_OPERATION: &str = "com.microsoft:peek-message";

/// Represents the lock style to use for a receiver - either `PeekLock` or `ReceiveAndDelete`.
///
/// This enum controls when a message is deleted from Service Bus and determines how message
Expand Down Expand Up @@ -1291,7 +1318,10 @@ impl Receiver {
> = azure_core_amqp::AmqpOrderedMap::new();

// Add the lock token as a string representation
application_properties.insert("lock-token".to_string(), lock_token.to_string().into());
application_properties.insert(
LOCK_TOKEN_PROPERTY_KEY.to_string(),
lock_token.to_string().into(),
);

// Add any properties to modify if specified
if let Some(properties) = options
Expand All @@ -1304,10 +1334,7 @@ impl Receiver {
}

let _response = management_client
.call(
"com.microsoft:defer-message".to_string(),
application_properties,
)
.call(DEFER_MESSAGE_OPERATION.to_string(), application_properties)
.await
.map_err(|e| {
ServiceBusError::new(ErrorKind::Amqp, format!("Failed to defer message: {:?}", e))
Expand Down Expand Up @@ -1515,18 +1542,24 @@ impl Receiver {
.collect::<Vec<_>>()
.join(",");

application_properties.insert("sequence-numbers".to_string(), sequence_numbers_str.into());
application_properties.insert(
SEQUENCE_NUMBERS_PROPERTY_KEY.to_string(),
sequence_numbers_str.into(),
);

// Set receiver settle mode based on receive mode
let settle_mode = match self.receive_mode {
ReceiveMode::PeekLock => 1u32,
ReceiveMode::ReceiveAndDelete => 0u32,
};
application_properties.insert("receiver-settle-mode".to_string(), settle_mode.into());
application_properties.insert(
RECEIVER_SETTLE_MODE_PROPERTY_KEY.to_string(),
settle_mode.into(),
);

let response = management_client
.call(
"com.microsoft:receive-by-sequence-number".to_string(),
RECEIVE_BY_SEQUENCE_NUMBER_OPERATION.to_string(),
application_properties,
)
.await
Expand Down Expand Up @@ -1716,13 +1749,13 @@ impl Receiver {
> = azure_core_amqp::AmqpOrderedMap::new();

// Add the lock token as a string representation
application_properties.insert("lock-token".to_string(), lock_token.to_string().into());
application_properties.insert(
LOCK_TOKEN_PROPERTY_KEY.to_string(),
lock_token.to_string().into(),
);

let response = management_client
.call(
"com.microsoft:renew-lock".to_string(),
application_properties,
)
.call(RENEW_LOCK_OPERATION.to_string(), application_properties)
.await
.map_err(|e| {
ServiceBusError::new(
Expand Down Expand Up @@ -1922,21 +1955,18 @@ impl Receiver {
> = azure_core_amqp::AmqpOrderedMap::new();

// Set the maximum number of messages to peek
application_properties.insert("message-count".to_string(), max_count.into());
application_properties.insert(MESSAGE_COUNT_PROPERTY_KEY.to_string(), max_count.into());

// Set the starting sequence number if provided
if let Some(from_sequence_number) = options.as_ref().and_then(|o| o.from_sequence_number) {
application_properties.insert(
"from-sequence-number".to_string(),
FROM_SEQUENCE_NUMBER_PROPERTY_KEY.to_string(),
from_sequence_number.into(),
);
}

let response = management_client
.call(
"com.microsoft:peek-message".to_string(),
application_properties,
)
.call(PEEK_MESSAGE_OPERATION.to_string(), application_properties)
.await
.map_err(|e| {
ServiceBusError::new(ErrorKind::Amqp, format!("Failed to peek messages: {:?}", e))
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/azure_messaging_servicebus/src/sender.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All Rights reserved
// Licensed under the MIT license.

use crate::{client::ServiceBusClientOptions, ErrorKind, Message, Result, ServiceBusError};
use crate::{clients::ServiceBusClientOptions, ErrorKind, Message, Result, ServiceBusError};
use azure_core::fmt::SafeDebug;
use azure_core_amqp::{
AmqpConnection, AmqpMessage, AmqpSender, AmqpSenderApis, AmqpSession, AmqpSessionApis,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -50,44 +50,6 @@ $resourceGroup = $DeploymentOutputs['RESOURCE_GROUP']
Write-Host "Service Bus Namespace: $namespaceName"
Write-Host "Resource Group: $resourceGroup"

# Retrieve connection strings (these contain secrets so aren't in Bicep outputs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that you've removed everything that uses az CLI, 1) you should get rid of lines 39-41 because they are unnecessary and create an additional unnecessary dependency, but then 2) why have this script at all? It does nothing useful. Just delete it.

Write-Host "Retrieving Service Bus connection strings..."

try {
$connectionString = az servicebus namespace authorization-rule keys list `
--resource-group $resourceGroup `
--namespace-name $namespaceName `
--name RootManageSharedAccessKey `
--query primaryConnectionString `
--output tsv

$listenOnlyConnectionString = az servicebus namespace authorization-rule keys list `
--resource-group $resourceGroup `
--namespace-name $namespaceName `
--name ListenOnly `
--query primaryConnectionString `
--output tsv

$sendOnlyConnectionString = az servicebus namespace authorization-rule keys list `
--resource-group $resourceGroup `
--namespace-name $namespaceName `
--name SendOnly `
--query primaryConnectionString `
--output tsv

Write-Host "✅ Connection strings retrieved successfully"

# Set additional outputs for the test pipeline
if ($CI) {
Write-Host "##vso[task.setvariable variable=SERVICEBUS_CONNECTION_STRING;issecret=true]$connectionString"
Write-Host "##vso[task.setvariable variable=SERVICEBUS_LISTEN_ONLY_CONNECTION_STRING;issecret=true]$listenOnlyConnectionString"
Write-Host "##vso[task.setvariable variable=SERVICEBUS_SEND_ONLY_CONNECTION_STRING;issecret=true]$sendOnlyConnectionString"
}
}
catch {
Write-Warning "Failed to retrieve connection strings: $($_.Exception.Message)"
}

Write-Host "##[endgroup]"

Write-Host "Service Bus post-deployment setup completed successfully."
Loading