@@ -43,13 +43,15 @@ def initialize
4343 @system_config = SystemConfig . new
4444
4545 @supervisor_mode = false
46+
47+ @root_agent_mutex = Mutex . new
4648 end
4749
4850 MAINLOOP_SLEEP_INTERVAL = 0.3
4951
5052 attr_reader :root_agent , :system_config , :supervisor_mode
5153
52- def init ( system_config , supervisor_mode : false )
54+ def init ( system_config , supervisor_mode : false , start_in_parallel : false )
5355 @system_config = system_config
5456 @supervisor_mode = supervisor_mode
5557
@@ -58,7 +60,7 @@ def init(system_config, supervisor_mode: false)
5860
5961 @log_event_verbose = system_config . log_event_verbose unless system_config . log_event_verbose . nil?
6062
61- @root_agent = RootAgent . new ( log : log , system_config : @system_config )
63+ @root_agent = RootAgent . new ( log : log , system_config : @system_config , start_in_parallel : start_in_parallel )
6264
6365 self
6466 end
@@ -133,7 +135,15 @@ def emit_stream(tag, es)
133135 end
134136
135137 def flush!
136- @root_agent . flush!
138+ @root_agent_mutex . synchronize do
139+ @root_agent . flush!
140+ end
141+ end
142+
143+ def cancel_source_only!
144+ @root_agent_mutex . synchronize do
145+ @root_agent . cancel_source_only!
146+ end
137147 end
138148
139149 def now
@@ -144,7 +154,9 @@ def now
144154 def run
145155 begin
146156 $log. info "starting fluentd worker" , pid : Process . pid , ppid : Process . ppid , worker : worker_id
147- start
157+ @root_agent_mutex . synchronize do
158+ start
159+ end
148160
149161 @fluent_log_event_router . start
150162
@@ -158,47 +170,51 @@ def run
158170 raise
159171 end
160172
161- stop_phase ( @root_agent )
173+ @root_agent_mutex . synchronize do
174+ stop_phase ( @root_agent )
175+ end
162176 end
163177
164178 # @param conf [Fluent::Config]
165179 # @param supervisor [Bool]
166180 # @reutrn nil
167181 def reload_config ( conf , supervisor : false )
168- # configure first to reduce down time while restarting
169- new_agent = RootAgent . new ( log : log , system_config : @system_config )
170- ret = Fluent ::StaticConfigAnalysis . call ( conf , workers : system_config . workers )
171-
172- ret . all_plugins . each do |plugin |
173- if plugin . respond_to? ( :reloadable_plugin? ) && !plugin . reloadable_plugin?
174- raise Fluent ::ConfigError , "Unreloadable plugin plugin: #{ Fluent ::Plugin . lookup_type_from_class ( plugin . class ) } , plugin_id: #{ plugin . plugin_id } , class_name: #{ plugin . class } )"
182+ @root_agent_mutex . synchronize do
183+ # configure first to reduce down time while restarting
184+ new_agent = RootAgent . new ( log : log , system_config : @system_config )
185+ ret = Fluent ::StaticConfigAnalysis . call ( conf , workers : system_config . workers )
186+
187+ ret . all_plugins . each do |plugin |
188+ if plugin . respond_to? ( :reloadable_plugin? ) && !plugin . reloadable_plugin?
189+ raise Fluent ::ConfigError , "Unreloadable plugin plugin: #{ Fluent ::Plugin . lookup_type_from_class ( plugin . class ) } , plugin_id: #{ plugin . plugin_id } , class_name: #{ plugin . class } )"
190+ end
175191 end
176- end
177192
178- # Assign @root_agent to new root_agent
179- # for https://github.com/fluent/fluentd/blob/fcef949ce40472547fde295ddd2cfe297e1eddd6/lib/fluent/plugin_helper/event_emitter.rb#L50
180- old_agent , @root_agent = @root_agent , new_agent
181- begin
182- @root_agent . configure ( conf )
183- rescue
184- @root_agent = old_agent
185- raise
186- end
193+ # Assign @root_agent to new root_agent
194+ # for https://github.com/fluent/fluentd/blob/fcef949ce40472547fde295ddd2cfe297e1eddd6/lib/fluent/plugin_helper/event_emitter.rb#L50
195+ old_agent , @root_agent = @root_agent , new_agent
196+ begin
197+ @root_agent . configure ( conf )
198+ rescue
199+ @root_agent = old_agent
200+ raise
201+ end
187202
188- unless @suppress_config_dump
189- $log. info :supervisor , "using configuration file: #{ conf . to_s . rstrip } "
190- end
203+ unless @suppress_config_dump
204+ $log. info :supervisor , "using configuration file: #{ conf . to_s . rstrip } "
205+ end
191206
192- # supervisor doesn't handle actual data. so the following code is unnecessary.
193- if supervisor
194- old_agent . shutdown # to close thread created in #configure
195- return
196- end
207+ # supervisor doesn't handle actual data. so the following code is unnecessary.
208+ if supervisor
209+ old_agent . shutdown # to close thread created in #configure
210+ return
211+ end
197212
198- stop_phase ( old_agent )
213+ stop_phase ( old_agent )
199214
200- $log. info 'restart fluentd worker' , worker : worker_id
201- start_phase ( new_agent )
215+ $log. info 'restart fluentd worker' , worker : worker_id
216+ start_phase ( new_agent )
217+ end
202218 end
203219
204220 def stop
0 commit comments