Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 43 additions & 15 deletions src/cgw_connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1785,15 +1785,56 @@ impl CGWConnectionServer {
}
}
} else {
// On first device connection, details won’t exist in the cache.
// Each shard’s local cache only contains its own devices details,
// so we fetch the device details from the DB to determine
// whether it belongs to another shard (foreign) or is unassigned.
if let Some(group_id) = self.cgw_remote_discovery.get_device_group_id(&device_mac).await {
debug!("Device {} belongs to group {}", device_mac, group_id);
if let Some(group_owner_id) = self.cgw_remote_discovery.get_infra_group_owner_id(group_id).await {
if group_owner_id != self.local_cgw_id {
if let Ok(resp) = cgw_construct_foreign_infra_connection_msg(
group_id,
device_mac,
self.local_cgw_id,
group_owner_id,
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(group_id, resp);
} else {
error!("Failed to construct foreign_infra_connection message!");
}

}
}
device_group_id=group_id;
}
let remains_in_db = device_group_id != 0;
if device_group_id == 0 {
if let Ok(resp) = cgw_construct_unassigned_infra_connection_msg(
device_mac,
self.local_cgw_id,
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(0, resp);
} else {
error!("Failed to construct unassigned_infra_connection message!");
}
}
let device: CGWDevice = CGWDevice::new(
device_type,
CGWDeviceState::CGWDeviceConnected,
0,
false,
device_group_id,
remains_in_db,
caps,
);
devices_cache.add_device(&device_mac, &device);

if let Err(e) = self
.cgw_remote_discovery
.del_device_from_all_shards_redis_cache(&device_mac)
.await
{
error!("{e}");
}
match serde_json::to_string(&device) {
Ok(device_json) => {
if let Err(e) = self
Expand All @@ -1809,19 +1850,6 @@ impl CGWConnectionServer {
}
}

if let Ok(resp) = cgw_construct_unassigned_infra_connection_msg(
device_mac,
self.local_cgw_id,
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(0, resp);
} else {
error!("Failed to construct unassigned_infra_connection message!");
}

debug!(
"Detected unassigned infra {} connection",
device_mac.to_hex_string()
);
}

if self.feature_topomap_enabled {
Expand Down
28 changes: 28 additions & 0 deletions src/cgw_db_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,34 @@ impl CGWDBAccessor {
}
}

pub async fn get_infra_group_id_by_mac(&self, mac: MacAddress) -> Option<i32> {
let query = match self
.cl
.prepare("SELECT infra_group_id FROM infras WHERE mac = $1")
.await
{
Ok(q) => q,
Err(e) => {
error!("Failed to prepare query for get_infra_group_id_by_mac! Error: {e}");
return None;
}
};

let result = self.cl.query_opt(&query, &[&mac]).await;

match result {
Ok(Some(row)) => Some(row.get::<_, i32>("infra_group_id")),
Ok(None) => {
debug!("No infra found in DB for mac: {}", mac.to_hex_string());
None
}
Err(e) => {
error!("Query failed for get_infra_group_id_by_mac! Error: {e}");
None
}
}
}

pub async fn get_all_infras(&self) -> Option<Vec<CGWDBInfra>> {
let mut list: Vec<CGWDBInfra> = Vec::new();

Expand Down
107 changes: 107 additions & 0 deletions src/cgw_remote_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,56 @@ impl CGWRemoteDiscovery {
Ok(infras_assigned)
}

pub async fn del_device_from_all_shards_redis_cache(
&self,
device_mac: &MacAddress,
) -> Result<()> {
let mut con = self.redis_infra_cache_client.clone();

let pattern = format!("shard_id_*|{}", device_mac);
let keys: Vec<String> = match redis::cmd("KEYS").arg(&pattern).query_async(&mut con).await
{
Ok(keys) => keys,
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!(
"Failed to query Redis for device {} keys! Error: {e}",
device_mac.to_hex_string()
);
return Err(Error::RemoteDiscovery(
"Failed to update Redis devices cache",
));
}
};

if !keys.is_empty() {
let res: RedisResult<()> = redis::cmd("DEL").arg(&keys).query_async(&mut con).await;

match res {
Ok(_) => debug!(
"Removed device {} from Redis cache (all shards)",
device_mac
),
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!(
"Failed to remove device {} from Redis cache! Error: {e}",
device_mac.to_hex_string()
);
return Err(Error::RemoteDiscovery(
"Failed to update Redis devices cache",
));
}
};
}

Ok(())
}

pub async fn add_device_to_redis_cache(
&self,
device_mac: &MacAddress,
Expand Down Expand Up @@ -1403,6 +1453,63 @@ impl CGWRemoteDiscovery {
Ok(())
}

pub async fn get_device_group_id_from_redis(&self, device_mac: &MacAddress) -> Option<i32> {
let mut con = self.redis_infra_cache_client.clone();

let key = format!("shard_id_*|{}", device_mac);
let redis_keys: Vec<String> = match redis::cmd("KEYS").arg(&key).query_async(&mut con).await
{
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!(
"Failed to get device {} from Redis cache! Error: {}",
device_mac.to_hex_string(),
e
);
return None;
}
Ok(keys) => keys,
};

if let Some(first_key) = redis_keys.first() {
let device_str: String =
match redis::cmd("GET").arg(first_key).query_async(&mut con).await {
Ok(dev) => dev,
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!(
"Failed to get device cache entry {}! Error: {}",
first_key, e
);
return None;
}
};

match serde_json::from_str::<CGWDevice>(&device_str) {
Ok(dev) => Some(dev.get_device_group_id()),
Err(e) => {
error!("Failed to deserialize device from Redis cache! Error: {e}");
None
}
}
} else {
None
}
}


pub async fn get_device_group_id(&self, mac: &MacAddress) -> Option<i32> {
if let Some(gid) = self.get_device_group_id_from_redis(mac).await {
info!("Found group_id {} from Redis for device {}", gid, mac.to_hex_string());
return Some(gid);
}

self.db_accessor.get_infra_group_id_by_mac(*mac).await
}
pub async fn sync_devices_cache_with_redis(
&self,
cache: Arc<RwLock<CGWDevicesCache>>,
Expand Down