Skip to content

Commit

Permalink
Revert "use pubsub instead of filenotify to follow json logs"
Browse files Browse the repository at this point in the history
This reverts commit b1594c5.

Signed-off-by: Brian Goff <[email protected]>
  • Loading branch information
cpuguy83 committed Feb 24, 2016
1 parent f780918 commit 91fdfdd
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 81 deletions.
25 changes: 10 additions & 15 deletions daemon/logger/jsonfilelog/jsonfilelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/loggerutils"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/docker/pkg/pubsub"
"github.com/docker/go-units"
)

Expand All @@ -23,13 +22,12 @@ const Name = "json-file"

// JSONFileLogger is Logger implementation for default Docker logging.
type JSONFileLogger struct {
buf *bytes.Buffer
writer *loggerutils.RotateFileWriter
mu sync.Mutex
ctx logger.Context
readers map[*logger.LogWatcher]struct{} // stores the active log followers
extra []byte // json-encoded extra attributes
writeNotifier *pubsub.Publisher
buf *bytes.Buffer
writer *loggerutils.RotateFileWriter
mu sync.Mutex
ctx logger.Context
readers map[*logger.LogWatcher]struct{} // stores the active log followers
extra []byte // json-encoded extra attributes
}

func init() {
Expand Down Expand Up @@ -79,11 +77,10 @@ func New(ctx logger.Context) (logger.Logger, error) {
}

return &JSONFileLogger{
buf: bytes.NewBuffer(nil),
writer: writer,
readers: make(map[*logger.LogWatcher]struct{}),
extra: extra,
writeNotifier: pubsub.NewPublisher(0, 10),
buf: bytes.NewBuffer(nil),
writer: writer,
readers: make(map[*logger.LogWatcher]struct{}),
extra: extra,
}, nil
}

Expand All @@ -107,7 +104,6 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error {

l.buf.WriteByte('\n')
_, err = l.writer.Write(l.buf.Bytes())
l.writeNotifier.Publish(struct{}{})
l.buf.Reset()

return err
Expand Down Expand Up @@ -141,7 +137,6 @@ func (l *JSONFileLogger) Close() error {
r.Close()
delete(l.readers, r)
}
l.writeNotifier.Close()
l.mu.Unlock()
return err
}
Expand Down
142 changes: 80 additions & 62 deletions daemon/logger/jsonfilelog/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ import (

"github.com/Sirupsen/logrus"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/pkg/filenotify"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/jsonlog"
"github.com/docker/docker/pkg/tailfile"
)

const maxJSONDecodeRetry = 20000

func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
l.Reset()
if err := dec.Decode(l); err != nil {
Expand All @@ -32,6 +35,7 @@ func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, erro
// created by this driver.
func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
logWatcher := logger.NewLogWatcher()

go l.readLogs(logWatcher, config)
return logWatcher
}
Expand Down Expand Up @@ -81,7 +85,7 @@ func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.R
l.mu.Unlock()

notifyRotate := l.writer.NotifyRotate()
l.followLogs(latestFile, logWatcher, notifyRotate, config.Since)
followLogs(latestFile, logWatcher, notifyRotate, config.Since)

l.mu.Lock()
delete(l.readers, logWatcher)
Expand Down Expand Up @@ -117,81 +121,95 @@ func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since ti
}
}

func (l *JSONFileLogger) followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
var (
rotated bool
func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
dec := json.NewDecoder(f)
l := &jsonlog.JSONLog{}

dec = json.NewDecoder(f)
log = &jsonlog.JSONLog{}
writeNotify = l.writeNotifier.Subscribe()
watchClose = logWatcher.WatchClose()
)
fileWatcher, err := filenotify.New()
if err != nil {
logWatcher.Err <- err
}
defer fileWatcher.Close()

reopenLogFile := func() error {
f.Close()
f, err := os.Open(f.Name())
var retries int
for {
msg, err := decodeLogLine(dec, l)
if err != nil {
return err
}
dec = json.NewDecoder(f)
rotated = true
return nil
}
if err != io.EOF {
// try again because this shouldn't happen
if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
dec = json.NewDecoder(f)
retries++
continue
}

// io.ErrUnexpectedEOF is returned from json.Decoder when there is
// remaining data in the parser's buffer while an io.EOF occurs.
// If the json logger writes a partial json log entry to the disk
// while at the same time the decoder tries to decode it, the race condition happens.
if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
reader := io.MultiReader(dec.Buffered(), f)
dec = json.NewDecoder(reader)
retries++
continue
}
logWatcher.Err <- err
return
}

readToEnd := func() error {
for {
msg, err := decodeLogLine(dec, log)
if err != nil {
return err
logrus.WithField("logger", "json-file").Debugf("waiting for events")
if err := fileWatcher.Add(f.Name()); err != nil {
logrus.WithField("logger", "json-file").Warn("falling back to file poller")
fileWatcher.Close()
fileWatcher = filenotify.NewPollingWatcher()
if err := fileWatcher.Add(f.Name()); err != nil {
logrus.Errorf("error watching log file for modifications: %v", err)
logWatcher.Err <- err
}
}
if !since.IsZero() && msg.Timestamp.Before(since) {
select {
case <-fileWatcher.Events():
dec = json.NewDecoder(f)
fileWatcher.Remove(f.Name())
continue
case <-fileWatcher.Errors():
fileWatcher.Remove(f.Name())
logWatcher.Err <- err
return
case <-logWatcher.WatchClose():
fileWatcher.Remove(f.Name())
return
case <-notifyRotate:
f, err = os.Open(f.Name())
if err != nil {
logWatcher.Err <- err
return
}

dec = json.NewDecoder(f)
fileWatcher.Remove(f.Name())
fileWatcher.Add(f.Name())
continue
}
logWatcher.Msg <- msg
}
}

defer func() {
l.writeNotifier.Evict(writeNotify)
if rotated {
f.Close()
retries = 0 // reset retries since we've succeeded
if !since.IsZero() && msg.Timestamp.Before(since) {
continue
}
}()

for {
select {
case <-watchClose:
readToEnd()
return
case <-notifyRotate:
readToEnd()
if err := reopenLogFile(); err != nil {
logWatcher.Err <- err
return
}
case _, ok := <-writeNotify:
if err := readToEnd(); err == io.EOF {
if !ok {
// The writer is closed, no new logs will be generated.
case logWatcher.Msg <- msg:
case <-logWatcher.WatchClose():
logWatcher.Msg <- msg
for {
msg, err := decodeLogLine(dec, l)
if err != nil {
return
}

select {
case <-notifyRotate:
if err := reopenLogFile(); err != nil {
logWatcher.Err <- err
return
}
default:
dec = json.NewDecoder(f)
if !since.IsZero() && msg.Timestamp.Before(since) {
continue
}

} else if err == io.ErrUnexpectedEOF {
dec = json.NewDecoder(io.MultiReader(dec.Buffered(), f))
} else {
logrus.Errorf("Failed to decode json log %s: %v", f.Name(), err)
logWatcher.Err <- err
return
logWatcher.Msg <- msg
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/pubsub/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,8 @@ func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
// Evict removes the specified subscriber from receiving any more messages.
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()
if _, ok := p.subscribers[sub]; ok {
delete(p.subscribers, sub)
close(sub)
}
delete(p.subscribers, sub)
close(sub)
p.m.Unlock()
}

Expand Down

0 comments on commit 91fdfdd

Please sign in to comment.