Skip to content

Commit

Permalink
go reports-service: Refactor report repository
Browse files Browse the repository at this point in the history
  • Loading branch information
XxRoloxX committed Oct 14, 2024
1 parent 969c023 commit 1f30553
Show file tree
Hide file tree
Showing 18 changed files with 508 additions and 595 deletions.
2 changes: 2 additions & 0 deletions go/pkg/jsonl/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strings"
)

// JSONL is a encoding format which is an equivalent of list of JSONs
// where each object is separated by a newline. This format is used in the OpenAI Batch API.
type JsonLinesDecoder struct {
reader io.Reader
}
Expand Down
1 change: 0 additions & 1 deletion go/pkg/jsonl/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ type decodingTestCase struct {
}

func TestDecode(t *testing.T) {

testsCases := []struct {
description string
rawString string
Expand Down
2 changes: 2 additions & 0 deletions go/pkg/jsonl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"reflect"
)

// JSONL is a encoding format which is an equivalent of list of JSONs
// where each object is separated by a newline. This format is used in the OpenAI Batch API.
type JsonLinesEncoder struct {
writer io.Writer
}
Expand Down
1 change: 0 additions & 1 deletion go/pkg/jsonl/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ type encodingTestCase struct {
}

func TestEncode(t *testing.T) {

testsCases := []struct {
description string
encoded string
Expand Down
102 changes: 102 additions & 0 deletions go/pkg/repositories/mongodb_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package repositories

import (
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)

type MongoDbCollection[T any] struct {
Log *zap.Logger
Db string
Col string
Client *mongo.Client
}

func (m *MongoDbCollection[T]) GetDocuments(filter primitive.D, sort primitive.D) ([]T, error) {
opts := options.Find().SetSort(sort)

col := m.Client.Database(m.Db).Collection(m.Col)

cursor, err := col.Find(context.TODO(), filter, opts)
if err != nil {
m.Log.Error("Error fetching documents:", zap.String("database", m.Db), zap.String("collection", m.Col), zap.Error(err))
return nil, err
}

results := []T{}
if err = cursor.All(context.TODO(), &results); err != nil {
m.Log.Error("Error parsing filtered documents:", zap.String("database", m.Db), zap.String("collection", m.Col), zap.Error(err))
return nil, err
}

return results, nil
}

func (m *MongoDbCollection[T]) GetDocument(filter primitive.D, sort primitive.D) (T, error) {
opts := options.FindOne().SetSort(sort)

col := m.Client.Database(m.Db).Collection(m.Col)

var result T
err := col.FindOne(context.TODO(), filter, opts).Decode(&result)

if err != nil {
m.Log.Error("Error parsing filtered document:", zap.String("database", m.Db), zap.String("collection", m.Col), zap.Error(err))
return result, err
}

return result, nil
}

func (m *MongoDbCollection[T]) GetDistinctDocumentFieldValues(fieldName string, filter bson.D) ([]interface{}, error) {
col := m.Client.Database(m.Db).Collection(m.Col)
return col.Distinct(context.TODO(), fieldName, filter)
}

func (m *MongoDbCollection[T]) InsertDocuments(docs []interface{}) ([]primitive.ObjectID, error) {
col := m.Client.Database(m.Db).Collection(m.Col)

ids, err := col.InsertMany(context.TODO(), docs)
if err != nil {
m.Log.Error("Error inserting documents:", zap.String("database", m.Db), zap.String("collection", m.Col), zap.Error(err))
return nil, err
}

createdIds := make([]primitive.ObjectID, 0, len(ids.InsertedIDs))
for _, id := range ids.InsertedIDs {
createdIds = append(createdIds, id.(primitive.ObjectID))
}

return createdIds, nil
}

func (m *MongoDbCollection[T]) InsertDocument(doc interface{}) (*primitive.ObjectID, error) {
col := m.Client.Database(m.Db).Collection(m.Col)

id, err := col.InsertOne(context.TODO(), doc)
if err != nil {
m.Log.Error("Error inserting document:", zap.String("database", m.Db), zap.String("collection", m.Col), zap.Error(err))
return nil, err
}
objectId := id.InsertedID.(primitive.ObjectID)

return &objectId, nil
}

func (m *MongoDbCollection[T]) ReplaceDocument(ctx context.Context, id primitive.ObjectID, document T) error {
coll := m.Client.Database(m.Db).Collection(m.Col)
opts := options.Update().SetUpsert(true)

_, err := coll.UpdateOne(ctx, bson.M{"_id": id}, bson.M{"$set": document}, opts)

if err != nil {
m.Log.Error("Error inserting documents:", zap.String("database", m.Db), zap.String("collection", m.Col), zap.Error(err))
return err
}

return nil
}
29 changes: 29 additions & 0 deletions go/pkg/routing/error_handling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// func write_error()
package routing

import (
"encoding/json"
"io"
)

type HttpErrorBody struct {
Err string `json:"error"`
}

func WriteHttpError(writer io.Writer, errorMessage string) error {
body := HttpErrorBody{
Err: errorMessage,
}

serializedError, err := json.Marshal(body)
if err != nil {
return err
}

_, err = writer.Write(serializedError)
if err != nil {
return err
}

return nil
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
package repositories

import (
"context"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"github.com/Magpie-Monitor/magpie-monitor/pkg/repositories"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)

func NewClusterMetadataCollection(log *zap.Logger, client *mongo.Client) *MongoDbCollection[ClusterState] {
return &MongoDbCollection[ClusterState]{log: log, db: "METADATA", col: "CLUSTER_STATE", client: client}
func NewClusterMetadataCollection(log *zap.Logger, client *mongo.Client) *repositories.MongoDbCollection[ClusterState] {
return &repositories.MongoDbCollection[ClusterState]{Log: log, Db: "METADATA", Col: "CLUSTER_STATE", Client: client}
}

func NewNodeMetadataCollection(log *zap.Logger, client *mongo.Client) *MongoDbCollection[NodeState] {
return &MongoDbCollection[NodeState]{log: log, db: "METADATA", col: "NODE_STATE", client: client}
func NewNodeMetadataCollection(log *zap.Logger, client *mongo.Client) *repositories.MongoDbCollection[NodeState] {
return &repositories.MongoDbCollection[NodeState]{Log: log, Db: "METADATA", Col: "NODE_STATE", Client: client}
}

type ClusterState struct {
Expand All @@ -35,63 +31,3 @@ type NodeState struct {
CollectedAtMs int64 `json:"collectedAtMs" bson:"collectedAtMs"`
WatchedFiles []string `json:"watchedFiles" bson:"watchedFiles"`
}

type MongoDbCollection[T any] struct {
log *zap.Logger
db string
col string
client *mongo.Client
}

func (m *MongoDbCollection[T]) GetDocuments(filter primitive.D, sort primitive.D) ([]T, error) {
opts := options.Find().SetSort(sort)

col := m.client.Database(m.db).Collection(m.col)

cursor, err := col.Find(context.TODO(), filter, opts)
if err != nil {
m.log.Error("Error fetching documents:", zap.String("database", m.db), zap.String("collection", m.col), zap.Error(err))
return nil, err
}

var results []T
if err = cursor.All(context.TODO(), &results); err != nil {
m.log.Error("Error parsing filtered documents:", zap.String("database", m.db), zap.String("collection", m.col), zap.Error(err))
return nil, err
}

return results, nil
}

func (m *MongoDbCollection[T]) GetDocument(filter primitive.D, sort primitive.D) (T, error) {
opts := options.FindOne().SetSort(sort)

col := m.client.Database(m.db).Collection(m.col)

var result T
err := col.FindOne(context.TODO(), filter, opts).Decode(&result)

if err != nil {
m.log.Error("Error parsing filtered document:", zap.String("database", m.db), zap.String("collection", m.col), zap.Error(err))
return result, err
}

return result, nil
}

func (m *MongoDbCollection[T]) GetDistinctDocumentFieldValues(fieldName string, filter bson.D) ([]interface{}, error) {
col := m.client.Database(m.db).Collection(m.col)
return col.Distinct(context.TODO(), fieldName, filter)
}

func (m *MongoDbCollection[T]) InsertDocuments(docs []interface{}) error {
col := m.client.Database(m.db).Collection(m.col)

_, err := col.InsertMany(context.TODO(), docs)
if err != nil {
m.log.Error("Error inserting documents:", zap.String("database", m.db), zap.String("collection", m.col), zap.Error(err))
return err
}

return nil
}
13 changes: 8 additions & 5 deletions go/services/cluster_metadata/pkg/services/metadata_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"slices"
"time"

sharedrepo "github.com/Magpie-Monitor/magpie-monitor/pkg/repositories"
"github.com/Magpie-Monitor/magpie-monitor/services/cluster_metadata/pkg/repositories"
"go.mongodb.org/mongo-driver/bson"
"go.uber.org/zap"
)

func NewMetadataService(log *zap.Logger, clusterRepo *repositories.MongoDbCollection[repositories.ClusterState], nodeRepo *repositories.MongoDbCollection[repositories.NodeState]) *MetadataService {
func NewMetadataService(log *zap.Logger, clusterRepo *sharedrepo.MongoDbCollection[repositories.ClusterState], nodeRepo *sharedrepo.MongoDbCollection[repositories.NodeState]) *MetadataService {
return &MetadataService{log: log, clusterRepo: clusterRepo, nodeRepo: nodeRepo}
}

Expand All @@ -29,8 +30,8 @@ type NodeMetadata struct {

type MetadataService struct {
log *zap.Logger
clusterRepo *repositories.MongoDbCollection[repositories.ClusterState]
nodeRepo *repositories.MongoDbCollection[repositories.NodeState]
clusterRepo *sharedrepo.MongoDbCollection[repositories.ClusterState]
nodeRepo *sharedrepo.MongoDbCollection[repositories.NodeState]
}

func (m *MetadataService) GetClusterMetadataForTimerange(clusterName string, sinceMillis int, toMillis int) ([]ApplicationMetadata, error) {
Expand Down Expand Up @@ -122,9 +123,11 @@ func (m *MetadataService) GetNodeMetadataForTimerange(clusterName string, sinceM
}

func (m *MetadataService) InsertClusterMetadata(metadata repositories.ClusterState) error {
return m.clusterRepo.InsertDocuments([]interface{}{metadata})
_, err := m.clusterRepo.InsertDocuments([]interface{}{metadata})
return err
}

func (m *MetadataService) InsertNodeMetadata(metadata repositories.NodeState) error {
return m.nodeRepo.InsertDocuments([]interface{}{metadata})
_, err := m.nodeRepo.InsertDocuments([]interface{}{metadata})
return err
}
Loading

0 comments on commit 1f30553

Please sign in to comment.