Skip to content

Commit

Permalink
Adjust port conflict message if using clusterset IP
Browse files Browse the repository at this point in the history
State that it will expose the union of the service ports instead of
the intersection.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Sep 24, 2024
1 parent bead0f9 commit 078f986
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 19 deletions.
10 changes: 9 additions & 1 deletion pkg/agent/controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,15 @@ func New(spec *AgentSpecification, syncerConf broker.SyncerConfig, agentConfig A
agentController.serviceExportClient.localSyncer = agentController.serviceExportSyncer

agentController.endpointSliceController, err = newEndpointSliceController(spec, syncerConf, agentController.serviceExportClient,
agentController.serviceSyncer)
agentController.serviceSyncer, func(serviceName string, serviceNamespace string) *mcsv1a1.ServiceImport {
obj, found, _ := agentController.serviceImportController.remoteSyncer.GetResource(
brokerAggregatedServiceImportName(serviceName, serviceNamespace), syncerConf.BrokerNamespace)
if !found {
return nil
}

return obj.(*mcsv1a1.ServiceImport)
})
if err != nil {
return nil, err
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/agent/controller/clusterip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,22 @@ func testClusterIPServiceInTwoClusters() {
t.cluster1.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
t.cluster2.ensureNoServiceExportCondition(mcsv1a1.ServiceExportConflict)
})

Context("with differing ports", func() {
BeforeEach(func() {
t.cluster2.service.Spec.Ports = []corev1.ServicePort{toServicePort(port1), toServicePort(port3)}
t.aggregatedServicePorts = []mcsv1a1.ServicePort{port1, port2, port3}
})

It("should correctly set the Conflict status condition", func() {
t.awaitNonHeadlessServiceExported(&t.cluster1, &t.cluster2)

t.cluster1.awaitServiceExportCondition(newServiceExportConflictCondition(controller.PortConflictReason))

Expect(ptr.Deref(t.cluster1.retrieveServiceExportCondition(
t.cluster1.serviceExport, mcsv1a1.ServiceExportConflict).Message, "")).To(ContainSubstring("expose the union"))
})
})
})
}

Expand Down
40 changes: 30 additions & 10 deletions pkg/agent/controller/endpoint_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,20 @@ import (
k8slabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/set"
mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
)

//nolint:gocritic // (hugeParam) This function modifies syncerConf so we don't want to pass by pointer.
func newEndpointSliceController(spec *AgentSpecification, syncerConfig broker.SyncerConfig,
serviceExportClient *ServiceExportClient, serviceSyncer syncer.Interface,
serviceExportClient *ServiceExportClient, serviceSyncer syncer.Interface, aggregatedServiceImportGetter AggregatedServiceImportGetterFn,
) (*EndpointSliceController, error) {
c := &EndpointSliceController{
clusterID: spec.ClusterID,
serviceExportClient: serviceExportClient,
serviceSyncer: serviceSyncer,
conflictCheckWorkQueue: workqueue.New("ConflictChecker"),
clusterID: spec.ClusterID,
serviceExportClient: serviceExportClient,
serviceSyncer: serviceSyncer,
conflictCheckWorkQueue: workqueue.New("ConflictChecker"),
aggregatedServiceImportGetter: aggregatedServiceImportGetter,
}

syncerConfig.LocalNamespace = metav1.NamespaceAll
Expand Down Expand Up @@ -243,12 +245,18 @@ func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) (

var prevServicePorts []mcsv1a1.ServicePort
var intersectedServicePorts []mcsv1a1.ServicePort
clusterNames := make([]string, 0, len(epsList))
clusterNames := set.New[string]()
conflict := false

for _, o := range epsList {
eps := o.(*discovery.EndpointSlice)

if clusterNames.Has(eps.Labels[constants.MCSLabelSourceCluster]) {
continue
}

clusterNames.Insert(eps.Labels[constants.MCSLabelSourceCluster])

servicePorts := c.serviceExportClient.toServicePorts(eps.Ports)
if prevServicePorts == nil {
prevServicePorts = servicePorts
Expand All @@ -258,16 +266,28 @@ func (c *EndpointSliceController) checkForConflicts(_, name, namespace string) (
}

intersectedServicePorts = slices.Intersect(intersectedServicePorts, servicePorts, servicePortKey)

clusterNames = append(clusterNames, eps.Labels[constants.MCSLabelSourceCluster])
}

if conflict {
aggregatedSI := c.aggregatedServiceImportGetter(name, namespace)
if aggregatedSI == nil {
return false, nil
}

exposedOp := "intersection"
exposedPorts := intersectedServicePorts

if len(aggregatedSI.Spec.IPs) > 0 {
exposedPorts = aggregatedSI.Spec.Ports
exposedOp = "union"
}

c.serviceExportClient.UpdateStatusConditions(ctx, name, namespace, newServiceExportCondition(
mcsv1a1.ServiceExportConflict, corev1.ConditionTrue, PortConflictReason,
fmt.Sprintf("The service ports conflict between the constituent clusters %s. "+
"The service will expose the intersection of all the ports: %s",
fmt.Sprintf("[%s]", strings.Join(clusterNames, ", ")), servicePortsToString(intersectedServicePorts))))
"The service will expose the %s of all the ports: %s",
fmt.Sprintf("[%s]", strings.Join(clusterNames.UnsortedList(), ", ")), exposedOp,
servicePortsToString(exposedPorts))))
} else if c.serviceExportClient.hasCondition(name, namespace, mcsv1a1.ServiceExportConflict, PortConflictReason) {
c.serviceExportClient.UpdateStatusConditions(ctx, name, namespace, newServiceExportCondition(
mcsv1a1.ServiceExportConflict, corev1.ConditionFalse, PortConflictReason, ""))
Expand Down
6 changes: 5 additions & 1 deletion pkg/agent/controller/service_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob

aggregate := &mcsv1a1.ServiceImport{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", serviceName, serviceNamespace),
Name: brokerAggregatedServiceImportName(serviceName, serviceNamespace),
Annotations: map[string]string{
mcsv1a1.LabelServiceName: serviceName,
constants.LabelSourceNamespace: serviceNamespace,
Expand Down Expand Up @@ -481,6 +481,10 @@ func (c *ServiceImportController) Distribute(ctx context.Context, obj runtime.Ob
return err
}

func brokerAggregatedServiceImportName(serviceName, serviceNamespace string) string {
return fmt.Sprintf("%s-%s", serviceName, serviceNamespace)
}

func (c *ServiceImportController) Delete(ctx context.Context, obj runtime.Object) error {
localServiceImport := c.converter.toServiceImport(obj)
key, _ := cache.MetaNamespaceKeyFunc(localServiceImport)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/controller/service_import_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (a *ServiceImportAggregator) updateOnDelete(ctx context.Context, name, name
func (a *ServiceImportAggregator) update(ctx context.Context, name, namespace string, mutate func(*mcsv1a1.ServiceImport) error) error {
aggregate := &mcsv1a1.ServiceImport{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", name, namespace),
Name: brokerAggregatedServiceImportName(name, namespace),
},
}

Expand Down
15 changes: 9 additions & 6 deletions pkg/agent/controller/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (

type EndpointSliceListerFn func(selector k8slabels.Selector) []runtime.Object

type AggregatedServiceImportGetterFn func(serviceName string, serviceNamespace string) *mcsv1a1.ServiceImport

type converter struct {
scheme *runtime.Scheme
}
Expand Down Expand Up @@ -123,12 +125,13 @@ type ServiceEndpointSliceController struct {

// EndpointSliceController encapsulates a syncer that syncs EndpointSlices to and from that broker.
type EndpointSliceController struct {
clusterID string
syncer *broker.Syncer
serviceImportAggregator *ServiceImportAggregator
serviceExportClient *ServiceExportClient
serviceSyncer syncer.Interface
conflictCheckWorkQueue workqueue.Interface
clusterID string
syncer *broker.Syncer
aggregatedServiceImportGetter AggregatedServiceImportGetterFn
serviceImportAggregator *ServiceImportAggregator
serviceExportClient *ServiceExportClient
serviceSyncer syncer.Interface
conflictCheckWorkQueue workqueue.Interface
}

type ServiceExportClient struct {
Expand Down

0 comments on commit 078f986

Please sign in to comment.