From 2b21d034d2ca3d571f4a79d485ba2619128ace10 Mon Sep 17 00:00:00 2001 From: Mike Terhar Date: Fri, 13 Dec 2024 04:35:35 +0000 Subject: [PATCH 1/5] add the libhoney log parser --- receiver/libhoneyreceiver/encoder.go | 126 ++++++++ receiver/libhoneyreceiver/go.mod | 32 ++- receiver/libhoneyreceiver/go.sum | 16 +- .../internal/eventtime/eventtime.go | 65 +++++ receiver/libhoneyreceiver/libhoney.go | 127 -------- receiver/libhoneyreceiver/libhoneyparser.go | 272 ++++++++++++++++++ receiver/libhoneyreceiver/receiver.go | 238 +++++++++++++++ 7 files changed, 735 insertions(+), 141 deletions(-) create mode 100644 receiver/libhoneyreceiver/encoder.go create mode 100644 receiver/libhoneyreceiver/internal/eventtime/eventtime.go delete mode 100644 receiver/libhoneyreceiver/libhoney.go create mode 100644 receiver/libhoneyreceiver/libhoneyparser.go create mode 100644 receiver/libhoneyreceiver/receiver.go diff --git a/receiver/libhoneyreceiver/encoder.go b/receiver/libhoneyreceiver/encoder.go new file mode 100644 index 000000000000..126ca86779f7 --- /dev/null +++ b/receiver/libhoneyreceiver/encoder.go @@ -0,0 +1,126 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package libhoneyreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver" + +import ( + "bytes" + + "github.com/gogo/protobuf/jsonpb" + spb "google.golang.org/genproto/googleapis/rpc/status" + + "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" +) + +const ( + pbContentType = "application/x-protobuf" + jsonContentType = "application/json" + msgpackContentType = "application/x-msgpack" +) + +var ( + jsEncoder = &jsonEncoder{} + jsonPbMarshaler = &jsonpb.Marshaler{} + mpEncoder = &msgpackEncoder{} +) + +type encoder interface { + unmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error) + unmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) + unmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error) + + marshalTracesResponse(ptraceotlp.ExportResponse) ([]byte, error) + marshalMetricsResponse(pmetricotlp.ExportResponse) ([]byte, error) + marshalLogsResponse(plogotlp.ExportResponse) ([]byte, error) + + marshalStatus(rsp *spb.Status) ([]byte, error) + + contentType() string +} + +type jsonEncoder struct{} + +func (jsonEncoder) unmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error) { + req := ptraceotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (jsonEncoder) unmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) { + req := pmetricotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (jsonEncoder) unmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error) { + req := plogotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (jsonEncoder) marshalTracesResponse(resp ptraceotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (jsonEncoder) marshalMetricsResponse(resp pmetricotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (jsonEncoder) marshalLogsResponse(resp plogotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (jsonEncoder) marshalStatus(resp *spb.Status) ([]byte, error) { + buf := new(bytes.Buffer) + err := jsonPbMarshaler.Marshal(buf, resp) + return buf.Bytes(), err +} + +func (jsonEncoder) contentType() string { + return jsonContentType +} + +// messagepack responses seem to work in JSON so leaving this alone for now. +type msgpackEncoder struct{} + +func (msgpackEncoder) unmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error) { + req := ptraceotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (msgpackEncoder) unmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) { + req := pmetricotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (msgpackEncoder) unmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error) { + req := plogotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (msgpackEncoder) marshalTracesResponse(resp ptraceotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (msgpackEncoder) marshalMetricsResponse(resp pmetricotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (msgpackEncoder) marshalLogsResponse(resp plogotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (msgpackEncoder) marshalStatus(resp *spb.Status) ([]byte, error) { + buf := new(bytes.Buffer) + err := jsonPbMarshaler.Marshal(buf, resp) + return buf.Bytes(), err +} + +func (msgpackEncoder) contentType() string { + return msgpackContentType +} diff --git a/receiver/libhoneyreceiver/go.mod b/receiver/libhoneyreceiver/go.mod index 58ea39a586a9..c0d34fb0c7be 100644 --- a/receiver/libhoneyreceiver/go.mod +++ b/receiver/libhoneyreceiver/go.mod @@ -3,14 +3,25 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhon go 1.22.0 require ( + github.com/gogo/protobuf v1.3.2 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.115.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.115.0 github.com/stretchr/testify v1.10.0 + github.com/vmihailenco/msgpack/v5 v5.4.1 + go.opentelemetry.io/collector/component v0.115.1-0.20241206185113-3f3e208e71b8 + go.opentelemetry.io/collector/component/componentstatus v0.115.1-0.20241206185113-3f3e208e71b8 go.opentelemetry.io/collector/component/componenttest v0.115.1-0.20241206185113-3f3e208e71b8 go.opentelemetry.io/collector/config/confighttp v0.115.1-0.20241206185113-3f3e208e71b8 go.opentelemetry.io/collector/confmap v1.21.1-0.20241206185113-3f3e208e71b8 go.opentelemetry.io/collector/consumer v1.21.1-0.20241206185113-3f3e208e71b8 go.opentelemetry.io/collector/consumer/consumertest v0.115.1-0.20241206185113-3f3e208e71b8 + go.opentelemetry.io/collector/pdata v1.21.1-0.20241206185113-3f3e208e71b8 + go.opentelemetry.io/collector/receiver v0.115.1-0.20241206185113-3f3e208e71b8 go.opentelemetry.io/collector/receiver/receivertest v0.115.1-0.20241206185113-3f3e208e71b8 + go.opentelemetry.io/collector/semconv v0.115.1-0.20241206185113-3f3e208e71b8 go.uber.org/goleak v1.3.0 + go.uber.org/zap v1.27.0 + google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 ) require ( @@ -20,7 +31,6 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect - github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect github.com/json-iterator/go v1.1.12 // indirect @@ -32,27 +42,23 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.115.0 github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/cors v1.11.1 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect go.opentelemetry.io/collector/client v1.21.1-0.20241206185113-3f3e208e71b8 // indirect - go.opentelemetry.io/collector/component v0.115.1-0.20241206185113-3f3e208e71b8 - go.opentelemetry.io/collector/component/componentstatus v0.115.1-0.20241206185113-3f3e208e71b8 go.opentelemetry.io/collector/config/configauth v0.115.1-0.20241206185113-3f3e208e71b8 // indirect go.opentelemetry.io/collector/config/configcompression v1.21.1-0.20241206185113-3f3e208e71b8 // indirect go.opentelemetry.io/collector/config/configopaque v1.21.1-0.20241206185113-3f3e208e71b8 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.115.1-0.20241206185113-3f3e208e71b8 // indirect go.opentelemetry.io/collector/config/configtls v1.21.1-0.20241206185113-3f3e208e71b8 // indirect go.opentelemetry.io/collector/config/internal v0.115.0 // indirect - go.opentelemetry.io/collector/consumer/consumererror v0.115.0 // indirect + go.opentelemetry.io/collector/consumer/consumererror v0.115.1-0.20241206185113-3f3e208e71b8 // indirect go.opentelemetry.io/collector/consumer/consumerprofiles v0.115.1-0.20241206185113-3f3e208e71b8 // indirect go.opentelemetry.io/collector/extension v0.115.1-0.20241206185113-3f3e208e71b8 // indirect go.opentelemetry.io/collector/extension/auth v0.115.1-0.20241206185113-3f3e208e71b8 // indirect - go.opentelemetry.io/collector/pdata v1.21.1-0.20241206185113-3f3e208e71b8 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.115.1-0.20241206185113-3f3e208e71b8 // indirect go.opentelemetry.io/collector/pipeline v0.115.1-0.20241206185113-3f3e208e71b8 // indirect - go.opentelemetry.io/collector/receiver v0.115.1-0.20241206185113-3f3e208e71b8 go.opentelemetry.io/collector/receiver/receiverprofiles v0.115.1-0.20241206185113-3f3e208e71b8 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect go.opentelemetry.io/otel v1.32.0 // indirect @@ -61,12 +67,10 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect go.opentelemetry.io/otel/trace v1.32.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 golang.org/x/net v0.31.0 // indirect golang.org/x/sys v0.27.0 // indirect golang.org/x/text v0.20.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect - google.golang.org/grpc v1.67.1 // indirect + google.golang.org/grpc v1.68.1 // indirect google.golang.org/protobuf v1.35.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) @@ -74,3 +78,11 @@ require ( replace google.golang.org/genproto => google.golang.org/genproto v0.0.0-20240701130421-f6361c86f094 replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden diff --git a/receiver/libhoneyreceiver/go.sum b/receiver/libhoneyreceiver/go.sum index 5cb28e5c29a3..9297f106264d 100644 --- a/receiver/libhoneyreceiver/go.sum +++ b/receiver/libhoneyreceiver/go.sum @@ -14,6 +14,8 @@ github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIx github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= @@ -58,6 +60,10 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/collector/client v1.21.1-0.20241206185113-3f3e208e71b8 h1:VfdixIcglr5IZhu6ogj8/uEMnf9Oi798V0td47/9jHg= @@ -86,8 +92,8 @@ go.opentelemetry.io/collector/confmap v1.21.1-0.20241206185113-3f3e208e71b8 h1:C go.opentelemetry.io/collector/confmap v1.21.1-0.20241206185113-3f3e208e71b8/go.mod h1:Rrhs+MWoaP6AswZp+ReQ2VO9dfOfcUjdjiSHBsG+nec= go.opentelemetry.io/collector/consumer v1.21.1-0.20241206185113-3f3e208e71b8 h1:GYE8iqLaknLjnrOM8QP+PBi7FpJGzCktMg1A9kgBbWg= go.opentelemetry.io/collector/consumer v1.21.1-0.20241206185113-3f3e208e71b8/go.mod h1:FQcC4ThMtRYY41dv+IPNK8POLLhAFY3r1YR5fuP7iiY= -go.opentelemetry.io/collector/consumer/consumererror v0.115.0 h1:yli//xBCQMPZKXNgNlXemo4dvqhnFrAmCZ11DvQgmcY= -go.opentelemetry.io/collector/consumer/consumererror v0.115.0/go.mod h1:LwVzAvQ6ZVNG7mbOvurbAo+W/rKws0IcjOwriuZXqPE= +go.opentelemetry.io/collector/consumer/consumererror v0.115.1-0.20241206185113-3f3e208e71b8 h1:2plpL/XAUMrA7/bq84vmLPo2zzxARrewKOti+erAed0= +go.opentelemetry.io/collector/consumer/consumererror v0.115.1-0.20241206185113-3f3e208e71b8/go.mod h1:LwVzAvQ6ZVNG7mbOvurbAo+W/rKws0IcjOwriuZXqPE= go.opentelemetry.io/collector/consumer/consumerprofiles v0.115.1-0.20241206185113-3f3e208e71b8 h1:ysXU7y4ltc7p1h3gQFtA7Cr3Qxn/10An8adNYPOeVUQ= go.opentelemetry.io/collector/consumer/consumerprofiles v0.115.1-0.20241206185113-3f3e208e71b8/go.mod h1:IzEmZ91Tp7TBxVDq8Cc9xvLsmO7H08njr6Pu9P5d9ns= go.opentelemetry.io/collector/consumer/consumertest v0.115.1-0.20241206185113-3f3e208e71b8 h1:zinrZujQGjMJhWo926FIwcIy4nMgwoYXnMe99nn0xDQ= @@ -112,6 +118,8 @@ go.opentelemetry.io/collector/receiver/receiverprofiles v0.115.1-0.2024120618511 go.opentelemetry.io/collector/receiver/receiverprofiles v0.115.1-0.20241206185113-3f3e208e71b8/go.mod h1:05E5hGujWeeXJmzKZwTdHyZ/+rRyrQlQB5p5Q2XY39M= go.opentelemetry.io/collector/receiver/receivertest v0.115.1-0.20241206185113-3f3e208e71b8 h1:cOsmTAvpuiDHh5ggc/JnsF3nBFC9dQaswFvTDpujJqs= go.opentelemetry.io/collector/receiver/receivertest v0.115.1-0.20241206185113-3f3e208e71b8/go.mod h1:Y8Z9U/bz9Xpyt8GI8DxZZgryw3mnnIw+AeKVLTD2cP8= +go.opentelemetry.io/collector/semconv v0.115.1-0.20241206185113-3f3e208e71b8 h1:+vUVC+FHqapool6OqgMQgc4oEjXrHyvDsvi4hEpBVLE= +go.opentelemetry.io/collector/semconv v0.115.1-0.20241206185113-3f3e208e71b8/go.mod h1:N6XE8Q0JKgBN2fAhkUQtqK9LT7rEGR6+Wu/Rtbal1iI= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 h1:UP6IpuHFkUgOQL9FFQFrZ+5LiwhhYRbi7VZSIx6Nj5s= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0/go.mod h1:qxuZLtbq5QDtdeSHsS7bcf6EH6uO6jUAgk764zd3rhM= go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= @@ -163,8 +171,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 h1:QCqS/PdaHTSWGvupk2F/ehwHtGc0/GYkT+3GAcR1CCc= google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= -google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= -google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/grpc v1.68.1 h1:oI5oTa11+ng8r8XMMN7jAOmWfPZWbYpCFaMUTACxkM0= +google.golang.org/grpc v1.68.1/go.mod h1:+q1XYFJjShcqn0QZHvCyeR4CXPA+llXIeUIfIe00waw= google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/receiver/libhoneyreceiver/internal/eventtime/eventtime.go b/receiver/libhoneyreceiver/internal/eventtime/eventtime.go new file mode 100644 index 000000000000..38f227165887 --- /dev/null +++ b/receiver/libhoneyreceiver/internal/eventtime/eventtime.go @@ -0,0 +1,65 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package eventtime // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/eventtime" + +import ( + "math" + "strconv" + "time" +) + +func GetNowTime() time.Time { + return time.Now() +} + +func GetEventTime(etHeader string) time.Time { + var eventTime time.Time + if etHeader != "" { + // Great, they sent us a time header. let's try and parse it. + // RFC3339Nano is the default that we send from all our SDKs + eventTime, _ = time.Parse(time.RFC3339Nano, etHeader) + if eventTime.IsZero() { + // the default didn't catch it, let's try a few other things + // is it all numeric? then try unix epoch times + epochInt, err := strconv.ParseInt(etHeader, 0, 64) + if err == nil { + // it might be seconds or it might be milliseconds! Who can know! + // 10-digit numbers are seconds, 13-digit milliseconds, 16 microseconds + if len(etHeader) == 10 { + eventTime = time.Unix(epochInt, 0) + } else if len(etHeader) > 10 { + // turn it into seconds and fractional seconds + fractionalTime := etHeader[:10] + "." + etHeader[10:] + // then chop it into the int part and the fractional part + if epochFloat, err := strconv.ParseFloat(fractionalTime, 64); err == nil { + sec, dec := math.Modf(epochFloat) + eventTime = time.Unix(int64(sec), int64(dec*(1e9))) + } + + } + } else { + epochFloat, err := strconv.ParseFloat(etHeader, 64) + if err == nil { + sec, dec := math.Modf(epochFloat) + eventTime = time.Unix(int64(sec), int64(dec*(1e9))) + } + } + } + } + return eventTime.UTC() +} + +func GetEventTimeSec(etHeader string) int64 { + eventTime := GetEventTime(etHeader) + return eventTime.Unix() +} + +func GetEventTimeNano(etHeader string) int64 { + eventTime := GetEventTime(etHeader) + return eventTime.UnixNano() +} + +func GetEventTimeDefaultString() string { + return time.Now().Format(time.RFC3339Nano) +} diff --git a/receiver/libhoneyreceiver/libhoney.go b/receiver/libhoneyreceiver/libhoney.go deleted file mode 100644 index 4ad1faab8fbb..000000000000 --- a/receiver/libhoneyreceiver/libhoney.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package libhoneyreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver" - -import ( - "context" - "errors" - "net" - "net/http" - "sync" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componentstatus" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/receiver/receiverhelper" - "go.uber.org/zap" -) - -type libhoneyReceiver struct { - cfg *Config - serverHTTP *http.Server - - nextTraces consumer.Traces - nextLogs consumer.Logs - shutdownWG sync.WaitGroup - - obsrepHTTP *receiverhelper.ObsReport - - settings *receiver.Settings -} - -type TeamInfo struct { - Slug string `json:"slug"` -} - -type EnvironmentInfo struct { - Slug string `json:"slug"` - Name string `json:"name"` -} - -type AuthInfo struct { - APIKeyAccess map[string]bool `json:"api_key_access"` - Team TeamInfo `json:"team"` - Environment EnvironmentInfo `json:"environment"` -} - -func newLibhoneyReceiver(cfg *Config, set *receiver.Settings) (*libhoneyReceiver, error) { - r := &libhoneyReceiver{ - cfg: cfg, - nextTraces: nil, - settings: set, - } - - var err error - r.obsrepHTTP, err = receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverID: set.ID, - Transport: "http", - ReceiverCreateSettings: *set, - }) - if err != nil { - return nil, err - } - - return r, nil -} - -func (r *libhoneyReceiver) startHTTPServer(ctx context.Context, host component.Host) error { - // If HTTP is not enabled, nothing to start. - if r.cfg.HTTP == nil { - return nil - } - - if r.nextTraces != nil { - // initialize routes - r.settings.Logger.Debug("r.nextTraces found and ready to go") - } else { - r.settings.Logger.Debug("r.nextTraces is nil for some reason") - } - - // start server - var err error - r.settings.Logger.Info("Starting HTTP server", zap.String("endpoint", r.cfg.HTTP.ServerConfig.Endpoint)) - var hln net.Listener - if hln, err = r.cfg.HTTP.ServerConfig.ToListener(ctx); err != nil { - return err - } - - r.shutdownWG.Add(1) - go func() { - defer r.shutdownWG.Done() - - if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil && !errors.Is(errHTTP, http.ErrServerClosed) { - componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(errHTTP)) - } - }() - return nil -} - -func (r *libhoneyReceiver) Start(ctx context.Context, host component.Host) error { - if err := r.startHTTPServer(ctx, host); err != nil { - return errors.Join(err, r.Shutdown(ctx)) - } - - return nil -} - -// Shutdown is a method to turn off receiving. -func (r *libhoneyReceiver) Shutdown(ctx context.Context) error { - var err error - - if r.serverHTTP != nil { - err = r.serverHTTP.Shutdown(ctx) - } - - r.shutdownWG.Wait() - return err -} - -func (r *libhoneyReceiver) registerTraceConsumer(tc consumer.Traces) { - r.nextTraces = tc -} - -func (r *libhoneyReceiver) registerLogConsumer(tc consumer.Logs) { - r.nextLogs = tc -} diff --git a/receiver/libhoneyreceiver/libhoneyparser.go b/receiver/libhoneyreceiver/libhoneyparser.go new file mode 100644 index 000000000000..d58cac623c60 --- /dev/null +++ b/receiver/libhoneyreceiver/libhoneyparser.go @@ -0,0 +1,272 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package libhoneyreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver" + +import ( + "encoding/json" + "errors" + "fmt" + "mime" + "net/http" + "net/url" + "slices" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" + semconv "go.opentelemetry.io/collector/semconv/v1.16.0" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/eventtime" +) + +func readContentType(resp http.ResponseWriter, req *http.Request) (encoder, bool) { + if req.Method != http.MethodPost { + handleUnmatchedMethod(resp) + return nil, false + } + + switch getMimeTypeFromContentType(req.Header.Get("Content-Type")) { + case jsonContentType: + return jsEncoder, true + case "application/x-msgpack", "application/msgpack": + return mpEncoder, true + default: + handleUnmatchedContentType(resp) + return nil, false + } +} + +func writeResponse(w http.ResponseWriter, contentType string, statusCode int, msg []byte) { + w.Header().Set("Content-Type", contentType) + w.WriteHeader(statusCode) + _, _ = w.Write(msg) +} + +func getMimeTypeFromContentType(contentType string) string { + mediatype, _, err := mime.ParseMediaType(contentType) + if err != nil { + return "" + } + return mediatype +} + +func handleUnmatchedMethod(resp http.ResponseWriter) { + status := http.StatusMethodNotAllowed + writeResponse(resp, "text/plain", status, []byte(fmt.Sprintf("%v method not allowed, supported: [POST]", status))) +} + +func handleUnmatchedContentType(resp http.ResponseWriter) { + status := http.StatusUnsupportedMediaType + writeResponse(resp, "text/plain", status, []byte(fmt.Sprintf("%v unsupported media type, supported: [%s, %s]", status, jsonContentType, pbContentType))) +} + +// taken from refinery https://github.com/honeycombio/refinery/blob/v2.6.1/route/route.go#L964-L974 +func getDatasetFromRequest(path string) (string, error) { + if path == "" { + return "", fmt.Errorf("missing dataset name") + } + dataset, err := url.PathUnescape(path) + if err != nil { + return "", err + } + return dataset, nil +} + +type simpleSpan struct { + Samplerate int `json:"samplerate" msgpack:"samplerate"` + MsgPackTimestamp *time.Time `msgpack:"time"` + Time string `json:"time"` // should not be trusted. use MsgPackTimestamp + Data map[string]interface{} `json:"data" msgpack:"data"` +} + +// Overrides unmarshall to make sure the MsgPackTimestamp is set +func (s *simpleSpan) UnmarshalJSON(j []byte) error { + type _simpleSpan simpleSpan + tstr := eventtime.GetEventTimeDefaultString() + tzero := time.Time{} + tmp := _simpleSpan{Time: "none", MsgPackTimestamp: &tzero, Samplerate: 1} + + err := json.Unmarshal(j, &tmp) + if err != nil { + return err + } + if tmp.MsgPackTimestamp.IsZero() && tmp.Time == "none" { + // neither timestamp was set. give it right now. + tmp.Time = tstr + tnow := time.Now() + tmp.MsgPackTimestamp = &tnow + } + if tmp.MsgPackTimestamp.IsZero() { + propertime := eventtime.GetEventTime(tmp.Time) + tmp.MsgPackTimestamp = &propertime + } + + *s = simpleSpan(tmp) + return nil +} + +func (s *simpleSpan) DebugString() string { + return fmt.Sprintf("%#v", s) +} + +// returns log until we add the trace parser +func (s *simpleSpan) SignalType() (string, error) { + return "log", nil +} + +func (s *simpleSpan) GetService(cfg Config, seen *serviceHistory, dataset string) (string, error) { + if serviceName, ok := s.Data[cfg.Resources.ServiceName]; ok { + seen.NameCount[serviceName.(string)] += 1 + return serviceName.(string), nil + } + return dataset, errors.New("no service.name found in event") +} + +func (s *simpleSpan) GetScope(cfg Config, seen *scopeHistory, serviceName string) (string, error) { + if scopeLibraryName, ok := s.Data[cfg.Scopes.LibraryName]; ok { + scopeKey := serviceName + scopeLibraryName.(string) + if _, ok := seen.Scope[scopeKey]; ok { + // if we've seen it, we don't expect it to be different right away so we'll just return it. + return scopeKey, nil + } + // otherwise, we need to make a new found scope + scopeLibraryVersion := "unset" + if scopeLibVer, ok := s.Data[cfg.Scopes.LibraryVersion]; ok { + scopeLibraryVersion = scopeLibVer.(string) + } + newScope := simpleScope{ + ServiceName: serviceName, // we only set the service name once. If the same library comes from multiple services in the same batch, we're in trouble. + LibraryName: scopeLibraryName.(string), + LibraryVersion: scopeLibraryVersion, + ScopeSpans: ptrace.NewSpanSlice(), + ScopeLogs: plog.NewLogRecordSlice(), + } + seen.Scope[scopeKey] = newScope + return scopeKey, nil + } + return "libhoney.receiver", errors.New("library name not found") +} + +type simpleScope struct { + ServiceName string + LibraryName string + LibraryVersion string + ScopeSpans ptrace.SpanSlice + ScopeLogs plog.LogRecordSlice +} + +type scopeHistory struct { + Scope map[string]simpleScope // key here is service.name+library.name +} +type serviceHistory struct { + NameCount map[string]int +} + +func (s *simpleSpan) ToPLogRecord(newLog *plog.LogRecord, already_used_fields *[]string, cfg Config, logger zap.Logger) error { + time_ns := s.MsgPackTimestamp.UnixNano() + logger.Debug("processing log with", zap.Int64("timestamp", time_ns)) + newLog.SetTimestamp(pcommon.Timestamp(time_ns)) + + if logSevCode, ok := s.Data["severity_code"]; ok { + logSevInt := int32(logSevCode.(int64)) + newLog.SetSeverityNumber(plog.SeverityNumber(logSevInt)) + } + + if logSevText, ok := s.Data["severity_text"]; ok { + newLog.SetSeverityText(logSevText.(string)) + } + + if logFlags, ok := s.Data["flags"]; ok { + logFlagsUint := uint32(logFlags.(uint64)) + newLog.SetFlags(plog.LogRecordFlags(logFlagsUint)) + } + + // undoing this is gonna be complicated: https://github.com/honeycombio/husky/blob/91c0498333cd9f5eed1fdb8544ca486db7dea565/otlp/logs.go#L61 + if logBody, ok := s.Data["body"]; ok { + newLog.Body().SetStr(logBody.(string)) + } + + newLog.Attributes().PutInt("SampleRate", int64(s.Samplerate)) + + logFieldsAlready := []string{"severity_text", "severity_code", "flags", "body"} + for k, v := range s.Data { + if slices.Contains(*already_used_fields, k) { + continue + } + if slices.Contains(logFieldsAlready, k) { + continue + } + switch v := v.(type) { + case string: + newLog.Attributes().PutStr(k, v) + case int: + newLog.Attributes().PutInt(k, int64(v)) + case int64, int16, int32: + intv := v.(int64) + newLog.Attributes().PutInt(k, intv) + case float64: + newLog.Attributes().PutDouble(k, v) + case bool: + newLog.Attributes().PutBool(k, v) + default: + logger.Warn("Span data type issue", zap.Int64("timestamp", time_ns), zap.String("key", k)) + } + } + return nil +} + +func toPsomething(dataset string, ss []simpleSpan, cfg Config, logger zap.Logger) (plog.Logs, error) { + foundServices := serviceHistory{} + foundServices.NameCount = make(map[string]int) + foundScopes := scopeHistory{} + foundScopes.Scope = make(map[string]simpleScope) + + foundScopes.Scope = make(map[string]simpleScope) // a list of already seen scopes + foundScopes.Scope["libhoney.receiver"] = simpleScope{dataset, "libhoney.receiver", "1.0.0", ptrace.NewSpanSlice(), plog.NewLogRecordSlice()} // seed a default + + already_used_fields := []string{cfg.Resources.ServiceName, cfg.Scopes.LibraryName, cfg.Scopes.LibraryVersion} + already_used_fields = append(already_used_fields, cfg.Attributes.Name, + cfg.Attributes.TraceID, cfg.Attributes.ParentID, cfg.Attributes.SpanID, + cfg.Attributes.Error, cfg.Attributes.SpanKind, + ) + already_used_fields = append(already_used_fields, cfg.Attributes.DurationFields...) + + for _, span := range ss { + action, err := span.SignalType() + if err != nil { + logger.Warn("signal type unclear") + } + switch action { + case "span": + // not implemented + case "log": + logService, _ := span.GetService(cfg, &foundServices, dataset) + logScopeKey, _ := span.GetScope(cfg, &foundScopes, logService) // adds a new found scope if needed + newLog := foundScopes.Scope[logScopeKey].ScopeLogs.AppendEmpty() + span.ToPLogRecord(&newLog, &already_used_fields, cfg, logger) + if err != nil { + logger.Warn("log could not be converted from libhoney to plog", zap.String("span.object", span.DebugString())) + } + } + } + + resultLogs := plog.NewLogs() + + for scopeName, ss := range foundScopes.Scope { + if ss.ScopeLogs.Len() > 0 { + lr := resultLogs.ResourceLogs().AppendEmpty() + lr.SetSchemaUrl(semconv.SchemaURL) + lr.Resource().Attributes().PutStr(semconv.AttributeServiceName, ss.ServiceName) + + ls := lr.ScopeLogs().AppendEmpty() + ls.Scope().SetName(ss.LibraryName) + ls.Scope().SetVersion(ss.LibraryVersion) + foundScopes.Scope[scopeName].ScopeLogs.MoveAndAppendTo(ls.LogRecords()) + } + } + + return resultLogs, nil +} diff --git a/receiver/libhoneyreceiver/receiver.go b/receiver/libhoneyreceiver/receiver.go new file mode 100644 index 000000000000..8fc0b37dda7f --- /dev/null +++ b/receiver/libhoneyreceiver/receiver.go @@ -0,0 +1,238 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package libhoneyreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver" + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/http" + "strings" + "sync" + + "github.com/vmihailenco/msgpack/v5" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/errorutil" +) + +type libhoneyReceiver struct { + cfg *Config + server *http.Server + nextTraces consumer.Traces + nextLogs consumer.Logs + shutdownWG sync.WaitGroup + obsreport *receiverhelper.ObsReport + settings *receiver.Settings +} + +type TeamInfo struct { + Slug string `json:"slug"` +} + +type EnvironmentInfo struct { + Slug string `json:"slug"` + Name string `json:"name"` +} + +type AuthInfo struct { + APIKeyAccess map[string]bool `json:"api_key_access"` + Team TeamInfo `json:"team"` + Environment EnvironmentInfo `json:"environment"` +} + +func newLibhoneyReceiver(cfg *Config, set *receiver.Settings) (*libhoneyReceiver, error) { + r := &libhoneyReceiver{ + cfg: cfg, + nextTraces: nil, + settings: set, + } + + var err error + r.obsreport, err = receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: set.ID, + Transport: "http", + ReceiverCreateSettings: *set, + }) + if err != nil { + return nil, err + } + + return r, nil +} + +func (r *libhoneyReceiver) startHTTPServer(ctx context.Context, host component.Host) error { + // If HTTP is not enabled, nothing to start. + if r.cfg.HTTP == nil { + return nil + } + + httpMux := http.NewServeMux() + + r.settings.Logger.Info("r.nextTraces is not null so httpTracesReciever was added", zap.Int("paths", len(r.cfg.HTTP.TracesURLPaths))) + for _, path := range r.cfg.HTTP.TracesURLPaths { + httpMux.HandleFunc(path, func(resp http.ResponseWriter, req *http.Request) { + r.handleSomething(resp, req) + }) + r.settings.Logger.Debug("Added path to HTTP server", zap.String("path", path)) + } + + if r.cfg.AuthAPI != "" { + httpMux.HandleFunc("/1/auth", func(resp http.ResponseWriter, req *http.Request) { + authURL := fmt.Sprintf("%s/1/auth", r.cfg.AuthAPI) + authReq, err := http.NewRequest(http.MethodGet, authURL, nil) + if err != nil { + errJson, _ := json.Marshal(`{"error": "failed to create AuthInfo request"}`) + writeResponse(resp, "json", http.StatusBadRequest, errJson) + return + } + authReq.Header.Set("x-honeycomb-team", req.Header.Get("x-honeycomb-team")) + var authClient http.Client + authResp, err := authClient.Do(authReq) + if err != nil { + errJson, _ := json.Marshal(fmt.Sprintf(`"error": "failed to send request to auth api endpoint", "message", "%s"}`, err.Error())) + writeResponse(resp, "json", http.StatusBadRequest, errJson) + return + } + defer authResp.Body.Close() + + switch { + case authResp.StatusCode == http.StatusUnauthorized: + errJson, _ := json.Marshal(`"error": "received 401 response for AuthInfo request from Honeycomb API - check your API key"}`) + writeResponse(resp, "json", http.StatusBadRequest, errJson) + return + case authResp.StatusCode > 299: + errJson, _ := json.Marshal(fmt.Sprintf(`"error": "bad response code from API", "status_code", %d}`, authResp.StatusCode)) + writeResponse(resp, "json", http.StatusBadRequest, errJson) + return + } + authRawBody, _ := io.ReadAll(authResp.Body) + resp.Write(authRawBody) + }) + } + + var err error + if r.server, err = r.cfg.HTTP.ToServer(ctx, host, r.settings.TelemetrySettings, httpMux); err != nil { + return err + } + + r.settings.Logger.Info("Starting HTTP server", zap.String("endpoint", r.cfg.HTTP.ServerConfig.Endpoint)) + var hln net.Listener + if hln, err = r.cfg.HTTP.ServerConfig.ToListener(ctx); err != nil { + return err + } + + r.shutdownWG.Add(1) + go func() { + defer r.shutdownWG.Done() + + if err := r.server.Serve(hln); err != nil && !errors.Is(err, http.ErrServerClosed) { + componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) + } + }() + return nil +} + +func (r *libhoneyReceiver) Start(ctx context.Context, host component.Host) error { + if err := r.startHTTPServer(ctx, host); err != nil { + return errors.Join(err, r.Shutdown(ctx)) + } + + return nil +} + +// Shutdown is a method to turn off receiving. +func (r *libhoneyReceiver) Shutdown(ctx context.Context) error { + var err error + + if r.server != nil { + err = r.server.Shutdown(ctx) + } + + r.shutdownWG.Wait() + return err +} + +func (r *libhoneyReceiver) registerTraceConsumer(tc consumer.Traces) { + r.nextTraces = tc +} + +func (r *libhoneyReceiver) registerLogConsumer(tc consumer.Logs) { + r.nextLogs = tc +} + +func (r *libhoneyReceiver) handleSomething(resp http.ResponseWriter, req *http.Request) { + enc, ok := readContentType(resp, req) + if !ok { + return + } + + dataset, err := getDatasetFromRequest(req.RequestURI) + if err != nil { + r.settings.Logger.Info("No dataset found in URL", zap.String("req.RequstURI", req.RequestURI)) + } + + for _, p := range r.cfg.HTTP.TracesURLPaths { + dataset = strings.Replace(dataset, p, "", 1) + r.settings.Logger.Debug("dataset parsed", zap.String("dataset.parsed", dataset)) + } + + body, err := io.ReadAll(req.Body) + if err != nil { + errorutil.HTTPError(resp, err) + } + if err = req.Body.Close(); err != nil { + errorutil.HTTPError(resp, err) + } + + simpleSpans := make([]simpleSpan, 0) + switch req.Header.Get("Content-Type") { + case "application/x-msgpack", "application/msgpack": + decoder := msgpack.NewDecoder(bytes.NewReader(body)) + decoder.UseLooseInterfaceDecoding(true) + decoder.Decode(&simpleSpans) + if len(simpleSpans) > 0 { + r.settings.Logger.Debug("Decoding with msgpack worked", zap.Time("timestamp.first.msgpacktimestamp", *simpleSpans[0].MsgPackTimestamp), zap.String("timestamp.first.time", simpleSpans[0].Time)) + r.settings.Logger.Debug("span zero", zap.String("span.data", simpleSpans[0].DebugString())) + } + case jsonContentType: + err = json.Unmarshal(body, &simpleSpans) + if err != nil { + errorutil.HTTPError(resp, err) + } + if len(simpleSpans) > 0 { + r.settings.Logger.Debug("Decoding with json worked", zap.Time("timestamp.first.msgpacktimestamp", *simpleSpans[0].MsgPackTimestamp), zap.String("timestamp.first.time", simpleSpans[0].Time)) + } + } + + otlpLogs, err := toPsomething(dataset, simpleSpans, *r.cfg, *r.settings.Logger) + if err != nil { + errorutil.HTTPError(resp, err) + return + } + + numLogs := otlpLogs.LogRecordCount() + if numLogs > 0 { + ctx := r.obsreport.StartLogsOp(context.Background()) + err = r.nextLogs.ConsumeLogs(ctx, otlpLogs) + r.obsreport.EndLogsOp(ctx, "protobuf", numLogs, err) + } + + if err != nil { + errorutil.HTTPError(resp, err) + return + } + + noErrors := []byte(`{"errors":[]}`) + writeResponse(resp, enc.contentType(), http.StatusAccepted, noErrors) +} From 83724607d9084b5a073e392a16d7b680cbc5c74f Mon Sep 17 00:00:00 2001 From: Mike Terhar Date: Fri, 13 Dec 2024 19:44:28 +0000 Subject: [PATCH 2/5] Refactored simplespans to a separate package --- .chloggen/logs-for-libhoneyreceiver.yaml | 27 +++ receiver/libhoneyreceiver/README.md | 29 +-- receiver/libhoneyreceiver/config.go | 31 +-- receiver/libhoneyreceiver/factory.go | 33 +-- .../internal/simplespan/simplespan.go | 187 +++++++++++++++++ receiver/libhoneyreceiver/libhoneyparser.go | 189 +++--------------- receiver/libhoneyreceiver/receiver.go | 3 +- 7 files changed, 278 insertions(+), 221 deletions(-) create mode 100644 .chloggen/logs-for-libhoneyreceiver.yaml create mode 100644 receiver/libhoneyreceiver/internal/simplespan/simplespan.go diff --git a/.chloggen/logs-for-libhoneyreceiver.yaml b/.chloggen/logs-for-libhoneyreceiver.yaml new file mode 100644 index 000000000000..dc90a4fbe50b --- /dev/null +++ b/.chloggen/logs-for-libhoneyreceiver.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: libhoneyreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Implement log signal for libhoney receiver + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36693] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] \ No newline at end of file diff --git a/receiver/libhoneyreceiver/README.md b/receiver/libhoneyreceiver/README.md index a87c8735d5d0..a765c45383f4 100644 --- a/receiver/libhoneyreceiver/README.md +++ b/receiver/libhoneyreceiver/README.md @@ -45,20 +45,21 @@ The following setting is required for refinery traffic since: - "/1/batch" include_metadata: true auth_api: https://api.honeycomb.io - resources: - service_name: service_name - scopes: - library_name: library.name - library_version: library.version - attributes: - trace_id: trace_id - parent_id: parent_id - span_id: span_id - name: name - error: error - spankind: span.kind - durationFields: - - duration_ms + fields: + resources: + service_name: service_name + scopes: + library_name: library.name + library_version: library.version + attributes: + trace_id: trace_id + parent_id: parent_id + span_id: span_id + name: name + error: error + spankind: span.kind + durationFields: + - duration_ms ``` ### Telemetry data types supported diff --git a/receiver/libhoneyreceiver/config.go b/receiver/libhoneyreceiver/config.go index abfd6476dbd1..8290df733f84 100644 --- a/receiver/libhoneyreceiver/config.go +++ b/receiver/libhoneyreceiver/config.go @@ -11,16 +11,16 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/confmap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/simplespan" ) // Config represents the receiver config settings within the collector's config.yaml type Config struct { - HTTP *HTTPConfig `mapstructure:"http"` - AuthAPI string `mapstructure:"auth_api"` - Wrapper string `mapstructure:"wrapper"` - Resources ResourcesConfig `mapstructure:"resources"` - Scopes ScopesConfig `mapstructure:"scopes"` - Attributes AttributesConfig `mapstructure:"attributes"` + HTTP *HTTPConfig `mapstructure:"http"` + AuthAPI string `mapstructure:"auth_api"` + Wrapper string `mapstructure:"wrapper"` + FieldMapConfig simplespan.FieldMapConfig `mapstructure:"fields"` } type HTTPConfig struct { @@ -30,25 +30,6 @@ type HTTPConfig struct { TracesURLPaths []string `mapstructure:"traces_url_paths,omitempty"` } -type ResourcesConfig struct { - ServiceName string `mapstructure:"service_name"` -} - -type ScopesConfig struct { - LibraryName string `mapstructure:"library_name"` - LibraryVersion string `mapstructure:"library_version"` -} - -type AttributesConfig struct { - TraceID string `mapstructure:"trace_id"` - ParentID string `mapstructure:"parent_id"` - SpanID string `mapstructure:"span_id"` - Name string `mapstructure:"name"` - Error string `mapstructure:"error"` - SpanKind string `mapstructure:"spankind"` - DurationFields []string `mapstructure:"durationFields"` -} - func (cfg *Config) Validate() error { if cfg.HTTP == nil { return errors.New("must specify at least one protocol when using the arbitrary JSON receiver") diff --git a/receiver/libhoneyreceiver/factory.go b/receiver/libhoneyreceiver/factory.go index 4d0d0fa25cfa..d90eef5a532e 100644 --- a/receiver/libhoneyreceiver/factory.go +++ b/receiver/libhoneyreceiver/factory.go @@ -14,6 +14,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/simplespan" ) const ( @@ -44,21 +45,23 @@ func createDefaultConfig() component.Config { TracesURLPaths: defaultTracesURLPaths, }, AuthAPI: "", - Resources: ResourcesConfig{ - ServiceName: "service.name", - }, - Scopes: ScopesConfig{ - LibraryName: "library.name", - LibraryVersion: "library.version", - }, - Attributes: AttributesConfig{ - TraceID: "trace.trace_id", - SpanID: "trace.span_id", - ParentID: "trace.parent_id", - Name: "name", - Error: "error", - SpanKind: "span.kind", - DurationFields: durationFieldsArr, + FieldMapConfig: simplespan.FieldMapConfig{ + Resources: simplespan.ResourcesConfig{ + ServiceName: "service.name", + }, + Scopes: simplespan.ScopesConfig{ + LibraryName: "library.name", + LibraryVersion: "library.version", + }, + Attributes: simplespan.AttributesConfig{ + TraceID: "trace.trace_id", + SpanID: "trace.span_id", + ParentID: "trace.parent_id", + Name: "name", + Error: "error", + SpanKind: "span.kind", + DurationFields: durationFieldsArr, + }, }, } } diff --git a/receiver/libhoneyreceiver/internal/simplespan/simplespan.go b/receiver/libhoneyreceiver/internal/simplespan/simplespan.go new file mode 100644 index 000000000000..a8355391763f --- /dev/null +++ b/receiver/libhoneyreceiver/internal/simplespan/simplespan.go @@ -0,0 +1,187 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package simplespan // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/simplespan" + +import ( + "encoding/json" + "errors" + "fmt" + "slices" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/eventtime" +) + +type FieldMapConfig struct { + Resources ResourcesConfig `mapstructure:"resources"` + Scopes ScopesConfig `mapstructure:"scopes"` + Attributes AttributesConfig `mapstructure:"attributes"` +} + +type ResourcesConfig struct { + ServiceName string `mapstructure:"service_name"` +} + +type ScopesConfig struct { + LibraryName string `mapstructure:"library_name"` + LibraryVersion string `mapstructure:"library_version"` +} + +type AttributesConfig struct { + TraceID string `mapstructure:"trace_id"` + ParentID string `mapstructure:"parent_id"` + SpanID string `mapstructure:"span_id"` + Name string `mapstructure:"name"` + Error string `mapstructure:"error"` + SpanKind string `mapstructure:"spankind"` + DurationFields []string `mapstructure:"durationFields"` +} + +type SimpleSpan struct { + Samplerate int `json:"samplerate" msgpack:"samplerate"` + MsgPackTimestamp *time.Time `msgpack:"time"` + Time string `json:"time"` // should not be trusted. use MsgPackTimestamp + Data map[string]interface{} `json:"data" msgpack:"data"` +} + +// Overrides unmarshall to make sure the MsgPackTimestamp is set +func (s *SimpleSpan) UnmarshalJSON(j []byte) error { + type _simpleSpan SimpleSpan + tstr := eventtime.GetEventTimeDefaultString() + tzero := time.Time{} + tmp := _simpleSpan{Time: "none", MsgPackTimestamp: &tzero, Samplerate: 1} + + err := json.Unmarshal(j, &tmp) + if err != nil { + return err + } + if tmp.MsgPackTimestamp.IsZero() && tmp.Time == "none" { + // neither timestamp was set. give it right now. + tmp.Time = tstr + tnow := time.Now() + tmp.MsgPackTimestamp = &tnow + } + if tmp.MsgPackTimestamp.IsZero() { + propertime := eventtime.GetEventTime(tmp.Time) + tmp.MsgPackTimestamp = &propertime + } + + *s = SimpleSpan(tmp) + return nil +} + +func (s *SimpleSpan) DebugString() string { + return fmt.Sprintf("%#v", s) +} + +// returns log until we add the trace parser +func (s *SimpleSpan) SignalType() (string, error) { + return "log", nil +} + +func (s *SimpleSpan) GetService(fields FieldMapConfig, seen *ServiceHistory, dataset string) (string, error) { + if serviceName, ok := s.Data[fields.Resources.ServiceName]; ok { + seen.NameCount[serviceName.(string)] += 1 + return serviceName.(string), nil + } + return dataset, errors.New("no service.name found in event") +} + +func (s *SimpleSpan) GetScope(fields FieldMapConfig, seen *ScopeHistory, serviceName string) (string, error) { + if scopeLibraryName, ok := s.Data[fields.Scopes.LibraryName]; ok { + scopeKey := serviceName + scopeLibraryName.(string) + if _, ok := seen.Scope[scopeKey]; ok { + // if we've seen it, we don't expect it to be different right away so we'll just return it. + return scopeKey, nil + } + // otherwise, we need to make a new found scope + scopeLibraryVersion := "unset" + if scopeLibVer, ok := s.Data[fields.Scopes.LibraryVersion]; ok { + scopeLibraryVersion = scopeLibVer.(string) + } + newScope := SimpleScope{ + ServiceName: serviceName, // we only set the service name once. If the same library comes from multiple services in the same batch, we're in trouble. + LibraryName: scopeLibraryName.(string), + LibraryVersion: scopeLibraryVersion, + ScopeSpans: ptrace.NewSpanSlice(), + ScopeLogs: plog.NewLogRecordSlice(), + } + seen.Scope[scopeKey] = newScope + return scopeKey, nil + } + return "libhoney.receiver", errors.New("library name not found") +} + +type SimpleScope struct { + ServiceName string + LibraryName string + LibraryVersion string + ScopeSpans ptrace.SpanSlice + ScopeLogs plog.LogRecordSlice +} + +type ScopeHistory struct { + Scope map[string]SimpleScope // key here is service.name+library.name +} +type ServiceHistory struct { + NameCount map[string]int +} + +func (s *SimpleSpan) ToPLogRecord(newLog *plog.LogRecord, already_used_fields *[]string, logger zap.Logger) error { + time_ns := s.MsgPackTimestamp.UnixNano() + logger.Debug("processing log with", zap.Int64("timestamp", time_ns)) + newLog.SetTimestamp(pcommon.Timestamp(time_ns)) + + if logSevCode, ok := s.Data["severity_code"]; ok { + logSevInt := int32(logSevCode.(int64)) + newLog.SetSeverityNumber(plog.SeverityNumber(logSevInt)) + } + + if logSevText, ok := s.Data["severity_text"]; ok { + newLog.SetSeverityText(logSevText.(string)) + } + + if logFlags, ok := s.Data["flags"]; ok { + logFlagsUint := uint32(logFlags.(uint64)) + newLog.SetFlags(plog.LogRecordFlags(logFlagsUint)) + } + + // undoing this is gonna be complicated: https://github.com/honeycombio/husky/blob/91c0498333cd9f5eed1fdb8544ca486db7dea565/otlp/logs.go#L61 + if logBody, ok := s.Data["body"]; ok { + newLog.Body().SetStr(logBody.(string)) + } + + newLog.Attributes().PutInt("SampleRate", int64(s.Samplerate)) + + logFieldsAlready := []string{"severity_text", "severity_code", "flags", "body"} + for k, v := range s.Data { + if slices.Contains(*already_used_fields, k) { + continue + } + if slices.Contains(logFieldsAlready, k) { + continue + } + switch v := v.(type) { + case string: + newLog.Attributes().PutStr(k, v) + case int: + newLog.Attributes().PutInt(k, int64(v)) + case int64, int16, int32: + intv := v.(int64) + newLog.Attributes().PutInt(k, intv) + case float64: + newLog.Attributes().PutDouble(k, v) + case bool: + newLog.Attributes().PutBool(k, v) + default: + logger.Warn("Span data type issue", zap.Int64("timestamp", time_ns), zap.String("key", k)) + } + } + return nil +} diff --git a/receiver/libhoneyreceiver/libhoneyparser.go b/receiver/libhoneyreceiver/libhoneyparser.go index d58cac623c60..35ba4c2eeaeb 100644 --- a/receiver/libhoneyreceiver/libhoneyparser.go +++ b/receiver/libhoneyreceiver/libhoneyparser.go @@ -4,22 +4,17 @@ package libhoneyreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver" import ( - "encoding/json" - "errors" "fmt" "mime" "net/http" "net/url" - "slices" - "time" - "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/collector/semconv/v1.16.0" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/eventtime" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/simplespan" ) func readContentType(resp http.ResponseWriter, req *http.Request) (encoder, bool) { @@ -63,7 +58,6 @@ func handleUnmatchedContentType(resp http.ResponseWriter) { writeResponse(resp, "text/plain", status, []byte(fmt.Sprintf("%v unsupported media type, supported: [%s, %s]", status, jsonContentType, pbContentType))) } -// taken from refinery https://github.com/honeycombio/refinery/blob/v2.6.1/route/route.go#L964-L974 func getDatasetFromRequest(path string) (string, error) { if path == "" { return "", fmt.Errorf("missing dataset name") @@ -75,164 +69,27 @@ func getDatasetFromRequest(path string) (string, error) { return dataset, nil } -type simpleSpan struct { - Samplerate int `json:"samplerate" msgpack:"samplerate"` - MsgPackTimestamp *time.Time `msgpack:"time"` - Time string `json:"time"` // should not be trusted. use MsgPackTimestamp - Data map[string]interface{} `json:"data" msgpack:"data"` -} - -// Overrides unmarshall to make sure the MsgPackTimestamp is set -func (s *simpleSpan) UnmarshalJSON(j []byte) error { - type _simpleSpan simpleSpan - tstr := eventtime.GetEventTimeDefaultString() - tzero := time.Time{} - tmp := _simpleSpan{Time: "none", MsgPackTimestamp: &tzero, Samplerate: 1} - - err := json.Unmarshal(j, &tmp) - if err != nil { - return err - } - if tmp.MsgPackTimestamp.IsZero() && tmp.Time == "none" { - // neither timestamp was set. give it right now. - tmp.Time = tstr - tnow := time.Now() - tmp.MsgPackTimestamp = &tnow - } - if tmp.MsgPackTimestamp.IsZero() { - propertime := eventtime.GetEventTime(tmp.Time) - tmp.MsgPackTimestamp = &propertime - } - - *s = simpleSpan(tmp) - return nil -} - -func (s *simpleSpan) DebugString() string { - return fmt.Sprintf("%#v", s) -} - -// returns log until we add the trace parser -func (s *simpleSpan) SignalType() (string, error) { - return "log", nil -} - -func (s *simpleSpan) GetService(cfg Config, seen *serviceHistory, dataset string) (string, error) { - if serviceName, ok := s.Data[cfg.Resources.ServiceName]; ok { - seen.NameCount[serviceName.(string)] += 1 - return serviceName.(string), nil - } - return dataset, errors.New("no service.name found in event") -} - -func (s *simpleSpan) GetScope(cfg Config, seen *scopeHistory, serviceName string) (string, error) { - if scopeLibraryName, ok := s.Data[cfg.Scopes.LibraryName]; ok { - scopeKey := serviceName + scopeLibraryName.(string) - if _, ok := seen.Scope[scopeKey]; ok { - // if we've seen it, we don't expect it to be different right away so we'll just return it. - return scopeKey, nil - } - // otherwise, we need to make a new found scope - scopeLibraryVersion := "unset" - if scopeLibVer, ok := s.Data[cfg.Scopes.LibraryVersion]; ok { - scopeLibraryVersion = scopeLibVer.(string) - } - newScope := simpleScope{ - ServiceName: serviceName, // we only set the service name once. If the same library comes from multiple services in the same batch, we're in trouble. - LibraryName: scopeLibraryName.(string), - LibraryVersion: scopeLibraryVersion, - ScopeSpans: ptrace.NewSpanSlice(), - ScopeLogs: plog.NewLogRecordSlice(), - } - seen.Scope[scopeKey] = newScope - return scopeKey, nil - } - return "libhoney.receiver", errors.New("library name not found") -} - -type simpleScope struct { - ServiceName string - LibraryName string - LibraryVersion string - ScopeSpans ptrace.SpanSlice - ScopeLogs plog.LogRecordSlice -} - -type scopeHistory struct { - Scope map[string]simpleScope // key here is service.name+library.name -} -type serviceHistory struct { - NameCount map[string]int -} - -func (s *simpleSpan) ToPLogRecord(newLog *plog.LogRecord, already_used_fields *[]string, cfg Config, logger zap.Logger) error { - time_ns := s.MsgPackTimestamp.UnixNano() - logger.Debug("processing log with", zap.Int64("timestamp", time_ns)) - newLog.SetTimestamp(pcommon.Timestamp(time_ns)) - - if logSevCode, ok := s.Data["severity_code"]; ok { - logSevInt := int32(logSevCode.(int64)) - newLog.SetSeverityNumber(plog.SeverityNumber(logSevInt)) - } - - if logSevText, ok := s.Data["severity_text"]; ok { - newLog.SetSeverityText(logSevText.(string)) - } - - if logFlags, ok := s.Data["flags"]; ok { - logFlagsUint := uint32(logFlags.(uint64)) - newLog.SetFlags(plog.LogRecordFlags(logFlagsUint)) - } - - // undoing this is gonna be complicated: https://github.com/honeycombio/husky/blob/91c0498333cd9f5eed1fdb8544ca486db7dea565/otlp/logs.go#L61 - if logBody, ok := s.Data["body"]; ok { - newLog.Body().SetStr(logBody.(string)) - } - - newLog.Attributes().PutInt("SampleRate", int64(s.Samplerate)) - - logFieldsAlready := []string{"severity_text", "severity_code", "flags", "body"} - for k, v := range s.Data { - if slices.Contains(*already_used_fields, k) { - continue - } - if slices.Contains(logFieldsAlready, k) { - continue - } - switch v := v.(type) { - case string: - newLog.Attributes().PutStr(k, v) - case int: - newLog.Attributes().PutInt(k, int64(v)) - case int64, int16, int32: - intv := v.(int64) - newLog.Attributes().PutInt(k, intv) - case float64: - newLog.Attributes().PutDouble(k, v) - case bool: - newLog.Attributes().PutBool(k, v) - default: - logger.Warn("Span data type issue", zap.Int64("timestamp", time_ns), zap.String("key", k)) - } - } - return nil -} - -func toPsomething(dataset string, ss []simpleSpan, cfg Config, logger zap.Logger) (plog.Logs, error) { - foundServices := serviceHistory{} +func toPsomething(dataset string, ss []simplespan.SimpleSpan, cfg Config, logger zap.Logger) (plog.Logs, error) { + foundServices := simplespan.ServiceHistory{} foundServices.NameCount = make(map[string]int) - foundScopes := scopeHistory{} - foundScopes.Scope = make(map[string]simpleScope) - - foundScopes.Scope = make(map[string]simpleScope) // a list of already seen scopes - foundScopes.Scope["libhoney.receiver"] = simpleScope{dataset, "libhoney.receiver", "1.0.0", ptrace.NewSpanSlice(), plog.NewLogRecordSlice()} // seed a default - - already_used_fields := []string{cfg.Resources.ServiceName, cfg.Scopes.LibraryName, cfg.Scopes.LibraryVersion} - already_used_fields = append(already_used_fields, cfg.Attributes.Name, - cfg.Attributes.TraceID, cfg.Attributes.ParentID, cfg.Attributes.SpanID, - cfg.Attributes.Error, cfg.Attributes.SpanKind, + foundScopes := simplespan.ScopeHistory{} + foundScopes.Scope = make(map[string]simplespan.SimpleScope) + + foundScopes.Scope = make(map[string]simplespan.SimpleScope) // a list of already seen scopes + foundScopes.Scope["libhoney.receiver"] = simplespan.SimpleScope{ + ServiceName: dataset, + LibraryName: "libhoney.receiver", + LibraryVersion: "1.0.0", + ScopeSpans: ptrace.NewSpanSlice(), + ScopeLogs: plog.NewLogRecordSlice(), + } // seed a default + + already_used_fields := []string{cfg.FieldMapConfig.Resources.ServiceName, cfg.FieldMapConfig.Scopes.LibraryName, cfg.FieldMapConfig.Scopes.LibraryVersion} + already_used_fields = append(already_used_fields, cfg.FieldMapConfig.Attributes.Name, + cfg.FieldMapConfig.Attributes.TraceID, cfg.FieldMapConfig.Attributes.ParentID, cfg.FieldMapConfig.Attributes.SpanID, + cfg.FieldMapConfig.Attributes.Error, cfg.FieldMapConfig.Attributes.SpanKind, ) - already_used_fields = append(already_used_fields, cfg.Attributes.DurationFields...) + already_used_fields = append(already_used_fields, cfg.FieldMapConfig.Attributes.DurationFields...) for _, span := range ss { action, err := span.SignalType() @@ -243,10 +100,10 @@ func toPsomething(dataset string, ss []simpleSpan, cfg Config, logger zap.Logger case "span": // not implemented case "log": - logService, _ := span.GetService(cfg, &foundServices, dataset) - logScopeKey, _ := span.GetScope(cfg, &foundScopes, logService) // adds a new found scope if needed + logService, _ := span.GetService(cfg.FieldMapConfig, &foundServices, dataset) + logScopeKey, _ := span.GetScope(cfg.FieldMapConfig, &foundScopes, logService) // adds a new found scope if needed newLog := foundScopes.Scope[logScopeKey].ScopeLogs.AppendEmpty() - span.ToPLogRecord(&newLog, &already_used_fields, cfg, logger) + span.ToPLogRecord(&newLog, &already_used_fields, logger) if err != nil { logger.Warn("log could not be converted from libhoney to plog", zap.String("span.object", span.DebugString())) } diff --git a/receiver/libhoneyreceiver/receiver.go b/receiver/libhoneyreceiver/receiver.go index 8fc0b37dda7f..bde46bca8364 100644 --- a/receiver/libhoneyreceiver/receiver.go +++ b/receiver/libhoneyreceiver/receiver.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/errorutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/simplespan" ) type libhoneyReceiver struct { @@ -195,7 +196,7 @@ func (r *libhoneyReceiver) handleSomething(resp http.ResponseWriter, req *http.R errorutil.HTTPError(resp, err) } - simpleSpans := make([]simpleSpan, 0) + simpleSpans := make([]simplespan.SimpleSpan, 0) switch req.Header.Get("Content-Type") { case "application/x-msgpack", "application/msgpack": decoder := msgpack.NewDecoder(bytes.NewReader(body)) From 6230172137b36607d40f9ead449687d78a2bedb9 Mon Sep 17 00:00:00 2001 From: Mike Terhar Date: Sat, 14 Dec 2024 01:28:52 +0000 Subject: [PATCH 3/5] obey linter --- receiver/libhoneyreceiver/encoder.go | 3 +-- .../internal/eventtime/eventtime.go | 1 - .../internal/simplespan/simplespan.go | 8 ++++---- receiver/libhoneyreceiver/libhoneyparser.go | 6 +++--- receiver/libhoneyreceiver/receiver.go | 16 +++++++++------- 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/receiver/libhoneyreceiver/encoder.go b/receiver/libhoneyreceiver/encoder.go index 126ca86779f7..89223c26139d 100644 --- a/receiver/libhoneyreceiver/encoder.go +++ b/receiver/libhoneyreceiver/encoder.go @@ -7,11 +7,10 @@ import ( "bytes" "github.com/gogo/protobuf/jsonpb" - spb "google.golang.org/genproto/googleapis/rpc/status" - "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + spb "google.golang.org/genproto/googleapis/rpc/status" ) const ( diff --git a/receiver/libhoneyreceiver/internal/eventtime/eventtime.go b/receiver/libhoneyreceiver/internal/eventtime/eventtime.go index 38f227165887..b7317b9f06f8 100644 --- a/receiver/libhoneyreceiver/internal/eventtime/eventtime.go +++ b/receiver/libhoneyreceiver/internal/eventtime/eventtime.go @@ -36,7 +36,6 @@ func GetEventTime(etHeader string) time.Time { sec, dec := math.Modf(epochFloat) eventTime = time.Unix(int64(sec), int64(dec*(1e9))) } - } } else { epochFloat, err := strconv.ParseFloat(etHeader, 64) diff --git a/receiver/libhoneyreceiver/internal/simplespan/simplespan.go b/receiver/libhoneyreceiver/internal/simplespan/simplespan.go index a8355391763f..53ae51246790 100644 --- a/receiver/libhoneyreceiver/internal/simplespan/simplespan.go +++ b/receiver/libhoneyreceiver/internal/simplespan/simplespan.go @@ -44,10 +44,10 @@ type AttributesConfig struct { } type SimpleSpan struct { - Samplerate int `json:"samplerate" msgpack:"samplerate"` - MsgPackTimestamp *time.Time `msgpack:"time"` - Time string `json:"time"` // should not be trusted. use MsgPackTimestamp - Data map[string]interface{} `json:"data" msgpack:"data"` + Samplerate int `json:"samplerate" msgpack:"samplerate"` + MsgPackTimestamp *time.Time `msgpack:"time"` + Time string `json:"time"` // should not be trusted. use MsgPackTimestamp + Data map[string]any `json:"data" msgpack:"data"` } // Overrides unmarshall to make sure the MsgPackTimestamp is set diff --git a/receiver/libhoneyreceiver/libhoneyparser.go b/receiver/libhoneyreceiver/libhoneyparser.go index 35ba4c2eeaeb..a4ab828f38ab 100644 --- a/receiver/libhoneyreceiver/libhoneyparser.go +++ b/receiver/libhoneyreceiver/libhoneyparser.go @@ -69,7 +69,7 @@ func getDatasetFromRequest(path string) (string, error) { return dataset, nil } -func toPsomething(dataset string, ss []simplespan.SimpleSpan, cfg Config, logger zap.Logger) (plog.Logs, error) { +func toPsomething(dataset string, ss []simplespan.SimpleSpan, cfg Config, logger zap.Logger) plog.Logs { foundServices := simplespan.ServiceHistory{} foundServices.NameCount = make(map[string]int) foundScopes := simplespan.ScopeHistory{} @@ -103,7 +103,7 @@ func toPsomething(dataset string, ss []simplespan.SimpleSpan, cfg Config, logger logService, _ := span.GetService(cfg.FieldMapConfig, &foundServices, dataset) logScopeKey, _ := span.GetScope(cfg.FieldMapConfig, &foundScopes, logService) // adds a new found scope if needed newLog := foundScopes.Scope[logScopeKey].ScopeLogs.AppendEmpty() - span.ToPLogRecord(&newLog, &already_used_fields, logger) + err := span.ToPLogRecord(&newLog, &already_used_fields, logger) if err != nil { logger.Warn("log could not be converted from libhoney to plog", zap.String("span.object", span.DebugString())) } @@ -125,5 +125,5 @@ func toPsomething(dataset string, ss []simplespan.SimpleSpan, cfg Config, logger } } - return resultLogs, nil + return resultLogs } diff --git a/receiver/libhoneyreceiver/receiver.go b/receiver/libhoneyreceiver/receiver.go index bde46bca8364..e235cacd3a97 100644 --- a/receiver/libhoneyreceiver/receiver.go +++ b/receiver/libhoneyreceiver/receiver.go @@ -118,7 +118,10 @@ func (r *libhoneyReceiver) startHTTPServer(ctx context.Context, host component.H return } authRawBody, _ := io.ReadAll(authResp.Body) - resp.Write(authRawBody) + _, err = resp.Write(authRawBody) + if err != nil { + r.settings.Logger.Info("couldn't write http response") + } }) } @@ -201,7 +204,10 @@ func (r *libhoneyReceiver) handleSomething(resp http.ResponseWriter, req *http.R case "application/x-msgpack", "application/msgpack": decoder := msgpack.NewDecoder(bytes.NewReader(body)) decoder.UseLooseInterfaceDecoding(true) - decoder.Decode(&simpleSpans) + err = decoder.Decode(&simpleSpans) + if err != nil { + r.settings.Logger.Info("messagepack decoding failed") + } if len(simpleSpans) > 0 { r.settings.Logger.Debug("Decoding with msgpack worked", zap.Time("timestamp.first.msgpacktimestamp", *simpleSpans[0].MsgPackTimestamp), zap.String("timestamp.first.time", simpleSpans[0].Time)) r.settings.Logger.Debug("span zero", zap.String("span.data", simpleSpans[0].DebugString())) @@ -216,11 +222,7 @@ func (r *libhoneyReceiver) handleSomething(resp http.ResponseWriter, req *http.R } } - otlpLogs, err := toPsomething(dataset, simpleSpans, *r.cfg, *r.settings.Logger) - if err != nil { - errorutil.HTTPError(resp, err) - return - } + otlpLogs := toPsomething(dataset, simpleSpans, *r.cfg, *r.settings.Logger) numLogs := otlpLogs.LogRecordCount() if numLogs > 0 { From d25e619c94b510c238af2c89d863df8d1e5f6b8a Mon Sep 17 00:00:00 2001 From: Mike Terhar Date: Tue, 17 Dec 2024 14:54:12 +0000 Subject: [PATCH 4/5] renamed simplespan and moved parser --- receiver/libhoneyreceiver/config.go | 13 +- receiver/libhoneyreceiver/factory.go | 10 +- .../libhoneyevent.go} | 60 ++++---- .../internal/parser/parser.go | 88 ++++++++++++ receiver/libhoneyreceiver/libhoneyparser.go | 129 ------------------ receiver/libhoneyreceiver/receiver.go | 89 +++++++++--- 6 files changed, 202 insertions(+), 187 deletions(-) rename receiver/libhoneyreceiver/internal/{simplespan/simplespan.go => libhoneyevent/libhoneyevent.go} (68%) create mode 100644 receiver/libhoneyreceiver/internal/parser/parser.go delete mode 100644 receiver/libhoneyreceiver/libhoneyparser.go diff --git a/receiver/libhoneyreceiver/config.go b/receiver/libhoneyreceiver/config.go index 8290df733f84..49602fcfa9d3 100644 --- a/receiver/libhoneyreceiver/config.go +++ b/receiver/libhoneyreceiver/config.go @@ -12,17 +12,18 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/confmap" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/simplespan" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" ) // Config represents the receiver config settings within the collector's config.yaml type Config struct { - HTTP *HTTPConfig `mapstructure:"http"` - AuthAPI string `mapstructure:"auth_api"` - Wrapper string `mapstructure:"wrapper"` - FieldMapConfig simplespan.FieldMapConfig `mapstructure:"fields"` + HTTP *HTTPConfig `mapstructure:"http"` + AuthAPI string `mapstructure:"auth_api"` + Wrapper string `mapstructure:"wrapper"` + FieldMapConfig libhoneyevent.FieldMapConfig `mapstructure:"fields"` } +// HTTPConfig defines the configuration for the HTTP server receiving traces. type HTTPConfig struct { *confighttp.ServerConfig `mapstructure:",squash"` @@ -30,6 +31,7 @@ type HTTPConfig struct { TracesURLPaths []string `mapstructure:"traces_url_paths,omitempty"` } +// Validate ensures the HTTP configuration is set. func (cfg *Config) Validate() error { if cfg.HTTP == nil { return errors.New("must specify at least one protocol when using the arbitrary JSON receiver") @@ -37,6 +39,7 @@ func (cfg *Config) Validate() error { return nil } +// Unmarshal unmarshals the configuration from the given configuration and then checks for errors. func (cfg *Config) Unmarshal(conf *confmap.Conf) error { // first load the config normally err := conf.Unmarshal(cfg) diff --git a/receiver/libhoneyreceiver/factory.go b/receiver/libhoneyreceiver/factory.go index d90eef5a532e..02ab9dcf1855 100644 --- a/receiver/libhoneyreceiver/factory.go +++ b/receiver/libhoneyreceiver/factory.go @@ -13,8 +13,8 @@ import ( "go.opentelemetry.io/collector/receiver" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/simplespan" ) const ( @@ -45,15 +45,15 @@ func createDefaultConfig() component.Config { TracesURLPaths: defaultTracesURLPaths, }, AuthAPI: "", - FieldMapConfig: simplespan.FieldMapConfig{ - Resources: simplespan.ResourcesConfig{ + FieldMapConfig: libhoneyevent.FieldMapConfig{ + Resources: libhoneyevent.ResourcesConfig{ ServiceName: "service.name", }, - Scopes: simplespan.ScopesConfig{ + Scopes: libhoneyevent.ScopesConfig{ LibraryName: "library.name", LibraryVersion: "library.version", }, - Attributes: simplespan.AttributesConfig{ + Attributes: libhoneyevent.AttributesConfig{ TraceID: "trace.trace_id", SpanID: "trace.span_id", ParentID: "trace.parent_id", diff --git a/receiver/libhoneyreceiver/internal/simplespan/simplespan.go b/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go similarity index 68% rename from receiver/libhoneyreceiver/internal/simplespan/simplespan.go rename to receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go index 53ae51246790..820fb47f6da2 100644 --- a/receiver/libhoneyreceiver/internal/simplespan/simplespan.go +++ b/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package simplespan // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/simplespan" +package libhoneyevent // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" import ( "encoding/json" @@ -18,21 +18,25 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/eventtime" ) +// FieldMapConfig is used to map the fields from the LibhoneyEvent to PData formats type FieldMapConfig struct { Resources ResourcesConfig `mapstructure:"resources"` Scopes ScopesConfig `mapstructure:"scopes"` Attributes AttributesConfig `mapstructure:"attributes"` } +// ResourcesConfig is used to map the fields from the LibhoneyEvent to PData formats type ResourcesConfig struct { ServiceName string `mapstructure:"service_name"` } +// ScopesConfig is used to map the fields from the LibhoneyEvent to PData formats type ScopesConfig struct { LibraryName string `mapstructure:"library_name"` LibraryVersion string `mapstructure:"library_version"` } +// AttributesConfig is used to map the fields from the LibhoneyEvent to PData formats type AttributesConfig struct { TraceID string `mapstructure:"trace_id"` ParentID string `mapstructure:"parent_id"` @@ -43,7 +47,7 @@ type AttributesConfig struct { DurationFields []string `mapstructure:"durationFields"` } -type SimpleSpan struct { +type LibhoneyEvent struct { Samplerate int `json:"samplerate" msgpack:"samplerate"` MsgPackTimestamp *time.Time `msgpack:"time"` Time string `json:"time"` // should not be trusted. use MsgPackTimestamp @@ -51,11 +55,11 @@ type SimpleSpan struct { } // Overrides unmarshall to make sure the MsgPackTimestamp is set -func (s *SimpleSpan) UnmarshalJSON(j []byte) error { - type _simpleSpan SimpleSpan +func (l *LibhoneyEvent) UnmarshalJSON(j []byte) error { + type _libhoneyEvent LibhoneyEvent tstr := eventtime.GetEventTimeDefaultString() tzero := time.Time{} - tmp := _simpleSpan{Time: "none", MsgPackTimestamp: &tzero, Samplerate: 1} + tmp := _libhoneyEvent{Time: "none", MsgPackTimestamp: &tzero, Samplerate: 1} err := json.Unmarshal(j, &tmp) if err != nil { @@ -72,29 +76,29 @@ func (s *SimpleSpan) UnmarshalJSON(j []byte) error { tmp.MsgPackTimestamp = &propertime } - *s = SimpleSpan(tmp) + *l = LibhoneyEvent(tmp) return nil } -func (s *SimpleSpan) DebugString() string { - return fmt.Sprintf("%#v", s) +func (l *LibhoneyEvent) DebugString() string { + return fmt.Sprintf("%#v", l) } // returns log until we add the trace parser -func (s *SimpleSpan) SignalType() (string, error) { +func (l *LibhoneyEvent) SignalType() (string, error) { return "log", nil } -func (s *SimpleSpan) GetService(fields FieldMapConfig, seen *ServiceHistory, dataset string) (string, error) { - if serviceName, ok := s.Data[fields.Resources.ServiceName]; ok { +func (l *LibhoneyEvent) GetService(fields FieldMapConfig, seen *ServiceHistory, dataset string) (string, error) { + if serviceName, ok := l.Data[fields.Resources.ServiceName]; ok { seen.NameCount[serviceName.(string)] += 1 return serviceName.(string), nil } return dataset, errors.New("no service.name found in event") } -func (s *SimpleSpan) GetScope(fields FieldMapConfig, seen *ScopeHistory, serviceName string) (string, error) { - if scopeLibraryName, ok := s.Data[fields.Scopes.LibraryName]; ok { +func (l *LibhoneyEvent) GetScope(fields FieldMapConfig, seen *ScopeHistory, serviceName string) (string, error) { + if scopeLibraryName, ok := l.Data[fields.Scopes.LibraryName]; ok { scopeKey := serviceName + scopeLibraryName.(string) if _, ok := seen.Scope[scopeKey]; ok { // if we've seen it, we don't expect it to be different right away so we'll just return it. @@ -102,7 +106,7 @@ func (s *SimpleSpan) GetScope(fields FieldMapConfig, seen *ScopeHistory, service } // otherwise, we need to make a new found scope scopeLibraryVersion := "unset" - if scopeLibVer, ok := s.Data[fields.Scopes.LibraryVersion]; ok { + if scopeLibVer, ok := l.Data[fields.Scopes.LibraryVersion]; ok { scopeLibraryVersion = scopeLibVer.(string) } newScope := SimpleScope{ @@ -126,42 +130,46 @@ type SimpleScope struct { ScopeLogs plog.LogRecordSlice } +// ScopeHistory is a map of scope keys to the SimpleScope object type ScopeHistory struct { Scope map[string]SimpleScope // key here is service.name+library.name } + +// ServiceHistory is a map of service names to the number of times they've been seen type ServiceHistory struct { NameCount map[string]int } -func (s *SimpleSpan) ToPLogRecord(newLog *plog.LogRecord, already_used_fields *[]string, logger zap.Logger) error { - time_ns := s.MsgPackTimestamp.UnixNano() - logger.Debug("processing log with", zap.Int64("timestamp", time_ns)) - newLog.SetTimestamp(pcommon.Timestamp(time_ns)) +// ToPLogRecord converts a LibhoneyEvent to a Pdata LogRecord +func (l *LibhoneyEvent) ToPLogRecord(newLog *plog.LogRecord, alreadyUsedFields *[]string, logger zap.Logger) error { + timeNs := l.MsgPackTimestamp.UnixNano() + logger.Debug("processing log with", zap.Int64("timestamp", timeNs)) + newLog.SetTimestamp(pcommon.Timestamp(timeNs)) - if logSevCode, ok := s.Data["severity_code"]; ok { + if logSevCode, ok := l.Data["severity_code"]; ok { logSevInt := int32(logSevCode.(int64)) newLog.SetSeverityNumber(plog.SeverityNumber(logSevInt)) } - if logSevText, ok := s.Data["severity_text"]; ok { + if logSevText, ok := l.Data["severity_text"]; ok { newLog.SetSeverityText(logSevText.(string)) } - if logFlags, ok := s.Data["flags"]; ok { + if logFlags, ok := l.Data["flags"]; ok { logFlagsUint := uint32(logFlags.(uint64)) newLog.SetFlags(plog.LogRecordFlags(logFlagsUint)) } // undoing this is gonna be complicated: https://github.com/honeycombio/husky/blob/91c0498333cd9f5eed1fdb8544ca486db7dea565/otlp/logs.go#L61 - if logBody, ok := s.Data["body"]; ok { + if logBody, ok := l.Data["body"]; ok { newLog.Body().SetStr(logBody.(string)) } - newLog.Attributes().PutInt("SampleRate", int64(s.Samplerate)) + newLog.Attributes().PutInt("SampleRate", int64(l.Samplerate)) logFieldsAlready := []string{"severity_text", "severity_code", "flags", "body"} - for k, v := range s.Data { - if slices.Contains(*already_used_fields, k) { + for k, v := range l.Data { + if slices.Contains(*alreadyUsedFields, k) { continue } if slices.Contains(logFieldsAlready, k) { @@ -180,7 +188,7 @@ func (s *SimpleSpan) ToPLogRecord(newLog *plog.LogRecord, already_used_fields *[ case bool: newLog.Attributes().PutBool(k, v) default: - logger.Warn("Span data type issue", zap.Int64("timestamp", time_ns), zap.String("key", k)) + logger.Warn("Span data type issue", zap.Int64("timestamp", timeNs), zap.String("key", k)) } } return nil diff --git a/receiver/libhoneyreceiver/internal/parser/parser.go b/receiver/libhoneyreceiver/internal/parser/parser.go new file mode 100644 index 000000000000..d2818dadd80a --- /dev/null +++ b/receiver/libhoneyreceiver/internal/parser/parser.go @@ -0,0 +1,88 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package parser // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/parser" + +import ( + "fmt" + "net/url" + + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" + semconv "go.opentelemetry.io/collector/semconv/v1.16.0" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" +) + +// GetDatasetFromRequest extracts the dataset name from the request path +func GetDatasetFromRequest(path string) (string, error) { + if path == "" { + return "", fmt.Errorf("missing dataset name") + } + dataset, err := url.PathUnescape(path) + if err != nil { + return "", err + } + return dataset, nil +} + +// ToPdata converts a list of LibhoneyEvents to a Pdata Logs object +func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyevent.FieldMapConfig, logger zap.Logger) plog.Logs { + foundServices := libhoneyevent.ServiceHistory{} + foundServices.NameCount = make(map[string]int) + foundScopes := libhoneyevent.ScopeHistory{} + foundScopes.Scope = make(map[string]libhoneyevent.SimpleScope) + + foundScopes.Scope = make(map[string]libhoneyevent.SimpleScope) // a list of already seen scopes + foundScopes.Scope["libhoney.receiver"] = libhoneyevent.SimpleScope{ + ServiceName: dataset, + LibraryName: "libhoney.receiver", + LibraryVersion: "1.0.0", + ScopeSpans: ptrace.NewSpanSlice(), + ScopeLogs: plog.NewLogRecordSlice(), + } // seed a default + + alreadyUsedFields := []string{cfg.Resources.ServiceName, cfg.Scopes.LibraryName, cfg.Scopes.LibraryVersion} + alreadyUsedFields = append(alreadyUsedFields, cfg.Attributes.Name, + cfg.Attributes.TraceID, cfg.Attributes.ParentID, cfg.Attributes.SpanID, + cfg.Attributes.Error, cfg.Attributes.SpanKind, + ) + alreadyUsedFields = append(alreadyUsedFields, cfg.Attributes.DurationFields...) + + for _, lhe := range lhes { + action, err := lhe.SignalType() + if err != nil { + logger.Warn("signal type unclear") + } + switch action { + case "span": + // not implemented + case "log": + logService, _ := lhe.GetService(cfg, &foundServices, dataset) + logScopeKey, _ := lhe.GetScope(cfg, &foundScopes, logService) // adds a new found scope if needed + newLog := foundScopes.Scope[logScopeKey].ScopeLogs.AppendEmpty() + err := lhe.ToPLogRecord(&newLog, &alreadyUsedFields, logger) + if err != nil { + logger.Warn("log could not be converted from libhoney to plog", zap.String("span.object", lhe.DebugString())) + } + } + } + + resultLogs := plog.NewLogs() + + for scopeName, ss := range foundScopes.Scope { + if ss.ScopeLogs.Len() > 0 { + lr := resultLogs.ResourceLogs().AppendEmpty() + lr.SetSchemaUrl(semconv.SchemaURL) + lr.Resource().Attributes().PutStr(semconv.AttributeServiceName, ss.ServiceName) + + ls := lr.ScopeLogs().AppendEmpty() + ls.Scope().SetName(ss.LibraryName) + ls.Scope().SetVersion(ss.LibraryVersion) + foundScopes.Scope[scopeName].ScopeLogs.MoveAndAppendTo(ls.LogRecords()) + } + } + + return resultLogs +} diff --git a/receiver/libhoneyreceiver/libhoneyparser.go b/receiver/libhoneyreceiver/libhoneyparser.go deleted file mode 100644 index a4ab828f38ab..000000000000 --- a/receiver/libhoneyreceiver/libhoneyparser.go +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package libhoneyreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver" - -import ( - "fmt" - "mime" - "net/http" - "net/url" - - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/ptrace" - semconv "go.opentelemetry.io/collector/semconv/v1.16.0" - "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/simplespan" -) - -func readContentType(resp http.ResponseWriter, req *http.Request) (encoder, bool) { - if req.Method != http.MethodPost { - handleUnmatchedMethod(resp) - return nil, false - } - - switch getMimeTypeFromContentType(req.Header.Get("Content-Type")) { - case jsonContentType: - return jsEncoder, true - case "application/x-msgpack", "application/msgpack": - return mpEncoder, true - default: - handleUnmatchedContentType(resp) - return nil, false - } -} - -func writeResponse(w http.ResponseWriter, contentType string, statusCode int, msg []byte) { - w.Header().Set("Content-Type", contentType) - w.WriteHeader(statusCode) - _, _ = w.Write(msg) -} - -func getMimeTypeFromContentType(contentType string) string { - mediatype, _, err := mime.ParseMediaType(contentType) - if err != nil { - return "" - } - return mediatype -} - -func handleUnmatchedMethod(resp http.ResponseWriter) { - status := http.StatusMethodNotAllowed - writeResponse(resp, "text/plain", status, []byte(fmt.Sprintf("%v method not allowed, supported: [POST]", status))) -} - -func handleUnmatchedContentType(resp http.ResponseWriter) { - status := http.StatusUnsupportedMediaType - writeResponse(resp, "text/plain", status, []byte(fmt.Sprintf("%v unsupported media type, supported: [%s, %s]", status, jsonContentType, pbContentType))) -} - -func getDatasetFromRequest(path string) (string, error) { - if path == "" { - return "", fmt.Errorf("missing dataset name") - } - dataset, err := url.PathUnescape(path) - if err != nil { - return "", err - } - return dataset, nil -} - -func toPsomething(dataset string, ss []simplespan.SimpleSpan, cfg Config, logger zap.Logger) plog.Logs { - foundServices := simplespan.ServiceHistory{} - foundServices.NameCount = make(map[string]int) - foundScopes := simplespan.ScopeHistory{} - foundScopes.Scope = make(map[string]simplespan.SimpleScope) - - foundScopes.Scope = make(map[string]simplespan.SimpleScope) // a list of already seen scopes - foundScopes.Scope["libhoney.receiver"] = simplespan.SimpleScope{ - ServiceName: dataset, - LibraryName: "libhoney.receiver", - LibraryVersion: "1.0.0", - ScopeSpans: ptrace.NewSpanSlice(), - ScopeLogs: plog.NewLogRecordSlice(), - } // seed a default - - already_used_fields := []string{cfg.FieldMapConfig.Resources.ServiceName, cfg.FieldMapConfig.Scopes.LibraryName, cfg.FieldMapConfig.Scopes.LibraryVersion} - already_used_fields = append(already_used_fields, cfg.FieldMapConfig.Attributes.Name, - cfg.FieldMapConfig.Attributes.TraceID, cfg.FieldMapConfig.Attributes.ParentID, cfg.FieldMapConfig.Attributes.SpanID, - cfg.FieldMapConfig.Attributes.Error, cfg.FieldMapConfig.Attributes.SpanKind, - ) - already_used_fields = append(already_used_fields, cfg.FieldMapConfig.Attributes.DurationFields...) - - for _, span := range ss { - action, err := span.SignalType() - if err != nil { - logger.Warn("signal type unclear") - } - switch action { - case "span": - // not implemented - case "log": - logService, _ := span.GetService(cfg.FieldMapConfig, &foundServices, dataset) - logScopeKey, _ := span.GetScope(cfg.FieldMapConfig, &foundScopes, logService) // adds a new found scope if needed - newLog := foundScopes.Scope[logScopeKey].ScopeLogs.AppendEmpty() - err := span.ToPLogRecord(&newLog, &already_used_fields, logger) - if err != nil { - logger.Warn("log could not be converted from libhoney to plog", zap.String("span.object", span.DebugString())) - } - } - } - - resultLogs := plog.NewLogs() - - for scopeName, ss := range foundScopes.Scope { - if ss.ScopeLogs.Len() > 0 { - lr := resultLogs.ResourceLogs().AppendEmpty() - lr.SetSchemaUrl(semconv.SchemaURL) - lr.Resource().Attributes().PutStr(semconv.AttributeServiceName, ss.ServiceName) - - ls := lr.ScopeLogs().AppendEmpty() - ls.Scope().SetName(ss.LibraryName) - ls.Scope().SetVersion(ss.LibraryVersion) - foundScopes.Scope[scopeName].ScopeLogs.MoveAndAppendTo(ls.LogRecords()) - } - } - - return resultLogs -} diff --git a/receiver/libhoneyreceiver/receiver.go b/receiver/libhoneyreceiver/receiver.go index e235cacd3a97..293c99b739b3 100644 --- a/receiver/libhoneyreceiver/receiver.go +++ b/receiver/libhoneyreceiver/receiver.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "io" + "mime" "net" "net/http" "strings" @@ -24,7 +25,8 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/errorutil" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/simplespan" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/parser" ) type libhoneyReceiver struct { @@ -37,15 +39,18 @@ type libhoneyReceiver struct { settings *receiver.Settings } +// TeamInfo is part of the AuthInfo struct that stores the team slug type TeamInfo struct { Slug string `json:"slug"` } +// EnvironmentInfo is part of the AuthInfo struct that stores the environment slug and name type EnvironmentInfo struct { Slug string `json:"slug"` Name string `json:"name"` } +// AuthInfo is used by Libhoney to validate team and environment information against Honeycomb's Auth API type AuthInfo struct { APIKeyAccess map[string]bool `json:"api_key_access"` Team TeamInfo `json:"team"` @@ -83,7 +88,7 @@ func (r *libhoneyReceiver) startHTTPServer(ctx context.Context, host component.H r.settings.Logger.Info("r.nextTraces is not null so httpTracesReciever was added", zap.Int("paths", len(r.cfg.HTTP.TracesURLPaths))) for _, path := range r.cfg.HTTP.TracesURLPaths { httpMux.HandleFunc(path, func(resp http.ResponseWriter, req *http.Request) { - r.handleSomething(resp, req) + r.handleEvent(resp, req) }) r.settings.Logger.Debug("Added path to HTTP server", zap.String("path", path)) } @@ -93,28 +98,28 @@ func (r *libhoneyReceiver) startHTTPServer(ctx context.Context, host component.H authURL := fmt.Sprintf("%s/1/auth", r.cfg.AuthAPI) authReq, err := http.NewRequest(http.MethodGet, authURL, nil) if err != nil { - errJson, _ := json.Marshal(`{"error": "failed to create AuthInfo request"}`) - writeResponse(resp, "json", http.StatusBadRequest, errJson) + errJSON, _ := json.Marshal(`{"error": "failed to create AuthInfo request"}`) + writeResponse(resp, "json", http.StatusBadRequest, errJSON) return } authReq.Header.Set("x-honeycomb-team", req.Header.Get("x-honeycomb-team")) var authClient http.Client authResp, err := authClient.Do(authReq) if err != nil { - errJson, _ := json.Marshal(fmt.Sprintf(`"error": "failed to send request to auth api endpoint", "message", "%s"}`, err.Error())) - writeResponse(resp, "json", http.StatusBadRequest, errJson) + errJSON, _ := json.Marshal(fmt.Sprintf(`"error": "failed to send request to auth api endpoint", "message", "%s"}`, err.Error())) + writeResponse(resp, "json", http.StatusBadRequest, errJSON) return } defer authResp.Body.Close() switch { case authResp.StatusCode == http.StatusUnauthorized: - errJson, _ := json.Marshal(`"error": "received 401 response for AuthInfo request from Honeycomb API - check your API key"}`) - writeResponse(resp, "json", http.StatusBadRequest, errJson) + errJSON, _ := json.Marshal(`"error": "received 401 response for AuthInfo request from Honeycomb API - check your API key"}`) + writeResponse(resp, "json", http.StatusBadRequest, errJSON) return case authResp.StatusCode > 299: - errJson, _ := json.Marshal(fmt.Sprintf(`"error": "bad response code from API", "status_code", %d}`, authResp.StatusCode)) - writeResponse(resp, "json", http.StatusBadRequest, errJson) + errJSON, _ := json.Marshal(fmt.Sprintf(`"error": "bad response code from API", "status_code", %d}`, authResp.StatusCode)) + writeResponse(resp, "json", http.StatusBadRequest, errJSON) return } authRawBody, _ := io.ReadAll(authResp.Body) @@ -175,13 +180,13 @@ func (r *libhoneyReceiver) registerLogConsumer(tc consumer.Logs) { r.nextLogs = tc } -func (r *libhoneyReceiver) handleSomething(resp http.ResponseWriter, req *http.Request) { +func (r *libhoneyReceiver) handleEvent(resp http.ResponseWriter, req *http.Request) { enc, ok := readContentType(resp, req) if !ok { return } - dataset, err := getDatasetFromRequest(req.RequestURI) + dataset, err := parser.GetDatasetFromRequest(req.RequestURI) if err != nil { r.settings.Logger.Info("No dataset found in URL", zap.String("req.RequstURI", req.RequestURI)) } @@ -198,31 +203,30 @@ func (r *libhoneyReceiver) handleSomething(resp http.ResponseWriter, req *http.R if err = req.Body.Close(); err != nil { errorutil.HTTPError(resp, err) } - - simpleSpans := make([]simplespan.SimpleSpan, 0) + libhoneyevents := make([]libhoneyevent.LibhoneyEvent, 0) switch req.Header.Get("Content-Type") { case "application/x-msgpack", "application/msgpack": decoder := msgpack.NewDecoder(bytes.NewReader(body)) decoder.UseLooseInterfaceDecoding(true) - err = decoder.Decode(&simpleSpans) + err = decoder.Decode(&libhoneyevents) if err != nil { r.settings.Logger.Info("messagepack decoding failed") } - if len(simpleSpans) > 0 { - r.settings.Logger.Debug("Decoding with msgpack worked", zap.Time("timestamp.first.msgpacktimestamp", *simpleSpans[0].MsgPackTimestamp), zap.String("timestamp.first.time", simpleSpans[0].Time)) - r.settings.Logger.Debug("span zero", zap.String("span.data", simpleSpans[0].DebugString())) + if len(libhoneyevents) > 0 { + r.settings.Logger.Debug("Decoding with msgpack worked", zap.Time("timestamp.first.msgpacktimestamp", *libhoneyevents[0].MsgPackTimestamp), zap.String("timestamp.first.time", libhoneyevents[0].Time)) + r.settings.Logger.Debug("event zero", zap.String("event.data", libhoneyevents[0].DebugString())) } case jsonContentType: - err = json.Unmarshal(body, &simpleSpans) + err = json.Unmarshal(body, &libhoneyevents) if err != nil { errorutil.HTTPError(resp, err) } - if len(simpleSpans) > 0 { - r.settings.Logger.Debug("Decoding with json worked", zap.Time("timestamp.first.msgpacktimestamp", *simpleSpans[0].MsgPackTimestamp), zap.String("timestamp.first.time", simpleSpans[0].Time)) + if len(libhoneyevents) > 0 { + r.settings.Logger.Debug("Decoding with json worked", zap.Time("timestamp.first.msgpacktimestamp", *libhoneyevents[0].MsgPackTimestamp), zap.String("timestamp.first.time", libhoneyevents[0].Time)) } } - otlpLogs := toPsomething(dataset, simpleSpans, *r.cfg, *r.settings.Logger) + otlpLogs := parser.ToPdata(dataset, libhoneyevents, r.cfg.FieldMapConfig, *r.settings.Logger) numLogs := otlpLogs.LogRecordCount() if numLogs > 0 { @@ -239,3 +243,44 @@ func (r *libhoneyReceiver) handleSomething(resp http.ResponseWriter, req *http.R noErrors := []byte(`{"errors":[]}`) writeResponse(resp, enc.contentType(), http.StatusAccepted, noErrors) } + +func readContentType(resp http.ResponseWriter, req *http.Request) (encoder, bool) { + if req.Method != http.MethodPost { + handleUnmatchedMethod(resp) + return nil, false + } + + switch getMimeTypeFromContentType(req.Header.Get("Content-Type")) { + case jsonContentType: + return jsEncoder, true + case "application/x-msgpack", "application/msgpack": + return mpEncoder, true + default: + handleUnmatchedContentType(resp) + return nil, false + } +} + +func writeResponse(w http.ResponseWriter, contentType string, statusCode int, msg []byte) { + w.Header().Set("Content-Type", contentType) + w.WriteHeader(statusCode) + _, _ = w.Write(msg) +} + +func getMimeTypeFromContentType(contentType string) string { + mediatype, _, err := mime.ParseMediaType(contentType) + if err != nil { + return "" + } + return mediatype +} + +func handleUnmatchedMethod(resp http.ResponseWriter) { + status := http.StatusMethodNotAllowed + writeResponse(resp, "text/plain", status, []byte(fmt.Sprintf("%v method not allowed, supported: [POST]", status))) +} + +func handleUnmatchedContentType(resp http.ResponseWriter) { + status := http.StatusUnsupportedMediaType + writeResponse(resp, "text/plain", status, []byte(fmt.Sprintf("%v unsupported media type, supported: [%s, %s]", status, jsonContentType, pbContentType))) +} From beeac276a0b05317fe015954a8f48bb896648520 Mon Sep 17 00:00:00 2001 From: Mike Terhar Date: Tue, 17 Dec 2024 19:14:31 +0000 Subject: [PATCH 5/5] move encoder into internal package --- receiver/libhoneyreceiver/encoder.go | 125 ------------------ receiver/libhoneyreceiver/encoder/encoder.go | 125 ++++++++++++++++++ .../internal/libhoneyevent/libhoneyevent.go | 11 +- receiver/libhoneyreceiver/receiver.go | 15 ++- 4 files changed, 141 insertions(+), 135 deletions(-) delete mode 100644 receiver/libhoneyreceiver/encoder.go create mode 100644 receiver/libhoneyreceiver/encoder/encoder.go diff --git a/receiver/libhoneyreceiver/encoder.go b/receiver/libhoneyreceiver/encoder.go deleted file mode 100644 index 89223c26139d..000000000000 --- a/receiver/libhoneyreceiver/encoder.go +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package libhoneyreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver" - -import ( - "bytes" - - "github.com/gogo/protobuf/jsonpb" - "go.opentelemetry.io/collector/pdata/plog/plogotlp" - "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" - "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" - spb "google.golang.org/genproto/googleapis/rpc/status" -) - -const ( - pbContentType = "application/x-protobuf" - jsonContentType = "application/json" - msgpackContentType = "application/x-msgpack" -) - -var ( - jsEncoder = &jsonEncoder{} - jsonPbMarshaler = &jsonpb.Marshaler{} - mpEncoder = &msgpackEncoder{} -) - -type encoder interface { - unmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error) - unmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) - unmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error) - - marshalTracesResponse(ptraceotlp.ExportResponse) ([]byte, error) - marshalMetricsResponse(pmetricotlp.ExportResponse) ([]byte, error) - marshalLogsResponse(plogotlp.ExportResponse) ([]byte, error) - - marshalStatus(rsp *spb.Status) ([]byte, error) - - contentType() string -} - -type jsonEncoder struct{} - -func (jsonEncoder) unmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error) { - req := ptraceotlp.NewExportRequest() - err := req.UnmarshalJSON(buf) - return req, err -} - -func (jsonEncoder) unmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) { - req := pmetricotlp.NewExportRequest() - err := req.UnmarshalJSON(buf) - return req, err -} - -func (jsonEncoder) unmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error) { - req := plogotlp.NewExportRequest() - err := req.UnmarshalJSON(buf) - return req, err -} - -func (jsonEncoder) marshalTracesResponse(resp ptraceotlp.ExportResponse) ([]byte, error) { - return resp.MarshalJSON() -} - -func (jsonEncoder) marshalMetricsResponse(resp pmetricotlp.ExportResponse) ([]byte, error) { - return resp.MarshalJSON() -} - -func (jsonEncoder) marshalLogsResponse(resp plogotlp.ExportResponse) ([]byte, error) { - return resp.MarshalJSON() -} - -func (jsonEncoder) marshalStatus(resp *spb.Status) ([]byte, error) { - buf := new(bytes.Buffer) - err := jsonPbMarshaler.Marshal(buf, resp) - return buf.Bytes(), err -} - -func (jsonEncoder) contentType() string { - return jsonContentType -} - -// messagepack responses seem to work in JSON so leaving this alone for now. -type msgpackEncoder struct{} - -func (msgpackEncoder) unmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error) { - req := ptraceotlp.NewExportRequest() - err := req.UnmarshalJSON(buf) - return req, err -} - -func (msgpackEncoder) unmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) { - req := pmetricotlp.NewExportRequest() - err := req.UnmarshalJSON(buf) - return req, err -} - -func (msgpackEncoder) unmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error) { - req := plogotlp.NewExportRequest() - err := req.UnmarshalJSON(buf) - return req, err -} - -func (msgpackEncoder) marshalTracesResponse(resp ptraceotlp.ExportResponse) ([]byte, error) { - return resp.MarshalJSON() -} - -func (msgpackEncoder) marshalMetricsResponse(resp pmetricotlp.ExportResponse) ([]byte, error) { - return resp.MarshalJSON() -} - -func (msgpackEncoder) marshalLogsResponse(resp plogotlp.ExportResponse) ([]byte, error) { - return resp.MarshalJSON() -} - -func (msgpackEncoder) marshalStatus(resp *spb.Status) ([]byte, error) { - buf := new(bytes.Buffer) - err := jsonPbMarshaler.Marshal(buf, resp) - return buf.Bytes(), err -} - -func (msgpackEncoder) contentType() string { - return msgpackContentType -} diff --git a/receiver/libhoneyreceiver/encoder/encoder.go b/receiver/libhoneyreceiver/encoder/encoder.go new file mode 100644 index 000000000000..b0a998ef310c --- /dev/null +++ b/receiver/libhoneyreceiver/encoder/encoder.go @@ -0,0 +1,125 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package encoder // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/encoder" + +import ( + "bytes" + + "github.com/gogo/protobuf/jsonpb" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + spb "google.golang.org/genproto/googleapis/rpc/status" +) + +const ( + PbContentType = "application/x-protobuf" + JsonContentType = "application/json" + MsgpackContentType = "application/x-msgpack" +) + +var ( + JsEncoder = &JsonEncoder{} + JsonPbMarshaler = &jsonpb.Marshaler{} + MpEncoder = &msgpackEncoder{} +) + +type Encoder interface { + UnmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error) + UnmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) + UnmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error) + + MarshalTracesResponse(ptraceotlp.ExportResponse) ([]byte, error) + MarshalMetricsResponse(pmetricotlp.ExportResponse) ([]byte, error) + MarshalLogsResponse(plogotlp.ExportResponse) ([]byte, error) + + MarshalStatus(rsp *spb.Status) ([]byte, error) + + ContentType() string +} + +type JsonEncoder struct{} + +func (JsonEncoder) UnmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error) { + req := ptraceotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (JsonEncoder) UnmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) { + req := pmetricotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (JsonEncoder) UnmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error) { + req := plogotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (JsonEncoder) MarshalTracesResponse(resp ptraceotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (JsonEncoder) MarshalMetricsResponse(resp pmetricotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (JsonEncoder) MarshalLogsResponse(resp plogotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (JsonEncoder) MarshalStatus(resp *spb.Status) ([]byte, error) { + buf := new(bytes.Buffer) + err := JsonPbMarshaler.Marshal(buf, resp) + return buf.Bytes(), err +} + +func (JsonEncoder) ContentType() string { + return JsonContentType +} + +// messagepack responses seem to work in JSON so leaving this alone for now. +type msgpackEncoder struct{} + +func (msgpackEncoder) UnmarshalTracesRequest(buf []byte) (ptraceotlp.ExportRequest, error) { + req := ptraceotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (msgpackEncoder) UnmarshalMetricsRequest(buf []byte) (pmetricotlp.ExportRequest, error) { + req := pmetricotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (msgpackEncoder) UnmarshalLogsRequest(buf []byte) (plogotlp.ExportRequest, error) { + req := plogotlp.NewExportRequest() + err := req.UnmarshalJSON(buf) + return req, err +} + +func (msgpackEncoder) MarshalTracesResponse(resp ptraceotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (msgpackEncoder) MarshalMetricsResponse(resp pmetricotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (msgpackEncoder) MarshalLogsResponse(resp plogotlp.ExportResponse) ([]byte, error) { + return resp.MarshalJSON() +} + +func (msgpackEncoder) MarshalStatus(resp *spb.Status) ([]byte, error) { + buf := new(bytes.Buffer) + err := JsonPbMarshaler.Marshal(buf, resp) + return buf.Bytes(), err +} + +func (msgpackEncoder) ContentType() string { + return MsgpackContentType +} diff --git a/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go b/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go index 820fb47f6da2..5519e304f138 100644 --- a/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go +++ b/receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go @@ -47,6 +47,7 @@ type AttributesConfig struct { DurationFields []string `mapstructure:"durationFields"` } +// LibhoneyEvent is the event structure from libhoney type LibhoneyEvent struct { Samplerate int `json:"samplerate" msgpack:"samplerate"` MsgPackTimestamp *time.Time `msgpack:"time"` @@ -54,7 +55,7 @@ type LibhoneyEvent struct { Data map[string]any `json:"data" msgpack:"data"` } -// Overrides unmarshall to make sure the MsgPackTimestamp is set +// UnmarshalJSON overrides the unmarshall to make sure the MsgPackTimestamp is set func (l *LibhoneyEvent) UnmarshalJSON(j []byte) error { type _libhoneyEvent LibhoneyEvent tstr := eventtime.GetEventTimeDefaultString() @@ -80,23 +81,26 @@ func (l *LibhoneyEvent) UnmarshalJSON(j []byte) error { return nil } +// DebugString returns a string representation of the LibhoneyEvent func (l *LibhoneyEvent) DebugString() string { return fmt.Sprintf("%#v", l) } -// returns log until we add the trace parser +// SignalType returns the type of signal this event represents. Only log is implemented for now. func (l *LibhoneyEvent) SignalType() (string, error) { return "log", nil } +// GetService returns the service name from the event or the dataset name if no service name is found. func (l *LibhoneyEvent) GetService(fields FieldMapConfig, seen *ServiceHistory, dataset string) (string, error) { if serviceName, ok := l.Data[fields.Resources.ServiceName]; ok { - seen.NameCount[serviceName.(string)] += 1 + seen.NameCount[serviceName.(string)]++ return serviceName.(string), nil } return dataset, errors.New("no service.name found in event") } +// GetScope returns the scope key for the event. If the scope has not been seen before, it creates a new one. func (l *LibhoneyEvent) GetScope(fields FieldMapConfig, seen *ScopeHistory, serviceName string) (string, error) { if scopeLibraryName, ok := l.Data[fields.Scopes.LibraryName]; ok { scopeKey := serviceName + scopeLibraryName.(string) @@ -122,6 +126,7 @@ func (l *LibhoneyEvent) GetScope(fields FieldMapConfig, seen *ScopeHistory, serv return "libhoney.receiver", errors.New("library name not found") } +// SimpleScope is a simple struct to hold the scope data type SimpleScope struct { ServiceName string LibraryName string diff --git a/receiver/libhoneyreceiver/receiver.go b/receiver/libhoneyreceiver/receiver.go index 293c99b739b3..ba6f28690d44 100644 --- a/receiver/libhoneyreceiver/receiver.go +++ b/receiver/libhoneyreceiver/receiver.go @@ -25,6 +25,7 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/errorutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/encoder" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/parser" ) @@ -216,7 +217,7 @@ func (r *libhoneyReceiver) handleEvent(resp http.ResponseWriter, req *http.Reque r.settings.Logger.Debug("Decoding with msgpack worked", zap.Time("timestamp.first.msgpacktimestamp", *libhoneyevents[0].MsgPackTimestamp), zap.String("timestamp.first.time", libhoneyevents[0].Time)) r.settings.Logger.Debug("event zero", zap.String("event.data", libhoneyevents[0].DebugString())) } - case jsonContentType: + case encoder.JsonContentType: err = json.Unmarshal(body, &libhoneyevents) if err != nil { errorutil.HTTPError(resp, err) @@ -241,20 +242,20 @@ func (r *libhoneyReceiver) handleEvent(resp http.ResponseWriter, req *http.Reque } noErrors := []byte(`{"errors":[]}`) - writeResponse(resp, enc.contentType(), http.StatusAccepted, noErrors) + writeResponse(resp, enc.ContentType(), http.StatusAccepted, noErrors) } -func readContentType(resp http.ResponseWriter, req *http.Request) (encoder, bool) { +func readContentType(resp http.ResponseWriter, req *http.Request) (encoder.Encoder, bool) { if req.Method != http.MethodPost { handleUnmatchedMethod(resp) return nil, false } switch getMimeTypeFromContentType(req.Header.Get("Content-Type")) { - case jsonContentType: - return jsEncoder, true + case encoder.JsonContentType: + return encoder.JsEncoder, true case "application/x-msgpack", "application/msgpack": - return mpEncoder, true + return encoder.MpEncoder, true default: handleUnmatchedContentType(resp) return nil, false @@ -282,5 +283,5 @@ func handleUnmatchedMethod(resp http.ResponseWriter) { func handleUnmatchedContentType(resp http.ResponseWriter) { status := http.StatusUnsupportedMediaType - writeResponse(resp, "text/plain", status, []byte(fmt.Sprintf("%v unsupported media type, supported: [%s, %s]", status, jsonContentType, pbContentType))) + writeResponse(resp, "text/plain", status, []byte(fmt.Sprintf("%v unsupported media type, supported: [%s, %s]", status, encoder.JsonContentType, encoder.PbContentType))) }