Skip to content

Commit

Permalink
wsuszko/metadata-service tests (#127)
Browse files Browse the repository at this point in the history
* go metadata-service: Introduce early draft of metadata tests.

* go metadata-service: Further test experiments.

* go, metadata-service: Introduce automated integration tests.

* docker tests: Add spacing/

* go, metadata: Refactor, remove comments.

* docker tests: Readd es-certs.

* docker: Remove test script.
  • Loading branch information
Woojciech authored Dec 9, 2024
1 parent ecda78b commit 31a9488
Show file tree
Hide file tree
Showing 13 changed files with 864 additions and 109 deletions.
42 changes: 42 additions & 0 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,48 @@ services:
- es-certs:/usr/local/share
- cache:/go/pkg/mod/

cluster-metadata-service:
user: "0"
container_name: magpie-monitor-test-cluster-metadata-service
restart: on-failure
image: magpiemonitor/cluster-metadata-service
build:
context: ./go
dockerfile: ./docker/cluster_metadata/Dockerfile
target: tests
ports:
- ${CLUSTER_METADATA_HTTP_PORT:-8092}:${CLUSTER_METADATA_HTTP_PORT:-8092}
environment:
APP_ENV: "test"
HTTP_PORT: ${CLUSTER_METADATA_HTTP_PORT}
METADATADB_USER: ${CLUSTER_METADATA_MONGODB_USER}
METADATADB_PASSWORD: ${CLUSTER_METADATA_MONGODB_PASSWORD}
METADATADB_HOST: ${CLUSTER_METADATA_MONGODB_HOST}
METADATADB_PORT: ${CLUSTER_METADATA_MONGODB_PORT}
SWAGGER_HOST: ${CLUSTER_METADATA_SERVICE_HOST}
VIRTUAL_HOST: ${CLUSTER_METADATA_PRODUCTION_HOST}
LETSENCRYPT_HOST: ${CLUSTER_METADATA_PRODUCTION_HOST}
VIRTUAL_PORT: ${CLUSTER_METADATA_HTTP_PORT}
KAFKA_BROKER_URL: ${KAFKA_BROKER_URL}
KAFKA_CLIENT_USERNAME: ${KAFKA_CLIENT_USERNAME}
KAFKA_CLIENT_PASSWORD: ${KAFKA_CLIENT_PASSWORD}
CLUSTER_METADATA_APPLICATION_TOPIC: ${CLUSTER_METADATA_APPLICATION_TOPIC}
CLUSTER_METADATA_NODE_TOPIC: ${CLUSTER_METADATA_NODE_TOPIC}
CLUSTER_METADATA_CLUSTER_TOPIC: ${CLUSTER_METADATA_CLUSTER_TOPIC}
NODE_ACTIVITY_WINDOW_MILLIS: ${CLUSTER_METADATA_NODE_ACTIVITY_WINDOW_MILLIS}
APPLICATION_ACTIVITY_WINDOW_MILLIS: ${CLUSTER_METADATA_APPLICATION_ACTIVITY_WINDOW_MILLIS}
CLUSTER_ACTIVITY_WINDOW_MILLIS: ${CLUSTER_METADATA_CLUSTER_ACTIVITY_WINDOW_MILLIS}
CLUSTER_AGGREGATED_STATE_CHANGE_POLL_INTERVAL_SECONDS: ${CLUSTER_METADATA_CLUSTER_AGGREGATED_STATE_CHANGE_POLL_INTERVAL_SECONDS}
NODE_AGGREGATED_STATE_CHANGE_POLL_INTERVAL_SECONDS: ${CLUSTER_METADATA_NODE_AGGREGATED_STATE_CHANGE_POLL_INTERVAL_SECONDS}
APPLICATION_AGGREGATED_STATE_CHANGE_POLL_INTERVAL_SECONDS: ${CLUSTER_METADATA_APPLICATION_AGGREGATED_STATE_POLL_INTERVAL_SECONDS}
KAFKA_MAX_MESSAGE_SIZE_BYTES: ${KAFKA_MAX_MESSAGE_SIZE_BYTES}
KAFKA_BROKER_GROUP_ID: ${KAFKA_BROKER_GROUP_ID}
POD_AGENT_APPLICATION_METADATA_TOPIC: ${POD_AGENT_APPLICATION_METADATA_TOPIC}
NODE_AGENT_NODE_METADATA_TOPIC: ${NODE_AGENT_NODE_METADATA_TOPIC}
INTEGRATION_TEST_WAIT_MODIFIER: ${INTEGRATION_TEST_WAIT_MODIFIER}
volumes:
- cache:/go/pkg/mod/

reports-service-test:
user: "0"
container_name: magpie-monitor-test-reports-service
Expand Down
7 changes: 7 additions & 0 deletions go/docker/cluster_metadata/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ COPY . .
RUN --mount=type=cache,target=/go/pkg/mod/ \
go build -o /bin/server ./services/cluster_metadata/cmd/cluster_metadata

FROM build AS tests

RUN echo "go env -w GOCACHE=/go/pkg/mod/ && go test ./services/cluster_metadata/... -v -cover" > test.sh
RUN chmod +x test.sh

ENTRYPOINT ["bash", "./test.sh"]

FROM debian:latest AS final

RUN apt-get update && apt-get install -y ca-certificates
Expand Down
2 changes: 1 addition & 1 deletion go/pkg/message-broker/json_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func (b *KafkaJsonMessageBroker[T]) Subscribe(ctx context.Context, messages chan
}

}

}

func (b *KafkaJsonMessageBroker[T]) CloseReader() error {
return b.broker.CloseReader()

Expand Down
1 change: 1 addition & 0 deletions go/pkg/message-broker/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (b *KafkaMessageBroker) Subscribe(ctx context.Context, messages chan<- []by
messages <- msg.Value
}
}

func (b *KafkaMessageBroker) CloseReader() error {
return b.reader.Close()
}
Expand Down
4 changes: 4 additions & 0 deletions go/pkg/repositories/mongodb_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,7 @@ func (m *MongoDbCollection[T]) ReplaceDocument(ctx context.Context, id primitive
func (m *MongoDbCollection[T]) Count(filter bson.D) (int64, error) {
return m.Client.Database(m.Db).Collection(m.Col).CountDocuments(context.TODO(), filter)
}

func (m *MongoDbCollection[T]) DeleteAll() (*mongo.DeleteResult, error) {
return m.Client.Database(m.Db).Collection(m.Col).DeleteMany(context.TODO(), bson.D{})
}
67 changes: 67 additions & 0 deletions go/pkg/tests/mock_kafka_broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package tests

import (
"context"
"time"

messagebroker "github.com/Magpie-Monitor/magpie-monitor/pkg/message-broker"
"github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/pkg/repositories"
"github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/pkg/services"
"go.uber.org/zap"
)

func NewMockApplicationMetadataBroker(logger *zap.Logger) messagebroker.MessageBroker[repositories.ApplicationState] {
return NewKafkaJsonMessageBroker[repositories.ApplicationState](logger)
}

func NewMockNodeMetadataBroker(logger *zap.Logger) messagebroker.MessageBroker[repositories.NodeState] {
return NewKafkaJsonMessageBroker[repositories.NodeState](logger)
}

func NewMockNodeMetadataUpdatedBroker(logger *zap.Logger) messagebroker.MessageBroker[services.NodeMetadataUpdated] {
return NewKafkaJsonMessageBroker[services.NodeMetadataUpdated](logger)
}

func NewMockApplicationMetadataUpdatedBroker(logger *zap.Logger) messagebroker.MessageBroker[services.ApplicationMetadataUpdated] {
return NewKafkaJsonMessageBroker[services.ApplicationMetadataUpdated](logger)
}

func NewMockClusterMetadataUpdatedBroker(logger *zap.Logger) messagebroker.MessageBroker[services.ClusterMetadataUpdated] {
return NewKafkaJsonMessageBroker[services.ClusterMetadataUpdated](logger)
}

type MockKafkaJsonMessageBroker[T any] struct {
messages []T
logger *zap.Logger
}

func NewKafkaJsonMessageBroker[T any](logger *zap.Logger) messagebroker.MessageBroker[T] {
return &MockKafkaJsonMessageBroker[T]{
logger: logger,
messages: make([]T, 0),
}
}

func (b *MockKafkaJsonMessageBroker[T]) Publish(key string, message T) error {
b.messages = append(b.messages, message)
b.logger.Info("Received message", zap.Any("messages", b.messages))
return nil
}

func (b *MockKafkaJsonMessageBroker[T]) Subscribe(ctx context.Context, messages chan<- T, errors chan<- error) {
for {
for _, msg := range b.messages {
messages <- msg
b.logger.Info("Published message", zap.Any("message", msg))
}
b.messages = make([]T, 0)

time.Sleep(5 * time.Second)
}
}

func (b *MockKafkaJsonMessageBroker[T]) CloseReader() error {
return nil
}

var _ messagebroker.MessageBroker[any] = &MockKafkaJsonMessageBroker[any]{}
1 change: 0 additions & 1 deletion go/pkg/tests/tests.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package tests

import (
// "context"
"context"
"testing"

Expand Down
82 changes: 3 additions & 79 deletions go/services/cluster_metadata/cmd/cluster_metadata/main.go
Original file line number Diff line number Diff line change
@@ -1,92 +1,16 @@
package main

import (
"context"
"fmt"
"net"
"net/http"
"os"

messagebroker "github.com/Magpie-Monitor/magpie-monitor/pkg/message-broker"
"github.com/Magpie-Monitor/magpie-monitor/pkg/mongodb"
"github.com/Magpie-Monitor/magpie-monitor/pkg/routing"
"github.com/Magpie-Monitor/magpie-monitor/pkg/swagger"
"github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/internal/database"
"github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/internal/handlers"
"github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/pkg/repositories"
"github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/pkg/config"
"github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/pkg/services"
"github.com/gorilla/mux"
"go.uber.org/fx"
"go.uber.org/fx/fxevent"
"go.uber.org/zap"
)

type ServerParams struct {
fx.In
Lc fx.Lifecycle
Logger *zap.Logger
MetadataRouter *handlers.MetadataRouter
RootRouter *mux.Router
SwaggerRouter *swagger.SwaggerRouter
}

func NewHTTPServer(ServerParams ServerParams) *http.Server {
port := os.Getenv("HTTP_PORT")

srv := &http.Server{Addr: fmt.Sprintf(":%s", port), Handler: ServerParams.RootRouter}
ServerParams.Lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
ln, err := net.Listen("tcp", srv.Addr)
if err != nil {
return err
}

ServerParams.Logger.Info("Starting HTTP server at", zap.String("addr", srv.Addr))
go srv.Serve(ln)
return nil
},
OnStop: func(ctx context.Context) error {
return srv.Shutdown(ctx)
},
})
return srv
}

func main() {
fx.New(
fx.WithLogger(func(log *zap.Logger) fxevent.Logger {
return &fxevent.ZapLogger{Logger: log}
}),
fx.Provide(
NewHTTPServer,

routing.NewRootRouter,

swagger.NewSwaggerRouter,
swagger.NewSwaggerHandler,
swagger.ProvideSwaggerConfig(),

handlers.NewMetadataRouter,
handlers.NewMetadataHandler,

mongodb.NewMongoDbClient,
database.NewMongoDbConnectionDetails,
messagebroker.NewKafkaCredentials,

services.NewMetadataService,
services.NewMetadataEventPublisher,

services.NewApplicationMetadataBroker,
services.NewNodeMetadataBroker,

repositories.NewApplicationMetadataCollection,
repositories.NewNodeMetadataCollection,
repositories.NewApplicationAggregatedMetadataCollection,
repositories.NewNodeAggregatedMetadataCollection,
repositories.NewClusterAggregatedStateCollection,

zap.NewProduction,
),
fx.Invoke(func(*http.Server) {}),
config.AppModule,
fx.Invoke(func(*http.Server, *services.MetadataService) {}),
).Run()
}
Loading

0 comments on commit 31a9488

Please sign in to comment.