Skip to content

Commit

Permalink
docker-compose.yml: Add proxy config for kafka-ui
Browse files Browse the repository at this point in the history
  • Loading branch information
XxRoloxX committed Oct 29, 2024
1 parent 22bd71b commit 7c8b78f
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 12 deletions.
1 change: 1 addition & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ MESSAGE_BROKER_USERNAME=username
MESSAGE_BROKER_PASSWORD=password
MESSAGE_BROKER_ADDRESS=kafka:9094
KAFKA_UI_PORT=8011
KAFKA_UI_PRODUCTION_HOST=kafka-ui.test.com

REPORT_GENERATED_BROKER_TOPIC=report_generated
REPORT_REQUEST_FAILED_BROKER_TOPIC=report_request_failed
Expand Down
1 change: 0 additions & 1 deletion docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ services:
volumes:
redisdata:
redisinsight:
kafkaui_data:

networks:
default:
Expand Down
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ services:
- ${KAFKA_UI_PORT:-8011}:8080
environment:
DYNAMIC_CONFIG_ENABLED: "true"
VIRTUAL_HOST: ${KAFKA_UI_PRODUCTION_HOST}
LETSENCRYPT_HOST: ${KAFKA_UI_PRODUCTION_HOST}
VIRTUAL_PORT: 8080
volumes:
- kafkaui-data:/etc/kafkaui

Expand Down
2 changes: 1 addition & 1 deletion go/pkg/message-broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package messagebroker

type MessageBroker[T any] interface {
Publish(key string, message T) error
Subscribe(ch chan<- T, errCh chan<- error)
Subscribe(messages chan<- T, errors chan<- error)
}
10 changes: 5 additions & 5 deletions go/pkg/message-broker/json_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,24 @@ func (b *KafkaJsonMessageBroker[T]) Publish(key string, message T) error {
return b.broker.Publish(context.Background(), []byte(key), encodedMessage)
}

func (b *KafkaJsonMessageBroker[T]) Subscribe(ch chan<- T, errChn chan<- error) {
func (b *KafkaJsonMessageBroker[T]) Subscribe(messages chan<- T, errors chan<- error) {

msgChannel := make(chan []byte)

go b.broker.Subscribe(context.Background(), msgChannel, errChn)
go b.broker.Subscribe(context.Background(), msgChannel, errors)

for {
message := <-msgChannel
var decodedMessage T

err := json.Unmarshal(message, &decodedMessage)
if err != nil {
b.logger.Error("Failed to decode a broker json message", zap.Error(err),
b.logger.Error("Failed to decode broker json message", zap.Error(err),
zap.Any("messsage", message))
errChn <- err
errors <- err
}

ch <- decodedMessage
messages <- decodedMessage
}

}
Expand Down
6 changes: 3 additions & 3 deletions go/pkg/message-broker/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,16 @@ func (b *KafkaMessageBroker) Publish(ctx context.Context, key []byte, value []by
return nil
}

func (b *KafkaMessageBroker) Subscribe(ctx context.Context, channel chan<- []byte, errChannel chan<- error) {
func (b *KafkaMessageBroker) Subscribe(ctx context.Context, messages chan<- []byte, errors chan<- error) {
for {
msg, err := b.reader.ReadMessage(ctx)

if err != nil {
b.logger.Error("Failed to read message", zap.Error(err))
errChannel <- err
errors <- err
}

channel <- msg.Value
messages <- msg.Value
}
}

Expand Down
3 changes: 2 additions & 1 deletion go/services/reports/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,11 @@ components:
type: string
customPrompt:
type: string

reportsPostParams:
type: object
properties:
correlationId:
type: string
clusterId:
type: string
sinceMs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (

type ReportRequestFailed struct {
CorrelationId string `json:"correlationId"`
ErrorType ReportRequestError `json:"reportRequest"`
ErrorType ReportRequestError `json:"errorType"`
ErrorMessage string `json:"errorMessage"`
TimestampMs int64 `json:"timestampMs"`
}
Expand Down

0 comments on commit 7c8b78f

Please sign in to comment.