@@ -90,21 +90,42 @@ class PiggybackMessage:
9090def watch_new_messages (omd_root : Path ) -> Iterator [PiggybackMessage ]:
9191 """Yields piggyback messages as they come in."""
9292
93+ host_folder_mask = Masks .MOVED_TO | Masks .DELETE_SELF
94+
9395 with INotify () as inotify :
9496 watch_for_new_piggybacked_hosts = inotify .add_watch (payload_dir (omd_root ), Masks .CREATE )
9597 watch_for_deleted_status_files = inotify .add_watch (
9698 source_status_dir (omd_root ), Masks .DELETE
9799 )
98100 for folder in _get_piggybacked_host_folders (omd_root ):
99- inotify .add_watch (folder , Masks .MOVED_TO )
101+ inotify .add_watch (folder , host_folder_mask )
102+
103+ _last_processed_time : int = int (time .time ())
100104
101105 for event in inotify .read_forever ():
106+ if event .type & Masks .Q_OVERFLOW :
107+ logger .warning (
108+ "Too many messages for the piggyback-hub to progress at once, rescanning data. "
109+ "Consider raising /proc/sys/fs/inotify/max_queued_events."
110+ )
111+ # check if any data was missed when the event queue overflowed
112+ for source_file in _get_source_state_files (omd_root ):
113+ if (
114+ mtime := _get_mtime (source_file )
115+ ) is not None and mtime >= _last_processed_time :
116+ source = HostName (source_file .name )
117+ for piggybacked_host in _get_piggybacked_hosts_for_source (omd_root , source ):
118+ yield from get_messages_for (HostAddress (piggybacked_host ), omd_root )
119+ _last_processed_time = int (time .time ())
120+ continue
121+
102122 # check if a new piggybacked host folder was created
103123 if event .watchee == watch_for_new_piggybacked_hosts :
104124 if event .type & Masks .CREATE :
105- inotify .add_watch (event .watchee .path / event .name , Masks . MOVED_TO )
125+ inotify .add_watch (event .watchee .path / event .name , host_folder_mask )
106126 # Handle all files already in the folder (we rather have duplicates than missing files)
107127 yield from get_messages_for (HostAddress (event .name ), omd_root )
128+ _last_processed_time = int (time .time ())
108129 continue
109130 if event .watchee == watch_for_deleted_status_files :
110131 if event .type & Masks .DELETE :
@@ -119,9 +140,15 @@ def watch_new_messages(omd_root: Path) -> Iterator[PiggybackMessage]:
119140 ),
120141 b"" ,
121142 )
143+ _last_processed_time = int (time .time ())
144+ continue
145+
146+ if event .type & Masks .DELETE_SELF :
147+ inotify .rm_watch (event .watchee )
122148 continue
123149
124150 if message := _make_message_from_event (event , omd_root ):
151+ _last_processed_time = int (time .time ())
125152 yield message
126153
127154
0 commit comments