refactor balancer into more testable and extensible interface
This commit is contained in:
parent
1b5db4b3b0
commit
e9dc275b81
20 changed files with 368 additions and 467 deletions
|
|
@ -1,10 +1,10 @@
|
|||
local ngx_balancer = require("ngx.balancer")
|
||||
local json = require("cjson")
|
||||
local configuration = require("configuration")
|
||||
local util = require("util")
|
||||
local lrucache = require("resty.lrucache")
|
||||
local round_robin = require("balancer.round_robin")
|
||||
local chash = require("balancer.chash")
|
||||
local sticky = require("balancer.sticky")
|
||||
local ewma = require("balancer.ewma")
|
||||
local resty_balancer = require("balancer.resty")
|
||||
|
||||
-- measured in seconds
|
||||
-- for an Nginx worker to pick up the new list of upstream peers
|
||||
|
|
@ -12,70 +12,51 @@ local resty_balancer = require("balancer.resty")
|
|||
local BACKENDS_SYNC_INTERVAL = 1
|
||||
|
||||
local DEFAULT_LB_ALG = "round_robin"
|
||||
local IMPLEMENTATIONS = {
|
||||
round_robin = round_robin,
|
||||
chash = chash,
|
||||
sticky = sticky,
|
||||
ewma = ewma,
|
||||
}
|
||||
|
||||
local _M = {}
|
||||
local balancers = {}
|
||||
|
||||
-- TODO(elvinefendi) we can probably avoid storing all backends here. We already store them in their respective
|
||||
-- load balancer implementations
|
||||
local backends, backends_err = lrucache.new(1024)
|
||||
if not backends then
|
||||
return error("failed to create the cache for backends: " .. (backends_err or "unknown"))
|
||||
end
|
||||
local function get_implementation(backend)
|
||||
local name = backend["load-balance"] or DEFAULT_LB_ALG
|
||||
|
||||
local function get_current_backend()
|
||||
local backend_name = ngx.var.proxy_upstream_name
|
||||
local backend = backends:get(backend_name)
|
||||
|
||||
if not backend then
|
||||
-- TODO(elvinefendi) maybe force backend sync here?
|
||||
ngx.log(ngx.WARN, "no backend configuration found for " .. tostring(backend_name))
|
||||
if backend["sessionAffinityConfig"] and backend["sessionAffinityConfig"]["name"] == "cookie" then
|
||||
name = "sticky"
|
||||
elseif backend["upstream-hash-by"] then
|
||||
name = "chash"
|
||||
end
|
||||
|
||||
return backend
|
||||
end
|
||||
|
||||
local function get_balancer(backend)
|
||||
if not backend then
|
||||
return nil
|
||||
local implementation = IMPLEMENTATIONS[name]
|
||||
if not implementation then
|
||||
ngx.log(ngx.WARN, string.format("%s is not supported, falling back to %s", backend["load-balance"], DEFAULT_LB_ALG))
|
||||
implementation = IMPLEMENTATIONS[DEFAULT_LB_ALG]
|
||||
end
|
||||
|
||||
local lb_alg = backend["load-balance"] or DEFAULT_LB_ALG
|
||||
if resty_balancer.is_applicable(backend) then
|
||||
return resty_balancer
|
||||
elseif lb_alg ~= "ewma" then
|
||||
if lb_alg ~= DEFAULT_LB_ALG then
|
||||
ngx.log(ngx.WARN,
|
||||
string.format("%s is not supported, falling back to %s", backend["load-balance"], DEFAULT_LB_ALG))
|
||||
end
|
||||
return resty_balancer
|
||||
end
|
||||
|
||||
return ewma
|
||||
end
|
||||
|
||||
local function balance()
|
||||
local backend = get_current_backend()
|
||||
local balancer = get_balancer(backend)
|
||||
if not balancer then
|
||||
return nil, nil
|
||||
end
|
||||
|
||||
local endpoint = balancer.balance(backend)
|
||||
if not endpoint then
|
||||
return nil, nil
|
||||
end
|
||||
|
||||
return endpoint.address, endpoint.port
|
||||
return implementation
|
||||
end
|
||||
|
||||
local function sync_backend(backend)
|
||||
backends:set(backend.name, backend)
|
||||
local implementation = get_implementation(backend)
|
||||
local balancer = balancers[backend.name]
|
||||
|
||||
local balancer = get_balancer(backend)
|
||||
if not balancer then
|
||||
balancers[backend.name] = implementation:new(backend)
|
||||
return
|
||||
end
|
||||
balancer.sync(backend)
|
||||
|
||||
if getmetatable(balancer) ~= implementation then
|
||||
ngx.log(ngx.INFO,
|
||||
string.format("LB algorithm changed from %s to %s, resetting the instance", balancer.name, implementation.name))
|
||||
balancers[backend.name] = implementation:new(backend)
|
||||
return
|
||||
end
|
||||
|
||||
balancer:sync(backend)
|
||||
end
|
||||
|
||||
local function sync_backends()
|
||||
|
|
@ -91,29 +72,10 @@ local function sync_backends()
|
|||
end
|
||||
|
||||
for _, new_backend in pairs(new_backends) do
|
||||
local backend = backends:get(new_backend.name)
|
||||
local backend_changed = true
|
||||
|
||||
if backend then
|
||||
backend_changed = not util.deep_compare(backend, new_backend)
|
||||
end
|
||||
|
||||
if backend_changed then
|
||||
sync_backend(new_backend)
|
||||
end
|
||||
sync_backend(new_backend)
|
||||
end
|
||||
end
|
||||
|
||||
local function after_balance()
|
||||
local backend = get_current_backend()
|
||||
local balancer = get_balancer(backend)
|
||||
if not balancer then
|
||||
return
|
||||
end
|
||||
|
||||
balancer.after_balance()
|
||||
end
|
||||
|
||||
function _M.init_worker()
|
||||
sync_backends() -- when worker starts, sync backends without delay
|
||||
local _, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
|
||||
|
|
@ -124,15 +86,24 @@ end
|
|||
|
||||
function _M.call()
|
||||
local phase = ngx.get_phase()
|
||||
if phase == "log" then
|
||||
after_balance()
|
||||
if phase ~= "log" and phase ~= "balancer" then
|
||||
ngx.log(ngx.ERR, "must be called in balancer or log, but was called in: " .. phase)
|
||||
return
|
||||
end
|
||||
if phase ~= "balancer" then
|
||||
return error("must be called in balancer or log, but was called in: " .. phase)
|
||||
|
||||
local backend_name = ngx.var.proxy_upstream_name
|
||||
local balancer = balancers[backend_name]
|
||||
if not balancer then
|
||||
ngx.status = ngx.HTTP_SERVICE_UNAVAILABLE
|
||||
return ngx.exit(ngx.status)
|
||||
end
|
||||
|
||||
local host, port = balance()
|
||||
if phase == "log" then
|
||||
balancer:after_balance()
|
||||
return
|
||||
end
|
||||
|
||||
local host, port = balancer:balance()
|
||||
if not host then
|
||||
ngx.status = ngx.HTTP_SERVICE_UNAVAILABLE
|
||||
return ngx.exit(ngx.status)
|
||||
|
|
@ -142,8 +113,13 @@ function _M.call()
|
|||
|
||||
local ok, err = ngx_balancer.set_current_peer(host, port)
|
||||
if not ok then
|
||||
ngx.log(ngx.ERR, string.format("error while setting current upstream peer to %s", tostring(err)))
|
||||
ngx.log(ngx.ERR, "error while setting current upstream peer to " .. tostring(err))
|
||||
end
|
||||
end
|
||||
|
||||
if _TEST then
|
||||
_M.get_implementation = get_implementation
|
||||
_M.sync_backend = sync_backend
|
||||
end
|
||||
|
||||
return _M
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue