Refactor EWMA to not use shared dictionaries
This commit is contained in:
parent
bc6f2e7016
commit
3cbfd63992
3 changed files with 8 additions and 69 deletions
|
|
@ -5,38 +5,17 @@
|
|||
-- /finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
|
||||
|
||||
|
||||
local resty_lock = require("resty.lock")
|
||||
local util = require("util")
|
||||
local split = require("util.split")
|
||||
|
||||
local DECAY_TIME = 10 -- this value is in seconds
|
||||
local LOCK_KEY = ":ewma_key"
|
||||
local PICK_SET_SIZE = 2
|
||||
|
||||
local ewma_lock = resty_lock:new("locks", {timeout = 0, exptime = 0.1})
|
||||
local balancer_ewma = {}
|
||||
local balancer_ewma_last_touched_at = {}
|
||||
|
||||
local _M = { name = "ewma" }
|
||||
|
||||
local function lock(upstream)
|
||||
local _, err = ewma_lock:lock(upstream .. LOCK_KEY)
|
||||
if err then
|
||||
if err ~= "timeout" then
|
||||
ngx.log(ngx.ERR, string.format("EWMA Balancer failed to lock: %s", tostring(err)))
|
||||
end
|
||||
end
|
||||
|
||||
return err
|
||||
end
|
||||
|
||||
local function unlock()
|
||||
local ok, err = ewma_lock:unlock()
|
||||
if not ok then
|
||||
ngx.log(ngx.ERR, string.format("EWMA Balancer failed to unlock: %s", tostring(err)))
|
||||
end
|
||||
|
||||
return err
|
||||
end
|
||||
|
||||
local function decay_ewma(ewma, last_touched_at, rtt, now)
|
||||
local td = now - last_touched_at
|
||||
td = (td > 0) and td or 0
|
||||
|
|
@ -47,40 +26,18 @@ local function decay_ewma(ewma, last_touched_at, rtt, now)
|
|||
end
|
||||
|
||||
local function get_or_update_ewma(upstream, rtt, update)
|
||||
local lock_err = nil
|
||||
if update then
|
||||
lock_err = lock(upstream)
|
||||
end
|
||||
local ewma = ngx.shared.balancer_ewma:get(upstream) or 0
|
||||
if lock_err ~= nil then
|
||||
return ewma, lock_err
|
||||
end
|
||||
local ewma = balancer_ewma[upstream] or 0
|
||||
|
||||
local now = ngx.now()
|
||||
local last_touched_at = ngx.shared.balancer_ewma_last_touched_at:get(upstream) or 0
|
||||
local last_touched_at = balancer_ewma_last_touched_at[upstream] or 0
|
||||
ewma = decay_ewma(ewma, last_touched_at, rtt, now)
|
||||
|
||||
if not update then
|
||||
return ewma, nil
|
||||
end
|
||||
|
||||
local success, err, forcible = ngx.shared.balancer_ewma_last_touched_at:set(upstream, now)
|
||||
if not success then
|
||||
ngx.log(ngx.WARN, "balancer_ewma_last_touched_at:set failed " .. err)
|
||||
end
|
||||
if forcible then
|
||||
ngx.log(ngx.WARN, "balancer_ewma_last_touched_at:set valid items forcibly overwritten")
|
||||
end
|
||||
|
||||
success, err, forcible = ngx.shared.balancer_ewma:set(upstream, ewma)
|
||||
if not success then
|
||||
ngx.log(ngx.WARN, "balancer_ewma:set failed " .. err)
|
||||
end
|
||||
if forcible then
|
||||
ngx.log(ngx.WARN, "balancer_ewma:set valid items forcibly overwritten")
|
||||
end
|
||||
|
||||
unlock()
|
||||
balancer_ewma_last_touched_at[upstream] = now
|
||||
balancer_ewma[upstream] = ewma
|
||||
return ewma, nil
|
||||
end
|
||||
|
||||
|
|
@ -153,8 +110,8 @@ function _M.sync(self, backend)
|
|||
self.peers = backend.endpoints
|
||||
|
||||
-- TODO: Reset state of EWMA per backend
|
||||
ngx.shared.balancer_ewma:flush_all()
|
||||
ngx.shared.balancer_ewma_last_touched_at:flush_all()
|
||||
balancer_ewma = {}
|
||||
balancer_ewma_last_touched_at = {}
|
||||
end
|
||||
|
||||
function _M.new(self, backend)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue