Merge pull request #3341 from Shopify/canary_upstream

Add canary annotation and alternative backends for traffic shaping
This commit is contained in:
k8s-ci-robot 2018-11-06 12:22:16 -08:00 committed by GitHub
commit 17cad51e47
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 859 additions and 23 deletions

View file

@ -268,6 +268,7 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
if host == "" {
host = defServerName
}
server := servers[host]
if server == nil {
server = servers[defServerName]
@ -300,13 +301,15 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
}
for _, path := range rule.HTTP.Paths {
upsName := fmt.Sprintf("%v-%v-%v",
ing.Namespace,
path.Backend.ServiceName,
path.Backend.ServicePort.String())
upsName := upstreamName(ing.Namespace, path.Backend.ServiceName, path.Backend.ServicePort)
ups := upstreams[upsName]
// Backend is not referenced to by a server
if ups.NoServer {
continue
}
nginxPath := rootLocation
if path.Path != "" {
nginxPath = path.Path
@ -420,6 +423,11 @@ func (n *NGINXController) getBackendServers(ingresses []*extensions.Ingress) ([]
}
}
}
if anns.Canary.Enabled {
glog.Infof("Canary ingress %v detected. Finding eligible backends to merge into.", ing.Name)
mergeAlternativeBackends(ing, upstreams, servers)
}
}
aUpstreams := make([]*ingress.Backend, 0, len(upstreams))
@ -508,10 +516,7 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
var defBackend string
if ing.Spec.Backend != nil {
defBackend = fmt.Sprintf("%v-%v-%v",
ing.Namespace,
ing.Spec.Backend.ServiceName,
ing.Spec.Backend.ServicePort.String())
defBackend = upstreamName(ing.Namespace, ing.Spec.Backend.ServiceName, ing.Spec.Backend.ServicePort)
glog.V(3).Infof("Creating upstream %q", defBackend)
upstreams[defBackend] = newUpstream(defBackend)
@ -537,6 +542,16 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
}
}
// configure traffic shaping for canary
if anns.Canary.Enabled {
upstreams[defBackend].NoServer = true
upstreams[defBackend].TrafficShapingPolicy = ingress.TrafficShapingPolicy{
Weight: anns.Canary.Weight,
Header: anns.Canary.Header,
Cookie: anns.Canary.Cookie,
}
}
if len(upstreams[defBackend].Endpoints) == 0 {
endps, err := n.serviceEndpoints(svcKey, ing.Spec.Backend.ServicePort.String())
upstreams[defBackend].Endpoints = append(upstreams[defBackend].Endpoints, endps...)
@ -558,10 +573,7 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
}
for _, path := range rule.HTTP.Paths {
name := fmt.Sprintf("%v-%v-%v",
ing.Namespace,
path.Backend.ServiceName,
path.Backend.ServicePort.String())
name := upstreamName(ing.Namespace, path.Backend.ServiceName, path.Backend.ServicePort)
if _, ok := upstreams[name]; ok {
continue
@ -595,6 +607,16 @@ func (n *NGINXController) createUpstreams(data []*extensions.Ingress, du *ingres
}
}
// configure traffic shaping for canary
if anns.Canary.Enabled {
upstreams[name].NoServer = true
upstreams[name].TrafficShapingPolicy = ingress.TrafficShapingPolicy{
Weight: anns.Canary.Weight,
Header: anns.Canary.Header,
Cookie: anns.Canary.Cookie,
}
}
if len(upstreams[name].Endpoints) == 0 {
endp, err := n.serviceEndpoints(svcKey, path.Backend.ServicePort.String())
if err != nil {
@ -970,6 +992,63 @@ func (n *NGINXController) createServers(data []*extensions.Ingress,
return servers
}
// Compares an Ingress of a potential alternative backend's rules with each existing server and finds matching host + path pairs.
// If a match is found, we know that this server should back the alternative backend and add the alternative backend
// to a backend's alternative list.
// If no match is found, then the serverless backend is deleted.
func mergeAlternativeBackends(ing *extensions.Ingress, upstreams map[string]*ingress.Backend,
servers map[string]*ingress.Server) {
// merge catch-all alternative backends
if ing.Spec.Backend != nil {
upsName := upstreamName(ing.Namespace, ing.Spec.Backend.ServiceName, ing.Spec.Backend.ServicePort)
ups := upstreams[upsName]
defLoc := servers[defServerName].Locations[0]
glog.Infof("matching backend %v found for alternative backend %v",
upstreams[defLoc.Backend].Name, ups.Name)
upstreams[defLoc.Backend].AlternativeBackends =
append(upstreams[defLoc.Backend].AlternativeBackends, ups.Name)
}
for _, rule := range ing.Spec.Rules {
for _, path := range rule.HTTP.Paths {
upsName := upstreamName(ing.Namespace, path.Backend.ServiceName, path.Backend.ServicePort)
ups := upstreams[upsName]
merged := false
server := servers[rule.Host]
// find matching paths
for _, location := range server.Locations {
if location.Backend == defUpstreamName {
continue
}
if location.Path == path.Path && !upstreams[location.Backend].NoServer {
glog.Infof("matching backend %v found for alternative backend %v",
upstreams[location.Backend].Name, ups.Name)
upstreams[location.Backend].AlternativeBackends =
append(upstreams[location.Backend].AlternativeBackends, ups.Name)
merged = true
}
}
if !merged {
glog.Warningf("unable to find real backend for alternative backend %v. Deleting.", ups.Name)
delete(upstreams, ups.Name)
}
}
}
}
// extractTLSSecretName returns the name of the Secret containing a SSL
// certificate for the given host name, or an empty string.
func extractTLSSecretName(host string, ing *extensions.Ingress,

View file

@ -20,6 +20,7 @@ import (
"crypto/x509"
"crypto/x509/pkix"
"encoding/asn1"
"k8s.io/apimachinery/pkg/util/intstr"
"testing"
extensions "k8s.io/api/extensions/v1beta1"
@ -27,6 +28,184 @@ import (
"k8s.io/ingress-nginx/internal/ingress"
)
func TestMergeAlternativeBackends(t *testing.T) {
testCases := map[string]struct {
ingress *extensions.Ingress
upstreams map[string]*ingress.Backend
servers map[string]*ingress.Server
expNumAlternativeBackends int
expNumLocations int
}{
"alternative backend has no server and embeds into matching real backend": {
&extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
Namespace: "example",
},
Spec: extensions.IngressSpec{
Rules: []extensions.IngressRule{
{
Host: "example.com",
IngressRuleValue: extensions.IngressRuleValue{
HTTP: &extensions.HTTPIngressRuleValue{
Paths: []extensions.HTTPIngressPath{
{
Path: "/",
Backend: extensions.IngressBackend{
ServiceName: "http-svc-canary",
ServicePort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 80,
},
},
},
},
},
},
},
},
},
},
map[string]*ingress.Backend{
"example-http-svc-80": {
Name: "example-http-svc-80",
NoServer: false,
},
"example-http-svc-canary-80": {
Name: "example-http-svc-canary-80",
NoServer: true,
TrafficShapingPolicy: ingress.TrafficShapingPolicy{
Weight: 20,
},
},
},
map[string]*ingress.Server{
"example.com": {
Hostname: "example.com",
Locations: []*ingress.Location{
{
Path: "/",
Backend: "example-http-svc-80",
},
},
},
},
1,
1,
},
"merging a alternative backend matches with the correct host": {
&extensions.Ingress{
ObjectMeta: metav1.ObjectMeta{
Namespace: "example",
},
Spec: extensions.IngressSpec{
Rules: []extensions.IngressRule{
{
Host: "foo.bar",
IngressRuleValue: extensions.IngressRuleValue{
HTTP: &extensions.HTTPIngressRuleValue{
Paths: []extensions.HTTPIngressPath{
{
Path: "/",
Backend: extensions.IngressBackend{
ServiceName: "foo-http-svc-canary",
ServicePort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 80,
},
},
},
},
},
},
},
{
Host: "example.com",
IngressRuleValue: extensions.IngressRuleValue{
HTTP: &extensions.HTTPIngressRuleValue{
Paths: []extensions.HTTPIngressPath{
{
Path: "/",
Backend: extensions.IngressBackend{
ServiceName: "http-svc-canary",
ServicePort: intstr.IntOrString{
Type: intstr.Int,
IntVal: 80,
},
},
},
},
},
},
},
},
},
},
map[string]*ingress.Backend{
"example-foo-http-svc-80": {
Name: "example-foo-http-svc-80",
NoServer: false,
},
"example-foo-http-svc-canary-80": {
Name: "example-foo-http-svc-canary-80",
NoServer: true,
TrafficShapingPolicy: ingress.TrafficShapingPolicy{
Weight: 20,
},
},
"example-http-svc-80": {
Name: "example-http-svc-80",
NoServer: false,
},
"example-http-svc-canary-80": {
Name: "example-http-svc-canary-80",
NoServer: true,
TrafficShapingPolicy: ingress.TrafficShapingPolicy{
Weight: 20,
},
},
},
map[string]*ingress.Server{
"foo.bar": {
Hostname: "foo.bar",
Locations: []*ingress.Location{
{
Path: "/",
Backend: "example-foo-http-svc-80",
},
},
},
"example.com": {
Hostname: "example.com",
Locations: []*ingress.Location{
{
Path: "/",
Backend: "example-http-svc-80",
},
},
},
},
1,
1,
},
}
for title, tc := range testCases {
t.Run(title, func(t *testing.T) {
mergeAlternativeBackends(tc.ingress, tc.upstreams, tc.servers)
numAlternativeBackends := len(tc.upstreams["example-http-svc-80"].AlternativeBackends)
if numAlternativeBackends != tc.expNumAlternativeBackends {
t.Errorf("expected %d alternative backends (got %d)", tc.expNumAlternativeBackends, numAlternativeBackends)
}
numLocations := len(tc.servers["example.com"].Locations)
if numLocations != tc.expNumLocations {
t.Errorf("expected %d locations (got %d)", tc.expNumLocations, numLocations)
}
})
}
}
func TestExtractTLSSecretName(t *testing.T) {
testCases := map[string]struct {
host string

View file

@ -759,13 +759,16 @@ func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertif
service = &apiv1.Service{Spec: backend.Service.Spec}
}
luaBackend := &ingress.Backend{
Name: backend.Name,
Port: backend.Port,
SSLPassthrough: backend.SSLPassthrough,
SessionAffinity: backend.SessionAffinity,
UpstreamHashBy: backend.UpstreamHashBy,
LoadBalancing: backend.LoadBalancing,
Service: service,
Name: backend.Name,
Port: backend.Port,
SSLPassthrough: backend.SSLPassthrough,
SessionAffinity: backend.SessionAffinity,
UpstreamHashBy: backend.UpstreamHashBy,
LoadBalancing: backend.LoadBalancing,
Service: service,
NoServer: backend.NoServer,
TrafficShapingPolicy: backend.TrafficShapingPolicy,
AlternativeBackends: backend.AlternativeBackends,
}
var endpoints []ingress.Endpoint

View file

@ -17,10 +17,13 @@ limitations under the License.
package controller
import (
"k8s.io/apimachinery/pkg/util/intstr"
"os"
"os/exec"
"syscall"
"fmt"
"github.com/golang/glog"
api "k8s.io/api/core/v1"
@ -43,6 +46,11 @@ func newUpstream(name string) *ingress.Backend {
}
}
// upstreamName returns a formatted upstream name based on namespace, service, and port
func upstreamName(namespace string, service string, port intstr.IntOrString) string {
return fmt.Sprintf("%v-%v-%v", namespace, service, port.String())
}
// sysctlSomaxconn returns the maximum number of connections that can be queued
// for acceptance (value of net.core.somaxconn)
// http://nginx.org/en/docs/http/ngx_http_core_module.html#listen