diff --git a/core/sdk/src/utils/channel_utils.rs b/core/sdk/src/utils/channel_utils.rs index 3b193cfeb..3fb41b1f6 100644 --- a/core/sdk/src/utils/channel_utils.rs +++ b/core/sdk/src/utils/channel_utils.rs @@ -23,6 +23,7 @@ pub async fn mpsc_send_and_log( message: T, channel_id: &str, ) { + println!("@@@NNA....mpsc send fn"); match tx.send(message).await { Ok(_) => { trace!("Successfully sent message through mpsc channel") diff --git a/device/thunder/src/bootstrap/boot_thunder_channel.rs b/device/thunder/src/bootstrap/boot_thunder_channel.rs index fb753c9a7..f0bae2283 100644 --- a/device/thunder/src/bootstrap/boot_thunder_channel.rs +++ b/device/thunder/src/bootstrap/boot_thunder_channel.rs @@ -17,7 +17,6 @@ use thunder_ripple_sdk::{ bootstrap::boot_thunder::boot_thunder, - client::plugin_manager::{ThunderPluginBootParam, ThunderPluginParam}, ripple_sdk::{extn::client::extn_client::ExtnClient, log::info}, }; @@ -25,10 +24,6 @@ pub async fn boot_thunder_channel(state: ExtnClient) { info!("Booting thunder"); let _ = boot_thunder( state, - ThunderPluginBootParam { - activate_on_boot: ThunderPluginParam::Default, - expected: ThunderPluginParam::Default, - }, ) .await; } diff --git a/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs b/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs index 6e8837ae0..313e4fe2c 100644 --- a/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs +++ b/device/thunder_ripple_sdk/src/bootstrap/boot_thunder.rs @@ -17,7 +17,7 @@ use crate::{ bootstrap::setup_thunder_processors::SetupThunderProcessor, - client::plugin_manager::ThunderPluginBootParam, thunder_state::ThunderBootstrapStateWithClient, + thunder_state::ThunderBootstrapStateWithClient, }; use ripple_sdk::{ api::manifest::device_manifest::DeviceManifest, @@ -45,7 +45,6 @@ fn gateway_default() -> String { pub async fn boot_thunder( ext_client: ExtnClient, - _plugin_param: ThunderPluginBootParam, device_manifest: &DeviceManifest, ) -> Option { info!("Booting thunder initiated"); @@ -94,16 +93,8 @@ pub async fn boot_thunder( gateway_url.set_host(Some(&host_override)).ok(); } - if let Ok(thndr_client) = ThunderClientBuilder::start_thunder_client( - gateway_url.clone(), - None, - None, - None, - None, - true, - status_check, - ) - .await + if let Ok(thndr_client) = + ThunderClientBuilder::start_thunder_client(gateway_url.clone(), status_check).await { let thunder_state = ThunderState::new(ext_client.clone(), thndr_client); diff --git a/device/thunder_ripple_sdk/src/bootstrap/get_config_step.rs b/device/thunder_ripple_sdk/src/bootstrap/get_config_step.rs deleted file mode 100644 index d0e28207d..000000000 --- a/device/thunder_ripple_sdk/src/bootstrap/get_config_step.rs +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright 2023 Comcast Cable Communications Management, LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// SPDX-License-Identifier: Apache-2.0 -// - -use crate::{ - client::plugin_manager::ThunderPluginBootParam, - thunder_state::{ThunderBootstrapStateWithConfig, ThunderConnectionState}, -}; -use std::sync::Arc; - -use ripple_sdk::extn::{ - client::extn_client::ExtnClient, - extn_client_message::{ExtnMessage, ExtnResponse}, -}; -use ripple_sdk::{ - api::config::Config, - log::{debug, warn}, - serde_json::{self, Error}, - utils::error::RippleError, -}; -use serde::Deserialize; -pub struct ThunderGetConfigStep; - -const GATEWAY_DEFAULT: &str = "ws://127.0.0.1:9998/jsonrpc"; -const POOL_SIZE_DEFAULT: u32 = 5; - -#[derive(Deserialize, Clone)] -pub struct ThunderPlatformParameters { - #[serde(default = "gateway_default")] - gateway: String, - #[serde(default = "pool_size_default")] - pool_size: u32, -} - -fn gateway_default() -> String { - String::from(GATEWAY_DEFAULT) -} - -fn pool_size_default() -> u32 { - POOL_SIZE_DEFAULT -} - -impl ThunderGetConfigStep { - pub fn get_name() -> String { - "ThunderGetConfigStep".into() - } - - pub async fn setup( - mut state: ExtnClient, - expected_plugins: ThunderPluginBootParam, - ) -> Result { - debug!("Requesting Platform parameters"); - let extn_message_response: Result = - state.request(Config::PlatformParameters).await; - if let Ok(message) = extn_message_response { - if let Some(ExtnResponse::Value(v)) = message.payload.extract() { - let mut pool_size = POOL_SIZE_DEFAULT; - let tp_res: Result = serde_json::from_value(v); - let mut gateway_url = url::Url::parse(GATEWAY_DEFAULT).unwrap(); - if let Ok(thunder_parameters) = tp_res { - pool_size = thunder_parameters.pool_size; - if let Ok(gurl) = url::Url::parse(&thunder_parameters.gateway) { - debug!("Got url from device manifest"); - gateway_url = gurl - } else { - warn!( - "Could not parse thunder gateway '{}', using default {}", - thunder_parameters.gateway, GATEWAY_DEFAULT - ); - } - } else { - warn!( - "Could not read thunder platform parameters, using default {}", - GATEWAY_DEFAULT - ); - } - if let Ok(host_override) = std::env::var("DEVICE_HOST") { - gateway_url.set_host(Some(&host_override)).ok(); - } - return Ok(ThunderBootstrapStateWithConfig { - extn_client: state, - url: gateway_url, - pool_size: Some(pool_size), - plugin_param: Some(expected_plugins), - thunder_connection_state: Some(Arc::new(ThunderConnectionState::new())), - }); - } - } - - Err(RippleError::BootstrapError) - } -} diff --git a/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_pool_step.rs b/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_pool_step.rs deleted file mode 100644 index cb94cd49b..000000000 --- a/device/thunder_ripple_sdk/src/bootstrap/setup_thunder_pool_step.rs +++ /dev/null @@ -1,146 +0,0 @@ -// Copyright 2023 Comcast Cable Communications Management, LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// SPDX-License-Identifier: Apache-2.0 -// - -use std::time::Duration; - -use ripple_sdk::{ - api::status_update::ExtnStatus, - log::{error, info, warn}, - utils::error::RippleError, -}; - -use crate::{ - client::{plugin_manager::PluginManager, thunder_client_pool::ThunderClientPool}, - thunder_state::{ - ThunderBootstrapStateWithClient, ThunderBootstrapStateWithConfig, ThunderState, - }, -}; - -pub struct ThunderPoolStep; - -impl ThunderPoolStep { - pub fn get_name() -> String { - "ThunderPoolStep".into() - } - - pub async fn setup( - state: ThunderBootstrapStateWithConfig, - status_check: bool, - ) -> Result { - let url = state.url.clone(); - let thunder_connection_state = state.thunder_connection_state.clone(); - let pool_size = match state.pool_size { - Some(s) => s, - None => { - warn!("Pool size of 1 is not recommended, there will be no dedicated connection for Controller events"); - return Err(RippleError::BootstrapError); - } - }; - - let controller_pool = ripple_sdk::tokio::time::timeout( - Duration::from_secs(10), - ThunderClientPool::start( - url.clone(), - None, - thunder_connection_state.clone(), - 1, - status_check, - ), - ) - .await; - - let controller_pool = match controller_pool { - Ok(Ok(thunder_client)) => thunder_client, - Ok(Err(e)) => { - error!("Fatal Thunder Unavailability Error: Ripple connection with Thunder is intermittent causing bootstrap errors."); - let _ = state.extn_client.event(ExtnStatus::Error); - return Err(e); - } - Err(_) => { - error!("Timed out waiting for starting ThunderClientPool."); - let _ = state.extn_client.event(ExtnStatus::Error); - return Err(RippleError::BootstrapError); - } - }; - - info!("Received Controller pool"); - let expected_plugins = match state.plugin_param.clone() { - Some(plugins) => plugins, - None => { - error!("Expected plugins are not provided."); - return Err(RippleError::BootstrapError); - } - }; - let tc = Box::new(controller_pool); - let (plugin_manager_tx, failed_plugins) = - PluginManager::start(tc, expected_plugins.clone()).await; - - if !failed_plugins.is_empty() { - error!( - "Mandatory Plugin activation for {:?} failed. Thunder Bootstrap delayed...", - failed_plugins - ); - loop { - let failed_plugins = PluginManager::activate_mandatory_plugins( - expected_plugins.clone(), - plugin_manager_tx.clone(), - ) - .await; - if !failed_plugins.is_empty() { - error!( - "Mandatory Plugin activation for {:?} failed. Thunder Bootstrap delayed...", - failed_plugins - ); - let _ = state.extn_client.event(ExtnStatus::Interrupted); - continue; - } else { - break; - } - } - } - - let client = ThunderClientPool::start( - url.clone(), - Some(plugin_manager_tx), - thunder_connection_state.clone(), - pool_size - 1, - status_check, - ) - .await; - - let client = match client { - Ok(client) => client, - Err(e) => { - error!("Fatal Thunder Unavailability Error: Ripple connection with Thunder is intermittent causing bootstrap errors."); - let _ = state.extn_client.event(ExtnStatus::Error); - return Err(e); - } - }; - - info!("Thunder client connected successfully"); - - let extn_client = state.extn_client.clone(); - let thunder_boot_strap_state_with_client = ThunderBootstrapStateWithClient { - prev: state, - state: ThunderState::new(extn_client, client), - }; - thunder_boot_strap_state_with_client - .state - .start_event_thread(); - Ok(thunder_boot_strap_state_with_client) - } -} diff --git a/device/thunder_ripple_sdk/src/client/device_operator.rs b/device/thunder_ripple_sdk/src/client/device_operator.rs index 91196f29c..9431d2461 100644 --- a/device/thunder_ripple_sdk/src/client/device_operator.rs +++ b/device/thunder_ripple_sdk/src/client/device_operator.rs @@ -36,7 +36,7 @@ pub trait DeviceOperator: Clone { handler: mpsc::Sender, ) -> Result; - async fn unsubscribe(&self, request: DeviceUnsubscribeRequest); + //async fn unsubscribe(&self, request: DeviceUnsubscribeRequest); //not used anywhere commented out for now. } #[derive(Debug, Clone)] diff --git a/device/thunder_ripple_sdk/src/client/plugin_manager.rs b/device/thunder_ripple_sdk/src/client/plugin_manager.rs index 11e12616e..692d6157c 100644 --- a/device/thunder_ripple_sdk/src/client/plugin_manager.rs +++ b/device/thunder_ripple_sdk/src/client/plugin_manager.rs @@ -472,102 +472,102 @@ impl PluginManager { #[cfg(test)] mod tests { use super::*; - use crate::client::thunder_client_pool::ThunderClientPool; - use crate::tests::thunder_client_pool_test_utility::{ - CustomMethodHandler, MockWebSocketServer, - }; - use crate::thunder_state::ThunderConnectionState; - use ripple_sdk::tokio::time::{sleep, Duration}; - use std::sync::Arc; - use url::Url; - - #[tokio::test] - async fn test_plugin_manager_start_and_message_handling() { - // Using the default method handler from tests::thunder_client_pool_test_utility - // This can be replaced with a custom method handler, if needed - let custom_method_handler = Arc::new(CustomMethodHandler); - let custom_method_handler_c = custom_method_handler.clone(); - - let server_task = tokio::spawn(async { - let mock_server = MockWebSocketServer::new("127.0.0.1:8081", custom_method_handler_c); - mock_server.start().await; - }); - - // Wait for the server to start - sleep(Duration::from_secs(1)).await; - - let url = Url::parse("ws://127.0.0.1:8081/jsonrpc").unwrap(); - - let controller_pool = ThunderClientPool::start( - url.clone(), - None, - Some(Arc::new(ThunderConnectionState::new())), - 1, - false, - ) - .await; - assert!(controller_pool.is_ok()); - - let controller_pool = controller_pool.unwrap(); - - let expected_plugins = ThunderPluginBootParam { - expected: ThunderPluginParam::None, - activate_on_boot: ThunderPluginParam::None, - }; + // use crate::tests::thunder_client_pool_test_utility::{ + // CustomMethodHandler, MockWebSocketServer, + // }; + // use crate::thunder_state::ThunderConnectionState; + // use ripple_sdk::tokio::time::{sleep, Duration}; + // use std::sync::Arc; + // use url::Url; + + // #[tokio::test] + // async fn test_plugin_manager_start_and_message_handling() { + // // Using the default method handler from tests::thunder_client_pool_test_utility + // // This can be replaced with a custom method handler, if needed + // let custom_method_handler = Arc::new(CustomMethodHandler); + // let custom_method_handler_c = custom_method_handler.clone(); + + // let server_task = tokio::spawn(async { + // let mock_server = MockWebSocketServer::new("127.0.0.1:8081", custom_method_handler_c); + // mock_server.start().await; + // }); + + // // Wait for the server to start + // sleep(Duration::from_secs(1)).await; + + // let url = Url::parse("ws://127.0.0.1:8081/jsonrpc").unwrap(); + + // let controller_pool = ThunderClientPool::start( + // url.clone(), + // None, + // Some(Arc::new(ThunderConnectionState::new())), + // 1, + // false, + // ) + // .await; + // assert!(controller_pool.is_ok()); + + // let controller_pool = controller_pool.unwrap(); + + // let expected_plugins = ThunderPluginBootParam { + // expected: ThunderPluginParam::None, + // activate_on_boot: ThunderPluginParam::None, + // }; + + // // Start the plugin manager + // let plugin_manager_tx = + // PluginManager::start(Box::new(controller_pool), expected_plugins).await; + + // let (plugin_manager_tx_clone, _) = plugin_manager_tx.clone(); + + // // Start the ThunderClientPool + // let client = ThunderClientPool::start( + // url, + // Some(plugin_manager_tx_clone), + // Some(Arc::new(ThunderConnectionState::new())), + // 4, + // false, + // ) + // .await; + // assert!(client.is_ok()); + + // // 1. test PluginManagerCommand::StateChangeEvent command + // let (plugin_manager_tx_clone, _) = plugin_manager_tx.clone(); + // let msg = PluginManagerCommand::StateChangeEvent(PluginStateChangeEvent { + // callsign: "org.rdk.Controller".to_string(), + // state: PluginState::Activated, + // }); + // mpsc_send_and_log(&plugin_manager_tx_clone, msg, "StateChangeEvent").await; + + // // 2. test PluginManagerCommand::ActivatePluginIfNeeded command + // let (tx, _rx) = oneshot::channel::(); + // let (plugin_manager_tx_clone, _) = plugin_manager_tx.clone(); + // let msg = PluginManagerCommand::ActivatePluginIfNeeded { + // callsign: "org.rdk.Controller".to_string(), + // tx, + // }; + // mpsc_send_and_log(&plugin_manager_tx_clone, msg, "ActivatePluginIfNeeded").await; + + // // 3. test PluginManagerCommand::WaitForActivation command + // let (tx, _rx) = oneshot::channel::(); + // let (plugin_manager_tx_clone, _) = plugin_manager_tx.clone(); + // let msg = PluginManagerCommand::WaitForActivation { + // callsign: "org.rdk.Controller".to_string(), + // tx, + // }; + // mpsc_send_and_log(&plugin_manager_tx_clone, msg, "WaitForActivation").await; + + // // 4. test PluginManagerCommand::ReactivatePluginState command + // let (tx, _rx) = oneshot::channel::(); + // let (plugin_manager_tx_clone, _) = plugin_manager_tx.clone(); + // let msg = PluginManagerCommand::ReactivatePluginState { tx }; + // mpsc_send_and_log(&plugin_manager_tx_clone, msg, "ReactivatePluginState").await; + + // // Wait for a moment and stop the server + // sleep(Duration::from_secs(1)).await; + // server_task.abort(); + // } - // Start the plugin manager - let plugin_manager_tx = - PluginManager::start(Box::new(controller_pool), expected_plugins).await; - - let (plugin_manager_tx_clone, _) = plugin_manager_tx.clone(); - - // Start the ThunderClientPool - let client = ThunderClientPool::start( - url, - Some(plugin_manager_tx_clone), - Some(Arc::new(ThunderConnectionState::new())), - 4, - false, - ) - .await; - assert!(client.is_ok()); - - // 1. test PluginManagerCommand::StateChangeEvent command - let (plugin_manager_tx_clone, _) = plugin_manager_tx.clone(); - let msg = PluginManagerCommand::StateChangeEvent(PluginStateChangeEvent { - callsign: "org.rdk.Controller".to_string(), - state: PluginState::Activated, - }); - mpsc_send_and_log(&plugin_manager_tx_clone, msg, "StateChangeEvent").await; - - // 2. test PluginManagerCommand::ActivatePluginIfNeeded command - let (tx, _rx) = oneshot::channel::(); - let (plugin_manager_tx_clone, _) = plugin_manager_tx.clone(); - let msg = PluginManagerCommand::ActivatePluginIfNeeded { - callsign: "org.rdk.Controller".to_string(), - tx, - }; - mpsc_send_and_log(&plugin_manager_tx_clone, msg, "ActivatePluginIfNeeded").await; - - // 3. test PluginManagerCommand::WaitForActivation command - let (tx, _rx) = oneshot::channel::(); - let (plugin_manager_tx_clone, _) = plugin_manager_tx.clone(); - let msg = PluginManagerCommand::WaitForActivation { - callsign: "org.rdk.Controller".to_string(), - tx, - }; - mpsc_send_and_log(&plugin_manager_tx_clone, msg, "WaitForActivation").await; - - // 4. test PluginManagerCommand::ReactivatePluginState command - let (tx, _rx) = oneshot::channel::(); - let (plugin_manager_tx_clone, _) = plugin_manager_tx.clone(); - let msg = PluginManagerCommand::ReactivatePluginState { tx }; - mpsc_send_and_log(&plugin_manager_tx_clone, msg, "ReactivatePluginState").await; - - // Wait for a moment and stop the server - sleep(Duration::from_secs(1)).await; - server_task.abort(); - } // test PluginStatus #[test] fn test_plugin_status() { diff --git a/device/thunder_ripple_sdk/src/client/thunder_async_client.rs b/device/thunder_ripple_sdk/src/client/thunder_async_client.rs index 7de1761fd..bafa68207 100644 --- a/device/thunder_ripple_sdk/src/client/thunder_async_client.rs +++ b/device/thunder_ripple_sdk/src/client/thunder_async_client.rs @@ -43,7 +43,10 @@ pub struct ThunderAsyncClient { #[derive(Clone, Debug)] pub struct ThunderAsyncRequest { pub id: u64, - request: DeviceChannelRequest, + // + //request: DeviceChannelRequest, + pub request: DeviceChannelRequest, + // } impl std::fmt::Display for ThunderAsyncRequest { @@ -275,6 +278,7 @@ impl ThunderAsyncClient { mut thunder_async_request_rx: Receiver, status_check: bool, ) { + println!("@@@NNA.... we reached thundersyncclient start fn.."); loop { info!("start: (re)establishing websocket connection: url={}", url); let resp = WebSocketUtils::get_ws_stream(url, None).await; @@ -319,63 +323,67 @@ impl ThunderAsyncClient { loop { tokio::select! { - Some(value) = &mut subscriptions_socket => { - match value { - Ok(message) => { - self.handle_response(message).await; + Some(value) = &mut subscriptions_socket => { + match value { + Ok(message) => { + self.handle_response(message).await; + }, + Err(e) => { + error!("Thunder_async_client Websocket error on read {:?}", e); + break; + } + } }, - Err(e) => { - error!("Thunder_async_client Websocket error on read {:?}", e); - break; - } - } - }, - Some(request) = thunder_async_request_rx.recv() => { - match self.check_plugin_status_n_prepare_request(&request) { - Ok(updated_request) => { - if let Ok(jsonrpc_request) = serde_json::from_str::(&updated_request) { - if jsonrpc_request.method.ends_with(".register") { - if let Some(Value::Object(ref params)) = jsonrpc_request.params { - if let Some(Value::String(event)) = params.get("event") { - debug!("thunder_async_request_rx: Rerouting subscription request for {}", event); - - // Store the subscription request in the subscriptions list in case we need to - // resubscribe later due to a socket disconnect. - self.subscriptions.insert(event.to_string(), jsonrpc_request.clone()); - debug!("thunder_async_request_rx: subscription request={}", updated_request); - // Reroute subsubscription requests through the persistent websocket so all notifications - // are sent to the same websocket connection. + Some(request) = thunder_async_request_rx.recv() => { + println!( + "*** _DEBUG: thunderasyncclient::start: thunderasync request{}", + request + ); + match self.check_plugin_status_n_prepare_request(&request) { + Ok(updated_request) => { + if let Ok(jsonrpc_request) = serde_json::from_str::(&updated_request) { + if jsonrpc_request.method.ends_with(".register") { + if let Some(Value::Object(ref params)) = jsonrpc_request.params { + if let Some(Value::String(event)) = params.get("event") { + debug!("thunder_async_request_rx: Rerouting subscription request for {}", event); + + // Store the subscription request in the subscriptions list in case we need to + // resubscribe later due to a socket disconnect. + self.subscriptions.insert(event.to_string(), jsonrpc_request.clone()); + debug!("thunder_async_request_rx: subscription request={}", updated_request); + // Reroute subsubscription requests through the persistent websocket so all notifications + // are sent to the same websocket connection. + let _feed = thunder_tx.feed(Message::Text(updated_request)).await; + let _flush = thunder_tx.flush().await; + } else { + error!("thunder_async_request_rx: Missing 'event' parameter"); + } + } else { + error!("thunder_async_request_rx: Missing 'params' object"); + } + } + else { + debug!("thunder_async_request_rx: call request={}", updated_request); let _feed = thunder_tx.feed(Message::Text(updated_request)).await; let _flush = thunder_tx.flush().await; - } else { - error!("thunder_async_request_rx: Missing 'event' parameter"); } - } else { - error!("thunder_async_request_rx: Missing 'params' object"); } } - else { - debug!("thunder_async_request_rx: call request={}", updated_request); - let _feed = thunder_tx.feed(Message::Text(updated_request)).await; - let _flush = thunder_tx.flush().await; - } - } - } - Err(e) => { - match e { - RippleError::ServiceNotReady => { - info!("Thunder Service not ready, request is now in pending list {:?}", request); - }, - _ => { - error!("error preparing request {:?}", e); - let response = ThunderAsyncResponse::new_error(request.id,e.clone()); - self.callback.send(response).await; + Err(e) => { + match e { + RippleError::ServiceNotReady => { + info!("Thunder Service not ready, request is now in pending list {:?}", request); + }, + _ => { + error!("error preparing request {:?}", e); + let response = ThunderAsyncResponse::new_error(request.id,e.clone()); + self.callback.send(response).await; + } + } } } } } - } - } } } } @@ -391,6 +399,7 @@ impl ThunderAsyncClient { } pub async fn send(&self, request: ThunderAsyncRequest) { + println!("@@@NNA....at thunderasync send fn"); if let Err(e) = self.sender.send(request).await { error!("Failed to send thunder Async Request: {:?}", e); } diff --git a/device/thunder_ripple_sdk/src/client/thunder_client.rs b/device/thunder_ripple_sdk/src/client/thunder_client.rs index 23d0ca64f..5678f3b99 100644 --- a/device/thunder_ripple_sdk/src/client/thunder_client.rs +++ b/device/thunder_ripple_sdk/src/client/thunder_client.rs @@ -15,45 +15,32 @@ // SPDX-License-Identifier: Apache-2.0 // +use super::device_operator::{ + DeviceCallRequest, DeviceChannelRequest, DeviceOperator, DeviceResponseMessage, + DeviceResponseSubscription, DeviceSubscribeRequest, +}; use super::thunder_async_client::{ThunderAsyncClient, ThunderAsyncRequest, ThunderAsyncResponse}; use super::thunder_async_client_plugins_status_mgr::{AsyncCallback, AsyncSender}; -use super::thunder_client_pool::ThunderPoolCommand; -use super::{ - device_operator::{ - DeviceCallRequest, DeviceChannelParams, DeviceChannelRequest, DeviceOperator, - DeviceResponseMessage, DeviceResponseSubscription, DeviceSubscribeRequest, - DeviceUnsubscribeRequest, - }, - jsonrpc_method_locator::JsonRpcMethodLocator, - plugin_manager::{PluginActivatedResult, PluginManagerCommand}, -}; -use crate::thunder_state::ThunderConnectionState; -use crate::utils::get_error_value; -use jsonrpsee::core::client::{Client, ClientT, SubscriptionClientT}; -use jsonrpsee::core::params::{ArrayParams, ObjectParams}; -use jsonrpsee::core::{async_trait, error::Error as JsonRpcError}; -use jsonrpsee::ws_client::WsClientBuilder; -use regex::Regex; +use jsonrpsee::core::async_trait; use ripple_sdk::{ - log::{error, info, warn}, - serde_json::{self, json, Value}, + log::error, + serde_json::Value, tokio, tokio::sync::mpsc::{self, Receiver, Sender as MpscSender}, tokio::sync::oneshot::{self, error::RecvError, Sender as OneShotSender}, - tokio::{sync::Mutex, task::JoinHandle, time::sleep}, utils::channel_utils::{mpsc_send_and_log, oneshot_send_and_log}, utils::error::RippleError, uuid::Uuid, + Mockable, }; -use serde::{Deserialize, Serialize}; -use std::collections::{BTreeMap, HashMap}; -use std::str::FromStr; +use serde::Deserialize; +use std::collections::HashMap; use std::sync::Arc; use std::sync::RwLock; -use std::{env, process::Command}; use tokio::sync::oneshot::Sender; use url::Url; +const GATEWAY_DEFAULT: &str = "ws://127.0.0.1:9998/jsonrpc"; pub type BrokerSubMap = HashMap; pub type BrokerCallbackMap = HashMap>>; @@ -71,7 +58,14 @@ impl ThunderClientManager { ) { if let Some(ref thunder_async_client) = client.thunder_async_client { let mut tac = thunder_async_client.clone(); + + println!("*** @@@@_DEBUG: ThunderClientManager: ThunderAsyncRequest received"); + tokio::spawn(async move { + println!( + "@@@NNA...transfer to thunderasync client...with req_tr:{:?}", + request_tr + ); tac.start(&thndr_endpoint_url, request_tr, status_check) .await; }); @@ -80,6 +74,10 @@ impl ThunderClientManager { /*thunder async response will get here */ tokio::spawn(async move { while let Some(response) = response_tr.recv().await { + println!( + "*** _DEBUG: ThunderClientManager: ThunderAsyncResponse received : {:?}", + response + ); if let Some(id) = response.get_id() { if let Some(thunder_async_callbacks) = client.clone().thunder_async_callbacks { let mut callbacks = thunder_async_callbacks.write().unwrap(); @@ -115,154 +113,64 @@ impl ThunderClientManager { pub struct ThunderClientBuilder; -#[derive(Debug)] -pub struct ThunderCallMessage { - pub method: String, - pub params: Option, - pub callback: OneShotSender, -} - -impl ThunderCallMessage { - pub fn callsign(&self) -> String { - JsonRpcMethodLocator::from_str(&self.method) - .unwrap() - .module - .unwrap() - } - - pub fn method_name(&self) -> String { - JsonRpcMethodLocator::from_str(&self.method) - .unwrap() - .method_name - } -} - -#[derive(Debug, Serialize)] -pub struct ThunderRegisterParams { - pub event: String, - pub id: String, -} - -#[derive(Debug)] -pub struct ThunderSubscribeMessage { - pub module: String, - pub event_name: String, - pub params: Option, - pub handler: MpscSender, - pub callback: Option>, - pub sub_id: Option, -} - -impl ThunderSubscribeMessage { - pub fn resubscribe(&self) -> ThunderSubscribeMessage { - ThunderSubscribeMessage { - module: self.module.clone(), - event_name: self.event_name.clone(), - params: self.params.clone(), - handler: self.handler.clone(), - callback: None, - sub_id: self.sub_id.clone(), - } - } -} - -#[derive(Debug, Clone)] -pub struct ThunderUnsubscribeMessage { - pub module: String, - pub event_name: String, - pub subscription_id: Option, -} - -#[derive(Debug)] -pub enum ThunderMessage { - ThunderCallMessage(ThunderCallMessage), - ThunderSubscribeMessage(ThunderSubscribeMessage), - ThunderUnsubscribeMessage(ThunderUnsubscribeMessage), -} - -impl ThunderMessage { - pub fn clone(&self, intercept_tx: OneShotSender) -> ThunderMessage { - match self { - ThunderMessage::ThunderCallMessage(m) => { - ThunderMessage::ThunderCallMessage(ThunderCallMessage { - method: m.method.clone(), - params: m.params.clone(), - callback: intercept_tx, - }) - } - ThunderMessage::ThunderSubscribeMessage(m) => { - ThunderMessage::ThunderSubscribeMessage(ThunderSubscribeMessage { - params: m.params.clone(), - callback: Some(intercept_tx), - module: m.module.clone(), - event_name: m.event_name.clone(), - handler: m.handler.clone(), - sub_id: m.sub_id.clone(), - }) - } - ThunderMessage::ThunderUnsubscribeMessage(m) => { - ThunderMessage::ThunderUnsubscribeMessage(m.clone()) - } - } - } -} - #[derive(Debug, Clone)] pub struct ThunderClient { - pub sender: Option>, - pub pooled_sender: Option>, pub id: Uuid, - pub plugin_manager_tx: Option>, - pub subscriptions: Option>>>, pub thunder_async_client: Option, pub thunder_async_subscriptions: Option>>, pub thunder_async_callbacks: Option>>, - pub use_thunder_async: bool, } -#[derive(Debug, Deserialize)] -pub struct DefaultThunderResult { - pub success: bool, -} +impl Mockable for ThunderClient { + fn mock() -> Self { + let (resp_tx, _resp_rx) = mpsc::channel(32); + let callback = AsyncCallback { sender: resp_tx }; + let (broker_tx, _broker_rx) = mpsc::channel(32); + let broker_sender = AsyncSender { sender: broker_tx }; + let client = ThunderAsyncClient::new(callback, broker_sender); -impl ThunderClient { - /// Sends a message to thunder. If this client is pooled - /// then it will wrap the message in a pool command before sending - pub async fn send_message(&self, message: ThunderMessage) { - if let Some(s) = &self.pooled_sender { - mpsc_send_and_log( - s, - ThunderPoolCommand::ThunderMessage(message), - "ThunderMessageToPool", - ) - .await; - } else if let Some(s) = &self.sender { - mpsc_send_and_log(s, message, "ThunderMessage").await; + ThunderClient { + id: Uuid::new_v4(), + thunder_async_client: Some(client), + thunder_async_subscriptions: Some(Arc::new(RwLock::new(HashMap::new()))), + thunder_async_callbacks: Some(Arc::new(RwLock::new(HashMap::new()))), } } } +#[derive(Debug, Deserialize)] +pub struct DefaultThunderResult { + pub success: bool, +} + #[async_trait] impl DeviceOperator for ThunderClient { async fn call(&self, request: DeviceCallRequest) -> DeviceResponseMessage { - if !self.use_thunder_async { - let (tx, rx) = oneshot::channel::(); - let message = ThunderMessage::ThunderCallMessage(ThunderCallMessage { - method: request.method, - params: request.params, - callback: tx, - }); - self.send_message(message).await; + let (tx, rx) = oneshot::channel::(); + let async_request = ThunderAsyncRequest::new(DeviceChannelRequest::Call(request)); + self.add_callback(&async_request, tx); + if let Some(async_client) = &self.thunder_async_client { + println!( + "*** _DEBUG: ThunderClient: call: request= {:?}", + async_request + ); + async_client.send(async_request).await; + } - rx.await.unwrap() - } else { - let (tx, rx) = oneshot::channel::(); - let async_request = ThunderAsyncRequest::new(DeviceChannelRequest::Call(request)); - self.add_callback(&async_request, tx); - if let Some(async_client) = &self.thunder_async_client { - async_client.send(async_request).await; + // Naveen: I think you need something to execute the callback (tx), otherwise you'll never get a response here. + + match rx.await { + Ok(response) => { + println!("*** _DEBUG: ThunderClient: call: response= {:?}", response); + response + } + Err(e) => { + error!("ThunderClient Failed to receive response: {:?}", e); + DeviceResponseMessage { + message: Value::Null, + sub_id: None, + } } - rx.await.unwrap() } } @@ -271,26 +179,7 @@ impl DeviceOperator for ThunderClient { request: DeviceSubscribeRequest, handler: mpsc::Sender, ) -> Result { - if !self.use_thunder_async { - let (tx, rx) = oneshot::channel::(); - let message = ThunderSubscribeMessage { - module: request.module, - event_name: request.event_name, - params: request.params, - handler, - callback: Some(tx), - sub_id: request.sub_id, - }; - let msg = ThunderMessage::ThunderSubscribeMessage(message); - self.send_message(msg).await; - let result = rx.await; - if let Err(ref e) = result { - error!("subscribe: e={:?}", e); - } - result - } else if let Some(subscribe_request) = - self.add_subscription_handler(&request, handler.clone()) - { + if let Some(subscribe_request) = self.add_subscription_handler(&request, handler.clone()) { let (tx, rx) = oneshot::channel::(); self.add_callback(&subscribe_request, tx); if let Some(async_client) = &self.thunder_async_client { @@ -308,273 +197,24 @@ impl DeviceOperator for ThunderClient { }) } } - - async fn unsubscribe(&self, request: DeviceUnsubscribeRequest) { - if !self.use_thunder_async { - let message = ThunderUnsubscribeMessage { - module: request.module, - event_name: request.event_name, - subscription_id: None, - }; - let msg = ThunderMessage::ThunderUnsubscribeMessage(message); - self.send_message(msg).await; - } else { - // unsubscribe() deprecate - } - } -} - -#[derive(Debug)] -pub struct ThunderSubscription { - handle: JoinHandle<()>, - params: Option, - listeners: HashMap>, - rpc_response: DeviceResponseMessage, } impl ThunderClient { - async fn subscribe( - client_id: Uuid, - client: &Client, - subscriptions_map: &Arc>>, - thunder_message: ThunderSubscribeMessage, - plugin_manager_tx: Option>, - pool_tx: Option>, - ) { - let subscribe_method = format!( - "client.{}.events.{}", - thunder_message.module, thunder_message.event_name - ); - - let sub_id = match &thunder_message.sub_id { - Some(sid) => sid.clone(), - None => Uuid::new_v4().to_string(), - }; - let mut subscriptions = subscriptions_map.lock().await; - if let Some(sub) = subscriptions.get_mut(&subscribe_method) { - // rpc subscription already exists, just add a listener - sub.listeners - .insert(sub_id.clone(), thunder_message.handler); - if let Some(cb) = thunder_message.callback { - let resp = DeviceResponseMessage::sub(sub.rpc_response.message.clone(), sub_id); - oneshot_send_and_log(cb, resp, "ThunderRegisterResponse"); - } - return; - } - // rpc subscription does not exist, set it up - let subscription_res = client - .subscribe_to_method::(subscribe_method.as_str()) - .await; - - let mut subscription = match subscription_res { - Ok(subscription) => subscription, - Err(e) => { - error!("Failed to setup subscriber in jsonrpsee client, {}", e); - // Maybe this method signature should change to propagate the error up - return; - } - }; - let params = ThunderRegisterParams { - event: thunder_message.event_name.clone(), - id: format!("client.{}.events", thunder_message.module.clone()), - }; - let json = serde_json::to_string(¶ms).unwrap(); - let method = format!("{}.register", thunder_message.module); - if let Some(callsign) = Self::extract_callsign_from_register_method(&method) { - if Self::check_and_activate_plugin(&callsign, &plugin_manager_tx) - .await - .is_err() - { - error!("{} Thunder plugin couldnt be activated", callsign) - } - } - - let response = Box::new(ThunderParamRequest { - method: method.as_str(), - params: &json, - json_based: true, - }) - .send_request(client) - .await; - let handler_channel = thunder_message.handler.clone(); - let sub_id_c = sub_id.clone(); - let handle = ripple_sdk::tokio::spawn(async move { - while let Some(ev_res) = subscription.next().await { - match ev_res { - Ok(ev) => { - let msg = DeviceResponseMessage::sub(ev, sub_id_c.clone()); - mpsc_send_and_log(&thunder_message.handler, msg, "ThunderSubscribeEvent") - .await; - } - Err(e) => error!("Thunder event error {e:?}"), - } - } - if let Some(ptx) = pool_tx { - warn!( - "Client {} became disconnected, resubscribing to events", - client_id - ); - // ResetThunderClient. Resubscribe would happen automatically when the client resets. - let pool_msg = ThunderPoolCommand::ResetThunderClient(client_id); - mpsc_send_and_log(&ptx, pool_msg, "ResetThunderClient").await; - } - }); - - let msg = DeviceResponseMessage::sub(response, sub_id.clone()); - let mut tsub = ThunderSubscription { - handle, - params: thunder_message.params.clone(), - listeners: HashMap::default(), - rpc_response: msg.clone(), - }; - tsub.listeners.insert(sub_id, handler_channel); - subscriptions.insert(subscribe_method, tsub); - if let Some(cb) = thunder_message.callback { - oneshot_send_and_log(cb, msg, "ThunderRegisterResponse"); - } - } - - async fn unsubscribe( - client: &Client, - subscriptions_map: &Arc>>, - thunder_message: ThunderUnsubscribeMessage, - ) { - let subscribe_method = format!( - "client.{}.events.{}", - thunder_message.module, thunder_message.event_name - ); - let mut unregister = false; - match thunder_message.subscription_id { - Some(sub_id) => { - // Remove the listener for the given sub_id, if there are no more listeners then - // unsubscribe through rpc - let mut subscriptions = subscriptions_map.lock().await; - if let Some(sub) = subscriptions.get_mut(&subscribe_method) { - sub.listeners.remove(&sub_id); - if sub.listeners.is_empty() { - unregister = true; - if let Some(s) = subscriptions.remove(&subscribe_method) { - s.handle.abort(); - } - } - } - } - None => { - // removing all subscriptions for a method - unregister = true; - let mut subscriptions = subscriptions_map.lock().await; - if let Some(sub) = subscriptions.remove(&subscribe_method) { - sub.handle.abort(); - } - } - } - if unregister { - let params = ThunderRegisterParams { - event: thunder_message.event_name, - id: format!("client.{}.events", thunder_message.module), - }; - let json = serde_json::to_string(¶ms).unwrap(); - Box::new(ThunderParamRequest { - method: format!("{}.unregister", thunder_message.module).as_str(), - params: &json, - json_based: true, - }) - .send_request(client) - .await; - } - } - - async fn call( - client: &Client, - thunder_message: ThunderCallMessage, - plugin_manager_tx: Option>, - ) { - // First check if the plugin is activated and ready to use - if Self::check_and_activate_plugin(&thunder_message.callsign(), &plugin_manager_tx) - .await - .is_err() - { - return_message(thunder_message.callback, json!({"error": "pre send error"})); - return; - } - let params = thunder_message.params; - match params { - Some(p) => match p { - DeviceChannelParams::Bool(b) => { - let r = Box::new(ThunderRawBoolRequest { - method: thunder_message.method.clone(), - v: b, - }) - .send_request() - .await; - return_message(thunder_message.callback, r); - } - _ => { - let response = Box::new(ThunderParamRequest { - method: &thunder_message.method.clone(), - params: &p.as_params(), - json_based: p.is_json(), - }) - .send_request(client) - .await; - return_message(thunder_message.callback, response); - } - }, - None => { - let response: Value = Box::new(ThunderNoParamRequest { - method: thunder_message.method.clone(), - }) - .send_request(client) - .await; - return_message(thunder_message.callback, response); - } - } - } - - async fn check_and_activate_plugin( - call_sign: &str, - plugin_manager_tx: &Option>, - ) -> Result<(), PluginActivatedResult> { - let (plugin_rdy_tx, plugin_rdy_rx) = oneshot::channel::(); - if let Some(tx) = plugin_manager_tx { - let msg = PluginManagerCommand::ActivatePluginIfNeeded { - callsign: call_sign.to_string(), - tx: plugin_rdy_tx, - }; - mpsc_send_and_log(tx, msg, "ActivatePluginIfNeeded").await; - if let Ok(res) = plugin_rdy_rx.await { - if !res.ready().await { - return Err(PluginActivatedResult::Error); - } - } - } - - Ok(()) - } - fn extract_callsign_from_register_method(method: &str) -> Option { - // capture the initial string before an optional version number, followed by ".register" - let re = Regex::new(r"^(.*?)(?:\.\d+)?\.register$").unwrap(); - - if let Some(cap) = re.captures(method) { - if let Some(string) = cap.get(1) { - return Some(string.as_str().to_string()); - } - } - None - } - fn add_callback( &self, request: &ThunderAsyncRequest, dev_resp_callback: Sender, ) { - let mut callbacks = self - .thunder_async_callbacks - .as_ref() - .unwrap() - .write() - .unwrap(); - callbacks.insert(request.id, Some(dev_resp_callback)); + println!( + "*** _DEBUG: add_callback invoked for : ThunderAsyncRequest: {}", + request + ); + if let Some(callbacks_arc) = &self.thunder_async_callbacks { + let mut callbacks = callbacks_arc.write().unwrap(); + callbacks.insert(request.id, Some(dev_resp_callback)); + } else { + error!("thunder_async_callbacks found None"); + } } // if already subscribed updated handlers @@ -614,477 +254,205 @@ impl ThunderClient { Some(async_request) } } -} - -impl ThunderClientBuilder { - fn parse_subscribe_method(subscribe_method: &str) -> Option<(String, String)> { - if let Some(client_start) = subscribe_method.find("client.") { - if let Some(events_start) = subscribe_method[client_start..].find(".events.") { - let module = subscribe_method - [client_start + "client.".len()..client_start + events_start] - .to_string(); - let event_name = - subscribe_method[client_start + events_start + ".events.".len()..].to_string(); - return Some((module, event_name)); - } - } - None - } - - async fn start_thunderpool_client( - url: Url, - plugin_manager_tx: Option>, - pool_tx: Option>, - thunder_connection_state: Option>, - existing_client: Option, - ) -> Result { - let uid = Uuid::new_v4(); - info!("initiating thunder connection URL:{} ", url); - let subscriptions = Arc::new(Mutex::new(HashMap::::default())); - let (s, mut r) = mpsc::channel::(32); - let pmtx_c = plugin_manager_tx.clone(); - let client = - Self::create_client(url.clone(), thunder_connection_state.clone().unwrap()).await; + // TODO: Move to MockThunderClient or similar o it's only used in tests + // pub fn start_mock(device_channel_request_tx: MpscSender) -> Self { + // let (device_response_message_tx, mut device_response_message_rx) = + // mpsc::channel::(32); + // let (thunder_async_response_tx, mut thunder_async_response_rx) = mpsc::channel(32); + // let callback = AsyncCallback { + // sender: thunder_async_response_tx, + // }; + + // let (thunder_async_request_tx, mut thunder_async_request_rx) = mpsc::channel(32); + // let broker_sender = AsyncSender { + // sender: thunder_async_request_tx, + // }; + // let client = ThunderAsyncClient::new(callback, broker_sender); + + // // Handle requests from ThunderAsyncClient and forward to device_channel_request_tx + // tokio::spawn(async move { + // while let Some(thunder_async_request) = thunder_async_request_rx.recv().await { + // println!( + // "*** _DEBUG: ThunderClient: mock: thunder_async_request= {:?}", + // thunder_async_request + // ); + + // mpsc_send_and_log( + // &device_channel_request_tx, + // thunder_async_request.request, + // "DeviceChannelRequest", + // ) + // .await; + // } + // }); + + // // Optionally, you can process responses here if needed + // tokio::spawn(async move { + // while let Some(thunder_async_response) = thunder_async_response_rx.recv().await { + // println!( + // "*** _DEBUG: ThunderClient: mock: thunder_async_response= {:?}", + // thunder_async_response + // ); + // // oneshot_send_and_log( + // // thunder_async_response_tx, + // // thunder_async_response, + // // "thunderasyncresponse", + // // ); + // // You can add logic here to handle the response if required + // } + // }); + + // // Optionally, you can process responses here if needed + // tokio::spawn(async move { + // while let Some(thunder_async_response) = device_response_message_rx.recv().await { + // println!( + // "*** _DEBUG: ThunderClient: mock: thunder_async_response= {:?}", + // thunder_async_response + // ); + // // oneshot_send_and_log( + // // thunder_async_response_tx, + // // thunder_async_response, + // // "thunderasyncresponse", + // // ); + // // You can add logic here to handle the response if required + // } + // }); + + // ThunderClient { + // id: Uuid::new_v4(), + // thunder_async_client: Some(client), + // thunder_async_subscriptions: Some(Arc::new(RwLock::new(HashMap::new()))), + // thunder_async_callbacks: Some(Arc::new(RwLock::new(HashMap::new()))), + // } + // } + + pub fn start_mock( + device_channel_request_tx: MpscSender, + mut thunderasync_resp_rx: mpsc::Receiver, + ) -> Self { + let (thunder_async_response_tx, mut thunder_async_response_rx) = mpsc::channel(32); + let callback = AsyncCallback { + sender: thunder_async_response_tx, + }; - // add error handling here - if client.is_err() { - error!("Unable to connect to thunder: {client:?}"); - return Err(RippleError::BootstrapError); - } + let (thunder_async_request_tx, mut thunder_async_request_rx) = mpsc::channel(32); + let broker_sender = AsyncSender { + sender: thunder_async_request_tx, + }; + let client = ThunderAsyncClient::new(callback, broker_sender); - let client = client.unwrap(); - let subscriptions_c = subscriptions.clone(); + // Handle requests from ThunderAsyncClient and forward to device_channel_request_tx tokio::spawn(async move { - while let Some(message) = r.recv().await { - if !client.is_connected() { - if let Some(ptx) = pool_tx { - warn!( - "Client {} became disconnected, removing from pool message {:?}", - uid, message - ); - // Remove the client and then try the message again with a new client - let pool_msg = ThunderPoolCommand::ResetThunderClient(uid); - mpsc_send_and_log(&ptx, pool_msg, "ResetThunderClient").await; - let pool_msg = ThunderPoolCommand::ThunderMessage(message); - mpsc_send_and_log(&ptx, pool_msg, "RetryThunderMessage").await; - return; - } - } - info!("Client {} sending thunder message {:?}", uid, message); - match message { - ThunderMessage::ThunderCallMessage(thunder_message) => { - ThunderClient::call(&client, thunder_message, plugin_manager_tx.clone()) - .await; - } - ThunderMessage::ThunderSubscribeMessage(thunder_message) => { - ThunderClient::subscribe( - uid, - &client, - &subscriptions_c, - thunder_message, - plugin_manager_tx.clone(), - pool_tx.clone(), - ) - .await; - } - ThunderMessage::ThunderUnsubscribeMessage(thunder_message) => { - ThunderClient::unsubscribe(&client, &subscriptions_c, thunder_message) - .await; - } - } - } - }); + while let Some(thunder_async_request) = thunder_async_request_rx.recv().await { + println!( + "*** _DEBUG: ThunderClient: mock: thunder_async_request= {:?}", + thunder_async_request + ); - if let Some(old_client) = existing_client { - // Re-subscribe for each subscription that was active on the old client - if let Some(subscriptions) = old_client.subscriptions { - // Reactivate the plugin state - let (plugin_rdy_tx, plugin_rdy_rx) = oneshot::channel::(); - if let Some(tx) = pmtx_c.clone() { - let msg = PluginManagerCommand::ReactivatePluginState { tx: plugin_rdy_tx }; - mpsc_send_and_log(&tx, msg, "ResetPluginState").await; - if let Ok(res) = plugin_rdy_rx.await { - res.ready().await; - } - } - let mut subs = subscriptions.lock().await; - for (subscribe_method, tsub) in subs.iter_mut() { - let mut listeners = - HashMap::>::default(); - std::mem::swap(&mut listeners, &mut tsub.listeners); - for (sub_id, listener) in listeners { - let thunder_message: ThunderSubscribeMessage = { - Self::parse_subscribe_method(subscribe_method) - .map(|(module, event_name)| ThunderSubscribeMessage { - module, - event_name, - params: tsub.params.clone(), - handler: listener, - callback: None, - sub_id: Some(sub_id), - }) - .unwrap() - }; - let resp = s - .send(ThunderMessage::ThunderSubscribeMessage(thunder_message)) - .await; - if resp.is_err() { - if let Some((module, _)) = - Self::parse_subscribe_method(subscribe_method) - { - error!("Failed to send re-subscribe message for {}", module); - } - } - } - } + mpsc_send_and_log( + &device_channel_request_tx, + thunder_async_request.request, + "DeviceChannelRequest", + ) + .await; } - } + }); - Ok(ThunderClient { - sender: Some(s), - pooled_sender: None, - id: uid, - plugin_manager_tx: pmtx_c, - subscriptions: Some(subscriptions), - thunder_async_client: None, - thunder_async_subscriptions: None, - thunder_async_callbacks: None, - use_thunder_async: false, - }) - } + // Process responses and forward to device_response_message_tx + tokio::spawn(async move { + while let Some(thunder_async_response) = thunder_async_response_rx.recv().await { + println!( + "*** _DEBUG: ThunderClient: mock: thunder_async_response= {:?}", + thunder_async_response + ); + let (device_response_message_tx, _device_response_message_rx) = + oneshot::channel::(); - async fn create_client( - url: Url, - thunder_connection_state: Arc, - ) -> Result { - // Ensure that only one connection attempt is made at a time - { - let mut is_connecting = thunder_connection_state.conn_status_mutex.lock().await; - // check if we are already reconnecting - if *is_connecting { - drop(is_connecting); - // wait for the connection to be ready - thunder_connection_state.conn_status_notify.notified().await; - } else { - //Mark the connection as reconnecting - *is_connecting = true; + if let Some(resp) = thunder_async_response.get_device_resp_msg(None) { + let _ = device_response_message_tx.send(resp); + }; } - } // Lock is released here + }); - let mut client: Result; - let mut delay_duration = tokio::time::Duration::from_millis(50); - loop { - // get the token from the environment anew each time - let url_with_token = if let Ok(token) = env::var("THUNDER_TOKEN") { - Url::parse_with_params(url.as_str(), &[("token", token)]).unwrap() - } else { - url.clone() - }; - client = WsClientBuilder::default() - .build(url_with_token.to_string()) - .await; - if client.is_err() { - error!( - "Thunder Websocket is not available. Attempt to connect to thunder, retrying" + // Process responses and forward to device_response_message_tx + tokio::spawn(async move { + while let Some(thunder_async_response) = thunderasync_resp_rx.recv().await { + println!( + "*** @@@_DEBUG: ThunderClient: mock: thunder_async_response= {:?}", + thunder_async_response ); - sleep(delay_duration).await; - if delay_duration < tokio::time::Duration::from_secs(3) { - delay_duration *= 2; - } - continue; + let (device_response_message_tx, _device_response_message_rx) = + oneshot::channel::(); + + if let Some(resp) = thunder_async_response.get_device_resp_msg(None) { + println!( + "*** @@@_DEBUG: ThunderClient:mock sending DeviceResponseMessage: {:?}", + resp + ); + let _ = device_response_message_tx.send(resp); + }; } - //break from the loop after signalling that we are no longer reconnecting - let mut is_connecting = thunder_connection_state.conn_status_mutex.lock().await; - *is_connecting = false; - thunder_connection_state.conn_status_notify.notify_waiters(); - break; - } - client - } - - pub async fn start_thunder_client( - url: Url, - plugin_manager_tx: Option>, - pool_tx: Option>, - thunder_connection_state: Option>, - existing_client: Option, - use_thunderasync_client: bool, - status_check: bool, - ) -> Result { - if !use_thunderasync_client { - Self::start_thunderpool_client( - url, - plugin_manager_tx, - pool_tx, - thunder_connection_state, - existing_client, - ) - .await - } else { - let (resp_tx, resp_rx) = mpsc::channel(32); - let callback = AsyncCallback { sender: resp_tx }; - let (broker_tx, broker_rx) = mpsc::channel(32); - let broker_sender = AsyncSender { sender: broker_tx }; - let client = ThunderAsyncClient::new(callback, broker_sender); - - let thunder_client = ThunderClient { - sender: None, - pooled_sender: None, - id: Uuid::new_v4(), - plugin_manager_tx: None, - subscriptions: None, - thunder_async_client: Some(client), - thunder_async_subscriptions: Some(Arc::new(RwLock::new(HashMap::new()))), - thunder_async_callbacks: Some(Arc::new(RwLock::new(HashMap::new()))), - use_thunder_async: true, - }; - - ThunderClientManager::start( - thunder_client.clone(), - broker_rx, - resp_rx, - url.to_string(), - status_check, - ); - Ok(thunder_client) - } - } + }); - #[cfg(test)] - pub fn mock(sender: MpscSender) -> ThunderClient { ThunderClient { - sender: Some(sender), - pooled_sender: None, id: Uuid::new_v4(), - plugin_manager_tx: None, - subscriptions: None, - thunder_async_client: None, - thunder_async_subscriptions: None, - thunder_async_callbacks: None, - use_thunder_async: false, - } - } -} - -pub struct ThunderRawBoolRequest { - method: String, - v: bool, -} - -impl ThunderRawBoolRequest { - async fn send_request(self: Box) -> Value { - let host = { - if cfg!(feature = "local_dev") { - match env::var("DEVICE_HOST") { - Ok(h) => h, - Err(_) => String::from("127.0.0.1"), - } - } else { - String::from("127.0.0.1") - } - }; - - if let Ok(t) = env::var("THUNDER_TOKEN") { - let command = format!( - r#"/usr/bin/curl -H "Authorization: Bearer {}" -d '{{"jsonrpc":"2.0","id":"1","method":"{}","params":{}}}' http://{}:9998/jsonrpc"#, - t, self.method, self.v, host - ); - let mut start_ref_app_command = Command::new("sh"); - start_ref_app_command.arg("-c").arg(command); - if start_ref_app_command.output().is_ok() { - Value::Bool(true) - } else { - Value::Bool(false) - } - } else { - let command = format!( - r#"/usr/bin/curl -d '{{"jsonrpc":"2.0","id":"1","method":"{}","params":{}}}' http://{}:9998/jsonrpc"#, - self.method, self.v, host - ); - let mut start_ref_app_command = Command::new("sh"); - start_ref_app_command.arg("-c").arg(command); - if start_ref_app_command.output().is_ok() { - Value::Bool(true) - } else { - Value::Bool(false) - } - } - } -} - -pub struct ThunderNoParamRequest { - method: String, -} - -impl ThunderNoParamRequest { - async fn send_request(self: Box, client: &Client) -> Value { - let result = client.request(&self.method, ObjectParams::new()).await; - if let Err(e) = result { - error!("send_request: Error: e={}", e); - return get_error_value(&e); + thunder_async_client: Some(client), + thunder_async_subscriptions: Some(Arc::new(RwLock::new(HashMap::new()))), + thunder_async_callbacks: Some(Arc::new(RwLock::new(HashMap::new()))), } - result.unwrap() } -} -pub struct ThunderParamRequest<'a> { - method: &'a str, - params: &'a str, - json_based: bool, -} -/* -Polymorph wrapper needed to be able to properly call `client.requse` */ -enum ParamWrapper { - Object(ObjectParams), - Array(ArrayParams), - None, -} -impl<'a> ThunderParamRequest<'a> { - async fn send_request(self: Box, client: &Client) -> Value { - let method = self.method; - - let result = match &self.get_params() { - ParamWrapper::Object(object_params) => { - client.request(method, object_params.clone()).await - } - ParamWrapper::Array(array_params) => client.request(method, array_params.clone()).await, - ParamWrapper::None => client.request(method, ArrayParams::new()).await, - }; - - if let Err(e) = result { - error!("send_request: Error: e={}", e); - return get_error_value(&e); - } - result.unwrap() - } - - fn get_params(self) -> ParamWrapper { - /* - Map in jsonrpsee is "ObjectParams" and Array is "ArrayParams" - */ - match self.json_based { - true => { - /*map */ - match serde_json::from_str::>(self.params) { - Ok(v_tree_map) => { - let mut params = ObjectParams::new(); - for kvp in v_tree_map { - let _ = params.insert(kvp.0, kvp.1); - } - ParamWrapper::Object(params) - } - Err(_e) => ParamWrapper::None, - } - } - /*array */ - false => { - let mut arrayparams = ArrayParams::new(); - match arrayparams.insert(Value::String(String::from(self.params))) { - Ok(_) => ParamWrapper::Array(arrayparams), - Err(_e) => ParamWrapper::None, - } - } - } + #[cfg(test)] + pub async fn thunderclient_mock() -> ThunderClient { + let gateway_url = url::Url::parse(GATEWAY_DEFAULT).unwrap(); + ThunderClientBuilder::start_thunder_client(gateway_url, true) + .await + .unwrap() } } -fn return_message(callback: OneShotSender, response: Value) { - let msg = DeviceResponseMessage::call(response); - oneshot_send_and_log(callback, msg, "returning message"); -} - -#[cfg(test)] -mod tests { - use jsonrpsee::core::traits::ToRpcParams as _; - - use super::*; +impl ThunderClientBuilder { + pub async fn start_thunder_client( + url: Url, + status_check: bool, + ) -> Result { + let (resp_tx, resp_rx) = mpsc::channel(32); + let callback = AsyncCallback { sender: resp_tx }; + let (broker_tx, broker_rx) = mpsc::channel(32); + let broker_sender = AsyncSender { sender: broker_tx }; + let client = ThunderAsyncClient::new(callback, broker_sender); - #[tokio::test] - async fn test_thunder_call_message() { - let thunder_call_message = ThunderCallMessage { - method: "org.rdk.RDKShell.1.createDisplay".to_string(), - params: Some(DeviceChannelParams::Json("test".to_string())), - callback: oneshot::channel::().0, + let thunder_client = ThunderClient { + id: Uuid::new_v4(), + thunder_async_client: Some(client), + thunder_async_subscriptions: Some(Arc::new(RwLock::new(HashMap::new()))), + thunder_async_callbacks: Some(Arc::new(RwLock::new(HashMap::new()))), }; - assert_eq!(thunder_call_message.callsign(), "org.rdk.RDKShell"); - assert_eq!(thunder_call_message.method_name(), "createDisplay"); - } - - #[test] - fn test_extract_callsign_from_register_method() { - let method = "org.rdk.RDKShell.1.register"; - let callsign = ThunderClient::extract_callsign_from_register_method(method); - assert_eq!(callsign, Some("org.rdk.RDKShell".to_string())); - let method = "org.rdk.RDKShell.register"; - let callsign = ThunderClient::extract_callsign_from_register_method(method); - assert_eq!(callsign, Some("org.rdk.RDKShell".to_string())); - - // test method abcd. 1.register - let method = "abcd .1.register"; - let callsign = ThunderClient::extract_callsign_from_register_method(method); - assert_eq!(callsign, Some("abcd ".to_string())); - } - - #[test] - fn test_extract_callsign_from_register_method_invalid_pattern() { - let method = "abcd.1"; - let callsign = ThunderClient::extract_callsign_from_register_method(method); - assert_eq!(callsign, None); - - let method = "abcd.1.register.2"; - let callsign = ThunderClient::extract_callsign_from_register_method(method); - assert_eq!(callsign, None); - } - #[test] - fn test_get_params_object_params() { - let request = ThunderParamRequest { - method: "test.method", - params: r#"{"key1": "value1", "key2": "value2"}"#, - json_based: true, - }; - match request.get_params() { - ParamWrapper::Object(params) => { - let r = params.to_rpc_params(); - let r = r.unwrap(); - let r = r.unwrap(); - let r = r.get(); - assert_eq!(r, r#"{"key1":"value1","key2":"value2"}"#); - } - _ => panic!("Expected ObjectParams"), - } + ThunderClientManager::start( + thunder_client.clone(), + broker_rx, + resp_rx, + url.to_string(), + status_check, + ); + Ok(thunder_client) } - #[test] - fn test_get_params_array_param_non_json_based() { - let request = ThunderParamRequest { - method: "test.method", - params: "value1", - json_based: false, - }; - match request.get_params() { - ParamWrapper::Array(params) => { - let r = params.to_rpc_params(); - let r = r.unwrap(); - let r = r.unwrap(); - let r = r.get(); - assert_eq!(r, r#"["value1"]"#); - } - _ => panic!("Expected ArrayParams"), - } - } + #[cfg(test)] + pub fn mock() -> ThunderClient { + let (resp_tx, _resp_rx) = mpsc::channel(32); + let callback = AsyncCallback { sender: resp_tx }; + let (broker_tx, _broker_rx) = mpsc::channel(32); + let broker_sender = AsyncSender { sender: broker_tx }; + let client = ThunderAsyncClient::new(callback, broker_sender); - #[test] - fn test_get_params_no_params() { - let request = ThunderParamRequest { - method: "test.method", - params: "", - json_based: true, - }; - match request.get_params() { - ParamWrapper::None => {} - _ => panic!("Expected NoParams"), + ThunderClient { + id: Uuid::new_v4(), + thunder_async_client: Some(client), + thunder_async_subscriptions: Some(Arc::new(RwLock::new(HashMap::new()))), + thunder_async_callbacks: Some(Arc::new(RwLock::new(HashMap::new()))), } } } diff --git a/device/thunder_ripple_sdk/src/client/thunder_client_pool.rs b/device/thunder_ripple_sdk/src/client/thunder_client_pool.rs deleted file mode 100644 index c688eab2b..000000000 --- a/device/thunder_ripple_sdk/src/client/thunder_client_pool.rs +++ /dev/null @@ -1,349 +0,0 @@ -// Copyright 2023 Comcast Cable Communications Management, LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// SPDX-License-Identifier: Apache-2.0 -// - -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, -}; - -use crate::{ - client::{device_operator::DeviceResponseMessage, thunder_client::ThunderClientBuilder}, - thunder_state::ThunderConnectionState, -}; -use ripple_sdk::{ - log::{debug, error}, - tokio::sync::{mpsc, oneshot}, - utils::channel_utils::oneshot_send_and_log, - uuid::Uuid, -}; -use ripple_sdk::{tokio, utils::error::RippleError}; -use url::Url; - -use super::{ - plugin_manager::PluginManagerCommand, - thunder_client::{ThunderClient, ThunderMessage}, -}; - -#[derive(Debug)] -pub struct ThunderClientPool { - clients: Vec, -} - -#[derive(Debug)] -struct PooledThunderClient { - in_use: Arc, - client: ThunderClient, -} - -#[derive(Debug)] -pub enum ThunderPoolCommand { - ThunderMessage(ThunderMessage), - ResetThunderClient(Uuid), -} - -impl ThunderClientPool { - pub async fn start( - url: Url, - plugin_manager_tx: Option>, - thunder_connection_state: Option>, - size: u32, - status_check: bool, - ) -> Result { - debug!("Starting a Thunder connection pool of size {}", size); - let (s, mut r) = mpsc::channel::(32); - let thunder_connection_state = thunder_connection_state.clone(); - let mut clients = Vec::default(); - for _ in 0..size { - let client = ThunderClientBuilder::start_thunder_client( - url.clone(), - plugin_manager_tx.clone(), - Some(s.clone()), - thunder_connection_state.clone(), - None, - false, - status_check, - ) - .await; - if let Ok(c) = client { - clients.push(PooledThunderClient { - in_use: Arc::new(AtomicBool::new(false)), - client: c, - }); - } - } - if clients.is_empty() { - return Err(RippleError::BootstrapError); - } - let sender_for_thread = s.clone(); - let pmtx_c = plugin_manager_tx.clone(); - tokio::spawn(async move { - let mut pool = ThunderClientPool { clients }; - while let Some(cmd) = r.recv().await { - match cmd { - ThunderPoolCommand::ThunderMessage(msg) => { - let c = match pool.get_client(&msg) { - Some(client) => client, - None => { - error!("Thunder pool had no clients!"); - return; - } - }; - - let (resp_tx, resp_rx) = oneshot::channel::(); - c.in_use.to_owned().store(true, Ordering::Relaxed); - let msg_with_intercept = msg.clone(resp_tx); - let in_use = c.in_use.clone(); - tokio::spawn(async move { - let resp = resp_rx.await; - in_use.store(false, Ordering::Relaxed); - // Intercept the response back from thunder here - // so that we can mark the client as not in use - match msg { - ThunderMessage::ThunderCallMessage(m) => { - if let Ok(r) = resp { - oneshot_send_and_log(m.callback, r, "ThunderReturn") - } - } - ThunderMessage::ThunderSubscribeMessage(m) => { - if let Ok(r) = resp { - if let Some(cb) = m.callback { - oneshot_send_and_log(cb, r, "ThunderReturn") - } - } - } - _ => {} - } - }); - c.client.send_message(msg_with_intercept).await; - } - ThunderPoolCommand::ResetThunderClient(client_id) => { - // Remove the given client and then start a new one to replace it - let mut itr = pool.clients.iter(); - let i = itr.position(|x| x.client.id == client_id); - if let Some(index) = i { - let client = ThunderClientBuilder::start_thunder_client( - url.clone(), - plugin_manager_tx.clone(), - Some(sender_for_thread.clone()), - thunder_connection_state.clone(), - pool.clients.get(index).map(|x| x.client.clone()), - false, - status_check, - ) - .await; - if let Ok(client) = client { - pool.clients.remove(index); - pool.clients.insert( - index, - PooledThunderClient { - in_use: Arc::new(AtomicBool::new(false)), - client, - }, - ); - } - } - } - } - } - }); - Ok(ThunderClient { - sender: None, - pooled_sender: Some(s), - id: Uuid::new_v4(), - plugin_manager_tx: pmtx_c, - subscriptions: None, - thunder_async_client: None, - thunder_async_subscriptions: None, - thunder_async_callbacks: None, - use_thunder_async: false, - }) - } - - fn get_client(&mut self, msg: &ThunderMessage) -> Option<&mut PooledThunderClient> { - // For subscribe and Un subscribe use the same client - match msg { - ThunderMessage::ThunderSubscribeMessage(_) - | ThunderMessage::ThunderUnsubscribeMessage(_) => self.clients.get_mut(0), - _ => { - // First use an unused client, if there are none just use - // any client and it will queue in the thread - let len = self.clients.len(); - let itr = self.clients.iter_mut(); - itr.enumerate() - .find(|(i, client)| *i == (len - 1) || !client.in_use.load(Ordering::Relaxed)) - .map(|x| x.1) - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::{ - client::{ - device_operator::{ - DeviceCallRequest, DeviceOperator, DeviceSubscribeRequest, DeviceUnsubscribeRequest, - }, - plugin_manager::{PluginActivatedResult, PluginManagerCommand}, - }, - tests::thunder_client_pool_test_utility::{ - CustomMethodHandler, MethodHandler, MockWebSocketServer, - }, - }; - use ripple_sdk::{ - tokio::time::{sleep, Duration}, - utils::channel_utils::oneshot_send_and_log, - }; - use url::Url; - - #[tokio::test] - async fn test_thunder_client_pool_start_and_reset() { - // Using the default method handler from tests::thunder_client_pool_test_utility - // This can be replaced with a custom method handler, if needed - let custom_method_handler = Arc::new(CustomMethodHandler); - let callsign = custom_method_handler.get_callsign(); - - // clone to pass to the server - let custom_method_handler_c = custom_method_handler.clone(); - - let server_task = tokio::spawn(async { - let mock_server = MockWebSocketServer::new("127.0.0.1:8080", custom_method_handler_c); - mock_server.start().await; - }); - - // Wait for server to start - sleep(Duration::from_secs(1)).await; - - let url = Url::parse("ws://127.0.0.1:8080/jsonrpc").unwrap(); - let (tx, mut rx) = mpsc::channel(32); - - // Spawn command thread - tokio::spawn(async move { - while let Some(command) = rx.recv().await { - match command { - PluginManagerCommand::StateChangeEvent(_ev) => {} - PluginManagerCommand::ActivatePluginIfNeeded { callsign: _, tx } => { - oneshot_send_and_log( - tx, - PluginActivatedResult::Ready, - "ActivatePluginIfNeededResponse", - ); - } - PluginManagerCommand::WaitForActivation { callsign: _, tx } => { - oneshot_send_and_log(tx, PluginActivatedResult::Ready, "WaitForActivation"); - } - PluginManagerCommand::ReactivatePluginState { tx } => { - oneshot_send_and_log( - tx, - PluginActivatedResult::Ready, - "ReactivatePluginState", - ); - } - PluginManagerCommand::WaitForActivationForDynamicPlugin { callsign: _, tx } => { - oneshot_send_and_log( - tx, - PluginActivatedResult::Ready, - "WaitForActivationForDynamic", - ); - } - } - } - }); - - // Test cases - // 1. create a client pool of size 4 - let client = ThunderClientPool::start( - url, - Some(tx), - Some(Arc::new(ThunderConnectionState::new())), - 4, - false, - ) - .await; - assert!(client.is_ok()); - let client = client.unwrap(); - - // 2. invoke client.call functions 20 times from multple spawned threads - for _ in 0..20 { - let client = client.clone(); - let callsign = callsign.clone(); - tokio::spawn(async move { - let resp = client - .call(DeviceCallRequest { - method: format!("{}.1.testMethod", callsign), - params: None, - }) - .await; - assert_eq!(resp.message, "testMethod Request Response".to_string()); - }); - } - - // 3. test subscribe. Call this 3 times from a loop - for _ in 0..3 { - let (sub_tx, _sub_rx) = mpsc::channel::(32); - let resp = client - .subscribe( - DeviceSubscribeRequest { - module: format!("{}.1", callsign), - event_name: "testEvent".into(), - params: None, - sub_id: None, - }, - sub_tx, - ) - .await; - match resp { - Ok(resp) => { - assert_eq!(resp.message, "Subscribed".to_string()); - } - Err(e) => { - panic!("Error subscribing: {:?}", e); - } - } - } - - // 4. Re-start server to test Thunder client reset - server_task.abort(); - sleep(Duration::from_secs(1)).await; - let custom_method_handler_c = custom_method_handler.clone(); - let server_task = tokio::spawn(async { - let mock_server = MockWebSocketServer::new("127.0.0.1:8080", custom_method_handler_c); - mock_server.start().await; - }); - // Wait for server to start - sleep(Duration::from_secs(1)).await; - // issue client call again - let resp = client - .call(DeviceCallRequest { - method: format!("{}.1.testMethod", callsign), - params: None, - }) - .await; - assert_eq!(resp.message, "testMethod Request Response".to_string()); - - // 5. test unsubscribe - client - .unsubscribe(DeviceUnsubscribeRequest { - module: format!("{}.1", callsign), - event_name: "testEvent".into(), - }) - .await; - sleep(Duration::from_secs(1)).await; - server_task.abort(); - } -} diff --git a/device/thunder_ripple_sdk/src/lib.rs b/device/thunder_ripple_sdk/src/lib.rs index 2d606f4a1..00908acd5 100644 --- a/device/thunder_ripple_sdk/src/lib.rs +++ b/device/thunder_ripple_sdk/src/lib.rs @@ -22,14 +22,11 @@ pub mod client { pub mod thunder_async_client; pub mod thunder_async_client_plugins_status_mgr; pub mod thunder_client; - pub mod thunder_client_pool; pub mod thunder_plugin; } pub mod bootstrap { pub mod boot_thunder; - pub mod get_config_step; - pub mod setup_thunder_pool_step; pub mod setup_thunder_processors; } diff --git a/device/thunder_ripple_sdk/src/processors/thunder_device_info.rs b/device/thunder_ripple_sdk/src/processors/thunder_device_info.rs index 7092f3692..565639211 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_device_info.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_device_info.rs @@ -804,6 +804,7 @@ impl ThunderDeviceInfoRequestProcessor { } async fn platform_build_info(state: CachedState, msg: ExtnMessage) -> bool { + println!("*** _DEBUG: platform_build_info: entry"); let resp = state .get_thunder_client() .call(DeviceCallRequest { @@ -811,6 +812,7 @@ impl ThunderDeviceInfoRequestProcessor { params: None, }) .await; + println!("*** _DEBUG: platform_build_info: resp= {:?}", resp); if let Ok(tsv) = serde_json::from_value::(resp.message) { let release_regex = Regex::new(r"([^_]*)_(.*)_(VBN|PROD[^_]*)_(.*)").unwrap(); let non_release_regex = @@ -942,6 +944,7 @@ impl ExtnRequestProcessor for ThunderDeviceInfoRequestProcessor { msg: ExtnMessage, extracted_message: Self::VALUE, ) -> bool { + println!("*** _DEBUG: process_request: {:?}", extracted_message); match extracted_message { DeviceInfoRequest::Model => Self::model(state.clone(), msg).await, DeviceInfoRequest::Audio => Self::audio(state.clone(), msg).await, @@ -985,8 +988,6 @@ fn round_to_nearest_quarter_hour(offset_seconds: i64) -> i64 { #[cfg(test)] pub mod tests { - use std::sync::Arc; - use ripple_sdk::{ api::device::{ device_info_request::{DeviceInfoRequest, PlatformBuildInfo}, @@ -1002,28 +1003,115 @@ pub mod tests { utils::channel_utils::oneshot_send_and_log, }; use serde::{Deserialize, Serialize}; + use std::sync::Arc; + use tokio::sync::oneshot; + use crate::client::thunder_async_client::ThunderAsyncResponse; use crate::{ client::{ - device_operator::DeviceResponseMessage, thunder_client::ThunderCallMessage, + device_operator::{DeviceCallRequest, DeviceResponseMessage}, + //thunder_client::ThunderCallMessage, thunder_plugin::ThunderPlugin, }, processors::thunder_device_info::ThunderDeviceInfoRequestProcessor, tests::mock_thunder_controller::{CustomHandler, MockThunderController, ThunderHandlerFn}, }; - + use ripple_sdk::api::gateway::rpc_gateway_api::JsonRpcApiResponse; + + // + // macro_rules! run_platform_info_test { + // ($build_name:expr) => { + // test_platform_build_info_with_build_name($build_name, Arc::new(|_msg: DeviceCallRequest, device_response_message_tx: Sender| { + + // oneshot_send_and_log( + // device_response_message_tx, + // DeviceResponseMessage::call(json!({"success" : true, "stbVersion": $build_name, "receiverVersion": $build_name, "stbTimestamp": "".to_owned() })), + // "", + // ); + // })).await; + // }; + // } + // macro_rules! run_platform_info_test { + // ($build_name:expr) => { + // test_platform_build_info_with_build_name($build_name, Arc::new(|_msg: DeviceCallRequest, device_response_message_tx: oneshot::Sender| { + + // println!("*** _DEBUG: run_platform_info_test: Custom handler: build_name={}", $build_name); + // oneshot_send_and_log( + // device_response_message_tx, + // DeviceResponseMessage::call(json!({"success" : true, "stbVersion": $build_name, "receiverVersion": $build_name, "stbTimestamp": "".to_owned() })), + // "", + // ); + // })).await; + // }; + // } + + // macro_rules! run_platform_info_test { + // ($build_name:expr) => { + // test_platform_build_info_with_build_name($build_name, Arc::new(|_msg: DeviceCallRequest, async_resp_message_tx: oneshot::Sender| { + + // println!("*** _DEBUG: run_platform_info_test: Custom handler: build_name={}", $build_name); + // let thunderasyncresp = ThunderAsyncResponse { + // id: None, + // result: Ok(JsonRpcApiResponse { + // jsonrpc: "2.0".to_owned(), + // id: None, + // result: Some(json!({"success" : true, "stbVersion": $build_name, "receiverVersion": $build_name, "stbTimestamp": "".to_owned() })), + // error: None, + // method: None, + // params: None + // }), + // }; + + // oneshot_send_and_log( + // async_resp_message_tx, + // thunderasyncresp, + // "", + // ); + // })).await; + // }; + // } + + #[macro_export] macro_rules! run_platform_info_test { ($build_name:expr) => { - test_platform_build_info_with_build_name($build_name, Arc::new(|msg: ThunderCallMessage| { - oneshot_send_and_log( - msg.callback, - DeviceResponseMessage::call(json!({"success" : true, "stbVersion": $build_name, "receiverVersion": $build_name, "stbTimestamp": "".to_owned() })), - "", - ); - })).await; + test_platform_build_info_with_build_name( + $build_name, + Arc::new( + move | _msg: DeviceCallRequest, async_resp_message_tx: oneshot::Sender | { + println!( + "*** _DEBUG: run_platform_info_test: Custom handler: build_name={}", + $build_name + ); + let thunderasyncresp = ThunderAsyncResponse { + id: None, + result: Ok(JsonRpcApiResponse { + jsonrpc: "2.0".to_owned(), + id: None, + result: Some(json!({ + "success": true, + "stbVersion": $build_name, + "receiverVersion": $build_name, + "stbTimestamp": "".to_owned() + })), + error: None, + method: None, + params: None, + }), + }; + + oneshot_send_and_log( + async_resp_message_tx, + thunderasyncresp, + "", + ); + } + ) + ).await; }; } + // + #[derive(Debug, Serialize, Deserialize)] struct BuildInfoTest { build_name: String, @@ -1055,16 +1143,64 @@ pub mod tests { run_platform_info_test!("SCXI11BEI_someVBNbuild"); } + // + // async fn test_platform_build_info_with_build_name( + // _build_name: &'static str, + // handler: Arc, + // ) { + // let mut ch = CustomHandler::default(); + // ch.custom_request_handler.insert( + // ThunderPlugin::System.unversioned_method("getSystemVersions"), + // handler, + // ); + + // let state = MockThunderController::state_with_mock(Some(ch)); + + // let msg = MockExtnClient::req( + // RippleContract::DeviceInfo, + // ExtnRequest::Device(DeviceRequest::DeviceInfo( + // DeviceInfoRequest::PlatformBuildInfo, + // )), + // ); + + // ThunderDeviceInfoRequestProcessor::process_request( + // state, + // msg, + // DeviceInfoRequest::PlatformBuildInfo, + // ) + // .await; + + // println!("*** _DEBUG: test_platform_build_info_with_build_name: Mark 1"); + + // // let msg: ExtnMessage = r.recv().await.unwrap().try_into().unwrap(); + // // let resp_opt = msg.payload.extract::(); + // // if let Some(DeviceResponse::PlatformBuildInfo(info)) = resp_opt { + // // let exp = tests.iter().find(|x| x.build_name == build_name).unwrap(); + // // assert_eq!(info, exp.info); + // // } else { + // // panic!("Did not get the expected PlatformBuildInfo from extension call"); + // // } + // } async fn test_platform_build_info_with_build_name( _build_name: &'static str, handler: Arc, ) { + println!( + "*** _DEBUG: test_platform_build_info_with_build_name: {}", + _build_name + ); + let mut ch = CustomHandler::default(); ch.custom_request_handler.insert( ThunderPlugin::System.unversioned_method("getSystemVersions"), handler, ); - let state = MockThunderController::state_with_mock(Some(ch)); + + // let (state, mut device_response_message_rx) = + // MockThunderController::state_with_mock(Some(ch)); + + let state = MockThunderController::state_with_mock(Some(ch)).await; + let msg = MockExtnClient::req( RippleContract::DeviceInfo, ExtnRequest::Device(DeviceRequest::DeviceInfo( @@ -1072,12 +1208,28 @@ pub mod tests { )), ); + println!( + "*** _DEBUG: test_platform_build_info_with_build_name: Calling ThunderDeviceInfoRequestProcessor::process_request" + ); + + // tokio::spawn(async move { + // while let Some(r) = device_response_message_rx.recv().await { + // println!( + // "*** _DEBUG: test_platform_build_info_with_build_name: Received DeviceResponseMessage: {:?}", + // r + // ); + // } + // }); + ThunderDeviceInfoRequestProcessor::process_request( state, msg, DeviceInfoRequest::PlatformBuildInfo, ) .await; + + println!("*** _DEBUG: test_platform_build_info_with_build_name: Mark 1"); + // let msg: ExtnMessage = r.recv().await.unwrap().try_into().unwrap(); // let resp_opt = msg.payload.extract::(); // if let Some(DeviceResponse::PlatformBuildInfo(info)) = resp_opt { @@ -1087,6 +1239,7 @@ pub mod tests { // panic!("Did not get the expected PlatformBuildInfo from extension call"); // } } + // macro_rules! check_offset { ($mock_response:expr, $timezone:expr, $expected_offset:expr, $expected_rounded_offset:expr) => {{ diff --git a/device/thunder_ripple_sdk/src/tests/mock_thunder_controller.rs b/device/thunder_ripple_sdk/src/tests/mock_thunder_controller.rs index d2cd63578..936d01034 100644 --- a/device/thunder_ripple_sdk/src/tests/mock_thunder_controller.rs +++ b/device/thunder_ripple_sdk/src/tests/mock_thunder_controller.rs @@ -1,33 +1,45 @@ use std::{collections::HashMap, future::Future, pin::Pin, str::FromStr, sync::Arc}; use ripple_sdk::{ - extn::mock_extension_client::MockExtnClient, + extn::{client::extn_client::ExtnClient, mock_extension_client::MockExtnClient}, serde_json, tokio::{ self, - sync::mpsc::{self, Sender}, + sync::{ + mpsc::{self, Sender}, + oneshot, + }, }, - utils::channel_utils::{mpsc_send_and_log, oneshot_send_and_log}, - uuid::Uuid, + utils::channel_utils::mpsc_send_and_log, + Mockable, }; use serde_json::Value; use crate::{ client::{ - device_operator::DeviceResponseMessage, + device_operator::{ + DeviceCallRequest, DeviceChannelRequest, DeviceResponseMessage, + DeviceUnsubscribeRequest, + }, jsonrpc_method_locator::JsonRpcMethodLocator, plugin_manager::{PluginState, PluginStateChangeEvent, PluginStatus}, - thunder_client::{ - ThunderCallMessage, ThunderClient, ThunderMessage, ThunderSubscribeMessage, - ThunderUnsubscribeMessage, - }, + thunder_async_client::ThunderAsyncResponse, + thunder_client::ThunderClient, thunder_plugin::ThunderPlugin, }, processors::thunder_device_info::CachedState, thunder_state::ThunderState, }; +use ripple_sdk::api::gateway::rpc_gateway_api::JsonRpcApiResponse; +// +//pub type ThunderHandlerFn = dyn Fn(DeviceCallRequest) + Send + Sync; +// pub type ThunderHandlerFn = +// dyn Fn(DeviceCallRequest, oneshot::Sender) + Send + Sync; + +pub type ThunderHandlerFn = + dyn Fn(DeviceCallRequest, oneshot::Sender) + Send + Sync; -pub type ThunderHandlerFn = dyn Fn(ThunderCallMessage) + Send + Sync; +// pub type ThunderSubscriberFn = dyn Fn( Sender, ) -> Pin> + Send>> @@ -72,10 +84,17 @@ pub struct MockThunderController { custom_handlers: CustomHandler, } -const EMPTY_RESPONSE: DeviceResponseMessage = DeviceResponseMessage { - message: Value::Null, - sub_id: None, -}; +// const EMPTY_RESPONSE: DeviceResponseMessage = DeviceResponseMessage { +// message: Value::Null, +// sub_id: None, +// }; + +fn empty_response() -> ThunderAsyncResponse { + ThunderAsyncResponse { + id: None, + result: Ok(JsonRpcApiResponse::default()), + } +} impl MockThunderController { pub async fn activate(&mut self, callsign: String) { @@ -104,28 +123,106 @@ impl MockThunderController { self.state_subscribers.push(callback); } - pub async fn handle_thunder_call(&mut self, msg: ThunderCallMessage) { + // + // pub async fn handle_thunder_call(&mut self, msg: DeviceCallRequest,) { + // let locator = JsonRpcMethodLocator::from_str(&msg.method).unwrap(); + // let module = locator.module.unwrap(); + + // let (tx, _rx) = oneshot::channel::(); + + // if module == ThunderPlugin::Controller.callsign() { + // if locator.method_name == "activate" { + // let ps = msg.params.unwrap().as_params(); + // let psv: Value = serde_json::from_str(ps.as_str()).expect("Message should be JSON"); + // let cs = psv.get("callsign").unwrap(); + // self.activate(String::from(cs.as_str().unwrap())).await; + // oneshot_send_and_log(tx, EMPTY_RESPONSE, "ActivateAck"); + // } else if msg.method.starts_with("status") { + // let status = self.status(locator.qualifier.unwrap()).await; + // let val = serde_json::to_value(status).unwrap_or_default(); + // let m = DeviceResponseMessage::call(val); + // oneshot_send_and_log(tx, m, "StatusReturn"); + // } + // } else if let Some(handler) = self + // .custom_handlers + // .custom_request_handler + // .get(&format!("{}.{}", module, locator.method_name)) + // { + // (handler)(msg.clone()); + // } else { + // println!( + // "No mock thunder response found for {}.{}", + // module, locator.method_name + // ); + // return; + // } + // } + pub async fn handle_thunder_call( + &mut self, + msg: DeviceCallRequest, + //device_response_message_tx: mpsc::Sender, + device_response_message_tx: mpsc::Sender, + ) { + println!( + "*** _DEBUG: handle_thunder_call: DeviceChannelRequest::Call req received : {:?}", + msg + ); let locator = JsonRpcMethodLocator::from_str(&msg.method).unwrap(); let module = locator.module.unwrap(); + if module == ThunderPlugin::Controller.callsign() { if locator.method_name == "activate" { + println!("*** _DEBUG: in activate block"); let ps = msg.params.unwrap().as_params(); let psv: Value = serde_json::from_str(ps.as_str()).expect("Message should be JSON"); let cs = psv.get("callsign").unwrap(); + self.activate(String::from(cs.as_str().unwrap())).await; - oneshot_send_and_log(msg.callback, EMPTY_RESPONSE, "ActivateAck"); - } else if msg.method_name().starts_with("status") { + mpsc_send_and_log(&device_response_message_tx, empty_response(), "ActivateAck") + .await; + } else if msg.method.starts_with("status") { + println!("*** _DEBUG: in status block"); let status = self.status(locator.qualifier.unwrap()).await; let val = serde_json::to_value(status).unwrap_or_default(); - let m = DeviceResponseMessage::call(val); - oneshot_send_and_log(msg.callback, m, "StatusReturn"); + //let m = DeviceResponseMessage::call(val); + let thunderasyncresp = ThunderAsyncResponse { + id: None, + result: Ok(JsonRpcApiResponse { + jsonrpc: "2.0".to_owned(), + id: None, + result: Some(val), + error: None, + method: None, + params: None, + }), + }; + mpsc_send_and_log( + &device_response_message_tx, + thunderasyncresp, + "StatusReturn", + ) + .await; } } else if let Some(handler) = self .custom_handlers .custom_request_handler .get(&format!("{}.{}", module, locator.method_name)) { - (handler)(msg); + println!("*** _DEBUG: MockThunderController: handle_thunder_call: calling custom handler for {}.{}", module, locator.method_name); + // let (handler_response_tx, handler_response_rx) = + // oneshot::channel::(); + // (handler)(msg.clone(), handler_response_tx); + + let (handler_response_tx, handler_response_rx) = + oneshot::channel::(); + (handler)(msg.clone(), handler_response_tx); + + if let Ok(response) = handler_response_rx.await { + println!("*** _DEBUG: MockThunderController: handle_thunder_call: received response from custom handler for {}.{}", module, locator.method_name); + mpsc_send_and_log(&device_response_message_tx, response, "CustomResponse").await; + } else { + println!("*** _DEBUG: MockThunderController: handle_thunder_call: no response received from custom handler for {}.{}", module, locator.method_name); + } } else { println!( "No mock thunder response found for {}.{}", @@ -134,61 +231,199 @@ impl MockThunderController { return; } } + // - pub async fn handle_thunder_unsub(&mut self, _msg: ThunderUnsubscribeMessage) {} - pub async fn handle_thunder_sub(&mut self, msg: ThunderSubscribeMessage) { - if msg.module == "Controller.1" && msg.event_name == "statechange" { - self.on_state_change(msg.handler).await; - } else if let Some(handler) = self - .custom_handlers - .custom_subscription_handler - .get(&format!("{}.{}", msg.module, msg.event_name)) - { - let response = handler.call(msg.handler.clone()).await; - if response.is_some() { - mpsc_send_and_log(&msg.handler, response.unwrap(), "OnStatusChange").await; - } - } else { - println!( - "No mock subscription found for {}.{}", - msg.module, &msg.event_name - ); - } - if let Some(cb) = msg.callback { - oneshot_send_and_log(cb, EMPTY_RESPONSE, "SubscribeAck"); - } - } + pub async fn handle_thunder_unsub(&mut self, _msg: DeviceUnsubscribeRequest) {} - pub fn start() -> mpsc::Sender { - MockThunderController::start_with_custom_handlers(None) - } - pub fn start_with_custom_handlers( - custom_handlers: Option, - ) -> mpsc::Sender { - let (client_tx, mut client_rx) = mpsc::channel(32); - tokio::spawn(async move { - let mut mock_controller = MockThunderController::default(); - if let Some(ch) = custom_handlers { - mock_controller.custom_handlers = ch; - } - while let Some(tm) = client_rx.recv().await { - match tm { - ThunderMessage::ThunderCallMessage(msg) => { - mock_controller.handle_thunder_call(msg).await; - } - ThunderMessage::ThunderSubscribeMessage(msg) => { - mock_controller.handle_thunder_sub(msg).await; - } - crate::client::thunder_client::ThunderMessage::ThunderUnsubscribeMessage( - msg, - ) => { - mock_controller.handle_thunder_unsub(msg).await; - } - } - } - }); - client_tx - } + // + // pub async fn handle_thunder_sub( + // &mut self, + // msg: DeviceSubscribeRequest, + // handler: mpsc::Sender, + // ) { + // let (tx, _rx) = oneshot::channel::(); + // if msg.module == "Controller.1" && msg.event_name == "statechange" { + // self.on_state_change(handler).await; + // } else if let Some(handler_fn) = self + // .custom_handlers + // .custom_subscription_handler + // .get(&format!("{}.{}", msg.module, msg.event_name)) + // { + // let response = handler_fn.call(handler.clone()).await; + // if let Some(resp) = response { + // mpsc_send_and_log(&handler, resp, "OnStatusChange").await; + // } + // } else { + // println!( + // "No mock subscription found for {}.{}", + // msg.module, &msg.event_name + // ); + // } + // oneshot_send_and_log(tx, EMPTY_RESPONSE, "SubscribeAck"); + // } + //-------------------------------------------------------------------------------------------------- + // pub async fn handle_thunder_sub( + // &mut self, + // msg: DeviceSubscribeRequest, + // handler: mpsc::Sender, + // ) { + // if msg.module == "Controller.1" && msg.event_name == "statechange" { + // self.on_state_change(handler.clone()).await; + // } else if let Some(handler_fn) = self + // .custom_handlers + // .custom_subscription_handler + // .get(&format!("{}.{}", msg.module, msg.event_name)) + // { + // let response = handler_fn.call(handler.clone()).await; + // if let Some(resp) = response { + // mpsc_send_and_log(&handler, resp, "OnStatusChange").await; + // } + // } else { + // println!( + // "No mock subscription found for {}.{}", + // msg.module, &msg.event_name + // ); + // mpsc_send_and_log(&handler, empty_response(), "SubscribeAck").await; + // } + // mpsc_send_and_log(&handler, EMPTY_RESPONSE, "SubscribeAck").await; + // } + //-------------------------------------------------------------------------------------------------- + // + + // + //pub fn start() -> mpsc::Sender { + // pub fn start() -> ( + // mpsc::Sender, + // mpsc::Receiver, + // ) { + // // + // //MockThunderController::start_with_custom_handlers(None) + // } + + // + // pub fn start_with_custom_handlers( + // custom_handlers: Option, + // ) -> mpsc::Sender { + // let (client_tx, mut client_rx) = mpsc::channel(32); + // let (resp_tx, _resp_rx): ( + // mpsc::Sender, + // mpsc::Receiver, + // ) = mpsc::channel(32); + // tokio::spawn(async move { + // let mut mock_controller = MockThunderController::default(); + // if let Some(ch) = custom_handlers { + // mock_controller.custom_handlers = ch; + // } + // while let Some(tm) = client_rx.recv().await { + // match tm { + // DeviceChannelRequest::Call(msg) => { + // mock_controller.handle_thunder_call(msg).await; + // } + // DeviceChannelRequest::Subscribe(msg) => { + // mock_controller + // .handle_thunder_sub(msg, resp_tx.clone()) + // .await; + // } + // DeviceChannelRequest::Unsubscribe(msg) => { + // mock_controller.handle_thunder_unsub(msg).await; + // } + // } + // } + // }); + // client_tx + // } + + // pub fn start_with_custom_handlers( + // custom_handlers: Option, + // ) -> ( + // mpsc::Sender, + // mpsc::Receiver, + // ) { + // println!("*** _DEBUG: start_with_custom_handlers: invoked"); + // let (device_channel_request_tx, mut device_channel_request_rx) = mpsc::channel(32); + // //let (device_response_message_tx, mut device_response_message_rx) = mpsc::channel(32); + + // let (device_response_message_tx, mut device_response_message_rx) = + // mpsc::channel::(32); + + // tokio::spawn(async move { + // let mut mock_controller = MockThunderController::default(); + // if let Some(ch) = custom_handlers { + // mock_controller.custom_handlers = ch; + // } + // while let Some(device_channel_request) = device_channel_request_rx.recv().await { + // println!("*** _DEBUG: MockThunderController: start_with_custom_handlers: received request: {:?}", device_channel_request); + + // match device_channel_request { + // DeviceChannelRequest::Call(msg) => { + // println!("*** _DEBUG: start_with_custom_handlers: DeviceChannelRequest::Call req received : {:?}", msg); + // mock_controller + // .handle_thunder_call(msg, device_response_message_tx.clone()) + // .await; + // } + // DeviceChannelRequest::Subscribe(msg) => { + // println!("*** _DEBUG: start_with_custom_handlers: DeviceChannelRequest::Subscribe req received : {:?}", msg); + // // mock_controller + // // .handle_thunder_sub(msg, device_response_message_tx.clone()) + // // .await; + // } + // DeviceChannelRequest::Unsubscribe(msg) => { + // //dmock_controller.handle_thunder_unsub(msg).await; + // } + // } + // } + // }); + + // (device_channel_request_tx, device_response_message_rx) + // } + + // + // pub fn start_with_custom_handlers( + // custom_handlers: Option, + // ) -> ( + // mpsc::Sender, + // mpsc::Receiver, + // ) { + // println!("*** _DEBUG: start_with_custom_handlers: invoked"); + // let (device_channel_request_tx, mut device_channel_request_rx) = mpsc::channel(32); + // //let (device_response_message_tx, mut device_response_message_rx) = mpsc::channel(32); + + // let (device_response_message_tx, mut device_response_message_rx) = + // mpsc::channel::(32); + + // tokio::spawn(async move { + // let mut mock_controller = MockThunderController::default(); + // if let Some(ch) = custom_handlers { + // mock_controller.custom_handlers = ch; + // } + // while let Some(device_channel_request) = device_channel_request_rx.recv().await { + // println!("*** _DEBUG: MockThunderController: start_with_custom_handlers: received request: {:?}", device_channel_request); + + // match device_channel_request { + // DeviceChannelRequest::Call(msg) => { + // println!("*** _DEBUG: start_with_custom_handlers: DeviceChannelRequest::Call req received : {:?}", msg); + // mock_controller + // .handle_thunder_call(msg, device_response_message_tx.clone()) + // .await; + // } + // DeviceChannelRequest::Subscribe(msg) => { + // println!("*** _DEBUG: start_with_custom_handlers: DeviceChannelRequest::Subscribe req received : {:?}", msg); + // // mock_controller + // // .handle_thunder_sub(msg, device_response_message_tx.clone()) + // // .await; + // } + // DeviceChannelRequest::Unsubscribe(msg) => { + // //dmock_controller.handle_thunder_unsub(msg).await; + // } + // } + // } + // }); + + // (device_channel_request_tx, device_response_message_rx) + // } + // + + // /** * Creates state object that points to a mock thunder controller. @@ -196,38 +431,34 @@ impl MockThunderController { * Returns the state and a receiver which can be used to listen to responses that * come back from the extension */ - pub fn state_with_mock(custom_thunder: Option) -> CachedState { - let s_thunder = MockThunderController::start_with_custom_handlers(custom_thunder); - let thunder_client = ThunderClient { - sender: Some(s_thunder), - pooled_sender: None, - id: Uuid::new_v4(), - plugin_manager_tx: None, - subscriptions: None, - thunder_async_client: None, - thunder_async_subscriptions: None, - thunder_async_callbacks: None, - use_thunder_async: false, - }; + // + // pub fn state_with_mock(custom_thunder: Option) -> CachedState { + // let _s_thunder = MockThunderController::start_with_custom_handlers(custom_thunder); + // let thunder_client = ThunderClient::mock(); + // let extn_client = MockExtnClient::client(); + // let thunder_state = ThunderState::new(extn_client, thunder_client); + // CachedState::new(thunder_state) + // } - let extn_client = MockExtnClient::client(); - let thunder_state = ThunderState::new(extn_client, thunder_client); - CachedState::new(thunder_state) - } + // + // pub fn state_with_mock(custom_thunder: Option) -> CachedState { + // println!("*** _DEBUG: state_with_mock: invoked"); + // let (device_channel_request_tx, device_response_message_rx) = + // MockThunderController::start_with_custom_handlers(custom_thunder); - pub fn get_thunder_state_mock_with_handler(handler: Option) -> ThunderState { - let s_thunder = MockThunderController::start_with_custom_handlers(handler); - let thunder_client = ThunderClient { - sender: Some(s_thunder), - pooled_sender: None, - id: Uuid::new_v4(), - plugin_manager_tx: None, - subscriptions: None, - thunder_async_client: None, - thunder_async_subscriptions: None, - thunder_async_callbacks: None, - use_thunder_async: false, - }; + // let thunder_client = + // ThunderClient::start_mock(device_channel_request_tx, device_response_message_rx); + // let extn_client = ExtnClient::new_main(); + // let thunder_state = ThunderState::new(extn_client, thunder_client); + // //(CachedState::new(thunder_state), device_response_message_rx) + // CachedState::new(thunder_state) + // } + // + + // + + pub fn get_thunder_state_mock_with_handler(_handler: Option) -> ThunderState { + let thunder_client = ThunderClient::mock(); let extn_client = MockExtnClient::client(); ThunderState::new(extn_client, thunder_client) @@ -242,4 +473,21 @@ impl MockThunderController { pub fn get_thunder_state_mock() -> ThunderState { Self::get_thunder_state_mock_with_handler(None) } + + //new changes: + pub async fn state_with_mock(_custom_thunder: Option) -> CachedState { + println!("*** _DEBUG: state_with_mock: invoked"); + // let (device_channel_request_tx, device_response_message_rx) = + // MockThunderController::start_with_custom_handlers(custom_thunder); + + // let thunder_client = + // ThunderClient::start_mock(device_channel_request_tx, device_response_message_rx); + + let thunder_client = ThunderClient::thunderclient_mock().await; + + let extn_client = ExtnClient::new_main(); + let thunder_state = ThunderState::new(extn_client, thunder_client); + //(CachedState::new(thunder_state), device_response_message_rx) + CachedState::new(thunder_state) + } } diff --git a/device/thunder_ripple_sdk/src/thunder_state.rs b/device/thunder_ripple_sdk/src/thunder_state.rs index 23bc57912..81775f731 100644 --- a/device/thunder_ripple_sdk/src/thunder_state.rs +++ b/device/thunder_ripple_sdk/src/thunder_state.rs @@ -32,7 +32,7 @@ use url::Url; use crate::{ client::{ - device_operator::{DeviceOperator, DeviceResponseMessage, DeviceUnsubscribeRequest}, + device_operator::{DeviceOperator, DeviceResponseMessage}, plugin_manager::ThunderPluginBootParam, thunder_client::ThunderClient, }, @@ -119,12 +119,9 @@ impl ThunderState { if self .event_processor .handle_listener(listen, app_id.clone(), handler.clone()) + && listen { - if listen { - self.subscribe(handler).await - } else { - self.unsubscribe(handler).await - } + self.subscribe(handler).await } } @@ -134,15 +131,6 @@ impl ThunderState { let _ = client.subscribe(handler.request, sender).await; } - async fn unsubscribe(&self, handler: ThunderEventHandler) { - let client = self.get_thunder_client(); - let request = DeviceUnsubscribeRequest { - module: handler.request.module, - event_name: handler.request.event_name, - }; - client.unsubscribe(request).await; - } - pub fn start_event_thread(&self) { let mut rx = self.receiver.write().unwrap(); let rx = rx.take();