Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions vshard/storage/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2693,7 +2693,7 @@ end
-- Main applier of rebalancer routes. It manages worker fibers,
-- logs total results.
--
local function rebalancer_service_apply_routes_f(service, routes)
local function rebalancer_service_apply_routes_f(service, limiter, routes)
lfiber.name('vshard.rebalancer_applier')
service:set_activity('applying routes')
local worker_count = M.rebalancer_worker_count
Expand Down Expand Up @@ -2721,15 +2721,15 @@ local function rebalancer_service_apply_routes_f(service, routes)
local f = workers[i]
local ok, res = f:join()
if not ok then
log.error(service:set_status_error(
limiter:log_error(err, service:set_status_error(
'Rebalancer worker %d threw an exception: %s', i, res))
end
end
if not dispenser.error then
log.info('Rebalancer routes are applied')
service:set_status_ok()
else
log.info(service:set_status_error(
limiter:log_info(dispenser.error, service:set_status_error(
"Couldn't apply some rebalancer routes: %s", dispenser.error))
end
local throttled = {}
Expand All @@ -2749,9 +2749,12 @@ end

local function rebalancer_apply_routes_f(routes)
assert(not M.routes_applier_service)
local service = lservice_info.new('routes_applier')
local name = 'routes_applier'
local service = lservice_info.new(name)
M.routes_applier_service = service
local ok, err = pcall(rebalancer_service_apply_routes_f, service, routes)
local ratelimit = lratelimit.create{name = name}
local ok, err = pcall(rebalancer_service_apply_routes_f, service,
ratelimit, routes)
-- Delay service destruction in order to check states and errors
while M.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY do
lfiber.sleep(0.001)
Expand Down Expand Up @@ -2895,9 +2898,10 @@ local function rebalancer_service_f(service, limiter)
rs, 'vshard.storage.rebalancer_apply_routes', {src_routes},
{timeout = consts.REBALANCER_APPLY_ROUTES_TIMEOUT})
if not status then
log.error(service:set_status_error(
local err = lerror.make(err)
limiter:log_error(err, service:set_status_error(
'Error during routes appying on "%s": %s. '..
'Try rebalance later', rs, lerror.make(err)))
'Try rebalance later', rs, err))
service:set_activity('idling')
lfiber.sleep(consts.REBALANCER_WORK_INTERVAL)
goto continue
Expand Down
Loading