Skip to content

Commit

Permalink
add the libhoney log parser
Browse files Browse the repository at this point in the history
  • Loading branch information
mterhar committed Dec 13, 2024
1 parent 783c00a commit 2b21d03
Show file tree
Hide file tree
Showing 7 changed files with 735 additions and 141 deletions.
126 changes: 126 additions & 0 deletions receiver/libhoneyreceiver/encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package libhoneyreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver"

import (
"bytes"

"github.com/gogo/protobuf/jsonpb"
spb "google.golang.org/genproto/googleapis/rpc/status"

"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
)

const (
pbContentType = "application/x-protobuf"
jsonContentType = "application/json"
msgpackContentType = "application/x-msgpack"
)

var (
jsEncoder = &jsonEncoder{}
jsonPbMarshaler = &jsonpb.Marshaler{}
mpEncoder = &msgpackEncoder{}
)

type encoder interface {
unmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error)
unmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error)
unmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error)

marshalTracesResponse(ptraceotlp.ExportResponse) ([]byte, error)
marshalMetricsResponse(pmetricotlp.ExportResponse) ([]byte, error)
marshalLogsResponse(plogotlp.ExportResponse) ([]byte, error)

marshalStatus(rsp *spb.Status) ([]byte, error)

contentType() string
}

type jsonEncoder struct{}

func (jsonEncoder) unmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error) {
req := ptraceotlp.NewExportRequest()
err := req.UnmarshalJSON(buf)
return req, err
}

func (jsonEncoder) unmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) {
req := pmetricotlp.NewExportRequest()
err := req.UnmarshalJSON(buf)
return req, err
}

func (jsonEncoder) unmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error) {
req := plogotlp.NewExportRequest()
err := req.UnmarshalJSON(buf)
return req, err
}

func (jsonEncoder) marshalTracesResponse(resp ptraceotlp.ExportResponse) ([]byte, error) {
return resp.MarshalJSON()
}

func (jsonEncoder) marshalMetricsResponse(resp pmetricotlp.ExportResponse) ([]byte, error) {
return resp.MarshalJSON()
}

func (jsonEncoder) marshalLogsResponse(resp plogotlp.ExportResponse) ([]byte, error) {
return resp.MarshalJSON()
}

func (jsonEncoder) marshalStatus(resp *spb.Status) ([]byte, error) {
buf := new(bytes.Buffer)
err := jsonPbMarshaler.Marshal(buf, resp)
return buf.Bytes(), err
}

func (jsonEncoder) contentType() string {
return jsonContentType
}

// messagepack responses seem to work in JSON so leaving this alone for now.
type msgpackEncoder struct{}

func (msgpackEncoder) unmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error) {
req := ptraceotlp.NewExportRequest()
err := req.UnmarshalJSON(buf)
return req, err
}

func (msgpackEncoder) unmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) {
req := pmetricotlp.NewExportRequest()
err := req.UnmarshalJSON(buf)
return req, err
}

func (msgpackEncoder) unmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error) {
req := plogotlp.NewExportRequest()
err := req.UnmarshalJSON(buf)
return req, err
}

func (msgpackEncoder) marshalTracesResponse(resp ptraceotlp.ExportResponse) ([]byte, error) {
return resp.MarshalJSON()
}

func (msgpackEncoder) marshalMetricsResponse(resp pmetricotlp.ExportResponse) ([]byte, error) {
return resp.MarshalJSON()
}

func (msgpackEncoder) marshalLogsResponse(resp plogotlp.ExportResponse) ([]byte, error) {
return resp.MarshalJSON()
}

func (msgpackEncoder) marshalStatus(resp *spb.Status) ([]byte, error) {
buf := new(bytes.Buffer)
err := jsonPbMarshaler.Marshal(buf, resp)
return buf.Bytes(), err
}

func (msgpackEncoder) contentType() string {
return msgpackContentType
}
32 changes: 22 additions & 10 deletions receiver/libhoneyreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,25 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhon
go 1.22.0

require (
github.com/gogo/protobuf v1.3.2
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.115.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.115.0
github.com/stretchr/testify v1.10.0
github.com/vmihailenco/msgpack/v5 v5.4.1
go.opentelemetry.io/collector/component v0.115.1-0.20241206185113-3f3e208e71b8
go.opentelemetry.io/collector/component/componentstatus v0.115.1-0.20241206185113-3f3e208e71b8
go.opentelemetry.io/collector/component/componenttest v0.115.1-0.20241206185113-3f3e208e71b8
go.opentelemetry.io/collector/config/confighttp v0.115.1-0.20241206185113-3f3e208e71b8
go.opentelemetry.io/collector/confmap v1.21.1-0.20241206185113-3f3e208e71b8
go.opentelemetry.io/collector/consumer v1.21.1-0.20241206185113-3f3e208e71b8
go.opentelemetry.io/collector/consumer/consumertest v0.115.1-0.20241206185113-3f3e208e71b8
go.opentelemetry.io/collector/pdata v1.21.1-0.20241206185113-3f3e208e71b8
go.opentelemetry.io/collector/receiver v0.115.1-0.20241206185113-3f3e208e71b8
go.opentelemetry.io/collector/receiver/receivertest v0.115.1-0.20241206185113-3f3e208e71b8
go.opentelemetry.io/collector/semconv v0.115.1-0.20241206185113-3f3e208e71b8
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9
)

require (
Expand All @@ -20,7 +31,6 @@ require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand All @@ -32,27 +42,23 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.115.0
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/cors v1.11.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.opentelemetry.io/collector/client v1.21.1-0.20241206185113-3f3e208e71b8 // indirect
go.opentelemetry.io/collector/component v0.115.1-0.20241206185113-3f3e208e71b8
go.opentelemetry.io/collector/component/componentstatus v0.115.1-0.20241206185113-3f3e208e71b8
go.opentelemetry.io/collector/config/configauth v0.115.1-0.20241206185113-3f3e208e71b8 // indirect
go.opentelemetry.io/collector/config/configcompression v1.21.1-0.20241206185113-3f3e208e71b8 // indirect
go.opentelemetry.io/collector/config/configopaque v1.21.1-0.20241206185113-3f3e208e71b8 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.115.1-0.20241206185113-3f3e208e71b8 // indirect
go.opentelemetry.io/collector/config/configtls v1.21.1-0.20241206185113-3f3e208e71b8 // indirect
go.opentelemetry.io/collector/config/internal v0.115.0 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.115.0 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.115.1-0.20241206185113-3f3e208e71b8 // indirect
go.opentelemetry.io/collector/consumer/consumerprofiles v0.115.1-0.20241206185113-3f3e208e71b8 // indirect
go.opentelemetry.io/collector/extension v0.115.1-0.20241206185113-3f3e208e71b8 // indirect
go.opentelemetry.io/collector/extension/auth v0.115.1-0.20241206185113-3f3e208e71b8 // indirect
go.opentelemetry.io/collector/pdata v1.21.1-0.20241206185113-3f3e208e71b8 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.115.1-0.20241206185113-3f3e208e71b8 // indirect
go.opentelemetry.io/collector/pipeline v0.115.1-0.20241206185113-3f3e208e71b8 // indirect
go.opentelemetry.io/collector/receiver v0.115.1-0.20241206185113-3f3e208e71b8
go.opentelemetry.io/collector/receiver/receiverprofiles v0.115.1-0.20241206185113-3f3e208e71b8 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect
go.opentelemetry.io/otel v1.32.0 // indirect
Expand All @@ -61,16 +67,22 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect
go.opentelemetry.io/otel/trace v1.32.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0
golang.org/x/net v0.31.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/text v0.20.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect
google.golang.org/grpc v1.67.1 // indirect
google.golang.org/grpc v1.68.1 // indirect
google.golang.org/protobuf v1.35.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace google.golang.org/genproto => google.golang.org/genproto v0.0.0-20240701130421-f6361c86f094

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
16 changes: 12 additions & 4 deletions receiver/libhoneyreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 65 additions & 0 deletions receiver/libhoneyreceiver/internal/eventtime/eventtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package eventtime // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/eventtime"

import (
"math"
"strconv"
"time"
)

func GetNowTime() time.Time {
return time.Now()
}

func GetEventTime(etHeader string) time.Time {
var eventTime time.Time
if etHeader != "" {
// Great, they sent us a time header. let's try and parse it.
// RFC3339Nano is the default that we send from all our SDKs
eventTime, _ = time.Parse(time.RFC3339Nano, etHeader)
if eventTime.IsZero() {
// the default didn't catch it, let's try a few other things
// is it all numeric? then try unix epoch times
epochInt, err := strconv.ParseInt(etHeader, 0, 64)
if err == nil {
// it might be seconds or it might be milliseconds! Who can know!
// 10-digit numbers are seconds, 13-digit milliseconds, 16 microseconds
if len(etHeader) == 10 {
eventTime = time.Unix(epochInt, 0)
} else if len(etHeader) > 10 {
// turn it into seconds and fractional seconds
fractionalTime := etHeader[:10] + "." + etHeader[10:]
// then chop it into the int part and the fractional part
if epochFloat, err := strconv.ParseFloat(fractionalTime, 64); err == nil {
sec, dec := math.Modf(epochFloat)
eventTime = time.Unix(int64(sec), int64(dec*(1e9)))
}

}
} else {
epochFloat, err := strconv.ParseFloat(etHeader, 64)
if err == nil {
sec, dec := math.Modf(epochFloat)
eventTime = time.Unix(int64(sec), int64(dec*(1e9)))
}
}
}
}
return eventTime.UTC()
}

func GetEventTimeSec(etHeader string) int64 {
eventTime := GetEventTime(etHeader)
return eventTime.Unix()
}

func GetEventTimeNano(etHeader string) int64 {
eventTime := GetEventTime(etHeader)
return eventTime.UnixNano()
}

func GetEventTimeDefaultString() string {
return time.Now().Format(time.RFC3339Nano)
}
Loading

0 comments on commit 2b21d03

Please sign in to comment.