Adds the ability to create alternative backends. Alternative backends enable traffic shaping by sharing a single location but routing to different backends depending on the TrafficShapingPolicy defined by AlternativeBackends. When the list of upstreams and servers are retrieved, we then call mergeAlternativeBackends which iterates through the paths of every ingress and checks if the backend supporting the path is a AlternativeBackend. If so, we then iterate through the map of servers and find the real backend that the AlternativeBackend should fall under. Once found, the AlternativeBackend is embedded in the list of VirtualBackends for the real backend. If no matching real backend for a AlternativeBackend is found, then the AlternativeBackend is deleted as it cannot be backed by any server.
232 lines
6.3 KiB
Lua
232 lines
6.3 KiB
Lua
local ngx_balancer = require("ngx.balancer")
|
|
local json = require("cjson")
|
|
local util = require("util")
|
|
local dns_util = require("util.dns")
|
|
local configuration = require("configuration")
|
|
local round_robin = require("balancer.round_robin")
|
|
local chash = require("balancer.chash")
|
|
local sticky = require("balancer.sticky")
|
|
local ewma = require("balancer.ewma")
|
|
|
|
-- measured in seconds
|
|
-- for an Nginx worker to pick up the new list of upstream peers
|
|
-- it will take <the delay until controller POSTed the backend object to the Nginx endpoint> + BACKENDS_SYNC_INTERVAL
|
|
local BACKENDS_SYNC_INTERVAL = 1
|
|
|
|
local DEFAULT_LB_ALG = "round_robin"
|
|
local IMPLEMENTATIONS = {
|
|
round_robin = round_robin,
|
|
chash = chash,
|
|
sticky = sticky,
|
|
ewma = ewma,
|
|
}
|
|
|
|
local _M = {}
|
|
local balancers = {}
|
|
|
|
local function get_implementation(backend)
|
|
local name = backend["load-balance"] or DEFAULT_LB_ALG
|
|
|
|
if backend["sessionAffinityConfig"] and backend["sessionAffinityConfig"]["name"] == "cookie" then
|
|
name = "sticky"
|
|
elseif backend["upstream-hash-by"] then
|
|
name = "chash"
|
|
end
|
|
|
|
local implementation = IMPLEMENTATIONS[name]
|
|
if not implementation then
|
|
ngx.log(ngx.WARN, string.format("%s is not supported, falling back to %s", backend["load-balance"], DEFAULT_LB_ALG))
|
|
implementation = IMPLEMENTATIONS[DEFAULT_LB_ALG]
|
|
end
|
|
|
|
return implementation
|
|
end
|
|
|
|
local function resolve_external_names(original_backend)
|
|
local backend = util.deepcopy(original_backend)
|
|
local endpoints = {}
|
|
for _, endpoint in ipairs(backend.endpoints) do
|
|
local ips = dns_util.resolve(endpoint.address)
|
|
for _, ip in ipairs(ips) do
|
|
table.insert(endpoints, { address = ip, port = endpoint.port })
|
|
end
|
|
end
|
|
backend.endpoints = endpoints
|
|
return backend
|
|
end
|
|
|
|
local function format_ipv6_endpoints(endpoints)
|
|
local formatted_endpoints = {}
|
|
for _, endpoint in ipairs(endpoints) do
|
|
local formatted_endpoint = endpoint
|
|
if not endpoint.address:match("^%d+.%d+.%d+.%d+$") then
|
|
formatted_endpoint.address = string.format("[%s]", endpoint.address)
|
|
end
|
|
table.insert(formatted_endpoints, formatted_endpoint)
|
|
end
|
|
return formatted_endpoints
|
|
end
|
|
|
|
local function sync_backend(backend)
|
|
local implementation = get_implementation(backend)
|
|
local balancer = balancers[backend.name]
|
|
|
|
if not balancer then
|
|
balancers[backend.name] = implementation:new(backend)
|
|
return
|
|
end
|
|
|
|
-- every implementation is the metatable of its instances (see .new(...) functions)
|
|
-- here we check if `balancer` is the instance of `implementation`
|
|
-- if it is not then we deduce LB algorithm has changed for the backend
|
|
if getmetatable(balancer) ~= implementation then
|
|
ngx.log(ngx.INFO,
|
|
string.format("LB algorithm changed from %s to %s, resetting the instance", balancer.name, implementation.name))
|
|
balancers[backend.name] = implementation:new(backend)
|
|
return
|
|
end
|
|
|
|
local service_type = backend.service and backend.service.spec and backend.service.spec["type"]
|
|
if service_type == "ExternalName" then
|
|
backend = resolve_external_names(backend)
|
|
end
|
|
|
|
backend.endpoints = format_ipv6_endpoints(backend.endpoints)
|
|
|
|
balancer:sync(backend)
|
|
end
|
|
|
|
local function sync_backends()
|
|
local backends_data = configuration.get_backends_data()
|
|
if not backends_data then
|
|
balancers = {}
|
|
return
|
|
end
|
|
|
|
local ok, new_backends = pcall(json.decode, backends_data)
|
|
if not ok then
|
|
ngx.log(ngx.ERR, "could not parse backends data: " .. tostring(new_backends))
|
|
return
|
|
end
|
|
|
|
local balancers_to_keep = {}
|
|
for _, new_backend in ipairs(new_backends) do
|
|
sync_backend(new_backend)
|
|
balancers_to_keep[new_backend.name] = balancers[new_backend.name]
|
|
end
|
|
|
|
for backend_name, _ in pairs(balancers) do
|
|
if not balancers_to_keep[backend_name] then
|
|
balancers[backend_name] = nil
|
|
end
|
|
end
|
|
end
|
|
|
|
local function route_to_alternative_balancer(balancer)
|
|
if not balancer.alternative_backends then
|
|
return false
|
|
end
|
|
|
|
-- TODO: support traffic shaping for n > 1 alternative backends
|
|
local alternative_balancer = balancers[balancer.alternative_backends[1]]
|
|
|
|
local clean_target_header = util.replace_special_char(alternative_balancer.traffic_shaping_policy.header, "-", "_")
|
|
|
|
local header = ngx.var["http_" .. clean_target_header]
|
|
if header then
|
|
if header == "always" then
|
|
return true
|
|
elseif header == "never" then
|
|
return false
|
|
end
|
|
end
|
|
|
|
local clean_target_cookie = util.replace_special_char(alternative_balancer.traffic_shaping_policy.cookie, "-", "_")
|
|
|
|
local cookie = ngx.var["cookie_" .. clean_target_cookie]
|
|
if cookie then
|
|
if cookie == "always" then
|
|
return true
|
|
elseif cookie == "never" then
|
|
return false
|
|
end
|
|
end
|
|
|
|
if math.random(100) <= alternative_balancer.traffic_shaping_policy.weight then
|
|
return true
|
|
end
|
|
|
|
return false
|
|
end
|
|
|
|
local function get_balancer()
|
|
local backend_name = ngx.var.proxy_upstream_name
|
|
|
|
local balancer = balancers[backend_name]
|
|
if not balancer then
|
|
return
|
|
end
|
|
|
|
if route_to_alternative_balancer(balancer) then
|
|
local alternative_balancer = balancers[balancer.alternative_backends[1]]
|
|
return alternative_balancer
|
|
end
|
|
|
|
return balancer
|
|
end
|
|
|
|
function _M.init_worker()
|
|
sync_backends() -- when worker starts, sync backends without delay
|
|
local _, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
|
|
if err then
|
|
ngx.log(ngx.ERR, string.format("error when setting up timer.every for sync_backends: %s", tostring(err)))
|
|
end
|
|
end
|
|
|
|
function _M.rewrite()
|
|
local balancer = get_balancer()
|
|
if not balancer then
|
|
ngx.status = ngx.HTTP_SERVICE_UNAVAILABLE
|
|
return ngx.exit(ngx.status)
|
|
end
|
|
end
|
|
|
|
function _M.balance()
|
|
local balancer = get_balancer()
|
|
if not balancer then
|
|
return
|
|
end
|
|
|
|
local peer = balancer:balance()
|
|
if not peer then
|
|
ngx.log(ngx.WARN, "no peer was returned, balancer: " .. balancer.name)
|
|
return
|
|
end
|
|
|
|
ngx_balancer.set_more_tries(1)
|
|
|
|
local ok, err = ngx_balancer.set_current_peer(peer)
|
|
if not ok then
|
|
ngx.log(ngx.ERR, string.format("error while setting current upstream peer %s: %s", peer, err))
|
|
end
|
|
end
|
|
|
|
function _M.log()
|
|
local balancer = get_balancer()
|
|
if not balancer then
|
|
return
|
|
end
|
|
|
|
if not balancer.after_balance then
|
|
return
|
|
end
|
|
|
|
balancer:after_balance()
|
|
end
|
|
|
|
if _TEST then
|
|
_M.get_implementation = get_implementation
|
|
_M.sync_backend = sync_backend
|
|
end
|
|
|
|
return _M
|