diff --git a/src/cgw_connection_server.rs b/src/cgw_connection_server.rs index 47f3ada..371e803 100644 --- a/src/cgw_connection_server.rs +++ b/src/cgw_connection_server.rs @@ -1785,15 +1785,56 @@ impl CGWConnectionServer { } } } else { + // On first device connection, details won’t exist in the cache. + // Each shard’s local cache only contains its own devices details, + // so we fetch the device details from the DB to determine + // whether it belongs to another shard (foreign) or is unassigned. + if let Some(group_id) = self.cgw_remote_discovery.get_device_group_id(&device_mac).await { + debug!("Device {} belongs to group {}", device_mac, group_id); + if let Some(group_owner_id) = self.cgw_remote_discovery.get_infra_group_owner_id(group_id).await { + if group_owner_id != self.local_cgw_id { + if let Ok(resp) = cgw_construct_foreign_infra_connection_msg( + group_id, + device_mac, + self.local_cgw_id, + group_owner_id, + ) { + self.enqueue_mbox_message_from_cgw_to_nb_api(group_id, resp); + } else { + error!("Failed to construct foreign_infra_connection message!"); + } + + } + } + device_group_id=group_id; + } + let remains_in_db = device_group_id != 0; + if device_group_id == 0 { + if let Ok(resp) = cgw_construct_unassigned_infra_connection_msg( + device_mac, + self.local_cgw_id, + ) { + self.enqueue_mbox_message_from_cgw_to_nb_api(0, resp); + } else { + error!("Failed to construct unassigned_infra_connection message!"); + } + } let device: CGWDevice = CGWDevice::new( device_type, CGWDeviceState::CGWDeviceConnected, - 0, - false, + device_group_id, + remains_in_db, caps, ); devices_cache.add_device(&device_mac, &device); + if let Err(e) = self + .cgw_remote_discovery + .del_device_from_all_shards_redis_cache(&device_mac) + .await + { + error!("{e}"); + } match serde_json::to_string(&device) { Ok(device_json) => { if let Err(e) = self @@ -1809,19 +1850,6 @@ impl CGWConnectionServer { } } - if let Ok(resp) = cgw_construct_unassigned_infra_connection_msg( - device_mac, - self.local_cgw_id, - ) { - self.enqueue_mbox_message_from_cgw_to_nb_api(0, resp); - } else { - error!("Failed to construct unassigned_infra_connection message!"); - } - - debug!( - "Detected unassigned infra {} connection", - device_mac.to_hex_string() - ); } if self.feature_topomap_enabled { diff --git a/src/cgw_db_accessor.rs b/src/cgw_db_accessor.rs index 7762246..b4b6984 100644 --- a/src/cgw_db_accessor.rs +++ b/src/cgw_db_accessor.rs @@ -309,6 +309,34 @@ impl CGWDBAccessor { } } + pub async fn get_infra_group_id_by_mac(&self, mac: MacAddress) -> Option { + let query = match self + .cl + .prepare("SELECT infra_group_id FROM infras WHERE mac = $1") + .await + { + Ok(q) => q, + Err(e) => { + error!("Failed to prepare query for get_infra_group_id_by_mac! Error: {e}"); + return None; + } + }; + + let result = self.cl.query_opt(&query, &[&mac]).await; + + match result { + Ok(Some(row)) => Some(row.get::<_, i32>("infra_group_id")), + Ok(None) => { + debug!("No infra found in DB for mac: {}", mac.to_hex_string()); + None + } + Err(e) => { + error!("Query failed for get_infra_group_id_by_mac! Error: {e}"); + None + } + } + } + pub async fn get_all_infras(&self) -> Option> { let mut list: Vec = Vec::new(); diff --git a/src/cgw_remote_discovery.rs b/src/cgw_remote_discovery.rs index 000febf..cf811c7 100644 --- a/src/cgw_remote_discovery.rs +++ b/src/cgw_remote_discovery.rs @@ -1348,6 +1348,56 @@ impl CGWRemoteDiscovery { Ok(infras_assigned) } + pub async fn del_device_from_all_shards_redis_cache( + &self, + device_mac: &MacAddress, + ) -> Result<()> { + let mut con = self.redis_infra_cache_client.clone(); + + let pattern = format!("shard_id_*|{}", device_mac); + let keys: Vec = match redis::cmd("KEYS").arg(&pattern).query_async(&mut con).await + { + Ok(keys) => keys, + Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } + warn!( + "Failed to query Redis for device {} keys! Error: {e}", + device_mac.to_hex_string() + ); + return Err(Error::RemoteDiscovery( + "Failed to update Redis devices cache", + )); + } + }; + + if !keys.is_empty() { + let res: RedisResult<()> = redis::cmd("DEL").arg(&keys).query_async(&mut con).await; + + match res { + Ok(_) => debug!( + "Removed device {} from Redis cache (all shards)", + device_mac + ), + Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } + warn!( + "Failed to remove device {} from Redis cache! Error: {e}", + device_mac.to_hex_string() + ); + return Err(Error::RemoteDiscovery( + "Failed to update Redis devices cache", + )); + } + }; + } + + Ok(()) + } + pub async fn add_device_to_redis_cache( &self, device_mac: &MacAddress, @@ -1403,6 +1453,63 @@ impl CGWRemoteDiscovery { Ok(()) } + pub async fn get_device_group_id_from_redis(&self, device_mac: &MacAddress) -> Option { + let mut con = self.redis_infra_cache_client.clone(); + + let key = format!("shard_id_*|{}", device_mac); + let redis_keys: Vec = match redis::cmd("KEYS").arg(&key).query_async(&mut con).await + { + Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } + error!( + "Failed to get device {} from Redis cache! Error: {}", + device_mac.to_hex_string(), + e + ); + return None; + } + Ok(keys) => keys, + }; + + if let Some(first_key) = redis_keys.first() { + let device_str: String = + match redis::cmd("GET").arg(first_key).query_async(&mut con).await { + Ok(dev) => dev, + Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } + error!( + "Failed to get device cache entry {}! Error: {}", + first_key, e + ); + return None; + } + }; + + match serde_json::from_str::(&device_str) { + Ok(dev) => Some(dev.get_device_group_id()), + Err(e) => { + error!("Failed to deserialize device from Redis cache! Error: {e}"); + None + } + } + } else { + None + } + } + + + pub async fn get_device_group_id(&self, mac: &MacAddress) -> Option { + if let Some(gid) = self.get_device_group_id_from_redis(mac).await { + info!("Found group_id {} from Redis for device {}", gid, mac.to_hex_string()); + return Some(gid); + } + + self.db_accessor.get_infra_group_id_by_mac(*mac).await + } pub async fn sync_devices_cache_with_redis( &self, cache: Arc>,