From 98caab4c77d34a0328fac76b31ac0c7c26f76b0a Mon Sep 17 00:00:00 2001 From: maura fortino Date: Mon, 26 Aug 2024 15:17:13 -0400 Subject: [PATCH 1/2] added logic for kafka send and update funcs --- go.mod | 26 +++- go.sum | 45 ++++-- internal/sink/sink.go | 267 ++++++++++++++++++++++++++++++++++- internal/sink/sinkWrapper.go | 3 +- 4 files changed, 323 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index d6d21203..1755aa1d 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/xmidt-org/caduceus go 1.21 require ( + github.com/IBM/sarama v1.43.3 github.com/alecthomas/kong v0.8.1 github.com/go-chi/chi/v5 v5.0.12 github.com/goschtalt/goschtalt v0.25.0 @@ -37,6 +38,9 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-kit/kit v0.13.0 // indirect github.com/go-kit/log v0.2.1 // indirect @@ -46,12 +50,22 @@ require ( github.com/goccy/go-json v0.10.2 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.5.0 // indirect github.com/gorilla/mux v1.8.1 // indirect github.com/goschtalt/approx v1.0.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jtacoma/uritemplates v1.0.0 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/lestrrat-go/blackmagic v1.0.2 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect @@ -61,10 +75,12 @@ require ( github.com/lestrrat-go/option v1.0.1 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/openzipkin/zipkin-go v0.4.2 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/spf13/cast v1.6.0 // indirect github.com/stretchr/objx v0.5.2 // indirect @@ -84,10 +100,10 @@ require ( go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/dig v1.17.1 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.21.0 // indirect - golang.org/x/net v0.23.0 // indirect - golang.org/x/sys v0.18.0 // indirect - golang.org/x/text v0.14.0 // indirect + golang.org/x/crypto v0.26.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sys v0.23.0 // indirect + golang.org/x/text v0.17.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240108191215-35c7eff3a6b1 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1 // indirect google.golang.org/grpc v1.60.1 // indirect @@ -95,3 +111,5 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/xmidt-org/ancla => /Users/mforti446/Documents/work/xmidt-repos/ancla diff --git a/go.sum b/go.sum index 5d6c690f..8837fa55 100644 --- a/go.sum +++ b/go.sum @@ -613,6 +613,8 @@ github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3 github.com/GaryBoone/GoStats v0.0.0-20130122001700-1993eafbef57/go.mod h1:5zDl2HgTb/k5i9op9y6IUSiuVkZFpUrWGQbZc9tNR40= github.com/HdrHistogram/hdrhistogram-go v1.1.0/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= +github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA= +github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ= github.com/InVisionApp/go-health v2.1.0+incompatible/go.mod h1:/+Gv1o8JUsrjC6pi6MN6/CgKJo4OqZ6x77XAnImrzhg= github.com/InVisionApp/go-logger v1.0.1/go.mod h1:+cGTDSn+P8105aZkeOfIhdd7vFO5X1afUHcjvanY0L8= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= @@ -781,7 +783,12 @@ github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:Htrtb github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4= @@ -816,6 +823,7 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2 github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goblin v0.0.0-20210519012713-85d372ac71e2/go.mod h1:VzmDKDJVZI3aJmnRI9VjAn9nJ8qPPsN1fqzr9dqInIo= @@ -945,6 +953,7 @@ github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8l github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -1082,6 +1091,7 @@ github.com/hashicorp/consul/sdk v0.13.0/go.mod h1:0hs/l5fOVhJy/VdcoaNqUSi2AUs95e github.com/hashicorp/consul/sdk v0.13.1/go.mod h1:SW/mM4LbKfqmMvcFu8v+eiQQ7oitXEFeiBe9StxERb0= github.com/hashicorp/consul/sdk v0.14.1/go.mod h1:vFt03juSzocLRFo59NkeQHHmQa6+g7oU0pfzdI1mUhg= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-bexpr v0.1.2/go.mod h1:ANbpTX1oAql27TZkKVeW8p1w8NTdnyzPe/0qqPCKohU= github.com/hashicorp/go-checkpoint v0.0.0-20171009173528-1545e56e46de/go.mod h1:xIwEieBHERyEvaeKF/TcHh1Hu+lxPM+n2vT1+g9I4m4= @@ -1106,6 +1116,7 @@ github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iP github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-plugin v1.0.1/go.mod h1:++UyYGoz3o5w9ZzAdZxtQKrWWP+iqPBn3cQptSMzBuY= github.com/hashicorp/go-raftchunking v0.6.1/go.mod h1:cGlg3JtDy7qy6c/3Bu660Mic1JF+7lWqIwCFSb08fX0= @@ -1121,6 +1132,7 @@ github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdv github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-version v1.1.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= @@ -1176,11 +1188,18 @@ github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab/go.mod github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c h1:qSHzRbhzK8RdXOsAdfDgO49TtqC1oZ+acxPrkfTxcCs= github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/jarcoal/httpmock v0.0.0-20180424175123-9c70cfe4a1da/go.mod h1:ks+b9deReOc7jgqp+e7LuFiCBH6Rm5hL32cLcEAArb4= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= @@ -1219,6 +1238,8 @@ github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8 github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -1416,6 +1437,8 @@ github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1-0.20181008045315-2233dee583dc/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -1499,6 +1522,7 @@ github.com/psanford/memfs v0.0.0-20210214183328-a001468d78ef h1:NKxTG6GVGbfMXc2m github.com/psanford/memfs v0.0.0-20210214183328-a001468d78ef/go.mod h1:tcaRap0jS3eifrEEllL6ZMd9dg8IlDpi2S1oARrQ+NI= github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03/go.mod h1:gRAiPF5C5Nd0eyyRdqIu9qTiFSoZzpTq727b5B8fkkU= @@ -1636,8 +1660,6 @@ github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23n github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= github.com/xhit/go-str2duration v1.2.0/go.mod h1:3cPSlfZlUHVlneIVfePFWcJZsuwf+P1v2SRTV4cUmp4= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= -github.com/xmidt-org/ancla v0.3.13-0.20240813173230-82d231708d47 h1:H0Ek3dCIxuzz8zhVe6evdYrMR4xdrE8R/W7uQHahr1g= -github.com/xmidt-org/ancla v0.3.13-0.20240813173230-82d231708d47/go.mod h1:MRBmecVt4ECxqHU5FDzGNyBPeW4F2mWz6jQ3hpEIoA8= github.com/xmidt-org/argus v0.3.9/go.mod h1:mDFS44R704gl9Fif3gkfAyvnZa53SvMepmXjYWABPvk= github.com/xmidt-org/argus v0.3.10-0.20201105190057-402fede05764/go.mod h1:lnMCVB/i0gOlUOOd2WbzDDgzTEqP5TipzQ8xKIw+N/I= github.com/xmidt-org/argus v0.3.10-0.20201217204602-66f69b12c498/go.mod h1:lnMCVB/i0gOlUOOd2WbzDDgzTEqP5TipzQ8xKIw+N/I= @@ -1916,11 +1938,12 @@ golang.org/x/crypto v0.0.0-20221010152910-d6f0a8c073c2/go.mod h1:IxCIyHEi3zRg3s0 golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= -golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= -golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -2064,8 +2087,8 @@ golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= -golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= -golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/oauth2 v0.0.0-20170807180024-9a379c6b3e95/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -2118,6 +2141,8 @@ golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -2246,8 +2271,8 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= +golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20220526004731-065cf7ba2467/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -2278,8 +2303,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/internal/sink/sink.go b/internal/sink/sink.go index 7743fdeb..3edec35a 100644 --- a/internal/sink/sink.go +++ b/internal/sink/sink.go @@ -16,6 +16,7 @@ import ( "strings" "time" + "github.com/IBM/sarama" "github.com/xmidt-org/ancla" "github.com/xmidt-org/caduceus/internal/metrics" "github.com/xmidt-org/retry" @@ -40,16 +41,46 @@ type WebhookV1 struct { // clientMiddleware func(http.Client) http.Client } +type Kafkas []*Kafka +type Kafka struct { + id string + logger *zap.Logger + brokerAddr []string + topic string + config *sarama.Config +} + func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink { var sink Sink switch l := listener.(type) { case *ancla.RegistryV1: sink = &WebhookV1{ id: l.GetId(), - deliveryInterval: c.DeliveryInterval, - deliveryRetries: c.DeliveryRetries, + deliveryInterval: c.DeliveryInterval, //TODO: should we be using retry hints for this? + deliveryRetries: c.DeliveryRetries, //TODO: should we be using retry hints for this? logger: logger, } + return sink + case *ancla.RegistryV2: + var sinks Kafkas + for _, k := range l.Registration.Kafkas { + kafka := &Kafka{ + id: l.Registration.CanonicalName, + brokerAddr: k.BootstrapServers, + topic: "test", + } + + //TODO: this is basic set up for now - will need to add more options to config + //once we know what we are allowing users to send + config := sarama.NewConfig() + config.Producer.Return.Successes = true + config.Producer.RequiredAcks = sarama.WaitForAll + config.Producer.Retry.Max = c.DeliveryRetries + + kafka.config = config + sinks = append(sinks, kafka) + sink = sinks + } default: return nil } @@ -213,3 +244,235 @@ func (v1 *WebhookV1) onAttempt(request *http.Request, event string) retry.OnAtte } } + +func (k Kafkas) Update(l ancla.Register) error { + return nil +} + +// TODO: probably get rid of urls +func (k Kafkas) Send(urls *ring.Ring, secret string, acceptType string, msg *wrp.Message) error { + //TODO: is this how we want to set this up? + //or do we want to only send to specific kafkas in the list based on an id + for _, kafka := range k { + err := kafka.send(secret, acceptType, msg) + return err + } + return nil +} + +func (k *Kafka) send(secret string, acceptType string, msg *wrp.Message) error { + + defer func() { + if r := recover(); nil != r { + // s.droppedPanic.Add(1.0) + k.logger.Error("goroutine send() panicked", zap.String("id", k.id), zap.Any("panic", r)) + } + // s.workers.Release() + // s.currentWorkersGauge.Add(-1.0) + }() + + payload := msg.Payload + body := payload + + // Use the internal content type unless the accept type is wrp + contentType := msg.ContentType + switch acceptType { + case "wrp", wrp.MimeTypeMsgpack, wrp.MimeTypeWrp: + // WTS - We should pass the original, raw WRP event instead of + // re-encoding it. + contentType = wrp.MimeTypeMsgpack + //TODO: do we want to use the wrp encoder or the sarama encoder? + buffer := bytes.NewBuffer([]byte{}) + encoder := wrp.NewEncoder(buffer, wrp.Msgpack) + encoder.Encode(msg) + body = buffer.Bytes() + } + + // Create a new Kafka producer + producer, err := sarama.NewSyncProducer(k.brokerAddr, k.config) + if err != nil { + k.logger.Error("Could not create Kafka producer", zap.Error(err)) + return err + } + defer producer.Close() + + id, _ := wrp.ParseDeviceID(msg.Source) + var sig string + if secret != "" { + s := hmac.New(sha1.New, []byte(secret)) + s.Write(body) + sig = fmt.Sprintf("sha1=%s", hex.EncodeToString(s.Sum(nil))) + } + eventHeader := strings.TrimPrefix(msg.Destination, "event:") + + // Create a Kafka message + //TODO: add more header options + kafkaMsg := &sarama.ProducerMessage{ + Topic: k.topic, + Key: nil, + Value: sarama.ByteEncoder(msg.Payload), + Headers: []sarama.RecordHeader{ + { + Key: []byte("X-Webpa-Device-Id"), + Value: []byte(id), + }, + { + Key: []byte("X-Webpa-Device-Name"), + Value: []byte(id), + }, + { + Key: []byte("X-Webpa-Event"), + Value: []byte(eventHeader), + }, + { + Key: []byte("X-Webpa-Transaction-Id"), + Value: []byte(msg.TransactionUUID), + }, + { + Key: []byte("X-Webpa-Signature"), + Value: []byte(sig), + }, + { + Key: []byte("Content-Type"), + Value: []byte(contentType), + }, + }, + } + + // Send the message to Kafka + + partition, offset, err := producer.SendMessage(kafkaMsg) + if err != nil { + k.logger.Error("Failed to send message to Kafka", zap.Error(err)) + return err + } + + k.logger.Debug("Message sent to Kafka", + + zap.String("Topic", kafkaMsg.Topic), + zap.Int32("Partition", partition), + zap.Int64("Offset", offset), + ) + + return nil + +} + +func AddMessageHeaders(kafkaMsg *sarama.ProducerMessage, m *wrp.Message) { + kafkaMsg.Headers = append(kafkaMsg.Headers, sarama.RecordHeader{ + Key: []byte(wrphttp.MessageTypeHeader), + Value: []byte(m.Type.FriendlyName()), + }) + + if len(m.Source) > 0 { + kafkaMsg.Headers = append(kafkaMsg.Headers, sarama.RecordHeader{ + Key: []byte(wrphttp.SourceHeader), + Value: []byte(m.Source), + }) + } + + if len(m.Destination) > 0 { + kafkaMsg.Headers = append(kafkaMsg.Headers, sarama.RecordHeader{ + Key: []byte(wrphttp.DestinationHeader), + Value: []byte(m.Destination), + }) + } + + if len(m.TransactionUUID) > 0 { + kafkaMsg.Headers = append(kafkaMsg.Headers, sarama.RecordHeader{ + Key: []byte(wrphttp.TransactionUuidHeader), + Value: []byte(m.TransactionUUID), + }) + } + + if m.Status != nil { + kafkaMsg.Headers = append(kafkaMsg.Headers, sarama.RecordHeader{ + Key: []byte(wrphttp.StatusHeader), + Value: []byte(strconv.FormatInt(*m.Status, 10)), + }) + } + + if m.RequestDeliveryResponse != nil { + kafkaMsg.Headers = append(kafkaMsg.Headers, sarama.RecordHeader{ + Key: []byte(wrphttp.RequestDeliveryResponseHeader), + Value: []byte(strconv.FormatInt(*m.RequestDeliveryResponse, 10)), + }) + } + + // TODO Remove along with `IncludeSpans` + // nolint:staticcheck + if m.IncludeSpans != nil { + kafkaMsg.Headers = append(kafkaMsg.Headers, sarama.RecordHeader{ + Key: []byte(wrphttp.IncludeSpansHeader), + Value: []byte(strconv.FormatBool(*m.IncludeSpans)), + }) + } + + for _, s := range m.Spans { + kafkaMsg.Headers = append(kafkaMsg.Headers, sarama.RecordHeader{ + Key: []byte(wrphttp.SpanHeader), + Value: []byte(strings.Join(s, ",")), + }) + } + + if len(m.Accept) > 0 { + kafkaMsg.Headers = append(kafkaMsg.Headers, sarama.RecordHeader{ + Key: []byte(wrphttp.AcceptHeader), + Value: []byte(m.Accept), + }) + } + + if len(m.Path) > 0 { + kafkaMsg.Headers = append(kafkaMsg.Headers, sarama.RecordHeader{ + Key: []byte(wrphttp.PathHeader), + Value: []byte(m.Path), + }) + } + + // for k, v := range m.Metadata { + // // perform k + "=" + v more efficiently + // buf := bytes.Buffer{} + // buf.WriteString(k) + // buf.WriteString("=") + // buf.WriteString(v) + // kafkaMsg.Headers = append(kafkaMsg.Headers, sarama.RecordHeader{ + // Key: []byte(MetadataHeader), + // Value: []byte(buf.String()), + // }) + // } + + // var partnerIds []byte + // for _, v := range m.PartnerIDs { + // partnerIds = append(partnerIds, byte(v)) + // kafkaMsg.Headers = append(kafkaMsg.Headers, sarama.RecordHeader{ + // Key: []byte(PartnerIdHeader), + // Value: []byte(m.PartnerIDs), + // }) + // h.Add(PartnerIdHeader, v) + // } + + if len(m.SessionID) > 0 { + kafkaMsg.Headers = append(kafkaMsg.Headers, sarama.RecordHeader{ + Key: []byte(wrphttp.SessionIdHeader), + Value: []byte(m.SessionID), + }) + } + + // for _, v := range m.Headers { + // h.Add(HeadersHeader, v) + // } + + if len(m.ServiceName) > 0 { + kafkaMsg.Headers = append(kafkaMsg.Headers, sarama.RecordHeader{ + Key: []byte(wrphttp.ServiceNameHeader), + Value: []byte(m.ServiceName), + }) + } + + if len(m.URL) > 0 { + kafkaMsg.Headers = append(kafkaMsg.Headers, sarama.RecordHeader{ + Key: []byte(wrphttp.URLHeader), + Value: []byte(m.URL), + }) + } +} diff --git a/internal/sink/sinkWrapper.go b/internal/sink/sinkWrapper.go index 3d32f18b..0237ede9 100644 --- a/internal/sink/sinkWrapper.go +++ b/internal/sink/sinkWrapper.go @@ -177,8 +177,7 @@ func (w *wrapper) Update(list []ancla.Register) { continue } - fmt.Println(sender) - // sender.Update(inValue.Listener) //commenting out until argus/ancla fix + sender.Update(inValue.Listener) //commenting out until argus/ancla fix } } From cfd96023e26c6954c0087c3979a923e6fb226463 Mon Sep 17 00:00:00 2001 From: maura fortino Date: Tue, 27 Aug 2024 12:57:54 -0400 Subject: [PATCH 2/2] init MatcherV2 for Kafka --- internal/sink/matcher.go | 47 +++++++++++++++++++++++++++++++++++----- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/internal/sink/matcher.go b/internal/sink/matcher.go index 1544689c..f2f9eed3 100644 --- a/internal/sink/matcher.go +++ b/internal/sink/matcher.go @@ -16,6 +16,7 @@ import ( "github.com/xmidt-org/ancla" "github.com/xmidt-org/caduceus/internal/metrics" + "github.com/xmidt-org/webhook-schema" "github.com/xmidt-org/wrp-go/v3" "go.uber.org/zap" ) @@ -36,6 +37,7 @@ type Matcher interface { getUrls() *ring.Ring } +// MatcherV1 holds the matching information related to RegistryV1 type MatcherV1 struct { events []*regexp.Regexp matcher []*regexp.Regexp @@ -43,6 +45,15 @@ type MatcherV1 struct { CommonWebhook } +// MatcherV2 holds the matching information related to RegistryV2 +// TODO: will have to determine if we need a Matcher specifically for Kafka and another for the new webhook +// FOR NOW: leaving as one matcher +type MatcherV2 struct { + matcher []webhook.FieldRegex + urls *ring.Ring + CommonWebhook +} + type CommonWebhook struct { mutex sync.RWMutex logger *zap.Logger @@ -58,6 +69,13 @@ func NewMatcher(l ancla.Register, logger *zap.Logger) (Matcher, error) { return nil, err } return m, nil + case *ancla.RegistryV2: + m := &MatcherV2{} + m.logger = logger + if err := m.update(*v); err != nil { + return nil, err + } + return m, nil default: return nil, fmt.Errorf("invalid listener") } @@ -159,7 +177,7 @@ func (m1 *MatcherV1) IsMatch(msg *wrp.Message) bool { var ( matchEvent = false - matchDevice = false + matchDevice = true ) for _, eventRegex := range events { if eventRegex.MatchString(strings.TrimPrefix(msg.Destination, "event:")) { @@ -172,10 +190,13 @@ func (m1 *MatcherV1) IsMatch(msg *wrp.Message) bool { return false } - for _, deviceRegex := range matcher { - if deviceRegex.MatchString(msg.Source) || deviceRegex.MatchString(strings.TrimPrefix(msg.Destination, "event:")) { - matchDevice = true - break + if matcher != nil{ + matchDevice = false + for _, deviceRegex := range matcher { + if deviceRegex.MatchString(msg.Source) || deviceRegex.MatchString(strings.TrimPrefix(msg.Destination, "event:")) { + matchDevice = true + break + } } } @@ -194,3 +215,19 @@ func (m1 *MatcherV1) getUrls() (urls *ring.Ring) { m1.urls = m1.urls.Next() return } + +// TODO: need to add in logic for update +func (m2 *MatcherV2) update(l ancla.RegistryV2) error { + return nil +} + +// TODO: need to add in logic for IsMatch +func (m2 *MatcherV2) IsMatch(msg *wrp.Message) bool { + return true +} + +// TODO: getUrls will probably be removed from the Matcher Interface +//TODO: want to move the getUrls logic and the urls field to the sink instead of the matcher +func (m2 *MatcherV2) getUrls() (urls *ring.Ring) { + return +}