diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e74b5e8..224cce7a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,15 @@ All notable changes to this project will be documented in this file. ### Changed +## [v3.2.11] - 2020-05-8 +### Added +- @hhromic Add Syslog TCP framing documentation to README + +### Changed +- @hhromic syslog adapter refactor +- @michaelshobbs use type assertion instead of reflection to determine connection type +- @michaelshobbs use // + space for all human readable comments + ## [v3.2.10] - 2020-05-1 ### Added - @jszwedko Add optional TCP framing to syslog adapter @@ -234,7 +243,8 @@ All notable changes to this project will be documented in this file. - Base container is now Alpine - Moved to gliderlabs organization -[unreleased]: https://github.com/gliderlabs/logspout/compare/v3.2.10...HEAD +[unreleased]: https://github.com/gliderlabs/logspout/compare/v3.2.11...HEAD +[v3.2.11]: https://github.com/gliderlabs/logspout/compare/v3.2.10...v3.2.11 [v3.2.10]: https://github.com/gliderlabs/logspout/compare/v3.2.9...v3.2.10 [v3.2.9]: https://github.com/gliderlabs/logspout/compare/v3.2.8...v3.2.9 [v3.2.8]: https://github.com/gliderlabs/logspout/compare/v3.2.7...v3.2.8 diff --git a/README.md b/README.md index d4e08e5d..6238f0c7 100644 --- a/README.md +++ b/README.md @@ -194,6 +194,7 @@ If you use multiline logging with raw, it's recommended to json encode the Data * `SYSLOG_PRIORITY` - datum for priority field (default `{{.Priority}}`) * `SYSLOG_STRUCTURED_DATA` - datum for structured data field * `SYSLOG_TAG` - datum for tag field (default `{{.ContainerName}}+route.Options["append_tag"]`) +* `SYSLOG_TCP_FRAMING` - for TCP or TLS transports, whether to use `octet-counted` framing in emitted messages or `traditional` LF framing (default `traditional`) * `SYSLOG_TIMESTAMP` - datum for timestamp field (default `{{.Timestamp}}`) * `MULTILINE_ENABLE_DEFAULT` - enable multiline logging for all containers when using the multiline adapter (default `true`) * `MULTILINE_MATCH` - determines which lines the pattern should match, one of first|last|nonfirst|nonlast, for details see: [MULTILINE_MATCH](#multiline_match) (default `nonfirst`) @@ -250,6 +251,22 @@ Use examples: ``` +#### Syslog TCP Framing + +When using a TCP or TLS transport with the Syslog adapter, it is possible to add octet-counting to the emitted frames as described in [RFC6587 (Syslog over TCP) 3.4.1](https://tools.ietf.org/html/rfc6587#section-3.4.1) and [RFC5424 (Syslog over TLS)](https://tools.ietf.org/html/rfc5424). + +This prefixes each message with the length of the message to allow consumers to easily determine where the message ends (rather than traditional LF framing). This also enables multiline Syslog messages without escaping. + +To enable octet-counted framing for Syslog over TCP or TLS, use the `SYSLOG_TCP_FRAMING` environment variable: + + $ docker run --name="logspout" \ + -e SYSLOG_TCP_FRAMING=octet-counted \ + --volume=/var/run/docker.sock:/var/run/docker.sock \ + gliderlabs/logspout \ + syslog+tcp://logs.papertrailapp.com:55555 + +> NOTE: The default is to use traditional LF framing for backwards compatibility though octet-counted framing is preferred when it is known the downstream consumer can handle it. + #### Using Logspout in a swarm In a swarm, logspout is best deployed as a global service. When running logspout with 'docker run', you can change the value of the hostname field using the `SYSLOG_HOSTNAME` environment variable as explained above. However, this does not work in a compose file because the value for `SYSLOG_HOSTNAME` will be the same for all logspout "tasks", regardless of the docker host on which they run. To support this mode of deployment, the syslog adapter will look for the file `/etc/host_hostname` and, if the file exists and it is not empty, will configure the hostname field with the content of this file. You can then use a volume mount to map a file on the docker hosts with the file `/etc/host_hostname` in the container. The sample compose file below illustrates how this can be done diff --git a/VERSION b/VERSION index 9e90abd9..f53f86ae 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v3.2.10 +v3.2.11 diff --git a/adapters/raw/raw.go b/adapters/raw/raw.go index bb483a50..5ec15b90 100644 --- a/adapters/raw/raw.go +++ b/adapters/raw/raw.go @@ -7,7 +7,6 @@ import ( "log" "net" "os" - "reflect" "text/template" "github.com/gliderlabs/logspout/router" @@ -69,11 +68,10 @@ func (a *Adapter) Stream(logstream chan *router.Message) { log.Println("raw:", err) return } - //log.Println("debug:", buf.String()) _, err = a.conn.Write(buf.Bytes()) if err != nil { log.Println("raw:", err) - if reflect.TypeOf(a.conn).String() != "*net.UDPConn" { + if _, ok := a.conn.(*net.UDPConn); !ok { return } } diff --git a/adapters/syslog/syslog.go b/adapters/syslog/syslog.go index 1454022a..b5e4e080 100644 --- a/adapters/syslog/syslog.go +++ b/adapters/syslog/syslog.go @@ -21,38 +21,34 @@ import ( ) const ( + // Rfc5424Format is the modern syslog protocol format. https://tools.ietf.org/html/rfc5424 + Rfc5424Format Format = "rfc5424" + // Rfc3164Format is the legacy BSD syslog protocol format. https://tools.ietf.org/html/rfc3164 + Rfc3164Format Format = "rfc3164" + // TraditionalTCPFraming is the traditional LF framing of syslog messages on the wire TraditionalTCPFraming TCPFraming = "traditional" // OctetCountedTCPFraming prepends the size of each message before the message. https://tools.ietf.org/html/rfc6587#section-3.4.1 OctetCountedTCPFraming TCPFraming = "octet-counted" + defaultFormat = Rfc5424Format + defaultTCPFraming = TraditionalTCPFraming defaultRetryCount = 10 ) var ( - hostname string - retryCount uint - tcpFraming TCPFraming - econnResetErrStr string + hostname string ) +// Format represents the RFC spec to use for syslog messages +type Format string + // TCPFraming represents the type of framing to use for syslog messages type TCPFraming string func init() { hostname, _ = os.Hostname() - econnResetErrStr = fmt.Sprintf("write: %s", syscall.ECONNRESET.Error()) router.AdapterFactories.Register(NewSyslogAdapter, "syslog") - setRetryCount() -} - -func setRetryCount() { - if count, err := strconv.Atoi(cfg.GetEnvDefault("RETRY_COUNT", strconv.Itoa(defaultRetryCount))); err != nil { - retryCount = uint(defaultRetryCount) - } else { - retryCount = uint(count) - } - debug("setting retryCount to:", retryCount) } func debug(v ...interface{}) { @@ -61,6 +57,17 @@ func debug(v ...interface{}) { } } +func getFormat() (Format, error) { + switch s := cfg.GetEnvDefault("SYSLOG_FORMAT", string(defaultFormat)); s { + case string(Rfc5424Format): + return Rfc5424Format, nil + case string(Rfc3164Format): + return Rfc3164Format, nil + default: + return defaultFormat, fmt.Errorf("unknown SYSLOG_FORMAT value: %s", s) + } +} + func getHostname() string { content, err := ioutil.ReadFile("/etc/host_hostname") if err == nil && len(content) > 0 { @@ -71,6 +78,95 @@ func getHostname() string { return hostname } +func getFieldTemplates(route *router.Route) (*FieldTemplates, error) { + var err error + var s string + var tmpl FieldTemplates + + s = cfg.GetEnvDefault("SYSLOG_PRIORITY", "{{.Priority}}") + if tmpl.priority, err = template.New("priority").Parse(s); err != nil { + return nil, err + } + debug("setting priority to:", s) + + s = cfg.GetEnvDefault("SYSLOG_TIMESTAMP", "{{.Timestamp}}") + if tmpl.timestamp, err = template.New("timestamp").Parse(s); err != nil { + return nil, err + } + debug("setting timestamp to:", s) + + s = getHostname() + if tmpl.hostname, err = template.New("hostname").Parse(s); err != nil { + return nil, err + } + debug("setting hostname to:", s) + + s = cfg.GetEnvDefault("SYSLOG_TAG", "{{.ContainerName}}"+route.Options["append_tag"]) + if tmpl.tag, err = template.New("tag").Parse(s); err != nil { + return nil, err + } + debug("setting tag to:", s) + + s = cfg.GetEnvDefault("SYSLOG_PID", "{{.Container.State.Pid}}") + if tmpl.pid, err = template.New("pid").Parse(s); err != nil { + return nil, err + } + debug("setting pid to:", s) + + s = cfg.GetEnvDefault("SYSLOG_STRUCTURED_DATA", "") + if route.Options["structured_data"] != "" { + s = route.Options["structured_data"] + } + if s == "" { + s = "-" + } else { + s = fmt.Sprintf("[%s]", s) + } + if tmpl.structuredData, err = template.New("structuredData").Parse(s); err != nil { + return nil, err + } + debug("setting structuredData to:", s) + + s = cfg.GetEnvDefault("SYSLOG_DATA", "{{.Data}}") + if tmpl.data, err = template.New("data").Parse(s); err != nil { + return nil, err + } + debug("setting data to:", s) + + return &tmpl, nil +} + +func getTCPFraming() (TCPFraming, error) { + switch s := cfg.GetEnvDefault("SYSLOG_TCP_FRAMING", string(defaultTCPFraming)); s { + case string(TraditionalTCPFraming): + return TraditionalTCPFraming, nil + case string(OctetCountedTCPFraming): + return OctetCountedTCPFraming, nil + default: + return defaultTCPFraming, fmt.Errorf("unknown SYSLOG_TCP_FRAMING value: %s", s) + } +} + +func getRetryCount() uint { + retryCountStr := cfg.GetEnvDefault("RETRY_COUNT", "") + if retryCountStr != "" { + retryCount, _ := strconv.Atoi(retryCountStr) + return uint(retryCount) + } + return defaultRetryCount +} + +func isTCPConnection(conn net.Conn) bool { + switch conn.(type) { + case *net.TCPConn: + return true + case *tls.Conn: + return true + default: + return false + } +} + // NewSyslogAdapter returnas a configured syslog.Adapter func NewSyslogAdapter(route *router.Route) (router.LogAdapter, error) { transport, found := router.AdapterTransports.Lookup(route.AdapterTransport("udp")) @@ -82,111 +178,83 @@ func NewSyslogAdapter(route *router.Route) (router.LogAdapter, error) { return nil, err } - format := cfg.GetEnvDefault("SYSLOG_FORMAT", "rfc5424") - priority := cfg.GetEnvDefault("SYSLOG_PRIORITY", "{{.Priority}}") - pid := cfg.GetEnvDefault("SYSLOG_PID", "{{.Container.State.Pid}}") - hostname = getHostname() - - tag := cfg.GetEnvDefault("SYSLOG_TAG", "{{.ContainerName}}"+route.Options["append_tag"]) - structuredData := cfg.GetEnvDefault("SYSLOG_STRUCTURED_DATA", "") - if route.Options["structured_data"] != "" { - structuredData = route.Options["structured_data"] + format, err := getFormat() + if err != nil { + return nil, err } - data := cfg.GetEnvDefault("SYSLOG_DATA", "{{.Data}}") - timestamp := cfg.GetEnvDefault("SYSLOG_TIMESTAMP", "{{.Timestamp}}") + debug("setting format to:", format) - if structuredData == "" { - structuredData = "-" - } else { - structuredData = fmt.Sprintf("[%s]", structuredData) + tmpl, err := getFieldTemplates(route) + if err != nil { + return nil, err } - if isTCPConnecion(conn) { - if err = setTCPFraming(); err != nil { + connIsTCP := isTCPConnection(conn) + debug("setting connIsTCP to:", connIsTCP) + + var tcpFraming TCPFraming + if connIsTCP { + if tcpFraming, err = getTCPFraming(); err != nil { return nil, err } + debug("setting tcpFraming to:", tcpFraming) } - var tmplStr string - switch format { - case "rfc5424": - // notes from RFC: - // - there is no upper limit for the entire message and depends on the transport in use - // - the HOSTNAME field must not exceed 255 characters - // - the TAG field must not exceed 48 characters - // - the PROCID field must not exceed 128 characters - tmplStr = fmt.Sprintf("<%s>1 %s %.255s %.48s %.128s - %s %s\n", - priority, timestamp, hostname, tag, pid, structuredData, data) - case "rfc3164": - // notes from RFC: - // - the entire message must be <= 1024 bytes - // - the TAG field must not exceed 32 characters - tmplStr = fmt.Sprintf("<%s>%s %s %.32s[%s]: %s\n", - priority, timestamp, hostname, tag, pid, data) - default: - return nil, errors.New("unsupported syslog format: " + format) - } - tmpl, err := template.New("syslog").Parse(tmplStr) - if err != nil { - return nil, err - } + retryCount := getRetryCount() + debug("setting retryCount to:", retryCount) + return &Adapter{ - route: route, - conn: conn, - tmpl: tmpl, - transport: transport, + route: route, + conn: conn, + connIsTCP: connIsTCP, + format: format, + tmpl: tmpl, + transport: transport, + tcpFraming: tcpFraming, + retryCount: retryCount, }, nil } -func setTCPFraming() error { - switch s := cfg.GetEnvDefault("SYSLOG_TCP_FRAMING", "traditional"); s { - case "traditional": - tcpFraming = TraditionalTCPFraming - return nil - case "octet-counted": - tcpFraming = OctetCountedTCPFraming - return nil - default: - return fmt.Errorf("unknown SYSLOG_TCP_FRAMING value: %s", s) - } +// FieldTemplates for rendering Syslog messages +type FieldTemplates struct { + priority *template.Template + timestamp *template.Template + hostname *template.Template + tag *template.Template + pid *template.Template + structuredData *template.Template + data *template.Template } // Adapter streams log output to a connection in the Syslog format type Adapter struct { - conn net.Conn - route *router.Route - tmpl *template.Template - transport router.AdapterTransport + conn net.Conn + connIsTCP bool + route *router.Route + format Format + tmpl *FieldTemplates + transport router.AdapterTransport + tcpFraming TCPFraming + retryCount uint } // Stream sends log data to a connection func (a *Adapter) Stream(logstream chan *router.Message) { for message := range logstream { m := &Message{message} - buf, err := m.Render(a.tmpl) + buf, err := m.Render(a.format, a.tmpl) if err != nil { log.Println("syslog:", err) return } - if isTCPConnecion(a.conn) { - switch tcpFraming { - case OctetCountedTCPFraming: - buf = append([]byte(fmt.Sprintf("%d ", len(buf))), buf...) - case TraditionalTCPFraming: - // leave as-is - default: - // should never get here, validated above - panic("unknown framing format: " + tcpFraming) - } + if a.connIsTCP && a.tcpFraming == OctetCountedTCPFraming { + buf = append([]byte(fmt.Sprintf("%d ", len(buf))), buf...) } if _, err = a.conn.Write(buf); err != nil { log.Println("syslog:", err) - switch a.conn.(type) { - case *net.UDPConn: - continue - default: + if a.connIsTCP { if err = a.retry(buf, err); err != nil { log.Panicf("syslog retry err: %+v", err) return @@ -198,7 +266,7 @@ func (a *Adapter) Stream(logstream chan *router.Message) { func (a *Adapter) retry(buf []byte, err error) error { if opError, ok := err.(*net.OpError); ok { - if (opError.Temporary() && opError.Err.Error() != econnResetErrStr) || opError.Timeout() { + if (opError.Temporary() && !errors.Is(opError, syscall.ECONNRESET)) || opError.Timeout() { retryErr := a.retryTemporary(buf) if retryErr == nil { return nil @@ -217,7 +285,7 @@ func (a *Adapter) retry(buf []byte, err error) error { } func (a *Adapter) retryTemporary(buf []byte) error { - log.Printf("syslog: retrying tcp up to %v times\n", retryCount) + log.Printf("syslog: retrying tcp up to %v times\n", a.retryCount) err := retryExp(func() error { _, err := a.conn.Write(buf) if err == nil { @@ -226,7 +294,7 @@ func (a *Adapter) retryTemporary(buf []byte) error { } return err - }, retryCount) + }, a.retryCount) if err != nil { log.Println("syslog: retry failed") @@ -237,7 +305,7 @@ func (a *Adapter) retryTemporary(buf []byte) error { } func (a *Adapter) reconnect() error { - log.Printf("syslog: reconnecting up to %v times\n", retryCount) + log.Printf("syslog: reconnecting up to %v times\n", a.retryCount) err := retryExp(func() error { conn, err := a.transport.Dial(a.route.Address, a.route.Options) if err != nil { @@ -245,7 +313,7 @@ func (a *Adapter) reconnect() error { } a.conn = conn return nil - }, retryCount) + }, a.retryCount) if err != nil { return err @@ -254,7 +322,7 @@ func (a *Adapter) reconnect() error { } func retryExp(fun func() error, tries uint) error { - try := uint(0) + var try uint for { err := fun() if err == nil { @@ -270,29 +338,68 @@ func retryExp(fun func() error, tries uint) error { } } -func isTCPConnecion(conn net.Conn) bool { - switch conn.(type) { - case *net.TCPConn: - return true - case *tls.Conn: - return true - default: - return false - } -} - // Message extends router.Message for the syslog standard type Message struct { *router.Message } // Render transforms the log message using the Syslog template -func (m *Message) Render(tmpl *template.Template) ([]byte, error) { - buf := new(bytes.Buffer) - err := tmpl.Execute(buf, m) - if err != nil { +func (m *Message) Render(format Format, tmpl *FieldTemplates) ([]byte, error) { + priority := new(bytes.Buffer) + if err := tmpl.priority.Execute(priority, m); err != nil { + return nil, err + } + + timestamp := new(bytes.Buffer) + if err := tmpl.timestamp.Execute(timestamp, m); err != nil { + return nil, err + } + + hostname := new(bytes.Buffer) + if err := tmpl.hostname.Execute(hostname, m); err != nil { + return nil, err + } + + tag := new(bytes.Buffer) + if err := tmpl.tag.Execute(tag, m); err != nil { + return nil, err + } + + pid := new(bytes.Buffer) + if err := tmpl.pid.Execute(pid, m); err != nil { + return nil, err + } + + structuredData := new(bytes.Buffer) + if err := tmpl.structuredData.Execute(structuredData, m); err != nil { return nil, err } + + data := new(bytes.Buffer) + if err := tmpl.data.Execute(data, m); err != nil { + return nil, err + } + + buf := new(bytes.Buffer) + switch format { + case Rfc5424Format: + // notes from RFC: + // - there is no upper limit for the entire message and depends on the transport in use + // - the HOSTNAME field must not exceed 255 characters + // - the TAG field must not exceed 48 characters + // - the PROCID field must not exceed 128 characters + fmt.Fprintf(buf, "<%s>1 %s %.255s %.48s %.128s - %s %s\n", + priority, timestamp, hostname, tag, pid, structuredData, data, + ) + case Rfc3164Format: + // notes from RFC: + // - the entire message must be <= 1024 bytes + // - the TAG field must not exceed 32 characters + fmt.Fprintf(buf, "<%s>%s %s %.32s[%s]: %s\n", + priority, timestamp, hostname, tag, pid, data, + ) + } + return buf.Bytes(), nil } diff --git a/adapters/syslog/syslog_test.go b/adapters/syslog/syslog_test.go index c032a27f..d3a06504 100644 --- a/adapters/syslog/syslog_test.go +++ b/adapters/syslog/syslog_test.go @@ -12,7 +12,6 @@ import ( "strings" "sync" "testing" - "text/template" "time" _ "github.com/gliderlabs/logspout/transports/tcp" @@ -36,7 +35,7 @@ var ( Hostname: "8dfafdbc3a40", }, } - hostHostnameFilename = "/etc/host_hostname" + hostHostnameFilename = "/tmp/host_hostname" hostnameContent = "hostname" badHostnameContent = "hostname\r\n" ) @@ -91,16 +90,74 @@ func TestSyslogOctetFraming(t *testing.T) { } } +func TestSysLogFormat(t *testing.T) { + defer os.Unsetenv("SYSLOG_FORMAT") + + newFormat := Rfc3164Format + os.Setenv("SYSLOG_FORMAT", string(newFormat)) + format, err := getFormat() + if err != nil { + t.Fatal("unexpected error: ", err) + } + if format != newFormat { + t.Errorf("expected %v got %v", newFormat, format) + } + + os.Unsetenv("SYSLOG_FORMAT") + format, err = getFormat() + if err != nil { + t.Fatal("unexpected error: ", err) + } + if format != defaultFormat { + t.Errorf("expected %v got %v", defaultFormat, format) + } + + os.Setenv("SYSLOG_FORMAT", "invalid-option") + _, err = getFormat() + if err == nil { + t.Fatal("expected error, got nil") + } +} + +func TestSysLogTCPFraming(t *testing.T) { + defer os.Unsetenv("SYSLOG_TCP_FRAMING") + + newTCPFraming := OctetCountedTCPFraming + os.Setenv("SYSLOG_TCP_FRAMING", string(newTCPFraming)) + tcpFraming, err := getTCPFraming() + if err != nil { + t.Fatal("unexpected error: ", err) + } + if tcpFraming != newTCPFraming { + t.Errorf("expected %v got %v", newTCPFraming, tcpFraming) + } + + os.Unsetenv("SYSLOG_TCP_FRAMING") + tcpFraming, err = getTCPFraming() + if err != nil { + t.Fatal("unexpected error: ", err) + } + if tcpFraming != defaultTCPFraming { + t.Errorf("expected %v got %v", defaultTCPFraming, tcpFraming) + } + + os.Setenv("SYSLOG_TCP_FRAMING", "invalid-option") + _, err = getTCPFraming() + if err == nil { + t.Fatal("expected error, got nil") + } +} + func TestSyslogRetryCount(t *testing.T) { newRetryCount := uint(20) os.Setenv("RETRY_COUNT", strconv.Itoa(int(newRetryCount))) - setRetryCount() + retryCount := getRetryCount() if retryCount != newRetryCount { t.Errorf("expected %v got %v", newRetryCount, retryCount) } os.Unsetenv("RETRY_COUNT") - setRetryCount() + retryCount = getRetryCount() if retryCount != defaultRetryCount { t.Errorf("expected %v got %v", defaultRetryCount, retryCount) } @@ -135,7 +192,7 @@ func TestSyslogReconnectOnClose(t *testing.T) { <-messages msgnum++ } - check(t, adapter.(*Adapter).tmpl, <-messages, msg) + check(t, <-messages, msg) msgnum++ case <-timeout: adapter.(*Adapter).conn.Close() @@ -220,13 +277,13 @@ func sendLogstream(stream chan *router.Message, messages chan string, adapter ro }, } stream <- msg.Message - b, _ := msg.Render(adapter.(*Adapter).tmpl) + b, _ := msg.Render(adapter.(*Adapter).format, adapter.(*Adapter).tmpl) messages <- string(b) time.Sleep(10 * time.Millisecond) } } -func check(t *testing.T, tmpl *template.Template, in string, out string) { +func check(t *testing.T, in string, out string) { if in != out { t.Errorf("expected: %s\ngot: %s\n", in, out) } diff --git a/router/routes.go b/router/routes.go index 8812243b..156fed8f 100644 --- a/router/routes.go +++ b/router/routes.go @@ -140,7 +140,7 @@ func (rm *RouteManager) Add(route *Route) error { } route.closer = make(chan struct{}) route.adapter = adapter - //Stop any existing route with this ID: + // Stop any existing route with this ID: if rm.routes[route.ID] != nil { rm.routes[route.ID].closer <- struct{}{} } diff --git a/router/routes_test.go b/router/routes_test.go index e94a6c33..0b9140c8 100644 --- a/router/routes_test.go +++ b/router/routes_test.go @@ -32,10 +32,10 @@ func TestRouterGetAll(t *testing.T) { func TestRouterNoDuplicateIds(t *testing.T) { AdapterFactories.Register(newDummyAdapter, "syslog") - //Mock "running" so routes actually start running when added. + // Mock "running" so routes actually start running when added. Routes.routing = true - //Start the first route. + // Start the first route. route1 := &Route{ ID: "abc", Address: "someUrl", @@ -45,7 +45,7 @@ func TestRouterNoDuplicateIds(t *testing.T) { t.Error("Error adding route:", err) } - //Start a second route with the same ID. + // Start a second route with the same ID. var route2 = &Route{ ID: "abc", Address: "someUrl2",