Skip to content

Commit 49e227c

Browse files
committed
fix: initialize mqtt version from the create options
Signed-off-by: Simon Berger <[email protected]>
1 parent 4c39d4a commit 49e227c

File tree

1 file changed

+17
-1
lines changed

1 file changed

+17
-1
lines changed

src/async_client.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ impl AsyncClient {
170170

171171
let mut cli = InnerAsyncClient {
172172
handle: ptr::null_mut(),
173-
mqtt_version: AtomicU32::new(MQTT_VERSION_DEFAULT),
173+
mqtt_version: AtomicU32::new(opts.mqtt_version_raw()),
174174
opts: Mutex::new(ConnectOptions::new()),
175175
callback_context: Mutex::new(CallbackContext::default()),
176176
server_uri: CString::new(opts.server_uri)?,
@@ -1285,6 +1285,7 @@ impl Drop for InnerAsyncClient {
12851285
mod tests {
12861286
use super::*;
12871287
use crate::create_options::CreateOptionsBuilder;
1288+
use crate::MessageBuilder;
12881289
use std::sync::{Arc, Mutex, RwLock};
12891290
use std::thread;
12901291

@@ -1470,4 +1471,19 @@ mod tests {
14701471
let retrieved = client.unwrap().server_uri();
14711472
assert_eq!(retrieved, server_uri.to_string());
14721473
}
1474+
1475+
// See: <https://github.com/eclipse-paho/paho.mqtt.rust/pull/257>
1476+
#[test]
1477+
fn test_publish_before_connect() {
1478+
let server_uri = "tcp://localhost:1883";
1479+
let client = CreateOptionsBuilder::new()
1480+
.server_uri(server_uri)
1481+
.mqtt_version(MqttVersion::V5)
1482+
.send_while_disconnected(true)
1483+
.allow_disconnected_send_at_anytime(true)
1484+
.create_client()
1485+
.unwrap();
1486+
let res = client.try_publish(MessageBuilder::new().topic("test").payload(&[]).finalize());
1487+
assert!(res.is_ok(), "Failed to publish message before connecting");
1488+
}
14731489
}

0 commit comments

Comments
 (0)