From 078f986b4613a240beee526801efb092d889a0c9 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Mon, 23 Sep 2024 08:24:53 -0400 Subject: [PATCH] Adjust port conflict message if using clusterset IP State that it will expose the union of the service ports instead of the intersection. Signed-off-by: Tom Pantelis --- pkg/agent/controller/agent.go | 10 ++++- .../controller/clusterip_service_test.go | 16 ++++++++ pkg/agent/controller/endpoint_slice.go | 40 ++++++++++++++----- pkg/agent/controller/service_import.go | 6 ++- .../controller/service_import_aggregator.go | 2 +- pkg/agent/controller/types.go | 15 ++++--- 6 files changed, 70 insertions(+), 19 deletions(-) diff --git a/pkg/agent/controller/agent.go b/pkg/agent/controller/agent.go index 87491cb5..d00f0509 100644 --- a/pkg/agent/controller/agent.go +++ b/pkg/agent/controller/agent.go @@ -121,7 +121,15 @@ func New(spec *AgentSpecification, syncerConf broker.SyncerConfig, agentConfig A agentController.serviceExportClient.localSyncer = agentController.serviceExportSyncer agentController.endpointSliceController, err = newEndpointSliceController(spec, syncerConf, agentController.serviceExportClient, - agentController.serviceSyncer) + agentController.serviceSyncer, func(serviceName string, serviceNamespace string) *mcsv1a1.ServiceImport { + obj, found, _ := agentController.serviceImportController.remoteSyncer.GetResource( + brokerAggregatedServiceImportName(serviceName, serviceNamespace), syncerConf.BrokerNamespace) + if !found { + return nil + } + + return obj.(*mcsv1a1.ServiceImport) + }) if err != nil { return nil, err } diff --git a/pkg/agent/controller/clusterip_service_test.go b/pkg/agent/controller/clusterip_service_test.go index 300704a4..0fe57c08 100644 --- a/pkg/agent/controller/clusterip_service_test.go +++ b/pkg/agent/controller/clusterip_service_test.go @@ -837,6 +837,22 @@ func testClusterIPServiceInTwoClusters() { t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) t.cluster2.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict) }) + + Context("with differing ports", func() { + BeforeEach(func() { + t.cluster2.service.Spec.Ports = []corev1.ServicePort{toServicePort(port1), toServicePort(port3)} + t.aggregatedServicePorts = []mcsv1a1.ServicePort{port1, port2, port3} + }) + + It("should correctly set the Conflict status condition", func() { + t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2) + + t.cluster1.awaitServiceExportCondition(newServiceExportConflictCondition(controller.PortConflictReason)) + + Expect(ptr.Deref(t.cluster1.retrieveServiceExportCondition( + t.cluster1.serviceExport, mcsv1a1.ServiceExportConflict).Message, "")).To(ContainSubstring("expose the union")) + }) + }) }) } diff --git a/pkg/agent/controller/endpoint_slice.go b/pkg/agent/controller/endpoint_slice.go index a2c40192..564c92b2 100644 --- a/pkg/agent/controller/endpoint_slice.go +++ b/pkg/agent/controller/endpoint_slice.go @@ -39,18 +39,20 @@ import ( k8slabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/utils/set" mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) //nolint:gocritic // (hugeParam) This function modifies syncerConf so we don't want to pass by pointer. func newEndpointSliceController(spec *AgentSpecification, syncerConfig broker.SyncerConfig, - serviceExportClient *ServiceExportClient, serviceSyncer syncer.Interface, + serviceExportClient *ServiceExportClient, serviceSyncer syncer.Interface, aggregatedServiceImportGetter AggregatedServiceImportGetterFn, ) (*EndpointSliceController, error) { c := &EndpointSliceController{ - clusterID: spec.ClusterID, - serviceExportClient: serviceExportClient, - serviceSyncer: serviceSyncer, - conflictCheckWorkQueue: workqueue.New("ConflictChecker"), + clusterID: spec.ClusterID, + serviceExportClient: serviceExportClient, + serviceSyncer: serviceSyncer, + conflictCheckWorkQueue: workqueue.New("ConflictChecker"), + aggregatedServiceImportGetter: aggregatedServiceImportGetter, } syncerConfig.LocalNamespace = metav1.NamespaceAll @@ -243,12 +245,18 @@ func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) ( var prevServicePorts []mcsv1a1.ServicePort var intersectedServicePorts []mcsv1a1.ServicePort - clusterNames := make([]string, 0, len(epsList)) + clusterNames := set.New[string]() conflict := false for _, o := range epsList { eps := o.(*discovery.EndpointSlice) + if clusterNames.Has(eps.Labels[constants.MCSLabelSourceCluster]) { + continue + } + + clusterNames.Insert(eps.Labels[constants.MCSLabelSourceCluster]) + servicePorts := c.serviceExportClient.toServicePorts(eps.Ports) if prevServicePorts == nil { prevServicePorts = servicePorts @@ -258,16 +266,28 @@ func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) ( } intersectedServicePorts = slices.Intersect(intersectedServicePorts, servicePorts, servicePortKey) - - clusterNames = append(clusterNames, eps.Labels[constants.MCSLabelSourceCluster]) } if conflict { + aggregatedSI := c.aggregatedServiceImportGetter(name, namespace) + if aggregatedSI == nil { + return false, nil + } + + exposedOp := "intersection" + exposedPorts := intersectedServicePorts + + if len(aggregatedSI.Spec.IPs) > 0 { + exposedPorts = aggregatedSI.Spec.Ports + exposedOp = "union" + } + c.serviceExportClient.UpdateStatusConditions(ctx, name, namespace, newServiceExportCondition( mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, PortConflictReason, fmt.Sprintf("The service ports conflict between the constituent clusters %s. "+ - "The service will expose the intersection of all the ports: %s", - fmt.Sprintf("[%s]", strings.Join(clusterNames, ", ")), servicePortsToString(intersectedServicePorts)))) + "The service will expose the %s of all the ports: %s", + fmt.Sprintf("[%s]", strings.Join(clusterNames.UnsortedList(), ", ")), exposedOp, + servicePortsToString(exposedPorts)))) } else if c.serviceExportClient.hasCondition(name, namespace, mcsv1a1.ServiceExportConflict, PortConflictReason) { c.serviceExportClient.UpdateStatusConditions(ctx, name, namespace, newServiceExportCondition( mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, PortConflictReason, "")) diff --git a/pkg/agent/controller/service_import.go b/pkg/agent/controller/service_import.go index aea5de8d..e54018f7 100644 --- a/pkg/agent/controller/service_import.go +++ b/pkg/agent/controller/service_import.go @@ -355,7 +355,7 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob aggregate := &mcsv1a1.ServiceImport{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%s", serviceName, serviceNamespace), + Name: brokerAggregatedServiceImportName(serviceName, serviceNamespace), Annotations: map[string]string{ mcsv1a1.LabelServiceName: serviceName, constants.LabelSourceNamespace: serviceNamespace, @@ -481,6 +481,10 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob return err } +func brokerAggregatedServiceImportName(serviceName, serviceNamespace string) string { + return fmt.Sprintf("%s-%s", serviceName, serviceNamespace) +} + func (c *ServiceImportController) Delete(ctx context.Context, obj runtime.Object) error { localServiceImport := c.converter.toServiceImport(obj) key, _ := cache.MetaNamespaceKeyFunc(localServiceImport) diff --git a/pkg/agent/controller/service_import_aggregator.go b/pkg/agent/controller/service_import_aggregator.go index ef1c446f..afb34b19 100644 --- a/pkg/agent/controller/service_import_aggregator.go +++ b/pkg/agent/controller/service_import_aggregator.go @@ -110,7 +110,7 @@ func (a *ServiceImportAggregator) updateOnDelete(ctx context.Context, name, name func (a *ServiceImportAggregator) update(ctx context.Context, name, namespace string, mutate func(*mcsv1a1.ServiceImport) error) error { aggregate := &mcsv1a1.ServiceImport{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%s", name, namespace), + Name: brokerAggregatedServiceImportName(name, namespace), }, } diff --git a/pkg/agent/controller/types.go b/pkg/agent/controller/types.go index ab4a7e79..893ee380 100644 --- a/pkg/agent/controller/types.go +++ b/pkg/agent/controller/types.go @@ -47,6 +47,8 @@ const ( type EndpointSliceListerFn func(selector k8slabels.Selector) []runtime.Object +type AggregatedServiceImportGetterFn func(serviceName string, serviceNamespace string) *mcsv1a1.ServiceImport + type converter struct { scheme *runtime.Scheme } @@ -123,12 +125,13 @@ type ServiceEndpointSliceController struct { // EndpointSliceController encapsulates a syncer that syncs EndpointSlices to and from that broker. type EndpointSliceController struct { - clusterID string - syncer *broker.Syncer - serviceImportAggregator *ServiceImportAggregator - serviceExportClient *ServiceExportClient - serviceSyncer syncer.Interface - conflictCheckWorkQueue workqueue.Interface + clusterID string + syncer *broker.Syncer + aggregatedServiceImportGetter AggregatedServiceImportGetterFn + serviceImportAggregator *ServiceImportAggregator + serviceExportClient *ServiceExportClient + serviceSyncer syncer.Interface + conflictCheckWorkQueue workqueue.Interface } type ServiceExportClient struct {