@@ -22,6 +22,27 @@ local function get_fiber_id(fiber)
2222 return fid
2323end
2424
25+ -- This method adds an issue to the cluster instance that is displayed in the admin panel
26+ -- and metrics. The method calls a global function that must be declared in the role module.
27+ -- If the function is not declared, then instead of an issue, a warning is written to the log.
28+ local function add_issue (task_name , message , ...)
29+ if rawget (_G , ' expirationd_enable_issue' ) ~= nil then
30+ _G .expirationd_enable_issue (task_name , message , ... )
31+ else
32+ local message_tpl = ' Expirationd warning: task "%s", ' .. message
33+ log .warn (message_tpl :format (task_name , ... ))
34+ end
35+ end
36+
37+ -- This method removes a previously added issue from the cluster instance, which is displayed in
38+ -- the admin panel and metrics. The method calls a global function that must be declared in the
39+ -- role module. If the function is not declared, then nothing is called.
40+ local function remove_issue (task_name )
41+ if rawget (_G , ' expirationd_disable_issue' ) ~= nil then
42+ _G .expirationd_disable_issue (task_name )
43+ end
44+ end
45+
2546local stash_names = {
2647 cfg = ' __expirationd_cfg' ,
2748 metrics_stats = ' __expirationd_metrics_stats' ,
@@ -186,6 +207,67 @@ local function check_space_and_index_exist(task)
186207 return true
187208end
188209
210+ local function load_function (func_name )
211+ if func_name == nil or type (func_name ) ~= ' string' then
212+ return nil
213+ end
214+
215+ local func = rawget (_G , func_name )
216+ if func ~= nil then
217+ if type (func ) ~= ' function' then
218+ return nil
219+ end
220+
221+ return func
222+ elseif box .schema .func .exists (func_name ) then
223+ return function (...)
224+ return box .func [func_name ]:call ({... })
225+ end
226+ else
227+ return nil
228+ end
229+ end
230+
231+ local function load_functions (task )
232+ local func_list = {
233+ ' is_tuple_expired' ,
234+ ' iterate_with' ,
235+ ' on_full_scan_complete' ,
236+ ' on_full_scan_error' ,
237+ ' on_full_scan_start' ,
238+ ' on_full_scan_success' ,
239+ ' process_expired_tuple' ,
240+ ' process_while' ,
241+ }
242+
243+ local result = {}
244+ for _ , func_name in ipairs (func_list ) do
245+ if task [func_name ] and type (task [func_name ]) == ' string' then
246+ result [func_name ] = load_function (task [func_name ])
247+ if result [func_name ] == nil then
248+ add_issue (
249+ task .name ,
250+ ' Function "%s" (for option "%s") -- not loaded' ,
251+ task [func_name ],
252+ func_name
253+ )
254+ return false
255+ end
256+ end
257+ end
258+
259+ for func_name , func_ptr in pairs (result ) do
260+ task [func_name ] = func_ptr
261+ end
262+
263+ if task .is_tuple_expired == nil then
264+ add_issue (task .name , ' Function "is_tuple_expired" must be declare' )
265+ return false
266+ end
267+
268+ return true
269+ end
270+
189271local function check_space_and_index (task )
190272 local ok , err = check_space_and_index_exist (task )
191273 if not ok then
@@ -374,11 +456,14 @@ local function worker_loop(task)
374456 fiber .self ():cancel ()
375457 end
376458 -- Full scan iteration is complete, yield
459+ remove_issue (task .name )
377460 fiber .sleep (task .full_scan_delay )
378461 end
379462end
380463
381464local function guardian_loop (task )
465+ add_issue (task .name , ' Task is not running' )
466+
382467 -- detach the guardian from the creator and attach it to sched
383468 fiber .name (string.format (" guardian of %q" , task .name ), { truncate = true })
384469
@@ -389,6 +474,13 @@ local function guardian_loop(task)
389474 fiber .sleep (constants .check_interval )
390475 end
391476
477+ while true do
478+ if load_functions (task ) then
479+ break
480+ end
481+ fiber .sleep (constants .check_interval )
482+ end
483+
392484 while true do
393485 -- if fiber doesn't exist
394486 if get_fiber_id (task .worker_fiber ) == 0 then
@@ -447,6 +539,8 @@ local Task_methods = {
447539 --
448540 -- @function task.stop
449541 stop = function (self )
542+ remove_issue (self .name )
543+
450544 if (get_fiber_id (self .guardian_fiber ) ~= 0 ) then
451545 self .guardian_fiber :cancel ()
452546 while self .guardian_fiber :status () ~= " dead" do
@@ -991,23 +1085,23 @@ end
9911085--
9921086-- @function expirationd.start
9931087local function expirationd_run_task (name , space , is_tuple_expired , options )
994- checks (' string' , ' number|string' , ' function' , {
1088+ checks (' string' , ' number|string' , ' string| function' , {
9951089 args = ' ?' ,
9961090 atomic_iteration = ' ?boolean' ,
9971091 force = ' ?boolean' ,
9981092 force_allow_functional_index = ' ?boolean' ,
9991093 full_scan_delay = ' ?number|cdata' ,
10001094 full_scan_time = ' ?number|cdata' ,
10011095 index = ' ?number|string' ,
1002- iterate_with = ' ?function' ,
1096+ iterate_with = ' ?string| function' ,
10031097 iteration_delay = ' ?number|cdata' ,
10041098 iterator_type = ' ?number|string' ,
1005- on_full_scan_complete = ' ?function' ,
1006- on_full_scan_error = ' ?function' ,
1007- on_full_scan_start = ' ?function' ,
1008- on_full_scan_success = ' ?function' ,
1009- process_expired_tuple = ' ?function' ,
1010- process_while = ' ?function' ,
1099+ on_full_scan_complete = ' ?string| function' ,
1100+ on_full_scan_error = ' ?string| function' ,
1101+ on_full_scan_start = ' ?string| function' ,
1102+ on_full_scan_success = ' ?string| function' ,
1103+ process_expired_tuple = ' ?string| function' ,
1104+ process_while = ' ?string| function' ,
10111105 start_key = ' ?' ,
10121106 tuples_per_iteration = ' ?number|cdata' ,
10131107 vinyl_assumed_space_len_factor = ' ?number|cdata' ,
0 commit comments