Skip to content

Commit

Permalink
add analysis log
Browse files Browse the repository at this point in the history
  • Loading branch information
Damon Zhao committed Jul 12, 2017
1 parent ca71291 commit fa101b2
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 24 deletions.
1 change: 1 addition & 0 deletions conf/global.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ server:
threads: 4
errorlog: /var/log/gobeansdb/gobeansdb.log
accesslog: ""
analysislog: /var/log/gobeansdb/gobeansdb_analysis.log
hostname: 127.0.0.1 # 线上必须在local文件里改掉
staticdir: /var/lib/gobeansdb
mc:
Expand Down
3 changes: 2 additions & 1 deletion conf/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ server:
port: 7980
webport: 7988
errorlog: ./gobeansdb.log
accesslog: ""
accesslog: ./access.log
analysislog: ./gobeansdb_analysis.log
#zk: ["zk1:2181"]
hstore:
data:
Expand Down
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import (
)

// `Version` can be changed in gobeansproxy.
var Version = "2.1.0.16"
var Version = "2.1.0.17"

const AccessLogVersion = "V1"
const AnalysisLogVersion = "V1"

var (
ServerConf ServerConfig = DefaultServerConfig
Expand Down
21 changes: 11 additions & 10 deletions config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ var (
)

type ServerConfig struct {
Hostname string `yaml:",omitempty"`
ZKPath string `yaml:",omitempty"` // root path in zk
ZKServers []string `yaml:",omitempty"` // e.g. "zk1:2181,zk2:2181"
Listen string `yaml:",omitempty"` // ip
Port int `yaml:",omitempty"`
WebPort int `yaml:",omitempty"`
Threads int `yaml:",omitempty"` // NumCPU
ErrorLog string `yaml:",omitempty"`
AccessLog string `yaml:",omitempty"`
StaticDir string `yaml:",omitempty"` // directory for static files, e.g. *.html
Hostname string `yaml:",omitempty"`
ZKPath string `yaml:",omitempty"` // root path in zk
ZKServers []string `yaml:",omitempty"` // e.g. "zk1:2181,zk2:2181"
Listen string `yaml:",omitempty"` // ip
Port int `yaml:",omitempty"`
WebPort int `yaml:",omitempty"`
Threads int `yaml:",omitempty"` // NumCPU
ErrorLog string `yaml:",omitempty"`
AccessLog string `yaml:",omitempty"`
AnalysisLog string `yanl:",omitempty"`
StaticDir string `yaml:",omitempty"` // directory for static files, e.g. *.html

}

Expand Down
4 changes: 2 additions & 2 deletions gobeansdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func main() {
return
}

loghub.InitLogger(conf.ErrorLog, conf.AccessLog)
loghub.InitLogger(conf.ErrorLog, conf.AccessLog, conf.AnalysisLog)
logger.Infof("gobeansdb version %s starting at %d, config: %#v",
config.Version, conf.Port, conf)

Expand Down Expand Up @@ -83,7 +83,7 @@ func main() {
logger.Infof("mc server listen at %s", addr)
log.Println("ready")

server.HandleSignals(conf.ErrorLog, conf.AccessLog)
server.HandleSignals(conf.ErrorLog, conf.AccessLog, conf.AnalysisLog)
go storage.hstore.HintDumper(1 * time.Minute) // it may start merge go routine
go storage.hstore.Flusher()
config.AllowReload = true
Expand Down
52 changes: 52 additions & 0 deletions loghub/analysislog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package loghub

import (
"log"
"os"
"time"
)

var (
AnalysisLogFormat = "%s %15s:%4d - %s"
AnalysisLogFlag = log.LstdFlags | log.Lmicroseconds
AnalysisLogger *Logger
)

type AnalysisLogHub struct {
logger *log.Logger
logFd *os.File
BufferLog
}

func init() {
logger := openLogWithFd(os.Stderr, AnalysisLogFlag)
hub := &AnalysisLogHub{logger: logger}
hub.InitBuffer(200)
AnalysisLogger = NewLogger("", hub, DEBUG)
}

func InitAnalysisLog(path string, level int, bufferSize int) (err error) {
if analysisLog, analysisFd, err := openLog(path, AnalysisLogFlag); err == nil {
hub := &AnalysisLogHub{logger: analysisLog, logFd: analysisFd}
hub.InitBuffer(bufferSize)
AnalysisLogger.Hub = hub
AnalysisLogger.SetLevel(level)
} else {
log.Fatalf("open log error, path=[%s], err=[%s]", path, err.Error())
}
return
}

func (hub *AnalysisLogHub) Log(name string, level int, file string, line int, msg string) {
hub.logger.Printf(AnalysisLogFormat, levelString[level], file, line, msg)
bufline := &BufferLine{time.Now(), level, file, line, msg}
hub.Add(bufline)
hub.Last[level] = bufline
if level == FATAL {
os.Exit(1)
}
}

func (hub *AnalysisLogHub) Reopen(path string) (err error) {
return reopenLogger(&hub.logger, &hub.logFd, path, AnalysisLogFlag)
}
6 changes: 5 additions & 1 deletion loghub/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (l *Logger) Logf(level int, format string, v ...interface{}) {
l.Hub.Log(l.name, level, file, line, msg)
}

func InitLogger(errorlog string, accesslog string) {
func InitLogger(errorlog, accesslog, analysislog string) {
if errorlog != "" {
log.Printf("log to errorlog %s", errorlog)
InitErrorLog(errorlog, INFO, 200)
Expand All @@ -106,4 +106,8 @@ func InitLogger(errorlog string, accesslog string) {
log.Printf("open accesslog %s", accesslog)
InitAccessLog(accesslog, INFO)
}
if analysislog != "" {
log.Printf("log to analysislog %s", analysislog)
InitAnalysisLog(analysislog, INFO, 200)
}
}
24 changes: 16 additions & 8 deletions memcache/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ import (
"syscall"
"time"

"sync/atomic"

"github.intra.douban.com/coresys/gobeansdb/config"
"github.intra.douban.com/coresys/gobeansdb/loghub"
"github.intra.douban.com/coresys/gobeansdb/utils"
"sync/atomic"
)

var (
SlowCmdTime = time.Millisecond * 100 // 100ms
RL *ReqLimiter
logger = loghub.ErrorLogger
accessLogger = loghub.AccessLogger
SlowCmdTime = time.Millisecond * 100 // 100ms
RL *ReqLimiter
logger = loghub.ErrorLogger
accessLogger = loghub.AccessLogger
analysisLogger = loghub.AnalysisLogger
)

type ServerConn struct {
Expand Down Expand Up @@ -59,7 +61,7 @@ func (c *ServerConn) Shutdown() {
}

func overdue(recvtime, now time.Time) bool {
return now.Sub(recvtime) > time.Duration(config.MCConf.TimeoutMS) * time.Millisecond
return now.Sub(recvtime) > time.Duration(config.MCConf.TimeoutMS)*time.Millisecond
}

func (c *ServerConn) ServeOnce(storageClient StorageClient, stats *Stats) (err error) {
Expand Down Expand Up @@ -154,7 +156,7 @@ func (c *ServerConn) ServeOnce(storageClient StorageClient, stats *Stats) (err e
}

if !resp.Noreply {
if !readTimeout && overdue(req.ReceiveTime, time.Now()) {
if !readTimeout && overdue(req.ReceiveTime, time.Now()) {
req.SetStat("process_timeout")
resp = new(Response)
resp.Status = "PROCESS_TIMEOUT"
Expand Down Expand Up @@ -330,7 +332,7 @@ func (s *Server) Shutdown() {
//s.Unlock()
}

func (s *Server) HandleSignals(errorlog string, accesslog string) {
func (s *Server) HandleSignals(errorlog string, accesslog string, analysislog string) {
sch := make(chan os.Signal, 10)
signal.Notify(sch, syscall.SIGTERM, syscall.SIGKILL, syscall.SIGINT,
syscall.SIGHUP, syscall.SIGSTOP, syscall.SIGQUIT, syscall.SIGUSR1)
Expand All @@ -348,6 +350,12 @@ func (s *Server) HandleSignals(errorlog string, accesslog string) {
logger.Warnf("open %s failed: %s", accesslog, err.Error())
}
}

if analysisLogger.Hub != nil {
if err := analysisLogger.Hub.Reopen(analysislog); err != nil {
logger.Warnf("open %s failed: %s", analysislog, err.Error())
}
}
} else {
logger.Infof("signal recieved " + sig.String())
s.Shutdown()
Expand Down
6 changes: 6 additions & 0 deletions store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"time"

"github.intra.douban.com/coresys/gobeansdb/cmem"
"github.intra.douban.com/coresys/gobeansdb/config"
"github.intra.douban.com/coresys/gobeansdb/loghub"
"github.intra.douban.com/coresys/gobeansdb/utils"
)

Expand All @@ -30,6 +32,8 @@ const (
BUCKET_STAT_READY
)

var analysisLogger = loghub.AnalysisLogger

type BucketStat struct {
// pre open init
State int
Expand Down Expand Up @@ -437,6 +441,8 @@ func (bkt *Bucket) get(ki *KeyInfo, memOnly bool) (payload *Payload, pos Positio
} else if bytes.Compare(rec.Key, ki.Key) == 0 {
payload = rec.Payload
payload.Ver = meta.Ver
now := time.Now().Unix()
analysisLogger.Infof("%s %d %d %d %d %s", config.AnalysisLogVersion, now, rec.Payload.TS, bkt.ID, pos.Offset, ki.StringKey)
return
}
defer func() {
Expand Down
3 changes: 2 additions & 1 deletion store/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package store
import (
"bufio"
"fmt"
"io"
"os"
"sync"
"time"
Expand Down Expand Up @@ -187,7 +188,7 @@ func GetStreamWriter(path string, isappend bool) (*DataStreamWriter, error) {
}
if isappend {
offset = uint32(stat.Size())
offset, err := fd.Seek(0, os.SEEK_END)
offset, err := fd.Seek(0, io.SeekEnd)
if err != nil {
logger.Infof(err.Error())
return nil, err
Expand Down

0 comments on commit fa101b2

Please sign in to comment.