diff --git a/cmd/otel-allocator/collector/aws_cloud_map.go b/cmd/otel-allocator/collector/aws_cloud_map.go new file mode 100644 index 0000000000..333cba9b33 --- /dev/null +++ b/cmd/otel-allocator/collector/aws_cloud_map.go @@ -0,0 +1,241 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "errors" + "fmt" + "log" + "time" + + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/servicediscovery" + "github.com/aws/aws-sdk-go-v2/service/servicediscovery/types" + "github.com/aws/aws-sdk-go/aws" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type AwsCloudMapWatcher struct { + svc *servicediscovery.Client + namespaceName *string + serviceName *string + watcher watcher +} + +type AwsCloudMapWatcherOption func(*AwsCloudMapWatcher) + +var ( + errNoNamespace = errors.New("no Cloud Map namespace specified to resolve the backends") + errNoServiceName = errors.New("no Cloud Map service_name specified to resolve the backends") + discoveryDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "cloudmap_discovery_duration_seconds", + Help: "Time taken to discover instances in Cloud Map", + Buckets: prometheus.ExponentialBuckets(0.1, 2, 10), + }) + + discoveryErrors = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cloudmap_discovery_errors_total", + Help: "Total number of collector discovery errors", + }) + healthyInstances = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cloudmap_healthy_instances", + Help: "Number of healthy instances in Cloud Map", + }) + unhealthyInstances = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cloudmap_unhealthy_instances", + Help: "Number of unhealthy instances in Cloud Map", + }) + totalInstances = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cloudmap_instances_total", + Help: "Total number of instances in Cloud Map", + }) +) + +func NewAwsCloudMapWatcher(opts ...WatcherOption) (*AwsCloudMapWatcher, error) { + cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithDefaultRegion("")) + if err != nil { + log.Fatalf("Unable to load SDK config, %v", err) + return nil, err + } + + // Using the Config value, create the DynamoDB client + svc := servicediscovery.NewFromConfig(cfg) + + w := &AwsCloudMapWatcher{ + svc: svc, + + watcher: watcher{ + close: make(chan struct{}), + minUpdateInterval: defaultMinUpdateInterval, + }, + } + + for _, opt := range opts { + if opt.awsCloudMapOption == nil { + continue + } + opt.awsCloudMapOption(w) + } + + if w.namespaceName == nil || len(*w.namespaceName) == 0 { + return nil, errNoNamespace + } + + if w.serviceName == nil || len(*w.serviceName) == 0 { + return nil, errNoServiceName + } + + return w, nil +} + +func (w *AwsCloudMapWatcher) Watch(options ...WatchOption) error { + config := WatchConfig{} + + for _, option := range options { + option(&config) + } + + if w.svc == nil { + return fmt.Errorf("AWS Cloud Map service client not initialized") + } + + // Create a separate done channel for blocking + done := make(chan struct{}) + + // Initial discovery + w.watcher.log.Info("Performing initial discovery") + if err := w.discoverAndProcess(config.fn); err != nil { + return err + } + + w.watcher.log.Info("Starting periodic discovery", "interval", w.watcher.minUpdateInterval) + // Start the periodic discovery in a goroutine + go func() { + ticker := time.NewTicker(w.watcher.minUpdateInterval) + defer ticker.Stop() + + for { + select { + case <-w.watcher.close: + w.watcher.log.Info("Stopping periodic discovery") + close(done) // Signal the main thread to unblock + return + case <-ticker.C: + if err := w.discoverAndProcess(config.fn); err != nil { + w.watcher.log.Error(err, "Error discovering instances") + continue + } + + } + } + }() + + // Block the main thread until done signal + <-done + return nil +} + +func (w *AwsCloudMapWatcher) processBatch(instances []types.HttpInstanceSummary) ([]types.HttpInstanceSummary, struct{ healthy, unhealthy int }) { + const batchSize = 50 + stats := struct{ healthy, unhealthy int }{} + result := make([]types.HttpInstanceSummary, 0, len(instances)) + + for i := 0; i < len(instances); i += batchSize { + end := i + batchSize + if end > len(instances) { + end = len(instances) + } + + for _, instance := range instances[i:end] { + if instance.HealthStatus != types.HealthStatusUnhealthy { + result = append(result, instance) + stats.healthy++ + } else { + stats.unhealthy++ + } + } + } + + return result, stats +} + +func (w *AwsCloudMapWatcher) updateMetrics(healthy, unhealthy int) { + healthyInstances.Set(float64(healthy)) + unhealthyInstances.Set(float64(unhealthy)) + totalInstances.Set(float64(healthy + unhealthy)) +} + +func (w *AwsCloudMapWatcher) discoverAndProcess(handlerFn func(map[string]*allocation.Collector)) error { + startTime := time.Now() + + discoverOutput, err := w.svc.DiscoverInstances(context.TODO(), &servicediscovery.DiscoverInstancesInput{ + NamespaceName: w.namespaceName, + ServiceName: w.serviceName, + MaxResults: aws.Int32(100), + }) + if err != nil { + discoveryErrors.Inc() + return fmt.Errorf("Failed to discover instances: %w", err) + } + + discoveryDuration.Observe(time.Since(startTime).Seconds()) + + discoveredInstances, healthStats := w.processBatch(discoverOutput.Instances) + + w.updateMetrics(healthStats.healthy, healthStats.unhealthy) + + w.watcher.log.Info("Discovered instances", + "total", len(discoverOutput.Instances), + "healthy", healthStats.healthy, + "unhealthy", healthStats.unhealthy, + "namespace", w.namespaceName, + "service", w.serviceName, + ) + instanceIds := make([]string, len(discoveredInstances)) + for i, instance := range discoveredInstances { + instanceIds[i] = *instance.InstanceId + } + + w.watcher.log.Info("Running on collectors", "instanceIds", instanceIds, "namespace", w.namespaceName, "service", w.serviceName) + + if handlerFn != nil { + w.runOnCollectors(discoveredInstances, handlerFn) + } + + return nil +} + +// runOnCollectors runs the provided function on the set of collectors from the Store. +func (w *AwsCloudMapWatcher) runOnCollectors(store []types.HttpInstanceSummary, fn func(collectors map[string]*allocation.Collector)) { + collectorMap := make(map[string]*allocation.Collector, len(store)) + var node string + for _, obj := range store { + for attr, value := range obj.Attributes { + if attr == "EC2_INSTANCE_ID" { + node = value + } + } + collectorMap[*obj.InstanceId] = allocation.NewCollector(*obj.InstanceId, node) + } + collectorsDiscovered.Set(float64(len(collectorMap))) + fn(collectorMap) +} + +func (w *AwsCloudMapWatcher) Close() { + w.watcher.Close() +} diff --git a/cmd/otel-allocator/collector/collector.go b/cmd/otel-allocator/collector/collector.go index 8814e38797..515ecf2b46 100644 --- a/cmd/otel-allocator/collector/collector.go +++ b/cmd/otel-allocator/collector/collector.go @@ -4,31 +4,29 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - package collector import ( + "errors" + "fmt" "os" + "strings" "time" "github.com/go-logr/logr" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" - - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" ) const ( @@ -43,102 +41,169 @@ var ( }) ) -type Watcher struct { +type CollectorWatcherType int + +const ( + K8sCollectorWatcher CollectorWatcherType = iota + AwsCloudMapCollectorWatcher +) + +var collectorWatcherTypeStrings = []string{"k8s", "aws-cloud-map"} + +func ParseCollectorWatcherType(s string) (CollectorWatcherType, error) { + for i, name := range collectorWatcherTypeStrings { + if strings.ToLower(s) == name { + return CollectorWatcherType(i), nil + } + } + return 0, errors.New("invalid collector watcher type") +} + +// Implement the Stringer interface for CollectorWatcherType +func (c CollectorWatcherType) String() string { + if int(c) < len(collectorWatcherTypeStrings) { + return collectorWatcherTypeStrings[c] + } + return "unknown" +} + +// Implement the Set method for pflag +func (c *CollectorWatcherType) Set(value string) error { + for i, name := range collectorWatcherTypeStrings { + if strings.ToLower(value) == name { + *c = CollectorWatcherType(i) + return nil + } + } + return errors.New("invalid collector watcher type") +} + +// Implement the Type method for pflag +func (c *CollectorWatcherType) Type() string { + return "CollectorWatcherType" +} + +var _ Watcher = &watcher{} + +// CollectorWatcher interface defines the common methods for watchers +type Watcher interface { + Watch(...WatchOption) error + Close() +} + +type watcher struct { log logr.Logger - k8sClient kubernetes.Interface - close chan struct{} minUpdateInterval time.Duration + close chan struct{} } -func NewCollectorWatcher(logger logr.Logger, kubeConfig *rest.Config) (*Watcher, error) { - clientset, err := kubernetes.NewForConfig(kubeConfig) - if err != nil { - return &Watcher{}, err +func (w *watcher) Watch(options ...WatchOption) error { + config := &WatchConfig{} + for _, opt := range options { + opt(config) } - return &Watcher{ - log: logger.WithValues("component", "opentelemetry-targetallocator"), - k8sClient: clientset, - close: make(chan struct{}), - minUpdateInterval: defaultMinUpdateInterval, - }, nil -} + if config.fn == nil { + return fmt.Errorf("fn is required") + } -func (k *Watcher) Watch(labelSelector *metav1.LabelSelector, fn func(collectors map[string]*allocation.Collector)) error { - selector, err := metav1.LabelSelectorAsSelector(labelSelector) - if err != nil { - return err + if config.labelSelector == nil { + config.labelSelector = &metav1.LabelSelector{} } - listOptionsFunc := func(listOptions *metav1.ListOptions) { - listOptions.LabelSelector = selector.String() + if w.minUpdateInterval == 0 { + w.minUpdateInterval = defaultMinUpdateInterval } - informerFactory := informers.NewSharedInformerFactoryWithOptions( - k.k8sClient, - time.Second*30, - informers.WithNamespace(ns), - informers.WithTweakListOptions(listOptionsFunc)) - informer := informerFactory.Core().V1().Pods().Informer() - notify := make(chan struct{}, 1) - go k.rateLimitedCollectorHandler(notify, informer.GetStore(), fn) + if w.close == nil { + w.close = make(chan struct{}) + } + return nil +} - notifyFunc := func(_ interface{}) { - select { - case notify <- struct{}{}: - default: - } +func (w *watcher) Close() { + if w.close != nil { + close(w.close) } - _, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: notifyFunc, - UpdateFunc: func(oldObj, newObj interface{}) { - notifyFunc(newObj) +} + +// WatchConfig struct defines the common parameters for the watch method of Collector Watchers +type WatchConfig struct { + labelSelector *metav1.LabelSelector + fn func(collectors map[string]*allocation.Collector) +} + +type WatchOption func(wc *WatchConfig) + +func WithLabelSelector(labelSelector *metav1.LabelSelector) WatchOption { + return func(wc *WatchConfig) { + wc.labelSelector = labelSelector + } +} + +func WithFn(fn func(collectors map[string]*allocation.Collector)) WatchOption { + return func(wc *WatchConfig) { + wc.fn = fn + } +} + +type WatcherOption struct { + k8sOption K8sWatcherOption + awsCloudMapOption AwsCloudMapWatcherOption +} + +func WithMinUpdateInterval(interval time.Duration) WatcherOption { + return WatcherOption{ + k8sOption: func(w *K8sWatcher) { + w.watcher.minUpdateInterval = interval + }, + awsCloudMapOption: func(w *AwsCloudMapWatcher) { + w.watcher.minUpdateInterval = interval }, - DeleteFunc: notifyFunc, - }) - if err != nil { - return err } +} - informer.Run(k.close) - return nil +func WithLogger(logger logr.Logger) WatcherOption { + return WatcherOption{ + k8sOption: func(w *K8sWatcher) { + w.watcher.log = logger + }, + awsCloudMapOption: func(w *AwsCloudMapWatcher) { + w.watcher.log = logger + }, + } } -// rateLimitedCollectorHandler runs fn on collectors present in the store whenever it gets a notification on the notify channel, -// but not more frequently than once per k.eventPeriod. -func (k *Watcher) rateLimitedCollectorHandler(notify chan struct{}, store cache.Store, fn func(collectors map[string]*allocation.Collector)) { - ticker := time.NewTicker(k.minUpdateInterval) - defer ticker.Stop() - - for { - select { - case <-k.close: - return - case <-ticker.C: // throttle events to avoid excessive updates - select { - case <-notify: - k.runOnCollectors(store, fn) - default: +func WithKubeConfig(config *rest.Config) WatcherOption { + return WatcherOption{ + k8sOption: func(w *K8sWatcher) { + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + // Handle error, perhaps log it or panic + w.watcher.log.Error(err, "Failed to create Kubernetes client") + return } - } + w.k8sClient = clientset + }, } } -// runOnCollectors runs the provided function on the set of collectors from the Store. -func (k *Watcher) runOnCollectors(store cache.Store, fn func(collectors map[string]*allocation.Collector)) { - collectorMap := map[string]*allocation.Collector{} - objects := store.List() - for _, obj := range objects { - pod := obj.(*v1.Pod) - if pod.Spec.NodeName == "" { - continue - } - collectorMap[pod.Name] = allocation.NewCollector(pod.Name, pod.Spec.NodeName) +func WithCloudMapConfig(namespaceName, serviceName *string) WatcherOption { + return WatcherOption{ + awsCloudMapOption: func(w *AwsCloudMapWatcher) { + w.namespaceName = namespaceName + w.serviceName = serviceName + }, } - collectorsDiscovered.Set(float64(len(collectorMap))) - fn(collectorMap) } -func (k *Watcher) Close() { - close(k.close) +func NewCollectorWatcher(t CollectorWatcherType, options ...WatcherOption) (Watcher, error) { + switch t { + case K8sCollectorWatcher: + return NewK8sWatcher(options...) + case AwsCloudMapCollectorWatcher: + return NewAwsCloudMapWatcher(options...) + default: + return nil, fmt.Errorf("invalid collector watcher type: %v", t) + } } diff --git a/cmd/otel-allocator/collector/k8s.go b/cmd/otel-allocator/collector/k8s.go new file mode 100644 index 0000000000..313e01d8f0 --- /dev/null +++ b/cmd/otel-allocator/collector/k8s.go @@ -0,0 +1,141 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "errors" + "time" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +type K8sWatcher struct { + k8sClient kubernetes.Interface + watcher watcher +} + +type K8sWatcherOption func(*K8sWatcher) + +var ( + errNoClient = errors.New("no Kubernetes client given") +) + +func NewK8sWatcher(opts ...WatcherOption) (*K8sWatcher, error) { + c := &K8sWatcher{ + watcher: watcher{ + close: make(chan struct{}), + minUpdateInterval: defaultMinUpdateInterval, + }, + } + for _, opt := range opts { + if opt.k8sOption == nil { + continue + } + opt.k8sOption(c) + } + if c.k8sClient == nil { + return &K8sWatcher{}, errNoClient + } + return c, nil +} + +func (w *K8sWatcher) Watch(options ...WatchOption) error { // labelSelector *metav1.LabelSelector, fn func(collectors map[string]*allocation.Collector) + config := WatchConfig{} + for _, option := range options { + option(&config) + } + + selector, err := metav1.LabelSelectorAsSelector(config.labelSelector) + if err != nil { + return err + } + + listOptionsFunc := func(listOptions *metav1.ListOptions) { + listOptions.LabelSelector = selector.String() + } + informerFactory := informers.NewSharedInformerFactoryWithOptions( + w.k8sClient, + time.Second*30, + informers.WithNamespace(ns), + informers.WithTweakListOptions(listOptionsFunc)) + informer := informerFactory.Core().V1().Pods().Informer() + + notify := make(chan struct{}, 1) + go w.rateLimitedCollectorHandler(notify, informer.GetStore(), config.fn) + + notifyFunc := func(_ interface{}) { + select { + case notify <- struct{}{}: + default: + } + } + _, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: notifyFunc, + UpdateFunc: func(oldObj, newObj interface{}) { + notifyFunc(newObj) + }, + DeleteFunc: notifyFunc, + }) + if err != nil { + return err + } + + informer.Run(w.watcher.close) + return nil +} + +// rateLimitedCollectorHandler runs fn on collectors present in the store whenever it gets a notification on the notify channel, +// but not more frequently than once per k.eventPeriod. +func (w *K8sWatcher) rateLimitedCollectorHandler(notify chan struct{}, store cache.Store, fn func(collectors map[string]*allocation.Collector)) { + ticker := time.NewTicker(w.watcher.minUpdateInterval) + defer ticker.Stop() + + for { + select { + case <-w.watcher.close: + return + case <-ticker.C: // throttle events to avoid excessive updates + select { + case <-notify: + w.runOnCollectors(store, fn) + default: + } + } + } +} + +// runOnCollectors runs the provided function on the set of collectors from the Store. +func (w *K8sWatcher) runOnCollectors(store cache.Store, fn func(collectors map[string]*allocation.Collector)) { + collectorMap := map[string]*allocation.Collector{} + objects := store.List() + for _, obj := range objects { + pod := obj.(*v1.Pod) + if pod.Spec.NodeName == "" { + continue + } + collectorMap[pod.Name] = allocation.NewCollector(pod.Name, pod.Spec.NodeName) + } + collectorsDiscovered.Set(float64(len(collectorMap))) + fn(collectorMap) +} + +func (w *K8sWatcher) Close() { + w.watcher.Close() +} diff --git a/cmd/otel-allocator/collector/collector_test.go b/cmd/otel-allocator/collector/k8s_test.go similarity index 81% rename from cmd/otel-allocator/collector/collector_test.go rename to cmd/otel-allocator/collector/k8s_test.go index ed5ac364fc..be52d5346e 100644 --- a/cmd/otel-allocator/collector/collector_test.go +++ b/cmd/otel-allocator/collector/k8s_test.go @@ -31,7 +31,7 @@ import ( "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" ) -var logger = logf.Log.WithName("collector-unit-tests") +var logger = logf.Log.WithName("k8s-collector-unit-tests") var labelMap = map[string]string{ "app.kubernetes.io/instance": "default.test", "app.kubernetes.io/managed-by": "opentelemetry-operator", @@ -40,12 +40,14 @@ var labelSelector = metav1.LabelSelector{ MatchLabels: labelMap, } -func getTestPodWatcher() Watcher { - podWatcher := Watcher{ - k8sClient: fake.NewSimpleClientset(), - close: make(chan struct{}), - log: logger, - minUpdateInterval: time.Millisecond, +func getTestPodWatcher() K8sWatcher { + podWatcher := K8sWatcher{ + k8sClient: fake.NewSimpleClientset(), + watcher: watcher{ + close: make(chan struct{}), + log: logger, + minUpdateInterval: time.Millisecond, + }, } return podWatcher } @@ -63,9 +65,9 @@ func pod(name string) *v1.Pod { } } -func Test_runWatch(t *testing.T) { +func Test_k8sWatcher_runWatch(t *testing.T) { type args struct { - kubeFn func(t *testing.T, podWatcher Watcher) + kubeFn func(t *testing.T, podWatcher K8sWatcher) collectorMap map[string]*allocation.Collector } tests := []struct { @@ -76,7 +78,7 @@ func Test_runWatch(t *testing.T) { { name: "pod add", args: args{ - kubeFn: func(t *testing.T, podWatcher Watcher) { + kubeFn: func(t *testing.T, podWatcher K8sWatcher) { for _, k := range []string{"test-pod1", "test-pod2", "test-pod3"} { p := pod(k) _, err := podWatcher.k8sClient.CoreV1().Pods("test-ns").Create(context.Background(), p, metav1.CreateOptions{}) @@ -103,7 +105,7 @@ func Test_runWatch(t *testing.T) { { name: "pod delete", args: args{ - kubeFn: func(t *testing.T, podWatcher Watcher) { + kubeFn: func(t *testing.T, podWatcher K8sWatcher) { for _, k := range []string{"test-pod2", "test-pod3"} { err := podWatcher.k8sClient.CoreV1().Pods("test-ns").Delete(context.Background(), k, metav1.DeleteOptions{}) assert.NoError(t, err) @@ -136,7 +138,7 @@ func Test_runWatch(t *testing.T) { t.Run(tt.name, func(t *testing.T) { podWatcher := getTestPodWatcher() defer func() { - close(podWatcher.close) + close(podWatcher.watcher.close) }() var actual map[string]*allocation.Collector mapMutex := sync.Mutex{} @@ -145,12 +147,12 @@ func Test_runWatch(t *testing.T) { _, err := podWatcher.k8sClient.CoreV1().Pods("test-ns").Create(context.Background(), p, metav1.CreateOptions{}) assert.NoError(t, err) } - go func(podWatcher Watcher) { - err := podWatcher.Watch(&labelSelector, func(colMap map[string]*allocation.Collector) { + go func(podWatcher K8sWatcher) { + err := podWatcher.Watch(WithLabelSelector(&labelSelector), WithFn(func(colMap map[string]*allocation.Collector) { mapMutex.Lock() defer mapMutex.Unlock() actual = colMap - }) + })) require.NoError(t, err) }(podWatcher) @@ -168,15 +170,15 @@ func Test_runWatch(t *testing.T) { } // this tests runWatch in the case of watcher channel closing. -func Test_closeChannel(t *testing.T) { +func Test_k8sWatcher_closeChannel(t *testing.T) { podWatcher := getTestPodWatcher() var wg sync.WaitGroup wg.Add(1) - go func(podWatcher Watcher) { + go func(podWatcher K8sWatcher) { defer wg.Done() - err := podWatcher.Watch(&labelSelector, func(colMap map[string]*allocation.Collector) {}) + err := podWatcher.Watch(WithLabelSelector(&labelSelector), WithFn(func(colMap map[string]*allocation.Collector) {})) require.NoError(t, err) }(podWatcher) diff --git a/cmd/otel-allocator/config/config.go b/cmd/otel-allocator/config/config.go index ee55fe0a32..42ac705504 100644 --- a/cmd/otel-allocator/config/config.go +++ b/cmd/otel-allocator/config/config.go @@ -38,25 +38,30 @@ import ( ) const ( - DefaultResyncTime = 5 * time.Minute - DefaultConfigFilePath string = "/conf/targetallocator.yaml" - DefaultCRScrapeInterval model.Duration = model.Duration(time.Second * 30) - DefaultAllocationStrategy = "consistent-hashing" - DefaultFilterStrategy = "relabel-config" + DefaultResyncTime = 5 * time.Minute + DefaultConfigFilePath string = "/conf/targetallocator.yaml" + DefaultCRScrapeInterval model.Duration = model.Duration(time.Second * 30) + DefaultAllocationStrategy = "consistent-hashing" + DefaultFilterStrategy = "relabel-config" + DefaultCollectorWatcherType = "k8s" + DefaultMinUpdateInterval = 5 * time.Second ) type Config struct { - ListenAddr string `yaml:"listen_addr,omitempty"` - KubeConfigFilePath string `yaml:"kube_config_file_path,omitempty"` - ClusterConfig *rest.Config `yaml:"-"` - RootLogger logr.Logger `yaml:"-"` - CollectorSelector *metav1.LabelSelector `yaml:"collector_selector,omitempty"` - PromConfig *promconfig.Config `yaml:"config"` - AllocationStrategy string `yaml:"allocation_strategy,omitempty"` - AllocationFallbackStrategy string `yaml:"allocation_fallback_strategy,omitempty"` - FilterStrategy string `yaml:"filter_strategy,omitempty"` - PrometheusCR PrometheusCRConfig `yaml:"prometheus_cr,omitempty"` - HTTPS HTTPSServerConfig `yaml:"https,omitempty"` + ListenAddr string `yaml:"listen_addr,omitempty"` + KubeConfigFilePath string `yaml:"kube_config_file_path,omitempty"` + ClusterConfig *rest.Config `yaml:"-"` + RootLogger logr.Logger `yaml:"-"` + CollectorSelector *metav1.LabelSelector `yaml:"collector_selector,omitempty"` + PromConfig *promconfig.Config `yaml:"config"` + AllocationStrategy string `yaml:"allocation_strategy,omitempty"` + AllocationFallbackStrategy string `yaml:"allocation_fallback_strategy,omitempty"` + FilterStrategy string `yaml:"filter_strategy,omitempty"` + PrometheusCR PrometheusCRConfig `yaml:"prometheus_cr,omitempty"` + HTTPS HTTPSServerConfig `yaml:"https,omitempty"` + CollectorWatcher CollectorWatcherConfig `yaml:"collector_watcher,omitempty"` + Runtime RuntimeConfig `yaml:"runtime,omitempty"` + MinUpdateInterval time.Duration `yaml:"min_update_interval,omitempty"` } type PrometheusCRConfig struct { @@ -80,6 +85,24 @@ type HTTPSServerConfig struct { TLSKeyFilePath string `yaml:"tls_key_file_path,omitempty"` } +type CollectorWatcherConfig struct { + WatcherType string `yaml:"type,omitempty"` + AwsCloudMap AwsCloudMapConfig `yaml:"aws_cloud_map,omitempty"` +} + +type AwsCloudMapConfig struct { + Namespace string `yaml:"namespace,omitempty"` + ServiceName string `yaml:"service_name,omitempty"` +} + +type RuntimeConfig struct { + Kubernetes KubernetesRuntimeConfig `yaml:"kubernetes,omitempty"` +} + +type KubernetesRuntimeConfig struct { + Enabled bool `yaml:"enabled,omitempty"` +} + func LoadFromFile(file string, target *Config) error { return unmarshal(target, file) } @@ -95,19 +118,28 @@ func LoadFromCLI(target *Config, flagSet *pflag.FlagSet) error { if err != nil { return err } - clusterConfig, err := clientcmd.BuildConfigFromFlags("", target.KubeConfigFilePath) - if err != nil { - pathError := &fs.PathError{} - if ok := errors.As(err, &pathError); !ok { - return err - } - clusterConfig, err = rest.InClusterConfig() + + if runtimeKubernetesEnabled, changed, flagErr := getRuntimeKubernetesEnabled(flagSet); flagErr != nil { + return flagErr + } else if changed { + target.Runtime.Kubernetes.Enabled = runtimeKubernetesEnabled + } + + if target.Runtime.Kubernetes.Enabled { + clusterConfig, err := clientcmd.BuildConfigFromFlags("", target.KubeConfigFilePath) if err != nil { - return err + pathError := &fs.PathError{} + if ok := errors.As(err, &pathError); !ok { + return err + } + clusterConfig, err = rest.InClusterConfig() + if err != nil { + return err + } + target.KubeConfigFilePath = "" } - target.KubeConfigFilePath = "" + target.ClusterConfig = clusterConfig } - target.ClusterConfig = clusterConfig target.ListenAddr, err = getListenAddr(flagSet) if err != nil { @@ -150,6 +182,30 @@ func LoadFromCLI(target *Config, flagSet *pflag.FlagSet) error { target.HTTPS.TLSKeyFilePath = tlsKeyFilePath } + if cwType, changed, err := getCollectorWatcherType(flagSet); err != nil { + return err + } else if changed { + target.CollectorWatcher.WatcherType = cwType + } + + if collectorWatcherNamespace, changed, err := getAWSCloudMapNamespace(flagSet); err != nil { + return err + } else if changed { + target.CollectorWatcher.AwsCloudMap.Namespace = collectorWatcherNamespace + } + + if collectorWatcherServiceName, changed, err := getAWSCloudMapServiceName(flagSet); err != nil { + return err + } else if changed { + target.CollectorWatcher.AwsCloudMap.ServiceName = collectorWatcherServiceName + } + + if minUpdateInterval, changed, err := getMinUpdateInterval(flagSet); err != nil { + return err + } else if changed { + target.MinUpdateInterval = minUpdateInterval + } + return nil } @@ -172,6 +228,7 @@ func CreateDefaultConfig() Config { PrometheusCR: PrometheusCRConfig{ ScrapeInterval: DefaultCRScrapeInterval, }, + MinUpdateInterval: DefaultMinUpdateInterval, } } diff --git a/cmd/otel-allocator/config/flags.go b/cmd/otel-allocator/config/flags.go index 0a47c27636..a096f700ea 100644 --- a/cmd/otel-allocator/config/flags.go +++ b/cmd/otel-allocator/config/flags.go @@ -17,6 +17,7 @@ package config import ( "flag" "path/filepath" + "time" "github.com/spf13/pflag" "k8s.io/client-go/util/homedir" @@ -25,16 +26,21 @@ import ( // Flag names. const ( - targetAllocatorName = "target-allocator" - configFilePathFlagName = "config-file" - listenAddrFlagName = "listen-addr" - prometheusCREnabledFlagName = "enable-prometheus-cr-watcher" - kubeConfigPathFlagName = "kubeconfig-path" - httpsEnabledFlagName = "enable-https-server" - listenAddrHttpsFlagName = "listen-addr-https" - httpsCAFilePathFlagName = "https-ca-file" - httpsTLSCertFilePathFlagName = "https-tls-cert-file" - httpsTLSKeyFilePathFlagName = "https-tls-key-file" + targetAllocatorName = "target-allocator" + configFilePathFlagName = "config-file" + listenAddrFlagName = "listen-addr" + prometheusCREnabledFlagName = "enable-prometheus-cr-watcher" + kubeConfigPathFlagName = "kubeconfig-path" + httpsEnabledFlagName = "enable-https-server" + listenAddrHttpsFlagName = "listen-addr-https" + httpsCAFilePathFlagName = "https-ca-file" + httpsTLSCertFilePathFlagName = "https-tls-cert-file" + httpsTLSKeyFilePathFlagName = "https-tls-key-file" + collectorWatcherTypeFlagName = "collector-watcher-type" + awsCloudMapNamespaceFlagName = "aws-cloud-map-namespace" + awsCloudMapServiceNameFlagName = "aws-cloud-map-service-name" + runtimeKubernetesEnabledFlagName = "runtime-kubernetes-enabled" + minUpdateIntervalFlagName = "min-update-interval" ) // We can't bind this flag to our FlagSet, so we need to handle it separately. @@ -51,6 +57,11 @@ func getFlagSet(errorHandling pflag.ErrorHandling) *pflag.FlagSet { flagSet.String(httpsCAFilePathFlagName, "", "The path to the HTTPS server TLS CA file.") flagSet.String(httpsTLSCertFilePathFlagName, "", "The path to the HTTPS server TLS certificate file.") flagSet.String(httpsTLSKeyFilePathFlagName, "", "The path to the HTTPS server TLS key file.") + flagSet.String(collectorWatcherTypeFlagName, "k8s", "The type of collector watcher to use. (one of 'k8s', 'aws-cloud-map')") + flagSet.String(awsCloudMapNamespaceFlagName, "default", "The namespace of the AWS Cloud Map service.") + flagSet.String(awsCloudMapServiceNameFlagName, "otel-collector", "The name of the AWS Cloud Map service.") + flagSet.Bool(runtimeKubernetesEnabledFlagName, true, "Enable Kubernetes runtime.") + flagSet.Duration(minUpdateIntervalFlagName, DefaultMinUpdateInterval, "The minimum update interval.") zapFlagSet := flag.NewFlagSet("", flag.ErrorHandling(errorHandling)) zapCmdLineOpts.BindFlags(zapFlagSet) flagSet.AddGoFlagSet(zapFlagSet) @@ -122,3 +133,48 @@ func getHttpsTLSKeyFilePath(flagSet *pflag.FlagSet) (value string, changed bool, value, err = flagSet.GetString(httpsTLSKeyFilePathFlagName) return } + +func getCollectorWatcherType(flagSet *pflag.FlagSet) (value string, changed bool, err error) { + if changed = flagSet.Changed(collectorWatcherTypeFlagName); !changed { + value, err = DefaultCollectorWatcherType, nil + return + } + value, err = flagSet.GetString(collectorWatcherTypeFlagName) + return +} + +func getAWSCloudMapNamespace(flagSet *pflag.FlagSet) (value string, changed bool, err error) { + if changed = flagSet.Changed(awsCloudMapNamespaceFlagName); !changed { + value, err = "", nil + return + } + value, err = flagSet.GetString(awsCloudMapNamespaceFlagName) + return +} + +func getAWSCloudMapServiceName(flagSet *pflag.FlagSet) (value string, changed bool, err error) { + if changed = flagSet.Changed(awsCloudMapServiceNameFlagName); !changed { + value, err = "", nil + return + } + value, err = flagSet.GetString(awsCloudMapServiceNameFlagName) + return +} + +func getRuntimeKubernetesEnabled(flagSet *pflag.FlagSet) (value bool, changed bool, err error) { + if changed = flagSet.Changed(runtimeKubernetesEnabledFlagName); !changed { + value, err = true, nil + return + } + value, err = flagSet.GetBool(runtimeKubernetesEnabledFlagName) + return +} + +func getMinUpdateInterval(flagSet *pflag.FlagSet) (value time.Duration, changed bool, err error) { + if changed = flagSet.Changed(minUpdateIntervalFlagName); !changed { + value, err = 5*time.Second, nil + return + } + value, err = flagSet.GetDuration(minUpdateIntervalFlagName) + return +} diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index be2418902e..1a00a9b88e 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -53,7 +53,7 @@ func main() { allocatorPrehook prehook.Hook allocator allocation.Allocator discoveryManager *discovery.Manager - collectorWatcher *collector.Watcher + collectorWatcher collector.Watcher promWatcher allocatorWatcher.Watcher targetDiscoverer *target.Discoverer @@ -103,17 +103,31 @@ func main() { httpOptions = append(httpOptions, server.WithTLSConfig(tlsConfig, cfg.HTTPS.ListenAddr)) } srv := server.NewServer(log, allocator, cfg.ListenAddr, httpOptions...) - + promLogger := gokitlog.With(gokitlog.NewJSONLogger(gokitlog.NewSyncWriter(os.Stdout)), "logger", "prometheus-discovery") discoveryCtx, discoveryCancel := context.WithCancel(ctx) sdMetrics, err := discovery.CreateAndRegisterSDMetrics(prometheus.DefaultRegisterer) if err != nil { setupLog.Error(err, "Unable to register metrics for Prometheus service discovery") os.Exit(1) } - discoveryManager = discovery.NewManager(discoveryCtx, gokitlog.NewNopLogger(), prometheus.DefaultRegisterer, sdMetrics) + discoveryManager = discovery.NewManager(discoveryCtx, promLogger, prometheus.DefaultRegisterer, sdMetrics) targetDiscoverer = target.NewDiscoverer(log, discoveryManager, allocatorPrehook, srv) - collectorWatcher, collectorWatcherErr := collector.NewCollectorWatcher(log, cfg.ClusterConfig) + collectorWatcherType, err := collector.ParseCollectorWatcherType(cfg.CollectorWatcher.WatcherType) + if err != nil { + setupLog.Error(err, "Unable to parse collector watcher type") + os.Exit(1) + } + collectorWatcher, collectorWatcherErr := collector.NewCollectorWatcher( + collectorWatcherType, + collector.WithLogger(log.WithName("collector-watcher")), + collector.WithCloudMapConfig( + &cfg.CollectorWatcher.AwsCloudMap.Namespace, + &cfg.CollectorWatcher.AwsCloudMap.ServiceName, + ), + collector.WithMinUpdateInterval(cfg.MinUpdateInterval), + collector.WithKubeConfig(cfg.ClusterConfig), + ) if collectorWatcherErr != nil { setupLog.Error(collectorWatcherErr, "Unable to initialize collector watcher") os.Exit(1) @@ -185,7 +199,10 @@ func main() { }) runGroup.Add( func() error { - err := collectorWatcher.Watch(cfg.CollectorSelector, allocator.SetCollectors) + err := collectorWatcher.Watch( + collector.WithLabelSelector(cfg.CollectorSelector), + collector.WithFn(allocator.SetCollectors), + ) setupLog.Info("Collector watcher exited") return err }, diff --git a/cmd/otel-allocator/server/server.go b/cmd/otel-allocator/server/server.go index 2e9df9a8b0..ad2ae611c2 100644 --- a/cmd/otel-allocator/server/server.go +++ b/cmd/otel-allocator/server/server.go @@ -112,6 +112,11 @@ func (s *Server) setRouter(router *gin.Engine) { router.GET("/livez", s.LivenessProbeHandler) router.GET("/readyz", s.ReadinessProbeHandler) registerPprof(router.Group("/debug/pprof/")) + var routesPath []string + for _, route := range router.Routes() { + routesPath = append(routesPath, route.Path) + } + s.logger.Info("Register routes", "routes", routesPath) } func NewServer(log logr.Logger, allocator allocation.Allocator, listenAddr string, options ...Option) *Server { @@ -135,17 +140,17 @@ func NewServer(log logr.Logger, allocator allocation.Allocator, listenAddr strin } func (s *Server) Start() error { - s.logger.Info("Starting server...") + s.logger.Info("Starting HTTP server...", "listenAddr", s.server.Addr) return s.server.ListenAndServe() } func (s *Server) Shutdown(ctx context.Context) error { - s.logger.Info("Shutting down server...") + s.logger.Info("Shutting down HTTP server...") return s.server.Shutdown(ctx) } func (s *Server) StartHTTPS() error { - s.logger.Info("Starting HTTPS server...") + s.logger.Info("Starting HTTPS server...", "listenAddr", s.server.Addr) return s.httpsServer.ListenAndServeTLS("", "") } diff --git a/go.mod b/go.mod index 3b4edb3fdd..2afe456af9 100644 --- a/go.mod +++ b/go.mod @@ -228,3 +228,23 @@ require ( sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect ) + +require ( + github.com/aws/aws-sdk-go-v2/config v1.27.36 + github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.33.5 +) + +require ( + github.com/aws/aws-sdk-go-v2 v1.32.4 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.34 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.23.0 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.31.0 // indirect + github.com/aws/smithy-go v1.22.0 // indirect +) diff --git a/go.sum b/go.sum index 612f6b92dc..8e9007f43c 100644 --- a/go.sum +++ b/go.sum @@ -85,6 +85,34 @@ github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:W github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU= github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= +github.com/aws/aws-sdk-go-v2 v1.32.4 h1:S13INUiTxgrPueTmrm5DZ+MiAo99zYzHEFh1UNkOxNE= +github.com/aws/aws-sdk-go-v2 v1.32.4/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo= +github.com/aws/aws-sdk-go-v2/config v1.27.36 h1:4IlvHh6Olc7+61O1ktesh0jOcqmq/4WG6C2Aj5SKXy0= +github.com/aws/aws-sdk-go-v2/config v1.27.36/go.mod h1:IiBpC0HPAGq9Le0Xxb1wpAKzEfAQ3XlYgJLYKEVYcfw= +github.com/aws/aws-sdk-go-v2/credentials v1.17.34 h1:gmkk1l/cDGSowPRzkdxYi8edw+gN4HmVK151D/pqGNc= +github.com/aws/aws-sdk-go-v2/credentials v1.17.34/go.mod h1:4R9OEV3tgFMsok4ZeFpExn7zQaZRa9MRGFYnI/xC/vs= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14 h1:C/d03NAmh8C4BZXhuRNboF/DqhBkBCeDiJDcaqIT5pA= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14/go.mod h1:7I0Ju7p9mCIdlrfS+JCgqcYD0VXz/N4yozsox+0o078= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 h1:A2w6m6Tmr+BNXjDsr7M90zkWjsu4JXHwrzPg235STs4= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23/go.mod h1:35EVp9wyeANdujZruvHiQUAo9E3vbhnIO1mTCAxMlY0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23 h1:pgYW9FCabt2M25MoHYCfMrVY2ghiiBKYWUVXfwZs+sU= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23/go.mod h1:c48kLgzO19wAu3CPkDWC28JbaJ+hfQlsdl7I2+oqIbk= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5 h1:QFASJGfT8wMXtuP3D5CRmMjARHv9ZmzFUMJznHDOY3w= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5/go.mod h1:QdZ3OmoIjSX+8D1OPAzPxDfjXASbBMDsz9qvtyIhtik= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20 h1:Xbwbmk44URTiHNx6PNo0ujDE6ERlsCKJD3u1zfnzAPg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20/go.mod h1:oAfOFzUB14ltPZj1rWwRc3d/6OgD76R8KlvU3EqM9Fg= +github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.33.5 h1:Adq1dR6H8pI7pECxgc0S44HMjJcvKoUea0fUyHEFUZA= +github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.33.5/go.mod h1:GZ6a9kkJfcGPRCMGU003Gb/VsB3qAH2xeIUt/6DLYF4= +github.com/aws/aws-sdk-go-v2/service/sso v1.23.0 h1:fHySkG0IGj2nepgGJPmmhZYL9ndnsq1Tvc6MeuVQCaQ= +github.com/aws/aws-sdk-go-v2/service/sso v1.23.0/go.mod h1:XRlMvmad0ZNL+75C5FYdMvbbLkd6qiqz6foR1nA1PXY= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.0 h1:cU/OeQPNReyMj1JEBgjE29aclYZYtXcsPMXbTkVGMFk= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.0/go.mod h1:FnvDM4sfa+isJ3kDXIzAB9GAwVSzFzSy97uZ3IsHo4E= +github.com/aws/aws-sdk-go-v2/service/sts v1.31.0 h1:GNVxIHBTi2EgwCxpNiozhNasMOK+ROUA2Z3X+cSBX58= +github.com/aws/aws-sdk-go-v2/service/sts v1.31.0/go.mod h1:yMWe0F+XG0DkRZK5ODZhG7BEFYhLXi2dqGsv6tX0cgI= +github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM= +github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 h1:6df1vn4bBlDDo4tARvBm7l6KA9iVMnE3NWizDeWSrps= github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3/go.mod h1:CIWtjkly68+yqLPbvwwR/fjNJA/idrtULjZWh2v1ys0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=