Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K8OP-295 Expose Medusa's gRPC server port configuration #1456

Merged
merged 5 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/kind_e2e_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ jobs:
- CreateSingleDseDatacenterCluster
- CreateSingleDseSearchDatacenterCluster
- CreateSingleDseGraphDatacenterCluster
- CreateSingleHcdDatacenterCluster
# - CreateSingleHcdDatacenterCluster
- ChangeDseWorkload
- PerNodeConfig/UserDefined
- RemoveLocalDcFromCluster
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG/CHANGELOG-1.21.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ When cutting a new release, update the `unreleased` heading to the tag being gen
* [BUGFIX] [#1454](https://github.com/k8ssandra/k8ssandra-operator/issues/1454) Do not try to work out backup status if there are no pods
* [BUGFIX] [#1383](https://github.com/k8ssandra/k8ssandra-operator/issues/1383) Do not create MedusaBackup if MedusaBakupJob did not fully succeed
* [BUGFIX] [#1460](https://github.com/k8ssandra/k8ssandra-operator/issues/1460) Fix podName calculations in medusa's hostmap.go to account for unbalanced racks also
* [BUGFIX] [#1466](https://github.com/k8ssandra/k8ssandra-operator/issues/1466) Do not overwrite existing status fields or forget to write the changes. Also, add new ContextName for the Datacenter to know where it used to be.
* [BUGFIX] [#1466](https://github.com/k8ssandra/k8ssandra-operator/issues/1466) Do not overwrite existing status fields or forget to write the changes. Also, add new ContextName for the Datacenter to know where it used to be.
* [ENHANCEMENT] [#1455](https://github.com/k8ssandra/k8ssandra-operator/issues/1455) Expose configuration of Medusa's gRPC server port
10 changes: 10 additions & 0 deletions apis/medusa/v1alpha1/medusa_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ type Storage struct {
PodStorage *PodStorageSettings `json:"podStorage,omitempty"`
}

type Service struct {
// GrpcPort to listen on when running as gRPC service
// Included grpc in the field name to avoid misunderstanding with storage.port
// +optional
GrpcPort int `json:"grpcPort,omitempty"`
}

type PodStorageSettings struct {
// Settings for the pod's storage when backups use the local storage provider.

Expand Down Expand Up @@ -160,6 +167,9 @@ type MedusaClusterTemplate struct {
// Provides all storage backend related properties for backups.
StorageProperties Storage `json:"storageProperties,omitempty"`

// Provides all service related properties for Medusa.
ServiceProperties Service `json:"serviceProperties,omitempty"`

// Certificates for Medusa if client encryption is enabled in Cassandra.
// The secret must be in the same namespace as Cassandra and must contain three keys: "rootca.crt", "client.crt_signed" and "client.key".
// See https://docs.datastax.com/en/developer/python-driver/latest/security/ for more information on the required files.
Expand Down
16 changes: 16 additions & 0 deletions apis/medusa/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions charts/k8ssandra-operator/crds/k8ssandra-operator-crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26449,6 +26449,15 @@ spec:
type: string
type: object
type: object
serviceProperties:
description: Provides all service related properties for Medusa.
properties:
grpcPort:
description: |-
GrpcPort to listen on when running as gRPC service
Included grpc in the field name to avoid misunderstanding with storage.port
type: integer
type: object
storageProperties:
description: Provides all storage backend related properties for
backups.
Expand Down
9 changes: 9 additions & 0 deletions config/crd/bases/k8ssandra.io_k8ssandraclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26387,6 +26387,15 @@ spec:
type: string
type: object
type: object
serviceProperties:
description: Provides all service related properties for Medusa.
properties:
grpcPort:
description: |-
GrpcPort to listen on when running as gRPC service
Included grpc in the field name to avoid misunderstanding with storage.port
type: integer
type: object
storageProperties:
description: Provides all storage backend related properties for
backups.
Expand Down
15 changes: 13 additions & 2 deletions controllers/medusa/medusabackupjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package medusa
import (
"context"
"fmt"
"github.com/k8ssandra/k8ssandra-operator/pkg/cassandra"
"net"
"strings"

Expand Down Expand Up @@ -315,7 +316,12 @@ func (r *MedusaBackupJobReconciler) createMedusaBackup(ctx context.Context, back
}

func doMedusaBackup(ctx context.Context, name string, backupType shared.BackupType, pod *corev1.Pod, clientFactory medusa.ClientFactory, logger logr.Logger) (string, error) {
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(shared.BackupSidecarPort))
medusaPort := shared.BackupSidecarPort
explicitPort, found := cassandra.FindContainerPort(pod, "medusa", "grpc")
if found {
medusaPort = explicitPort
}
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(medusaPort))
logger.Info("connecting to backup sidecar", "Pod", pod.Name, "Address", addr)
if medusaClient, err := clientFactory.NewClient(ctx, addr); err != nil {
return "", err
Expand All @@ -332,7 +338,12 @@ func doMedusaBackup(ctx context.Context, name string, backupType shared.BackupTy
}

func backupStatus(ctx context.Context, name string, pod *corev1.Pod, clientFactory medusa.ClientFactory, logger logr.Logger) (medusa.StatusType, error) {
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(shared.BackupSidecarPort))
medusaPort := shared.BackupSidecarPort
explicitPort, found := cassandra.FindContainerPort(pod, "medusa", "grpc")
if found {
medusaPort = explicitPort
}
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(medusaPort))
logger.Info("connecting to backup sidecar", "Pod", pod.Name, "Address", addr)
if medusaClient, err := clientFactory.NewClient(ctx, addr); err != nil {
logger.Error(err, "Could not make a new medusa client")
Expand Down
4 changes: 4 additions & 0 deletions controllers/medusa/medusabackupjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func testMedusaBackupDatacenter(t *testing.T, ctx context.Context, f *framework.
Name: cassandraUserSecret,
},
},
// adding this did not actually break any assertions
ServiceProperties: api.Service{
GrpcPort: 1234,
},
CassandraUserSecretRef: corev1.LocalObjectReference{
Name: cassandraUserSecret,
},
Expand Down
8 changes: 7 additions & 1 deletion controllers/medusa/medusarestorejob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"k8s.io/utils/ptr"
"net"
"time"

Expand Down Expand Up @@ -274,7 +275,12 @@ func (r *MedusaRestoreJobReconciler) prepareRestore(ctx context.Context, request
}

for _, pod := range pods {
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(shared.BackupSidecarPort))
medusaPort := shared.BackupSidecarPort
explicitPort, found := cassandra.FindContainerPort(ptr.To(pod), "medusa", "grpc")
if found {
medusaPort = explicitPort
}
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(medusaPort))
if medusaClient, err := r.ClientFactory.NewClient(ctx, addr); err != nil {
logger.Error(err, "Failed to create Medusa client", "address", addr)
} else {
Expand Down
3 changes: 3 additions & 0 deletions controllers/medusa/medusarestorejob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func testMedusaRestoreDatacenter(t *testing.T, ctx context.Context, f *framework
Name: cassandraUserSecret,
},
},
ServiceProperties: api.Service{
GrpcPort: 4567,
},
CassandraUserSecretRef: corev1.LocalObjectReference{
Name: cassandraUserSecret,
},
Expand Down
26 changes: 23 additions & 3 deletions controllers/medusa/medusatask_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package medusa
import (
"context"
"fmt"
"github.com/k8ssandra/k8ssandra-operator/pkg/cassandra"
"net"
"sync"

Expand Down Expand Up @@ -413,7 +414,12 @@ func (r *MedusaTaskReconciler) scheduleSyncForPurge(task *medusav1alpha1.MedusaT
}

func doPurge(ctx context.Context, task *medusav1alpha1.MedusaTask, pod *corev1.Pod, clientFactory medusa.ClientFactory) (*medusa.PurgeBackupsResponse, error) {
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(shared.BackupSidecarPort))
medusaPort := shared.BackupSidecarPort
explicitPort, found := cassandra.FindContainerPort(pod, "medusa", "grpc")
if found {
medusaPort = explicitPort
}
addr := makeMedusaAddress(pod, medusaPort)
if medusaClient, err := clientFactory.NewClient(ctx, addr); err != nil {
return nil, err
} else {
Expand All @@ -423,7 +429,12 @@ func doPurge(ctx context.Context, task *medusav1alpha1.MedusaTask, pod *corev1.P
}

func prepareRestore(ctx context.Context, task *medusav1alpha1.MedusaTask, pod *corev1.Pod, clientFactory medusa.ClientFactory) (*medusa.PurgeBackupsResponse, error) {
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(shared.BackupSidecarPort))
medusaPort := shared.BackupSidecarPort
explicitPort, found := cassandra.FindContainerPort(pod, "medusa", "grpc")
if found {
medusaPort = explicitPort
}
addr := makeMedusaAddress(pod, medusaPort)
if medusaClient, err := clientFactory.NewClient(ctx, addr); err != nil {
return nil, err
} else {
Expand All @@ -434,7 +445,12 @@ func prepareRestore(ctx context.Context, task *medusav1alpha1.MedusaTask, pod *c
}

func GetBackups(ctx context.Context, pod *corev1.Pod, clientFactory medusa.ClientFactory) ([]*medusa.BackupSummary, error) {
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(shared.BackupSidecarPort))
medusaPort := shared.BackupSidecarPort
explicitPort, found := cassandra.FindContainerPort(pod, "medusa", "grpc")
if found {
medusaPort = explicitPort
}
addr := makeMedusaAddress(pod, medusaPort)
if medusaClient, err := clientFactory.NewClient(ctx, addr); err != nil {
return nil, err
} else {
Expand Down Expand Up @@ -462,3 +478,7 @@ func (r *MedusaTaskReconciler) SetupWithManager(mgr ctrl.Manager) error {
For(&medusav1alpha1.MedusaTask{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Complete(r)
}

func makeMedusaAddress(medusaPod *corev1.Pod, medusaPort int) string {
return net.JoinHostPort(medusaPod.Status.PodIP, fmt.Sprint(medusaPort))
}
3 changes: 3 additions & 0 deletions controllers/medusa/medusatask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func testMedusaTasks(t *testing.T, ctx context.Context, f *framework.Framework,
},
MaxBackupCount: 1,
},
ServiceProperties: api.Service{
GrpcPort: 7890,
},
CassandraUserSecretRef: corev1.LocalObjectReference{
Name: cassandraUserSecret,
},
Expand Down
3 changes: 3 additions & 0 deletions docs/content/en/tasks/backup-restore/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ spec:
# accessModes:
# - ReadWriteOnce
# size: 100Mi
serviceProperties:
# which port will Medusa's gRPC server listen on
grpcPort: 50051
```

The definition above requires a secret named `medusa-bucket-key` to be present in the target namespace before the `K8ssandraCluster` object gets created. Use the following format for this secret:
Expand Down
27 changes: 27 additions & 0 deletions pkg/cassandra/datacenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,33 @@ func FindInitContainer(dcPodTemplateSpec *corev1.PodTemplateSpec, containerName
return -1, false
}

func FindPort(container *corev1.Container, portName string) (int32, bool) {
if container.Ports != nil {
for _, port := range container.Ports {
if port.Name == portName {
return port.ContainerPort, true
}
}
}
return -1, false
}
func FindContainerPort(pod *corev1.Pod, containerName, podName string) (int, bool) {
if pod.Spec.Containers != nil {
for _, container := range pod.Spec.Containers {
if container.Name == containerName {
if container.Ports != nil {
for _, port := range container.Ports {
if port.Name == podName {
return int(port.ContainerPort), true
}
}
}
}
}
}
return -1, false
}

func FindVolume(dcPodTemplateSpec *corev1.PodTemplateSpec, volumeName string) (int, bool) {
if dcPodTemplateSpec != nil {
for i, volume := range dcPodTemplateSpec.Spec.Volumes {
Expand Down
23 changes: 15 additions & 8 deletions pkg/medusa/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
const (
DefaultMedusaImageRepository = "k8ssandra"
DefaultMedusaImageName = "medusa"
DefaultMedusaVersion = "0.22.3"
DefaultMedusaVersion = "97aa0276"
DefaultMedusaPort = 50051
DefaultProbeInitialDelay = 10
DefaultProbeTimeout = 1
Expand Down Expand Up @@ -114,6 +114,9 @@ func CreateMedusaIni(kc *k8ss.K8ssandraCluster, dcConfig *cassandra.DatacenterCo

[grpc]
enabled = 1
{{- if .Spec.Medusa.ServiceProperties.GrpcPort }}
port = {{ .Spec.Medusa.ServiceProperties.GrpcPort }}
{{- end }}

[logging]
level = DEBUG
Expand Down Expand Up @@ -226,19 +229,23 @@ func CreateMedusaMainContainer(dcConfig *cassandra.DatacenterConfig, medusaSpec
setImage(medusaSpec.ContainerImage, medusaContainer)
medusaContainer.SecurityContext = medusaSpec.SecurityContext
medusaContainer.Env = medusaEnvVars(medusaSpec, k8cName, useExternalSecrets, "GRPC")
var grpcPort = DefaultMedusaPort
if medusaSpec.ServiceProperties.GrpcPort != 0 {
grpcPort = medusaSpec.ServiceProperties.GrpcPort
}
medusaContainer.Ports = []corev1.ContainerPort{
{
Name: "grpc",
ContainerPort: DefaultMedusaPort,
ContainerPort: int32(grpcPort),
Protocol: "TCP",
},
}

readinessProbe, err := generateMedusaProbe(medusaSpec.ReadinessProbe)
readinessProbe, err := generateMedusaProbe(medusaSpec.ReadinessProbe, grpcPort)
if err != nil {
return nil, err
}
livenessProbe, err := generateMedusaProbe(medusaSpec.LivenessProbe)
livenessProbe, err := generateMedusaProbe(medusaSpec.LivenessProbe, grpcPort)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -545,9 +552,9 @@ func PurgeCronJob(dcConfig *cassandra.DatacenterConfig, clusterName, namespace s
return purgeCronJob, nil
}

func generateMedusaProbe(configuredProbe *corev1.Probe) (*corev1.Probe, error) {
func generateMedusaProbe(configuredProbe *corev1.Probe, grpcPort int) (*corev1.Probe, error) {
// Goalesce the custom probe with the default probe,
defaultProbe := defaultMedusaProbe()
defaultProbe := defaultMedusaProbe(grpcPort)
if configuredProbe == nil {
return defaultProbe, nil
}
Expand All @@ -561,12 +568,12 @@ func generateMedusaProbe(configuredProbe *corev1.Probe) (*corev1.Probe, error) {
return &mergedProbe, nil
}

func defaultMedusaProbe() *corev1.Probe {
func defaultMedusaProbe(grpcPort int) *corev1.Probe {
// Goalesce the custom probe with the default probe,
probe := &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"/bin/grpc_health_probe", fmt.Sprintf("--addr=:%d", DefaultMedusaPort)},
Command: []string{"/bin/grpc_health_probe", fmt.Sprintf("--addr=:%d", grpcPort)},
},
},
InitialDelaySeconds: DefaultProbeInitialDelay,
Expand Down
Loading
Loading