From 31a94887a61ff6b0e0e76a0c008e2368826fb08b Mon Sep 17 00:00:00 2001 From: Wojciech Suszko <53919286+Woojciech@users.noreply.github.com> Date: Mon, 9 Dec 2024 18:54:09 +0100 Subject: [PATCH] wsuszko/metadata-service tests (#127) * go metadata-service: Introduce early draft of metadata tests. * go metadata-service: Further test experiments. * go, metadata-service: Introduce automated integration tests. * docker tests: Add spacing/ * go, metadata: Refactor, remove comments. * docker tests: Readd es-certs. * docker: Remove test script. --- docker-compose.test.yml | 42 ++ go/docker/cluster_metadata/Dockerfile | 7 + go/pkg/message-broker/json_kafka.go | 2 +- go/pkg/message-broker/kafka.go | 1 + go/pkg/repositories/mongodb_repository.go | 4 + go/pkg/tests/mock_kafka_broker.go | 67 +++ go/pkg/tests/tests.go | 1 - .../cmd/cluster_metadata/main.go | 82 +-- .../cluster_metadata/pkg/config/modules.go | 139 +++++ .../pkg/services/event_service.go | 25 +- .../pkg/services/metadata_service.go | 43 +- .../pkg/services/metadata_service_test.go | 557 ++++++++++++++++++ .../logs_ingestion/pkg/config/modules.go | 3 +- 13 files changed, 864 insertions(+), 109 deletions(-) create mode 100644 go/pkg/tests/mock_kafka_broker.go create mode 100644 go/services/cluster_metadata/pkg/config/modules.go create mode 100644 go/services/cluster_metadata/pkg/services/metadata_service_test.go diff --git a/docker-compose.test.yml b/docker-compose.test.yml index 58d4bd3d..dc2a4a61 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -25,6 +25,48 @@ services: - es-certs:/usr/local/share - cache:/go/pkg/mod/ + cluster-metadata-service: + user: "0" + container_name: magpie-monitor-test-cluster-metadata-service + restart: on-failure + image: magpiemonitor/cluster-metadata-service + build: + context: ./go + dockerfile: ./docker/cluster_metadata/Dockerfile + target: tests + ports: + - ${CLUSTER_METADATA_HTTP_PORT:-8092}:${CLUSTER_METADATA_HTTP_PORT:-8092} + environment: + APP_ENV: "test" + HTTP_PORT: ${CLUSTER_METADATA_HTTP_PORT} + METADATADB_USER: ${CLUSTER_METADATA_MONGODB_USER} + METADATADB_PASSWORD: ${CLUSTER_METADATA_MONGODB_PASSWORD} + METADATADB_HOST: ${CLUSTER_METADATA_MONGODB_HOST} + METADATADB_PORT: ${CLUSTER_METADATA_MONGODB_PORT} + SWAGGER_HOST: ${CLUSTER_METADATA_SERVICE_HOST} + VIRTUAL_HOST: ${CLUSTER_METADATA_PRODUCTION_HOST} + LETSENCRYPT_HOST: ${CLUSTER_METADATA_PRODUCTION_HOST} + VIRTUAL_PORT: ${CLUSTER_METADATA_HTTP_PORT} + KAFKA_BROKER_URL: ${KAFKA_BROKER_URL} + KAFKA_CLIENT_USERNAME: ${KAFKA_CLIENT_USERNAME} + KAFKA_CLIENT_PASSWORD: ${KAFKA_CLIENT_PASSWORD} + CLUSTER_METADATA_APPLICATION_TOPIC: ${CLUSTER_METADATA_APPLICATION_TOPIC} + CLUSTER_METADATA_NODE_TOPIC: ${CLUSTER_METADATA_NODE_TOPIC} + CLUSTER_METADATA_CLUSTER_TOPIC: ${CLUSTER_METADATA_CLUSTER_TOPIC} + NODE_ACTIVITY_WINDOW_MILLIS: ${CLUSTER_METADATA_NODE_ACTIVITY_WINDOW_MILLIS} + APPLICATION_ACTIVITY_WINDOW_MILLIS: ${CLUSTER_METADATA_APPLICATION_ACTIVITY_WINDOW_MILLIS} + CLUSTER_ACTIVITY_WINDOW_MILLIS: ${CLUSTER_METADATA_CLUSTER_ACTIVITY_WINDOW_MILLIS} + CLUSTER_AGGREGATED_STATE_CHANGE_POLL_INTERVAL_SECONDS: ${CLUSTER_METADATA_CLUSTER_AGGREGATED_STATE_CHANGE_POLL_INTERVAL_SECONDS} + NODE_AGGREGATED_STATE_CHANGE_POLL_INTERVAL_SECONDS: ${CLUSTER_METADATA_NODE_AGGREGATED_STATE_CHANGE_POLL_INTERVAL_SECONDS} + APPLICATION_AGGREGATED_STATE_CHANGE_POLL_INTERVAL_SECONDS: ${CLUSTER_METADATA_APPLICATION_AGGREGATED_STATE_POLL_INTERVAL_SECONDS} + KAFKA_MAX_MESSAGE_SIZE_BYTES: ${KAFKA_MAX_MESSAGE_SIZE_BYTES} + KAFKA_BROKER_GROUP_ID: ${KAFKA_BROKER_GROUP_ID} + POD_AGENT_APPLICATION_METADATA_TOPIC: ${POD_AGENT_APPLICATION_METADATA_TOPIC} + NODE_AGENT_NODE_METADATA_TOPIC: ${NODE_AGENT_NODE_METADATA_TOPIC} + INTEGRATION_TEST_WAIT_MODIFIER: ${INTEGRATION_TEST_WAIT_MODIFIER} + volumes: + - cache:/go/pkg/mod/ + reports-service-test: user: "0" container_name: magpie-monitor-test-reports-service diff --git a/go/docker/cluster_metadata/Dockerfile b/go/docker/cluster_metadata/Dockerfile index 834f6676..53f5c54d 100644 --- a/go/docker/cluster_metadata/Dockerfile +++ b/go/docker/cluster_metadata/Dockerfile @@ -14,6 +14,13 @@ COPY . . RUN --mount=type=cache,target=/go/pkg/mod/ \ go build -o /bin/server ./services/cluster_metadata/cmd/cluster_metadata +FROM build AS tests + +RUN echo "go env -w GOCACHE=/go/pkg/mod/ && go test ./services/cluster_metadata/... -v -cover" > test.sh +RUN chmod +x test.sh + +ENTRYPOINT ["bash", "./test.sh"] + FROM debian:latest AS final RUN apt-get update && apt-get install -y ca-certificates diff --git a/go/pkg/message-broker/json_kafka.go b/go/pkg/message-broker/json_kafka.go index 83734e1d..ecdee652 100644 --- a/go/pkg/message-broker/json_kafka.go +++ b/go/pkg/message-broker/json_kafka.go @@ -56,8 +56,8 @@ func (b *KafkaJsonMessageBroker[T]) Subscribe(ctx context.Context, messages chan } } - } + func (b *KafkaJsonMessageBroker[T]) CloseReader() error { return b.broker.CloseReader() diff --git a/go/pkg/message-broker/kafka.go b/go/pkg/message-broker/kafka.go index b252e147..cc917504 100644 --- a/go/pkg/message-broker/kafka.go +++ b/go/pkg/message-broker/kafka.go @@ -125,6 +125,7 @@ func (b *KafkaMessageBroker) Subscribe(ctx context.Context, messages chan<- []by messages <- msg.Value } } + func (b *KafkaMessageBroker) CloseReader() error { return b.reader.Close() } diff --git a/go/pkg/repositories/mongodb_repository.go b/go/pkg/repositories/mongodb_repository.go index 6346318e..0235dab6 100644 --- a/go/pkg/repositories/mongodb_repository.go +++ b/go/pkg/repositories/mongodb_repository.go @@ -114,3 +114,7 @@ func (m *MongoDbCollection[T]) ReplaceDocument(ctx context.Context, id primitive func (m *MongoDbCollection[T]) Count(filter bson.D) (int64, error) { return m.Client.Database(m.Db).Collection(m.Col).CountDocuments(context.TODO(), filter) } + +func (m *MongoDbCollection[T]) DeleteAll() (*mongo.DeleteResult, error) { + return m.Client.Database(m.Db).Collection(m.Col).DeleteMany(context.TODO(), bson.D{}) +} diff --git a/go/pkg/tests/mock_kafka_broker.go b/go/pkg/tests/mock_kafka_broker.go new file mode 100644 index 00000000..daa94d86 --- /dev/null +++ b/go/pkg/tests/mock_kafka_broker.go @@ -0,0 +1,67 @@ +package tests + +import ( + "context" + "time" + + messagebroker "github.com/Magpie-Monitor/magpie-monitor/pkg/message-broker" + "github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/pkg/repositories" + "github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/pkg/services" + "go.uber.org/zap" +) + +func NewMockApplicationMetadataBroker(logger *zap.Logger) messagebroker.MessageBroker[repositories.ApplicationState] { + return NewKafkaJsonMessageBroker[repositories.ApplicationState](logger) +} + +func NewMockNodeMetadataBroker(logger *zap.Logger) messagebroker.MessageBroker[repositories.NodeState] { + return NewKafkaJsonMessageBroker[repositories.NodeState](logger) +} + +func NewMockNodeMetadataUpdatedBroker(logger *zap.Logger) messagebroker.MessageBroker[services.NodeMetadataUpdated] { + return NewKafkaJsonMessageBroker[services.NodeMetadataUpdated](logger) +} + +func NewMockApplicationMetadataUpdatedBroker(logger *zap.Logger) messagebroker.MessageBroker[services.ApplicationMetadataUpdated] { + return NewKafkaJsonMessageBroker[services.ApplicationMetadataUpdated](logger) +} + +func NewMockClusterMetadataUpdatedBroker(logger *zap.Logger) messagebroker.MessageBroker[services.ClusterMetadataUpdated] { + return NewKafkaJsonMessageBroker[services.ClusterMetadataUpdated](logger) +} + +type MockKafkaJsonMessageBroker[T any] struct { + messages []T + logger *zap.Logger +} + +func NewKafkaJsonMessageBroker[T any](logger *zap.Logger) messagebroker.MessageBroker[T] { + return &MockKafkaJsonMessageBroker[T]{ + logger: logger, + messages: make([]T, 0), + } +} + +func (b *MockKafkaJsonMessageBroker[T]) Publish(key string, message T) error { + b.messages = append(b.messages, message) + b.logger.Info("Received message", zap.Any("messages", b.messages)) + return nil +} + +func (b *MockKafkaJsonMessageBroker[T]) Subscribe(ctx context.Context, messages chan<- T, errors chan<- error) { + for { + for _, msg := range b.messages { + messages <- msg + b.logger.Info("Published message", zap.Any("message", msg)) + } + b.messages = make([]T, 0) + + time.Sleep(5 * time.Second) + } +} + +func (b *MockKafkaJsonMessageBroker[T]) CloseReader() error { + return nil +} + +var _ messagebroker.MessageBroker[any] = &MockKafkaJsonMessageBroker[any]{} diff --git a/go/pkg/tests/tests.go b/go/pkg/tests/tests.go index 4d06fd1c..3a42eff0 100644 --- a/go/pkg/tests/tests.go +++ b/go/pkg/tests/tests.go @@ -1,7 +1,6 @@ package tests import ( - // "context" "context" "testing" diff --git a/go/services/cluster_metadata/cmd/cluster_metadata/main.go b/go/services/cluster_metadata/cmd/cluster_metadata/main.go index 1e9d7f07..63088c2d 100644 --- a/go/services/cluster_metadata/cmd/cluster_metadata/main.go +++ b/go/services/cluster_metadata/cmd/cluster_metadata/main.go @@ -1,92 +1,16 @@ package main import ( - "context" - "fmt" - "net" "net/http" - "os" - messagebroker "github.com/Magpie-Monitor/magpie-monitor/pkg/message-broker" - "github.com/Magpie-Monitor/magpie-monitor/pkg/mongodb" - "github.com/Magpie-Monitor/magpie-monitor/pkg/routing" - "github.com/Magpie-Monitor/magpie-monitor/pkg/swagger" - "github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/internal/database" - "github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/internal/handlers" - "github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/pkg/repositories" + "github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/pkg/config" "github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/pkg/services" - "github.com/gorilla/mux" "go.uber.org/fx" - "go.uber.org/fx/fxevent" - "go.uber.org/zap" ) -type ServerParams struct { - fx.In - Lc fx.Lifecycle - Logger *zap.Logger - MetadataRouter *handlers.MetadataRouter - RootRouter *mux.Router - SwaggerRouter *swagger.SwaggerRouter -} - -func NewHTTPServer(ServerParams ServerParams) *http.Server { - port := os.Getenv("HTTP_PORT") - - srv := &http.Server{Addr: fmt.Sprintf(":%s", port), Handler: ServerParams.RootRouter} - ServerParams.Lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - ln, err := net.Listen("tcp", srv.Addr) - if err != nil { - return err - } - - ServerParams.Logger.Info("Starting HTTP server at", zap.String("addr", srv.Addr)) - go srv.Serve(ln) - return nil - }, - OnStop: func(ctx context.Context) error { - return srv.Shutdown(ctx) - }, - }) - return srv -} - func main() { fx.New( - fx.WithLogger(func(log *zap.Logger) fxevent.Logger { - return &fxevent.ZapLogger{Logger: log} - }), - fx.Provide( - NewHTTPServer, - - routing.NewRootRouter, - - swagger.NewSwaggerRouter, - swagger.NewSwaggerHandler, - swagger.ProvideSwaggerConfig(), - - handlers.NewMetadataRouter, - handlers.NewMetadataHandler, - - mongodb.NewMongoDbClient, - database.NewMongoDbConnectionDetails, - messagebroker.NewKafkaCredentials, - - services.NewMetadataService, - services.NewMetadataEventPublisher, - - services.NewApplicationMetadataBroker, - services.NewNodeMetadataBroker, - - repositories.NewApplicationMetadataCollection, - repositories.NewNodeMetadataCollection, - repositories.NewApplicationAggregatedMetadataCollection, - repositories.NewNodeAggregatedMetadataCollection, - repositories.NewClusterAggregatedStateCollection, - - zap.NewProduction, - ), - fx.Invoke(func(*http.Server) {}), + config.AppModule, + fx.Invoke(func(*http.Server, *services.MetadataService) {}), ).Run() } diff --git a/go/services/cluster_metadata/pkg/config/modules.go b/go/services/cluster_metadata/pkg/config/modules.go new file mode 100644 index 00000000..d8371508 --- /dev/null +++ b/go/services/cluster_metadata/pkg/config/modules.go @@ -0,0 +1,139 @@ +package config + +import ( + "context" + "fmt" + "net" + "net/http" + "os" + + messagebroker "github.com/Magpie-Monitor/magpie-monitor/pkg/message-broker" + "github.com/Magpie-Monitor/magpie-monitor/pkg/mongodb" + "github.com/Magpie-Monitor/magpie-monitor/pkg/routing" + "github.com/Magpie-Monitor/magpie-monitor/pkg/swagger" + "github.com/Magpie-Monitor/magpie-monitor/pkg/tests" + "github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/internal/database" + "github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/internal/handlers" + "github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/pkg/repositories" + "github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/pkg/services" + "github.com/gorilla/mux" + "go.uber.org/fx" + "go.uber.org/fx/fxevent" + "go.uber.org/zap" +) + +type ServerParams struct { + fx.In + Lc fx.Lifecycle + Logger *zap.Logger + MetadataRouter *handlers.MetadataRouter + RootRouter *mux.Router + SwaggerRouter *swagger.SwaggerRouter +} + +func NewHTTPServer(ServerParams ServerParams) *http.Server { + port := os.Getenv("HTTP_PORT") + + srv := &http.Server{Addr: fmt.Sprintf(":%s", port), Handler: ServerParams.RootRouter} + ServerParams.Lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + ln, err := net.Listen("tcp", srv.Addr) + if err != nil { + return err + } + + ServerParams.Logger.Info("Starting HTTP server at", zap.String("addr", srv.Addr)) + go srv.Serve(ln) + return nil + }, + OnStop: func(ctx context.Context) error { + return srv.Shutdown(ctx) + }, + }) + return srv +} + +var AppModule fx.Option + +func init() { + env := os.Getenv("APP_ENV") + fmt.Printf("Starting the app in %s environment", env) + + if env == tests.TEST_ENVIRONMENT { + AppModule = fx.Options( + fx.Provide( + NewHTTPServer, + + routing.NewRootRouter, + + swagger.NewSwaggerRouter, + swagger.NewSwaggerHandler, + swagger.ProvideSwaggerConfig(), + + handlers.NewMetadataRouter, + handlers.NewMetadataHandler, + + mongodb.NewMongoDbClient, + database.NewMongoDbConnectionDetails, + messagebroker.NewKafkaCredentials, + + services.NewMetadataService, + services.NewMetadataEventPublisher, + + tests.NewMockApplicationMetadataBroker, + tests.NewMockNodeMetadataBroker, + tests.NewMockNodeMetadataUpdatedBroker, + tests.NewMockApplicationMetadataUpdatedBroker, + tests.NewMockClusterMetadataUpdatedBroker, + + repositories.NewApplicationMetadataCollection, + repositories.NewNodeMetadataCollection, + repositories.NewApplicationAggregatedMetadataCollection, + repositories.NewNodeAggregatedMetadataCollection, + repositories.NewClusterAggregatedStateCollection, + + zap.NewProduction, + ), + ) + } else { + AppModule = fx.Options( + fx.WithLogger(func(log *zap.Logger) fxevent.Logger { + return &fxevent.ZapLogger{Logger: log} + }), + fx.Provide( + NewHTTPServer, + + routing.NewRootRouter, + + swagger.NewSwaggerRouter, + swagger.NewSwaggerHandler, + swagger.ProvideSwaggerConfig(), + + handlers.NewMetadataRouter, + handlers.NewMetadataHandler, + + mongodb.NewMongoDbClient, + database.NewMongoDbConnectionDetails, + messagebroker.NewKafkaCredentials, + + services.NewMetadataService, + services.NewMetadataEventPublisher, + + services.NewApplicationMetadataBroker, + services.NewNodeMetadataBroker, + + services.NewApplicationMetadataUpdatedBroker, + services.NewNodeMetadataUpdatedBroker, + services.NewClusterMetadataUpdatedBroker, + + repositories.NewApplicationMetadataCollection, + repositories.NewNodeMetadataCollection, + repositories.NewApplicationAggregatedMetadataCollection, + repositories.NewNodeAggregatedMetadataCollection, + repositories.NewClusterAggregatedStateCollection, + + zap.NewProduction, + ), + ) + } +} diff --git a/go/services/cluster_metadata/pkg/services/event_service.go b/go/services/cluster_metadata/pkg/services/event_service.go index 29695a24..a9629237 100644 --- a/go/services/cluster_metadata/pkg/services/event_service.go +++ b/go/services/cluster_metadata/pkg/services/event_service.go @@ -16,16 +16,21 @@ const ( CLUSTER_METADATA_CLUSTER_TOPIC_ENV_NAME = "CLUSTER_METADATA_CLUSTER_TOPIC" ) -func NewMetadataEventPublisher(log *zap.Logger, credentials *messagebroker.KafkaCredentials) *MetadataEventPublisher { +func NewMetadataEventPublisher( + log *zap.Logger, + appMetadataBroker messagebroker.MessageBroker[ApplicationMetadataUpdated], + nodeMetadataBroker messagebroker.MessageBroker[NodeMetadataUpdated], + clusterMetadataBroker messagebroker.MessageBroker[ClusterMetadataUpdated], +) *MetadataEventPublisher { return &MetadataEventPublisher{ log: log, - applicationMetadataBroker: NewApplicationMetadataUpdatedBroker(log, credentials), - nodeMetadataBroker: NewNodeMetadataUpdatedBroker(log, credentials), - clusterMetadataBroker: NewClusterMetadataUpdatedBroker(log, credentials), + applicationMetadataBroker: appMetadataBroker, + nodeMetadataBroker: nodeMetadataBroker, + clusterMetadataBroker: clusterMetadataBroker, } } -func NewApplicationMetadataUpdatedBroker(logger *zap.Logger, credentials *messagebroker.KafkaCredentials) *messagebroker.KafkaJsonMessageBroker[ApplicationMetadataUpdated] { +func NewApplicationMetadataUpdatedBroker(logger *zap.Logger, credentials *messagebroker.KafkaCredentials) messagebroker.MessageBroker[ApplicationMetadataUpdated] { envs.ValidateEnvs("%s env variable not set", []string{ CLUSTER_METADATA_APPLICATION_TOPIC_ENV_NAME, }) @@ -37,7 +42,7 @@ func NewApplicationMetadataUpdatedBroker(logger *zap.Logger, credentials *messag ) } -func NewNodeMetadataUpdatedBroker(logger *zap.Logger, credentials *messagebroker.KafkaCredentials) *messagebroker.KafkaJsonMessageBroker[NodeMetadataUpdated] { +func NewNodeMetadataUpdatedBroker(logger *zap.Logger, credentials *messagebroker.KafkaCredentials) messagebroker.MessageBroker[NodeMetadataUpdated] { envs.ValidateEnvs("%s env variable not set", []string{ CLUSTER_METADATA_NODE_TOPIC_ENV_NAME, }) @@ -50,7 +55,7 @@ func NewNodeMetadataUpdatedBroker(logger *zap.Logger, credentials *messagebroker ) } -func NewClusterMetadataUpdatedBroker(logger *zap.Logger, credentials *messagebroker.KafkaCredentials) *messagebroker.KafkaJsonMessageBroker[ClusterMetadataUpdated] { +func NewClusterMetadataUpdatedBroker(logger *zap.Logger, credentials *messagebroker.KafkaCredentials) messagebroker.MessageBroker[ClusterMetadataUpdated] { envs.ValidateEnvs("%s env variable not set", []string{ CLUSTER_METADATA_CLUSTER_TOPIC_ENV_NAME, }) @@ -80,9 +85,9 @@ type ClusterMetadataUpdated struct { type MetadataEventPublisher struct { log *zap.Logger - applicationMetadataBroker *messagebroker.KafkaJsonMessageBroker[ApplicationMetadataUpdated] - nodeMetadataBroker *messagebroker.KafkaJsonMessageBroker[NodeMetadataUpdated] - clusterMetadataBroker *messagebroker.KafkaJsonMessageBroker[ClusterMetadataUpdated] + applicationMetadataBroker messagebroker.MessageBroker[ApplicationMetadataUpdated] + nodeMetadataBroker messagebroker.MessageBroker[NodeMetadataUpdated] + clusterMetadataBroker messagebroker.MessageBroker[ClusterMetadataUpdated] } func (e *MetadataEventPublisher) PublishApplicationMetadataUpdatedEvent(metadata repositories.AggregatedApplicationMetadata) error { diff --git a/go/services/cluster_metadata/pkg/services/metadata_service.go b/go/services/cluster_metadata/pkg/services/metadata_service.go index 16d55240..88319675 100644 --- a/go/services/cluster_metadata/pkg/services/metadata_service.go +++ b/go/services/cluster_metadata/pkg/services/metadata_service.go @@ -63,8 +63,8 @@ type MetadataServiceParams struct { NodeAggregatedRepo *sharedrepo.MongoDbCollection[repositories.AggregatedNodeMetadata] ClusterAggregatedRepo *sharedrepo.MongoDbCollection[repositories.AggregatedClusterMetadata] EventEmitter *MetadataEventPublisher - ApplicationMetadataBroker *messagebroker.KafkaJsonMessageBroker[repositories.ApplicationState] - NodeMetadataBroker *messagebroker.KafkaJsonMessageBroker[repositories.NodeState] + ApplicationMetadataBroker messagebroker.MessageBroker[repositories.ApplicationState] + NodeMetadataBroker messagebroker.MessageBroker[repositories.NodeState] } func NewMetadataService(params MetadataServiceParams) *MetadataService { @@ -88,11 +88,7 @@ func NewMetadataService(params MetadataServiceParams) *MetadataService { params.Lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { - go metadataService.pollForClusterStateChange() - go metadataService.pollForApplicationStateChange() - go metadataService.pollForNodeStateChange() - go metadataService.consumeApplicationMetadata() - go metadataService.consumeNodeMetadata() + metadataService.Init() return nil }, }) @@ -115,8 +111,16 @@ type MetadataService struct { clusterAggregatedStateChangePollIntervalSeconds int nodeAggregatedStateChangePollIntervalSeconds int applicationAggregatedStateChangePollIntervalSeconds int - applicationMetadataBroker *messagebroker.KafkaJsonMessageBroker[repositories.ApplicationState] - nodeMetadataBroker *messagebroker.KafkaJsonMessageBroker[repositories.NodeState] + applicationMetadataBroker messagebroker.MessageBroker[repositories.ApplicationState] + nodeMetadataBroker messagebroker.MessageBroker[repositories.NodeState] +} + +func (m *MetadataService) Init() { + go m.pollForClusterStateChange() + go m.pollForApplicationStateChange() + go m.pollForNodeStateChange() + go m.consumeApplicationMetadata() + go m.consumeNodeMetadata() } func (m *MetadataService) consumeApplicationMetadata() { @@ -125,15 +129,18 @@ func (m *MetadataService) consumeApplicationMetadata() { err = make(chan error) ) - defer close(msg) - defer close(err) + ctx := context.Background() + defer ctx.Done() - go m.applicationMetadataBroker.Subscribe(context.TODO(), msg, err) + go m.applicationMetadataBroker.Subscribe(ctx, msg, err) for { select { case metadata := <-msg: - m.clusterRepo.InsertDocuments([]interface{}{metadata}) + _, err := m.clusterRepo.InsertDocuments([]interface{}{metadata}) + if err != nil { + m.log.Error("Error inserting consumed application metadata", zap.Error(err)) + } case error := <-err: m.log.Error("Error consuming application metadata", zap.Error(error)) } @@ -146,15 +153,17 @@ func (m *MetadataService) consumeNodeMetadata() { err = make(chan error) ) - defer close(msg) - defer close(err) + ctx := context.Background() - go m.nodeMetadataBroker.Subscribe(context.TODO(), msg, err) + go m.nodeMetadataBroker.Subscribe(ctx, msg, err) for { select { case metadata := <-msg: - m.nodeRepo.InsertDocuments([]interface{}{metadata}) + _, err := m.nodeRepo.InsertDocuments([]interface{}{metadata}) + if err != nil { + m.log.Error("Error inserting consumed node metadata", zap.Error(err)) + } case error := <-err: m.log.Error("Error consuming node metadata", zap.Error(error)) } diff --git a/go/services/cluster_metadata/pkg/services/metadata_service_test.go b/go/services/cluster_metadata/pkg/services/metadata_service_test.go new file mode 100644 index 00000000..ddb3a38b --- /dev/null +++ b/go/services/cluster_metadata/pkg/services/metadata_service_test.go @@ -0,0 +1,557 @@ +package services_test + +import ( + "context" + "testing" + "time" + + messagebroker "github.com/Magpie-Monitor/magpie-monitor/pkg/message-broker" + sharedrepo "github.com/Magpie-Monitor/magpie-monitor/pkg/repositories" + "github.com/Magpie-Monitor/magpie-monitor/pkg/tests" + "github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/pkg/config" + "github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/pkg/repositories" + "github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/pkg/services" + + "github.com/stretchr/testify/assert" + "go.mongodb.org/mongo-driver/bson" + "go.uber.org/fx" + "go.uber.org/zap" +) + +func TestNodeMetadataIngestion(t *testing.T) { + + testCases := []struct { + name string + metadata repositories.NodeState + }{ + { + name: "Ingest metadata with multiple watched files", + metadata: repositories.NodeState{ + ClusterId: "cluster", + NodeName: "node", + CollectedAtMs: time.Now().UnixMilli(), + WatchedFiles: []string{ + "file1", + "file2", + "file3", + "file4", + }, + }, + }, + { + name: "Ingest metadata without watched files", + metadata: repositories.NodeState{ + ClusterId: "cluster", + NodeName: "node", + CollectedAtMs: time.Now().UnixMilli(), + WatchedFiles: []string{}, + }, + }, + } + + type TestDependencies struct { + fx.In + Logger *zap.Logger + + MetadataService *services.MetadataService + + NodeMetadataBroker messagebroker.MessageBroker[repositories.NodeState] + + NodeMetadataRepository *sharedrepo.MongoDbCollection[repositories.NodeState] + NodeAggregatedRepo *sharedrepo.MongoDbCollection[repositories.AggregatedNodeMetadata] + } + + test := func(dependencies TestDependencies) { + log := dependencies.Logger + + for _, test := range testCases { + dependencies.MetadataService.Init() + + var metadata = test.metadata + + log.Info("Executing", zap.String("test", test.name)) + + dependencies.NodeMetadataRepository.DeleteAll() + dependencies.NodeAggregatedRepo.DeleteAll() + + dependencies.NodeMetadataBroker.Publish("", metadata) + + time.Sleep(15 * time.Second) + + result, err := dependencies.NodeMetadataRepository.GetDocument( + bson.D{ + {Key: "clusterId", Value: metadata.ClusterId}, + {Key: "collectedAtMs", Value: metadata.CollectedAtMs}}, + bson.D{}, + ) + + if err != nil { + log.Error("Error reading node metadata", zap.Error(err)) + t.Error("Error reading node metadata") + } + + assert.Len(t, result.WatchedFiles, len(metadata.WatchedFiles), "Invalid number of watched files") + assert.ElementsMatch(t, result.WatchedFiles, metadata.WatchedFiles, "Watched files don't match") + } + } + + tests.RunTest(test, t, config.AppModule) +} + +func TestApplicationMetadataIngestion(t *testing.T) { + + testCases := []struct { + name string + metadata repositories.ApplicationState + }{ + { + name: "Ingest metadata with multiple applications", + metadata: repositories.ApplicationState{ + CollectedAtMs: time.Now().UnixMilli(), + ClusterId: "cluster", + Applications: []repositories.Application{ + { + Kind: "Deployment", + Name: "test-dp", + }, + { + Kind: "StatefulSet", + Name: "test-sts", + }, + }, + }, + }, + { + name: "Ingest metadata without applications", + metadata: repositories.ApplicationState{ + CollectedAtMs: time.Now().UnixMilli(), + ClusterId: "cluster", + Applications: []repositories.Application{}, + }, + }, + } + + type TestDependencies struct { + fx.In + Logger *zap.Logger + MetadataService *services.MetadataService + ApplicationMetadataBroker messagebroker.MessageBroker[repositories.ApplicationState] + ApplicationMetadataRepository *sharedrepo.MongoDbCollection[repositories.ApplicationState] + } + + test := func(dependencies TestDependencies) { + + for _, test := range testCases { + dependencies.MetadataService.Init() + + var ( + log = dependencies.Logger + metadata = test.metadata + ) + log.Info("Executing", zap.String("test", test.name)) + + dependencies.ApplicationMetadataRepository.DeleteAll() + dependencies.ApplicationMetadataBroker.Publish("", metadata) + + time.Sleep(15 * time.Second) + + result, err := dependencies.ApplicationMetadataRepository.GetDocument( + bson.D{ + {Key: "clusterId", Value: metadata.ClusterId}, + {Key: "collectedAtMs", Value: metadata.CollectedAtMs}}, + bson.D{}, + ) + if err != nil { + log.Error("Error reading application metadata", zap.Error(err)) + t.Error("Error reading application metadata") + } + + assert.Len(t, result.Applications, len(metadata.Applications), "Invalid number of applications") + assert.ElementsMatch(t, result.Applications, metadata.Applications, "Applications don't match") + } + } + + tests.RunTest(test, t, config.AppModule) +} + +func TestNodeMetadataStateUpdate(t *testing.T) { + + testCases := []struct { + name string + metadata []repositories.NodeState + result repositories.AggregatedNodeMetadata + }{ + { + name: "Update for two nodes with fixed fileset", + metadata: []repositories.NodeState{ + { + ClusterId: "cluster", + NodeName: "node1", + CollectedAtMs: time.Now().UnixMilli(), + WatchedFiles: []string{"file1", "file2", "file3"}, + }, + { + ClusterId: "cluster", + NodeName: "node2", + CollectedAtMs: time.Now().UnixMilli(), + WatchedFiles: []string{"file1", "file2", "file3"}, + }, + }, + result: repositories.AggregatedNodeMetadata{ + ClusterId: "cluster", + Metadata: []repositories.NodeMetadata{ + { + Name: "node1", + Files: []interface{}{"file1", "file2", "file3"}, + }, + { + Name: "node2", + Files: []interface{}{"file1", "file2", "file3"}, + }, + }, + }, + }, + { + name: "Update for a single node with varied fileset", + metadata: []repositories.NodeState{ + { + ClusterId: "cluster", + NodeName: "node", + CollectedAtMs: time.Now().UnixMilli(), + WatchedFiles: []string{"file1", "file2"}, + }, + { + ClusterId: "cluster", + NodeName: "node", + CollectedAtMs: time.Now().UnixMilli(), + WatchedFiles: []string{"file1", "file2", "file3"}, + }, + { + ClusterId: "cluster", + NodeName: "node", + CollectedAtMs: time.Now().UnixMilli(), + WatchedFiles: []string{"file4"}, + }, + }, + result: repositories.AggregatedNodeMetadata{ + ClusterId: "cluster", + Metadata: []repositories.NodeMetadata{ + { + Name: "node", + Files: []interface{}{"file1", "file2", "file3", "file4"}, + }, + }, + }, + }, + } + + type TestDependencies struct { + fx.In + Logger *zap.Logger + + MetadataService *services.MetadataService + + NodeMetadataBroker messagebroker.MessageBroker[repositories.NodeState] + NodeMetadataUpdatedBroker messagebroker.MessageBroker[services.NodeMetadataUpdated] + + NodeRepo *sharedrepo.MongoDbCollection[repositories.NodeState] + NodeAggregatedRepo *sharedrepo.MongoDbCollection[repositories.AggregatedNodeMetadata] + } + + for _, test := range testCases { + tests.RunTest(func(dependencies TestDependencies) { + + dependencies.MetadataService.Init() + + var ( + metadata = test.metadata + result = test.result + log = dependencies.Logger + ) + + log.Info("Executing", zap.String("test", test.name)) + + dependencies.NodeRepo.DeleteAll() + dependencies.NodeAggregatedRepo.DeleteAll() + + for _, state := range metadata { + dependencies.NodeMetadataBroker.Publish("", state) + } + + msg := make(chan services.NodeMetadataUpdated) + go dependencies.NodeMetadataUpdatedBroker.Subscribe(context.Background(), msg, make(chan<- error)) + + updatedStateEvent := <-msg + updatedState := updatedStateEvent.Metadata + + assert.Equal(t, result.ClusterId, updatedState.ClusterId, "Invalid clusterId") + + for idx, metadata := range updatedState.Metadata { + assert.Equal(t, result.Metadata[idx].Name, metadata.Name, "Invalid node name") + assert.ElementsMatch(t, result.Metadata[idx].Files, metadata.Files, "Invalid node files") + } + + assert.Len(t, updatedState.Metadata, len(result.Metadata), "Invalid number of nodes") + }, + t, + config.AppModule, + ) + } +} + +func TestApplicationMetadataStateUpdate(t *testing.T) { + + testCases := []struct { + name string + metadata []repositories.ApplicationState + result repositories.AggregatedApplicationMetadata + }{ + { + name: "Application aggregated state with additional applications", + metadata: []repositories.ApplicationState{ + { + CollectedAtMs: time.Now().UnixMilli(), + ClusterId: "cluster", + Applications: []repositories.Application{ + { + Kind: "Deployment", + Name: "dp", + }, + { + Kind: "StatefulSet", + Name: "sts", + }, + }, + }, + { + CollectedAtMs: time.Now().UnixMilli(), + ClusterId: "cluster", + Applications: []repositories.Application{ + { + Kind: "Deployment", + Name: "dp", + }, + { + Kind: "StatefulSet", + Name: "sts-2", + }, + { + Kind: "DaemonSet", + Name: "dp-3", + }, + }, + }, + }, + result: repositories.AggregatedApplicationMetadata{ + ClusterId: "cluster", + Metadata: []repositories.ApplicationMetadata{ + { + Kind: "Deployment", + Name: "dp", + }, + { + Kind: "StatefulSet", + Name: "sts", + }, + { + Kind: "StatefulSet", + Name: "sts-2", + }, + { + Kind: "DaemonSet", + Name: "dp-3", + }, + }, + }, + }, + { + name: "Application aggregated state without additional applications", + metadata: []repositories.ApplicationState{ + { + CollectedAtMs: time.Now().UnixMilli(), + ClusterId: "cluster", + Applications: []repositories.Application{ + { + Kind: "Deployment", + Name: "dp", + }, + { + Kind: "StatefulSet", + Name: "sts", + }, + }, + }, + { + CollectedAtMs: time.Now().UnixMilli(), + ClusterId: "cluster", + Applications: []repositories.Application{ + { + Kind: "Deployment", + Name: "dp", + }, + { + Kind: "StatefulSet", + Name: "sts", + }, + }, + }, + }, + result: repositories.AggregatedApplicationMetadata{ + ClusterId: "cluster", + Metadata: []repositories.ApplicationMetadata{ + { + Kind: "Deployment", + Name: "dp", + }, + { + Kind: "StatefulSet", + Name: "sts", + }, + }, + }, + }, + { + name: "Application aggregated state with empty applications", + metadata: []repositories.ApplicationState{ + { + CollectedAtMs: time.Now().UnixMilli(), + ClusterId: "cluster", + Applications: []repositories.Application{}, + }, + { + CollectedAtMs: time.Now().UnixMilli(), + ClusterId: "cluster", + Applications: []repositories.Application{}, + }, + }, + result: repositories.AggregatedApplicationMetadata{ + ClusterId: "cluster", + Metadata: []repositories.ApplicationMetadata{}, + }, + }, + } + + type TestDependencies struct { + fx.In + Logger *zap.Logger + + MetadataService *services.MetadataService + + ApplicationMetadataRepository *sharedrepo.MongoDbCollection[repositories.ApplicationState] + ApplicationAggregatedRepo *sharedrepo.MongoDbCollection[repositories.AggregatedApplicationMetadata] + AppRepo *sharedrepo.MongoDbCollection[repositories.ApplicationState] + + ApplicationMetadataBroker messagebroker.MessageBroker[repositories.ApplicationState] + ApplicationMetadataUpdatedBroker messagebroker.MessageBroker[services.ApplicationMetadataUpdated] + } + + for _, test := range testCases { + tests.RunTest(func(dependencies TestDependencies) { + + dependencies.MetadataService.Init() + + var ( + log = dependencies.Logger + result = test.result + ) + + log.Info("Executing", zap.String("test", test.name)) + + dependencies.AppRepo.DeleteAll() + dependencies.ApplicationAggregatedRepo.DeleteAll() + dependencies.ApplicationMetadataRepository.DeleteAll() + + for _, app := range test.metadata { + dependencies.ApplicationMetadataBroker.Publish("", app) + } + + msg := make(chan services.ApplicationMetadataUpdated) + go dependencies.ApplicationMetadataUpdatedBroker.Subscribe(context.Background(), msg, make(chan<- error)) + + updatedStateEvent := <-msg + + updatedState := updatedStateEvent.Metadata + + assert.Equal(t, result.ClusterId, updatedState.ClusterId, "Invalid clusterId") + assert.ElementsMatch(t, result.Metadata, updatedState.Metadata, "Invalid application metadata") + }, t, config.AppModule) + } +} + +func TestClusterMetadataStateUpdate(t *testing.T) { + + testCases := []struct { + name string + metadata []repositories.NodeState + result repositories.AggregatedClusterMetadata + }{ + { + name: "test", + metadata: []repositories.NodeState{ + { + ClusterId: "cluster1", + NodeName: "node", + CollectedAtMs: time.Now().UnixMilli(), + WatchedFiles: []string{"file1", "file2"}, + }, + { + ClusterId: "cluster2", + NodeName: "node", + CollectedAtMs: time.Now().UnixMilli(), + WatchedFiles: []string{"file1", "file2"}, + }, + { + ClusterId: "cluster3", + NodeName: "node", + CollectedAtMs: time.Now().UnixMilli(), + WatchedFiles: []string{"file1", "file2"}, + }, + }, + result: repositories.AggregatedClusterMetadata{ + Metadata: []repositories.ClusterMetadata{ + { + ClusterId: "cluster1", + }, + { + ClusterId: "cluster2", + }, + { + ClusterId: "cluster3", + }, + }, + }, + }, + } + + type TestDependencies struct { + fx.In + MetadataService *services.MetadataService + AppRepo *sharedrepo.MongoDbCollection[repositories.ApplicationState] + NodeRepo *sharedrepo.MongoDbCollection[repositories.NodeState] + ClusterAggregatedRepo *sharedrepo.MongoDbCollection[repositories.AggregatedClusterMetadata] + NodeMetadataBroker messagebroker.MessageBroker[repositories.NodeState] + ClusterMetadataUpdatedBroker messagebroker.MessageBroker[services.ClusterMetadataUpdated] + } + + for _, test := range testCases { + tests.RunTest(func(dependencies TestDependencies) { + + dependencies.MetadataService.Init() + + dependencies.ClusterAggregatedRepo.DeleteAll() + dependencies.NodeRepo.DeleteAll() + dependencies.AppRepo.DeleteAll() + + for _, state := range test.metadata { + dependencies.NodeMetadataBroker.Publish("", state) + } + + msg := make(chan services.ClusterMetadataUpdated) + go dependencies.ClusterMetadataUpdatedBroker.Subscribe(context.Background(), msg, make(chan<- error)) + + updatedState := <-msg + + assert.ElementsMatch(t, test.result.Metadata, updatedState.Metadata.Metadata, "Invalid cluster metadata") + }, t, config.AppModule) + } +} diff --git a/go/services/logs_ingestion/pkg/config/modules.go b/go/services/logs_ingestion/pkg/config/modules.go index 3b33dbd3..01a5293b 100644 --- a/go/services/logs_ingestion/pkg/config/modules.go +++ b/go/services/logs_ingestion/pkg/config/modules.go @@ -2,6 +2,8 @@ package config import ( "fmt" + "os" + "github.com/Magpie-Monitor/magpie-monitor/pkg/elasticsearch" "github.com/Magpie-Monitor/magpie-monitor/pkg/repositories" "github.com/Magpie-Monitor/magpie-monitor/pkg/tests" @@ -9,7 +11,6 @@ import ( "go.uber.org/fx" "go.uber.org/fx/fxevent" "go.uber.org/zap" - "os" ) var AppModule fx.Option