Skip to content

Commit

Permalink
Handle conflicting service ports when merging per MCS spec
Browse files Browse the repository at this point in the history
The spec states that if the properties of service ports between
clusters don't match, the clusterset service should expose the union
of the ports. We're doing this however we're not properly handle
conflicts as per the spec. If multiple clusters have a matching port
by name but the other properties don't match, it should be considered
a conflict and the conflict resolution policy should be applied, ie the
port from the cluster with the oldest export timestamp should be used.
When merging, we need to sort the ports by cluster timestamp and use
just the port name when computing the union rather than all the
port properties as we did before.

Also when checking for conflicts, we should also check the AppProtocol
property.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Sep 27, 2024
1 parent 3989a7b commit 6c35f1a
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 34 deletions.
3 changes: 2 additions & 1 deletion coredns/resolver/service_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"

"github.com/submariner-io/admiral/pkg/slices"
"k8s.io/utils/ptr"
mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
)

Expand All @@ -44,7 +45,7 @@ func (si *serviceInfo) mergePorts() {
si.ports = info.endpointRecords[0].Ports
} else {
si.ports = slices.Intersect(si.ports, info.endpointRecords[0].Ports, func(p mcsv1a1.ServicePort) string {
return fmt.Sprintf("%s%s%d", p.Name, p.Protocol, p.Port)
return fmt.Sprintf("%s:%s:%d:%s", p.Name, p.Protocol, p.Port, ptr.Deref(p.AppProtocol, ""))
})
}
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/agent/controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
validations "k8s.io/apimachinery/pkg/util/validation"
"k8s.io/utils/ptr"
logf "sigs.k8s.io/controller-runtime/pkg/log"
mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
)
Expand Down Expand Up @@ -123,7 +124,8 @@ func New(spec *AgentSpecification, syncerConf broker.SyncerConfig, agentConfig A
agentController.endpointSliceController, err = newEndpointSliceController(spec, syncerConf, agentController.serviceExportClient,
agentController.serviceSyncer, func(serviceName string, serviceNamespace string) *mcsv1a1.ServiceImport {
obj, found, _ := agentController.serviceImportController.remoteSyncer.GetResource(
brokerAggregatedServiceImportName(serviceName, serviceNamespace), syncerConf.BrokerNamespace)
brokerAggregatedServiceImportName(serviceName, serviceNamespace),
agentController.endpointSliceController.syncer.GetBrokerNamespace())
if !found {
return nil
}
Expand Down Expand Up @@ -479,9 +481,9 @@ func (c converter) toServicePorts(from []discovery.EndpointPort) []mcsv1a1.Servi
to := make([]mcsv1a1.ServicePort, len(from))
for i := range from {
to[i] = mcsv1a1.ServicePort{
Name: *from[i].Name,
Protocol: *from[i].Protocol,
Port: *from[i].Port,
Name: ptr.Deref(from[i].Name, ""),
Protocol: ptr.Deref(from[i].Protocol, ""),
Port: ptr.Deref(from[i].Port, 0),
AppProtocol: from[i].AppProtocol,
}
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/agent/controller/clusterip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,22 @@ func testClusterIPServiceInTwoClusters() {
})
})

Context("with conflicting ports", func() {
BeforeEach(func() {
t.cluster2.service.Spec.Ports = []corev1.ServicePort{t.cluster1.service.Spec.Ports[0], toServicePort(port3)}
t.cluster2.service.Spec.Ports[0].Port++
t.aggregatedServicePorts = []mcsv1a1.ServicePort{port1, port2, port3}
})

It("should correctly set the ports in the aggregated ServiceImport and set the Conflict status condition", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2)

condition := newServiceExportConflictCondition(controller.PortConflictReason)
t.cluster1.awaitServiceExportCondition(condition)
t.cluster2.awaitServiceExportCondition(condition)
})
})

Context("with differing service types", func() {
BeforeEach(func() {
t.cluster2.service.Spec.ClusterIP = corev1.ClusterIPNone
Expand Down
10 changes: 8 additions & 2 deletions pkg/agent/controller/endpoint_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
k8slabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/ptr"
"k8s.io/utils/set"
mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
)
Expand Down Expand Up @@ -243,6 +244,10 @@ func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) (
mcsv1a1.LabelServiceName: name,
}))

servicePortKey := func(p mcsv1a1.ServicePort) string {
return fmt.Sprintf("%s:%s:%d:%s", p.Name, p.Protocol, p.Port, ptr.Deref(p.AppProtocol, ""))
}

var prevServicePorts []mcsv1a1.ServicePort
var intersectedServicePorts []mcsv1a1.ServicePort
clusterNames := set.New[string]()
Expand Down Expand Up @@ -271,7 +276,7 @@ func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) (
if conflict {
aggregatedSI := c.aggregatedServiceImportGetter(name, namespace)
if aggregatedSI == nil {
return false, nil
return true, nil
}

exposedOp := "intersection"
Expand Down Expand Up @@ -321,7 +326,8 @@ func (c *EndpointSliceController) enqueueForConflictCheck(ctx context.Context, e
func servicePortsToString(p []mcsv1a1.ServicePort) string {
s := make([]string, len(p))
for i := range p {
s[i] = fmt.Sprintf("[name: %s, protocol: %s, port: %v]", p[i].Name, p[i].Protocol, p[i].Port)
s[i] = fmt.Sprintf("[name: %s, protocol: %s, port: %v, appProtocol: %q]", p[i].Name, p[i].Protocol, p[i].Port,
ptr.Deref(p[i].AppProtocol, ""))
}

return strings.Join(s, ", ")
Expand Down
25 changes: 4 additions & 21 deletions pkg/agent/controller/service_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"net"
"reflect"
"strconv"
"strings"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -699,28 +698,12 @@ func (c *ServiceImportController) localServiceImportLister(transform func(si *mc
}

func findClusterWithOldestTimestamp(from map[string]string) string {
oldest := int64(math.MaxInt64)
foundCluster := ""

for k, v := range from {
cluster, found := strings.CutPrefix(k, timestampAnnotationPrefix)
if !found {
continue
}

t, err := strconv.ParseInt(v, 10, 64)
if err != nil {
logger.Warningf("Invalid timestamp annotation value %q for cluster %q", v, cluster)
continue
}

if t < oldest || (t == oldest && cluster < foundCluster) {
foundCluster = cluster
oldest = t
}
names := getClusterNamesOrderedByTimestamp(from)
if len(names) > 0 {
return names[0]
}

return foundCluster
return ""
}

func toSessionAffinityConfigString(c *corev1.SessionAffinityConfig) string {
Expand Down
64 changes: 58 additions & 6 deletions pkg/agent/controller/service_import_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package controller

import (
"context"
"fmt"
goslices "slices"
"strconv"
"strings"

"github.com/pkg/errors"
"github.com/submariner-io/admiral/pkg/log"
Expand Down Expand Up @@ -76,14 +78,25 @@ func (a *ServiceImportAggregator) setServicePorts(ctx context.Context, si *mcsv1
serviceNamespace, serviceName)
}

si.Spec.Ports = make([]mcsv1a1.ServicePort, 0)
portsByCluster := map[string][]mcsv1a1.ServicePort{}

for i := range list.Items {
eps := a.converter.toEndpointSlice(&list.Items[i])
si.Spec.Ports = slices.Union(si.Spec.Ports, a.converter.toServicePorts(eps.Ports), servicePortKey)
portsByCluster[eps.Labels[constants.MCSLabelSourceCluster]] = a.converter.toServicePorts(eps.Ports)
}

// Sort the clusters by their ServiceExport timestamps stored in the ServiceImport annotations so conflicting ports are
// resolved by taking the oldest as per the MCS spec's conflict resolution policy.

si.Spec.Ports = make([]mcsv1a1.ServicePort, 0)
for _, clusterName := range getClusterNamesOrderedByTimestamp(si.Annotations) {
ports := portsByCluster[clusterName]
si.Spec.Ports = slices.Union(si.Spec.Ports, ports, func(p mcsv1a1.ServicePort) string {
return p.Name
})
}

logger.V(log.DEBUG).Infof("Calculated ports for aggregated ServiceImport %q: %#v", si.Name, si.Spec.Ports)
logger.V(log.DEBUG).Infof("Calculated ports for aggregated ServiceImport %q: %s", si.Name, servicePortsToString(si.Spec.Ports))

return nil
}
Expand Down Expand Up @@ -152,6 +165,45 @@ func clusterStatusKey(c mcsv1a1.ClusterStatus) string {
return c.Cluster
}

func servicePortKey(p mcsv1a1.ServicePort) string {
return fmt.Sprintf("%s%s%d", p.Name, p.Protocol, p.Port)
type clusterSortInfo struct {
name string
timestamp int64
}

func getClusterNamesOrderedByTimestamp(from map[string]string) []string {
info := make([]clusterSortInfo, 0, len(from))

for k, v := range from {
cluster, found := strings.CutPrefix(k, timestampAnnotationPrefix)
if !found {
continue
}

t, err := strconv.ParseInt(v, 10, 64)
if err != nil {
logger.Warningf("Invalid timestamp annotation value %q for cluster %q", v, cluster)
continue
}

info = append(info, clusterSortInfo{name: cluster, timestamp: t})
}

goslices.SortFunc(info, func(a, b clusterSortInfo) int {
if a.timestamp == b.timestamp {
return strings.Compare(a.name, b.name)
}

if a.timestamp < b.timestamp {
return -1
}

return 1
})

sortedNames := make([]string, len(info))
for i := range info {
sortedNames[i] = info[i].name
}

return sortedNames
}

0 comments on commit 6c35f1a

Please sign in to comment.