Merge pull request #3374 from aledbf/restore-tcp-udp

Revert removal of support for TCP and UDP services
This commit is contained in:
k8s-ci-robot 2018-11-18 08:33:29 -08:00 committed by GitHub
commit bf7ad0daca
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
36 changed files with 781 additions and 57 deletions

View file

@ -47,7 +47,7 @@ local function handle_servers()
local ok, servers = pcall(json.decode, raw_servers)
if not ok then
ngx.log(ngx.ERR, "could not parse servers: " .. tostring(servers))
ngx.log(ngx.ERR, "could not parse servers: " .. tostring(servers))
ngx.status = ngx.HTTP_BAD_REQUEST
return
end
@ -63,8 +63,7 @@ local function handle_servers()
return
end
local err_msg = string.format("error setting certificate for %s: %s\n",
server.hostname, tostring(err))
local err_msg = string.format("error setting certificate for %s: %s\n", server.hostname, tostring(err))
table.insert(err_buf, err_msg)
end
else

View file

@ -0,0 +1,177 @@
local ngx_balancer = require("ngx.balancer")
local json = require("cjson")
local util = require("util")
local dns_util = require("util.dns")
local configuration = require("tcp_udp_configuration")
local round_robin = require("balancer.round_robin")
-- measured in seconds
-- for an Nginx worker to pick up the new list of upstream peers
-- it will take <the delay until controller POSTed the backend object to the Nginx endpoint> + BACKENDS_SYNC_INTERVAL
local BACKENDS_SYNC_INTERVAL = 1
local DEFAULT_LB_ALG = "round_robin"
local IMPLEMENTATIONS = {
round_robin = round_robin
}
local _M = {}
local balancers = {}
local function get_implementation(backend)
local name = backend["load-balance"] or DEFAULT_LB_ALG
local implementation = IMPLEMENTATIONS[name]
if not implementation then
ngx.log(ngx.WARN, string.format("%s is not supported, falling back to %s", backend["load-balance"], DEFAULT_LB_ALG))
implementation = IMPLEMENTATIONS[DEFAULT_LB_ALG]
end
return implementation
end
local function resolve_external_names(original_backend)
local backend = util.deepcopy(original_backend)
local endpoints = {}
for _, endpoint in ipairs(backend.endpoints) do
local ips = dns_util.resolve(endpoint.address)
for _, ip in ipairs(ips) do
table.insert(endpoints, {address = ip, port = endpoint.port})
end
end
backend.endpoints = endpoints
return backend
end
local function format_ipv6_endpoints(endpoints)
local formatted_endpoints = {}
for _, endpoint in ipairs(endpoints) do
local formatted_endpoint = endpoint
if not endpoint.address:match("^%d+.%d+.%d+.%d+$") then
formatted_endpoint.address = string.format("[%s]", endpoint.address)
end
table.insert(formatted_endpoints, formatted_endpoint)
end
return formatted_endpoints
end
local function sync_backend(backend)
if not backend.endpoints or #backend.endpoints == 0 then
ngx.log(ngx.INFO, string.format("there is no endpoint for backend %s. Skipping...", backend.name))
return
end
ngx.log(ngx.INFO, string.format("backend ", backend.name))
local implementation = get_implementation(backend)
local balancer = balancers[backend.name]
if not balancer then
balancers[backend.name] = implementation:new(backend)
return
end
-- every implementation is the metatable of its instances (see .new(...) functions)
-- here we check if `balancer` is the instance of `implementation`
-- if it is not then we deduce LB algorithm has changed for the backend
if getmetatable(balancer) ~= implementation then
ngx.log(
ngx.INFO,
string.format("LB algorithm changed from %s to %s, resetting the instance", balancer.name, implementation.name)
)
balancers[backend.name] = implementation:new(backend)
return
end
local service_type = backend.service and backend.service.spec and backend.service.spec["type"]
if service_type == "ExternalName" then
backend = resolve_external_names(backend)
end
backend.endpoints = format_ipv6_endpoints(backend.endpoints)
balancer:sync(backend)
end
local function sync_backends()
local backends_data = configuration.get_backends_data()
if not backends_data then
balancers = {}
return
end
local ok, new_backends = pcall(json.decode, backends_data)
if not ok then
ngx.log(ngx.ERR, "could not parse backends data: " .. tostring(new_backends))
return
end
local balancers_to_keep = {}
for _, new_backend in ipairs(new_backends) do
sync_backend(new_backend)
balancers_to_keep[new_backend.name] = balancers[new_backend.name]
end
for backend_name, _ in pairs(balancers) do
if not balancers_to_keep[backend_name] then
balancers[backend_name] = nil
end
end
end
local function get_balancer()
local backend_name = ngx.var.proxy_upstream_name
local balancer = balancers[backend_name]
if not balancer then
return
end
return balancer
end
function _M.init_worker()
sync_backends() -- when worker starts, sync backends without delay
local _, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
if err then
ngx.log(ngx.ERR, string.format("error when setting up timer.every for sync_backends: %s", tostring(err)))
end
end
function _M.balance()
local balancer = get_balancer()
if not balancer then
return
end
local peer = balancer:balance()
if not peer then
ngx.log(ngx.WARN, "no peer was returned, balancer: " .. balancer.name)
return
end
ngx_balancer.set_more_tries(1)
local ok, err = ngx_balancer.set_current_peer(peer)
if not ok then
ngx.log(ngx.ERR, string.format("error while setting current upstream peer %s: %s", peer, err))
end
end
function _M.log()
local balancer = get_balancer()
if not balancer then
return
end
if not balancer.after_balance then
return
end
balancer:after_balance()
end
if _TEST then
_M.get_implementation = get_implementation
_M.sync_backend = sync_backend
end
return _M

View file

@ -0,0 +1,40 @@
-- this is the Lua representation of TCP/UDP Configuration
local tcp_udp_configuration_data = ngx.shared.tcp_udp_configuration_data
local _M = {
nameservers = {}
}
function _M.get_backends_data()
return tcp_udp_configuration_data:get("backends")
end
function _M.call()
local sock, err = ngx.req.socket(true)
if not sock then
ngx.log(ngx.ERR, "failed to get raw req socket: ", err)
ngx.say("error: ", err)
return
end
local reader = sock:receiveuntil("\r\n")
local backends, err_read = reader()
if not backends then
ngx.log(ngx.ERR, "failed TCP/UDP dynamic-configuration:", err_read)
ngx.say("error: ", err_read)
return
end
if backends == nil or backends == "" then
return
end
local success, err_conf = tcp_udp_configuration_data:set("backends", backends)
if not success then
ngx.log(ngx.ERR, "dynamic-configuration: error updating configuration: " .. tostring(err_conf))
ngx.say("error: ", err_conf)
return
end
end
return _M

View file

@ -683,6 +683,40 @@ http {
}
stream {
lua_package_cpath "/usr/local/lib/lua/?.so;/usr/lib/lua-platform-path/lua/5.1/?.so;;";
lua_package_path "/etc/nginx/lua/?.lua;/etc/nginx/lua/vendor/?.lua;/usr/local/lib/lua/?.lua;;";
lua_shared_dict tcp_udp_configuration_data 5M;
init_by_lua_block {
require("resty.core")
collectgarbage("collect")
-- init modules
local ok, res
ok, res = pcall(require, "tcp_udp_configuration")
if not ok then
error("require failed: " .. tostring(res))
else
tcp_udp_configuration = res
tcp_udp_configuration.nameservers = { {{ buildResolversForLua $cfg.Resolver $cfg.DisableIpv6DNS }} }
end
ok, res = pcall(require, "tcp_udp_balancer")
if not ok then
error("require failed: " .. tostring(res))
else
tcp_udp_balancer = res
end
}
init_worker_by_lua_block {
tcp_udp_balancer.init_worker()
}
lua_add_variable $proxy_upstream_name;
log_format log_stream {{ $cfg.LogFormatStream }};
{{ if $cfg.DisableAccessLog }}
@ -692,6 +726,74 @@ stream {
{{ end }}
error_log {{ $cfg.ErrorLogPath }};
upstream upstream_balancer {
server 0.0.0.1:1234; # placeholder
balancer_by_lua_block {
tcp_udp_balancer.balance()
}
}
server {
listen unix:/tmp/ingress-stream.sock;
content_by_lua_block {
tcp_udp_configuration.call()
}
}
# TCP services
{{ range $tcpServer := .TCPBackends }}
server {
preread_by_lua_block {
ngx.var.proxy_upstream_name="tcp-{{ $tcpServer.Backend.Namespace }}-{{ $tcpServer.Backend.Name }}-{{ $tcpServer.Backend.Port }}";
}
{{ range $address := $all.Cfg.BindAddressIpv4 }}
listen {{ $address }}:{{ $tcpServer.Port }}{{ if $tcpServer.Backend.ProxyProtocol.Decode }} proxy_protocol{{ end }};
{{ else }}
listen {{ $tcpServer.Port }}{{ if $tcpServer.Backend.ProxyProtocol.Decode }} proxy_protocol{{ end }};
{{ end }}
{{ if $IsIPV6Enabled }}
{{ range $address := $all.Cfg.BindAddressIpv6 }}
listen {{ $address }}:{{ $tcpServer.Port }}{{ if $tcpServer.Backend.ProxyProtocol.Decode }} proxy_protocol{{ end }};
{{ else }}
listen [::]:{{ $tcpServer.Port }}{{ if $tcpServer.Backend.ProxyProtocol.Decode }} proxy_protocol{{ end }};
{{ end }}
{{ end }}
proxy_timeout {{ $cfg.ProxyStreamTimeout }};
proxy_pass upstream_balancer;
{{ if $tcpServer.Backend.ProxyProtocol.Encode }}
proxy_protocol on;
{{ end }}
}
{{ end }}
# UDP services
{{ range $udpServer := .UDPBackends }}
server {
preread_by_lua_block {
ngx.var.proxy_upstream_name="udp-{{ $udpServer.Backend.Namespace }}-{{ $udpServer.Backend.Name }}-{{ $udpServer.Backend.Port }}";
}
{{ range $address := $all.Cfg.BindAddressIpv4 }}
listen {{ $address }}:{{ $udpServer.Port }} udp;
{{ else }}
listen {{ $udpServer.Port }} udp;
{{ end }}
{{ if $IsIPV6Enabled }}
{{ range $address := $all.Cfg.BindAddressIpv6 }}
listen {{ $address }}:{{ $udpServer.Port }} udp;
{{ else }}
listen [::]:{{ $udpServer.Port }} udp;
{{ end }}
{{ end }}
proxy_responses {{ $cfg.ProxyStreamResponses }};
proxy_timeout {{ $cfg.ProxyStreamTimeout }};
proxy_pass upstream_balancer;
}
{{ end }}
}
{{/* definition of templates to avoid repetitions */}}
@ -926,7 +1028,7 @@ stream {
waf:set_option("mode", "{{ $location.LuaRestyWAF.Mode }}")
waf:set_option("storage_zone", "waf_storage")
{{ if $location.LuaRestyWAF.AllowUnknownContentTypes }}
{{ if $location.LuaRestyWAF.AllowUnknownContentTypes }}
waf:set_option("allow_unknown_content_types", true)
{{ else }}
waf:set_option("allowed_content_types", { "text/html", "text/json", "application/json" })