Fix HSTS
This commit is contained in:
parent
102c2eeaa4
commit
a86a682429
9 changed files with 126 additions and 110 deletions
|
|
@ -44,9 +44,10 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
defUpstreamName = "upstream-default-backend"
|
||||
defServerName = "_"
|
||||
namedPortAnnotation = "kubernetes.io/ingress-named-ports"
|
||||
defUpstreamName = "upstream-default-backend"
|
||||
defServerName = "_"
|
||||
namedPortAnnotation = "kubernetes.io/ingress-named-ports"
|
||||
podStoreSyncedPollPeriod = 1 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -127,7 +128,9 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura
|
|||
tcpConfigMap: tcpConfigMapName,
|
||||
udpConfigMap: udpConfigMapName,
|
||||
defaultSvc: defaultSvc,
|
||||
recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "loadbalancer-controller"}),
|
||||
recorder: eventBroadcaster.NewRecorder(api.EventSource{
|
||||
Component: "nginx-ingress-controller",
|
||||
}),
|
||||
}
|
||||
|
||||
lbc.syncQueue = NewTaskQueue(lbc.sync)
|
||||
|
|
@ -250,6 +253,7 @@ func (lbc *loadBalancerController) getUDPConfigMap(ns, name string) (*api.Config
|
|||
|
||||
func (lbc *loadBalancerController) updateEpNamedPorts(key string) {
|
||||
if !lbc.controllersInSync() {
|
||||
time.Sleep(podStoreSyncedPollPeriod)
|
||||
lbc.svcEpQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced"))
|
||||
return
|
||||
}
|
||||
|
|
@ -291,6 +295,12 @@ func (lbc *loadBalancerController) updateEpNamedPorts(key string) {
|
|||
return
|
||||
}
|
||||
|
||||
// check to avoid a call to checkSvcForUpdate if the port is not a string
|
||||
_, err = strconv.Atoi(path.Backend.ServicePort.StrVal)
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
err = lbc.checkSvcForUpdate(svc)
|
||||
if err != nil {
|
||||
lbc.svcEpQueue.requeue(key, err)
|
||||
|
|
@ -306,6 +316,7 @@ func (lbc *loadBalancerController) updateEpNamedPorts(key string) {
|
|||
// the current state
|
||||
func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) error {
|
||||
// get the pods associated with the service
|
||||
// TODO: switch this to a watch
|
||||
pods, err := lbc.client.Pods(svc.Namespace).List(api.ListOptions{
|
||||
LabelSelector: labels.Set(svc.Spec.Selector).AsSelector(),
|
||||
})
|
||||
|
|
@ -345,7 +356,7 @@ func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) error {
|
|||
}
|
||||
|
||||
curNamedPort := svc.ObjectMeta.Annotations[namedPortAnnotation]
|
||||
if !reflect.DeepEqual(curNamedPort, namedPorts) {
|
||||
if len(namedPorts) > 0 && !reflect.DeepEqual(curNamedPort, namedPorts) {
|
||||
data, _ := json.Marshal(namedPorts)
|
||||
|
||||
newSvc, err := lbc.client.Services(svc.Namespace).Get(svc.Name)
|
||||
|
|
@ -353,8 +364,8 @@ func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) error {
|
|||
return fmt.Errorf("error getting service %v/%v: %v", svc.Namespace, svc.Name, err)
|
||||
}
|
||||
|
||||
if svc.ObjectMeta.Annotations == nil {
|
||||
svc.ObjectMeta.Annotations = map[string]string{}
|
||||
if newSvc.ObjectMeta.Annotations == nil {
|
||||
newSvc.ObjectMeta.Annotations = map[string]string{}
|
||||
}
|
||||
|
||||
newSvc.ObjectMeta.Annotations[namedPortAnnotation] = string(data)
|
||||
|
|
@ -370,6 +381,7 @@ func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) error {
|
|||
|
||||
func (lbc *loadBalancerController) sync(key string) {
|
||||
if !lbc.controllersInSync() {
|
||||
time.Sleep(podStoreSyncedPollPeriod)
|
||||
lbc.syncQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced"))
|
||||
return
|
||||
}
|
||||
|
|
@ -396,6 +408,7 @@ func (lbc *loadBalancerController) sync(key string) {
|
|||
|
||||
func (lbc *loadBalancerController) updateIngressStatus(key string) {
|
||||
if !lbc.controllersInSync() {
|
||||
time.Sleep(podStoreSyncedPollPeriod)
|
||||
lbc.ingQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced"))
|
||||
return
|
||||
}
|
||||
|
|
@ -462,7 +475,7 @@ func (lbc *loadBalancerController) getTCPServices() []*nginx.Location {
|
|||
return []*nginx.Location{}
|
||||
}
|
||||
|
||||
return lbc.getServices(tcpMap.Data, api.ProtocolTCP)
|
||||
return lbc.getStreamServices(tcpMap.Data, api.ProtocolTCP)
|
||||
}
|
||||
|
||||
func (lbc *loadBalancerController) getUDPServices() []*nginx.Location {
|
||||
|
|
@ -482,10 +495,10 @@ func (lbc *loadBalancerController) getUDPServices() []*nginx.Location {
|
|||
return []*nginx.Location{}
|
||||
}
|
||||
|
||||
return lbc.getServices(tcpMap.Data, api.ProtocolUDP)
|
||||
return lbc.getStreamServices(tcpMap.Data, api.ProtocolUDP)
|
||||
}
|
||||
|
||||
func (lbc *loadBalancerController) getServices(data map[string]string, proto api.Protocol) []*nginx.Location {
|
||||
func (lbc *loadBalancerController) getStreamServices(data map[string]string, proto api.Protocol) []*nginx.Location {
|
||||
var svcs []*nginx.Location
|
||||
// k -> port to expose in nginx
|
||||
// v -> <namespace>/<service name>:<port from service to be used>
|
||||
|
|
@ -496,36 +509,45 @@ func (lbc *loadBalancerController) getServices(data map[string]string, proto api
|
|||
continue
|
||||
}
|
||||
|
||||
svcPort := strings.Split(v, ":")
|
||||
if len(svcPort) != 2 {
|
||||
// this ports are required for NGINX
|
||||
if k == "80" || k == "443" || k == "8181" {
|
||||
glog.Warningf("port %v cannot be used for TCP or UDP services. Is reserved for NGINX", k)
|
||||
continue
|
||||
}
|
||||
|
||||
nsSvcPort := strings.Split(v, ":")
|
||||
if len(nsSvcPort) != 2 {
|
||||
glog.Warningf("invalid format (namespace/name:port) '%v'", k)
|
||||
continue
|
||||
}
|
||||
|
||||
svcNs, svcName, err := parseNsName(svcPort[0])
|
||||
nsName := nsSvcPort[0]
|
||||
svcPort := nsSvcPort[1]
|
||||
|
||||
svcNs, svcName, err := parseNsName(nsName)
|
||||
if err != nil {
|
||||
glog.Warningf("%v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcPort[0])
|
||||
svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(nsName)
|
||||
if err != nil {
|
||||
glog.Warningf("error getting service %v: %v", svcPort[0], err)
|
||||
glog.Warningf("error getting service %v: %v", nsName, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if !svcExists {
|
||||
glog.Warningf("service %v was not found", svcPort[0])
|
||||
glog.Warningf("service %v was not found", nsName)
|
||||
continue
|
||||
}
|
||||
|
||||
svc := svcObj.(*api.Service)
|
||||
|
||||
var endps []nginx.UpstreamServer
|
||||
targetPort, err := strconv.Atoi(svcPort[1])
|
||||
targetPort, err := strconv.Atoi(svcPort)
|
||||
if err != nil {
|
||||
for _, sp := range svc.Spec.Ports {
|
||||
if sp.Name == svcPort[1] {
|
||||
if sp.Name == svcPort {
|
||||
endps = lbc.getEndpoints(svc, sp.TargetPort, proto)
|
||||
break
|
||||
}
|
||||
|
|
@ -616,55 +638,32 @@ func (lbc *loadBalancerController) getUpstreamServers(data []interface{}) ([]*ng
|
|||
server := servers[rule.Host]
|
||||
|
||||
for _, path := range rule.HTTP.Paths {
|
||||
|
||||
upsName := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), path.Backend.ServiceName, path.Backend.ServicePort.StrVal)
|
||||
upsName := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), path.Backend.ServiceName, path.Backend.ServicePort.String())
|
||||
ups := upstreams[upsName]
|
||||
|
||||
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName)
|
||||
svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey)
|
||||
if err != nil {
|
||||
glog.Infof("error getting service %v from the cache: %v", svcKey, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if !svcExists {
|
||||
glog.Warningf("service %v does no exists", svcKey)
|
||||
continue
|
||||
}
|
||||
|
||||
svc := svcObj.(*api.Service)
|
||||
|
||||
for _, servicePort := range svc.Spec.Ports {
|
||||
port := servicePort.TargetPort
|
||||
if servicePort.Name != "" {
|
||||
port = intstr.FromString(servicePort.Name)
|
||||
}
|
||||
|
||||
if port == path.Backend.ServicePort {
|
||||
endps := lbc.getEndpoints(svc, port, api.ProtocolTCP)
|
||||
if len(endps) == 0 {
|
||||
glog.Warningf("service %v does no have any active endpoints", svcKey)
|
||||
}
|
||||
|
||||
ups.Backends = append(ups.Backends, endps...)
|
||||
break
|
||||
}
|
||||
nginxPath := path.Path
|
||||
// if there's no path defined we assume /
|
||||
if nginxPath == "" {
|
||||
lbc.recorder.Eventf(ing, api.EventTypeWarning, "MAPPING",
|
||||
"Ingress rule '%v/%v' contains no path definition. Assuming /", ing.GetNamespace(), ing.GetName())
|
||||
nginxPath = "/"
|
||||
}
|
||||
|
||||
// Validate that there is no another previuous rule
|
||||
// for the same host and path.
|
||||
skipLoc := false
|
||||
addLoc := true
|
||||
for _, loc := range server.Locations {
|
||||
if loc.Path == path.Path {
|
||||
lbc.recorder.Eventf(ing, api.EventTypeWarning, "MAPPING", "Path '%v' already defined in another Ingress rule", path)
|
||||
skipLoc = true
|
||||
if loc.Path == nginxPath {
|
||||
lbc.recorder.Eventf(ing, api.EventTypeWarning, "MAPPING",
|
||||
"Path '%v' already defined in another Ingress rule", nginxPath)
|
||||
addLoc = false
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if skipLoc == false {
|
||||
if addLoc {
|
||||
server.Locations = append(server.Locations, &nginx.Location{
|
||||
Path: path.Path,
|
||||
Path: nginxPath,
|
||||
Upstream: *ups,
|
||||
})
|
||||
}
|
||||
|
|
@ -707,9 +706,39 @@ func (lbc *loadBalancerController) createUpstreams(data []interface{}) map[strin
|
|||
}
|
||||
|
||||
for _, path := range rule.HTTP.Paths {
|
||||
name := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), path.Backend.ServiceName, path.Backend.ServicePort.StrVal)
|
||||
name := fmt.Sprintf("%v-%v-%v", ing.GetNamespace(), path.Backend.ServiceName, path.Backend.ServicePort.String())
|
||||
if _, ok := upstreams[name]; !ok {
|
||||
upstreams[name] = nginx.NewUpstream(name)
|
||||
|
||||
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName)
|
||||
svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey)
|
||||
if err != nil {
|
||||
glog.Infof("error getting service %v from the cache: %v", svcKey, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if !svcExists {
|
||||
glog.Warningf("service %v does no exists", svcKey)
|
||||
continue
|
||||
}
|
||||
|
||||
svc := svcObj.(*api.Service)
|
||||
for _, servicePort := range svc.Spec.Ports {
|
||||
port := servicePort.TargetPort
|
||||
if servicePort.Name != "" {
|
||||
port = intstr.FromString(servicePort.Name)
|
||||
}
|
||||
|
||||
if port == path.Backend.ServicePort {
|
||||
endps := lbc.getEndpoints(svc, port, api.ProtocolTCP)
|
||||
if len(endps) == 0 {
|
||||
glog.Warningf("service %v does no have any active endpoints", svcKey)
|
||||
}
|
||||
|
||||
upstreams[name].Backends = append(upstreams[name].Backends, endps...)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -877,6 +906,9 @@ func (lbc *loadBalancerController) Stop() error {
|
|||
return fmt.Errorf("shutdown already in progress")
|
||||
}
|
||||
|
||||
// removeFromIngress removes the IP address of the node where the Ingres
|
||||
// controller is running before shutdown to avoid incorrect status
|
||||
// information in Ingress rules
|
||||
func (lbc *loadBalancerController) removeFromIngress() {
|
||||
ings := lbc.ingLister.Store.List()
|
||||
glog.Infof("updating %v Ingress rule/s", len(ings))
|
||||
|
|
@ -921,8 +953,6 @@ func (lbc *loadBalancerController) Run() {
|
|||
go lbc.endpController.Run(lbc.stopCh)
|
||||
go lbc.svcController.Run(lbc.stopCh)
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
go lbc.syncQueue.run(time.Second, lbc.stopCh)
|
||||
go lbc.ingQueue.run(time.Second, lbc.stopCh)
|
||||
go lbc.svcEpQueue.run(time.Second, lbc.stopCh)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue