From 73ec50d54add6cef73c6758b019c575e384eb9c4 Mon Sep 17 00:00:00 2001 From: liuhao1024 Date: Tue, 17 Sep 2019 14:36:03 +0800 Subject: [PATCH] add slowlog file rotate (#107) * - add slowlog file rotate based on size - add slowlog file total limit * - increasing node connection inputs size - update changelog - update version * - update slowlog config to command line flag --- cmd/proxy/main.go | 22 +++++--- proxy/CHANGELOG.md | 5 ++ proxy/proto/pipe.go | 2 +- proxy/slowlog/file.go | 118 ++++++++++++++++++++++++++++----------- proxy/slowlog/slowlog.go | 13 +++-- version/version.go | 6 +- 6 files changed, 115 insertions(+), 51 deletions(-) diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index feee48f0..1276fba6 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -18,14 +18,16 @@ import ( ) var ( - check bool - stat string - metrics bool - confFile string - clusterConfFile string - reload bool - slowlogFile string - slowlogSlowerThan int + check bool + stat string + metrics bool + confFile string + clusterConfFile string + reload bool + slowlogFile string + slowlogSlowerThan int + slowlogMaxBytes int + slowlogBackupCount int ) type clustersFlag []string @@ -54,6 +56,8 @@ func init() { flag.BoolVar(&reload, "reload", false, "reloading the servers in cluster config file.") flag.StringVar(&slowlogFile, "slowlog", "", "slowlog is the file where slowlog output") flag.IntVar(&slowlogSlowerThan, "slower-than", 0, "slower-than is the microseconds which slowlog must slower than.") + flag.IntVar(&slowlogMaxBytes, "slower-max-bytes", 500000000, "slower-max-bytes is maximum size of slow log file.") + flag.IntVar(&slowlogBackupCount, "slower-backup-count", 7, "slower-backup-count is maximum backup count of slow log file.") } func main() { @@ -71,7 +75,7 @@ func main() { defer log.Close() } // init slowlog if need - err := slowlog.Init(slowlogFile) + err := slowlog.Init(slowlogFile, slowlogMaxBytes, slowlogBackupCount) if err != nil { log.Errorf("fail to init slowlog due %s", err) } diff --git a/proxy/CHANGELOG.md b/proxy/CHANGELOG.md index a6fbe57b..07d8297f 100644 --- a/proxy/CHANGELOG.md +++ b/proxy/CHANGELOG.md @@ -1,5 +1,10 @@ # Overlord-proxy +## Version 1.8.5 +1. add slowlog file rotate based on size +2. add slowlog file total limit +3. increasing node connection inputs size + ## Version 1.8.4 1. fix time record MarkEndInput, which reduce memory consumption by 60% 2. try fetch cluster nodes when key moved diff --git a/proxy/proto/pipe.go b/proxy/proto/pipe.go index 1a2594e5..bd69ae32 100644 --- a/proxy/proto/pipe.go +++ b/proxy/proto/pipe.go @@ -45,7 +45,7 @@ func NewNodeConnPipe(conns int32, newNc func() NodeConn) (ncp *NodeConnPipe) { errCh: make(chan error, 1), } for i := int32(0); i < ncp.conns; i++ { - ncp.inputs[i] = make(chan *Message, pipeMaxCount*pipeMaxCount) + ncp.inputs[i] = make(chan *Message, pipeMaxCount*pipeMaxCount*16) ncp.mps[i] = newMsgPipe(ncp.inputs[i], newNc, ncp.errCh) } return diff --git a/proxy/slowlog/file.go b/proxy/slowlog/file.go index 4e7d3745..dc4e7ae8 100644 --- a/proxy/slowlog/file.go +++ b/proxy/slowlog/file.go @@ -2,10 +2,11 @@ package slowlog import ( "bufio" + "encoding/json" + "fmt" "os" "time" - "encoding/json" "overlord/pkg/log" "overlord/proxy/proto" ) @@ -19,6 +20,11 @@ type fileHandler struct { encoder *json.Encoder exchange chan *proto.SlowlogEntry flushInterval time.Duration + + fileName string + curBytes int + maxBytes int + backupCount int } func (f *fileHandler) save(cluster string, entry *proto.SlowlogEntry) { @@ -29,63 +35,111 @@ func (f *fileHandler) save(cluster string, entry *proto.SlowlogEntry) { } } -func (f *fileHandler) openFile(file string) error { - var ( - fd *os.File - err error - ) - if _, err = os.Stat(file); os.IsNotExist(err) { +func (f *fileHandler) openFile() error { + if _, err := os.Stat(f.fileName); os.IsNotExist(err) { // path/to/whatever does not exist - fd, err = os.Create(file) + f.fd, err = os.Create(f.fileName) if err != nil { return err } } else { - fd, err = os.OpenFile(file, os.O_APPEND|os.O_WRONLY, 0755) + f.fd, err = os.OpenFile(f.fileName, os.O_APPEND|os.O_WRONLY, 0755) if err != nil { return err } } + fdStat, err := f.fd.Stat() + if err != nil { + return err + } + f.curBytes = int(fdStat.Size()) - f.fd = fd - f.wr = bufio.NewWriter(f.fd) + f.wr = bufio.NewWriterSize(f.fd, 40960) f.encoder = json.NewEncoder(f.wr) - go func() { - defer f.fd.Close() - var ticker = time.NewTicker(f.flushInterval) + return nil +} + +func (f *fileHandler) rotate() { + fdStat, err := f.fd.Stat() + if err != nil { + return + } + + if f.maxBytes <= 0 { + return + } else if fdStat.Size() < int64(f.maxBytes) { + f.curBytes = int(fdStat.Size()) + return + } + + if f.backupCount > 0 { + _ = f.fd.Close() + + for i := f.backupCount - 1; i > 0; i-- { + sfn := fmt.Sprintf("%s.%d", f.fileName, i) + dfn := fmt.Sprintf("%s.%d", f.fileName, i+1) + _ = os.Rename(sfn, dfn) + } + + dfn := fmt.Sprintf("%s.1", f.fileName) + _ = os.Rename(f.fileName, dfn) + + err := f.openFile() + if err != nil { + return + } + } +} + +func (f *fileHandler) close() error { + if f.fd != nil { + return f.fd.Close() + } + return nil +} - for { - select { - case entry := <-f.exchange: - err := f.encoder.Encode(entry) +func (f *fileHandler) run() { + defer f.close() + ticker := time.NewTicker(f.flushInterval) + + for { + select { + case entry := <-f.exchange: + err := f.encoder.Encode(entry) + if err != nil { + log.Errorf("fail to write slowlog into file due %s", err) + return + } + case <-ticker.C: + f.rotate() // check slowlog file size and rotate slowlog file + if f.wr.Buffered() > 0 { + err := f.wr.Flush() if err != nil { - log.Errorf("fail to write slowlog into file due %s", err) + log.Errorf("fail to flush slowlog due %s", err) return } - case <-ticker.C: - if f.wr.Size() > 0 { - err := f.wr.Flush() - if err != nil { - log.Errorf("fail to flush slowlog due %s", err) - return - } - } } } - }() - - return nil + } } var fh *fileHandler // initFileHandler will init the file handler to the given file -func initFileHandler(file string) error { +func initFileHandler(fileName string, maxBytes int, backupCount int) error { fh = &fileHandler{ exchange: make(chan *proto.SlowlogEntry, 2048), flushInterval: time.Second * 5, + maxBytes: maxBytes, + backupCount: backupCount, + fileName: fileName, + } + err := fh.openFile() + if err != nil { + return err } - return fh.openFile(file) + go fh.run() + return err } diff --git a/proxy/slowlog/slowlog.go b/proxy/slowlog/slowlog.go index a9148180..0d6d916c 100644 --- a/proxy/slowlog/slowlog.go +++ b/proxy/slowlog/slowlog.go @@ -1,10 +1,11 @@ package slowlog import ( - "overlord/pkg/log" - "overlord/proxy/proto" "sync" "sync/atomic" + + "overlord/pkg/log" + "overlord/proxy/proto" ) const slowlogMaxCount = 1024 @@ -89,11 +90,11 @@ func Get(name string) Handler { } // Init slowlog with file and http -func Init(file string) error { +func Init(fileName string, maxBytes int, backupCount int) error { registerSlowlogHTTP() - if file == "" { + if fileName == "" { return nil } - log.Infof("setup slowlog for file [%s]", file) - return initFileHandler(file) + log.Infof("setup slowlog for file [%s]", fileName) + return initFileHandler(fileName, maxBytes, backupCount) } diff --git a/version/version.go b/version/version.go index 630712f3..da45b8e5 100644 --- a/version/version.go +++ b/version/version.go @@ -10,13 +10,13 @@ import ( const ( OverlordMajor = 1 OverlordMinor = 8 - OverlordPatch = 4 + OverlordPatch = 5 ) var ( showVersion bool - vstr string - vbytes []byte + vstr string + vbytes []byte ) func init() {