Skip to content

Commit

Permalink
Merge pull request #679 from douyu/fix/event
Browse files Browse the repository at this point in the history
refactor: add event log
  • Loading branch information
hjxp authored Jan 14, 2025
2 parents 5bfe804 + 2d1022e commit e0626b3
Showing 1 changed file with 23 additions and 27 deletions.
50 changes: 23 additions & 27 deletions internal/pkg/service/appevent/appevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package appevent
import (
"context"
"encoding/json"
"github.com/douyu/jupiter/pkg/store/gorm"
"github.com/douyu/jupiter/pkg/xlog"
"go.uber.org/zap"
"strings"
"time"

Expand All @@ -27,15 +30,14 @@ type appEvent struct {
topic string
}

func InitAppEvent(eventProducer *rocketmq.Producer, topic string) *appEvent {
func InitAppEvent(eventProducer *rocketmq.Producer, topic string) {
obj := &appEvent{
eventChan: make(chan db.AppEvent, 10000),
eventProducer: eventProducer,
topic: topic,
}
go obj.ConsumeEvent()
AppEvent = obj
return obj
}

func (a *appEvent) PutEvent(event db.AppEvent) {
Expand All @@ -57,33 +59,27 @@ func (a *appEvent) ConsumeEvent() {
}

func (a *appEvent) insert(event db.AppEvent) error {
tx := invoker.JunoMysql.Begin()
if err := tx.Create(&event).Error; err != nil {
tx.Rollback()
return err
}

if cfg.Cfg.JunoEvent.Rocketmq.Enable {
event.HandleOperationName()
event.HandleSourceName()
msg := &eventMessage{AppEvent: event, HostName: strings.Split(event.HostName, ",")}
eventMsg, _ := json.Marshal(&msg)
ctx, cancelFn := context.WithTimeout(context.Background(), time.Second)
_, err := a.eventProducer.SendSync(ctx, primitive.NewMessage(a.topic, eventMsg))
cancelFn()
if err != nil {
tx.Rollback()
err := invoker.JunoMysql.Transaction(func(tx *gorm.DB) error {
if err := tx.Create(&event).Error; err != nil {
xlog.Error("app event insert err", zap.Error(err))
return err
}
}

err := tx.Commit().Error
if err != nil {
return err
}

//invoker.AppStatic.WithLabelValues(event.App, event.Source, event.Operation).Inc()
return nil
if cfg.Cfg.JunoEvent.Rocketmq.Enable {
event.HandleOperationName()
event.HandleSourceName()
msg := &eventMessage{AppEvent: event, HostName: strings.Split(event.HostName, ",")}
eventMsg, _ := json.Marshal(&msg)
ctx, cancelFn := context.WithTimeout(context.Background(), time.Second)
_, err := a.eventProducer.SendSync(ctx, primitive.NewMessage(a.topic, eventMsg))
cancelFn()
if err != nil {
xlog.Error("app event eventProducer err", zap.Error(err))
return err
}
}
return nil
})
return err
}

func (a *appEvent) List(param view.ReqEventList) (res []db.AppEvent, page *view.Pagination, err error) {
Expand Down

0 comments on commit e0626b3

Please sign in to comment.