From e30e383de0a41bbe6a97fd73957969ab74828e86 Mon Sep 17 00:00:00 2001 From: Chaitanya Munukutla Date: Sun, 26 Sep 2021 13:53:40 +0530 Subject: [PATCH 1/4] Add redis provider Signed-off-by: Chaitanya Munukutla --- app/main.go | 8 +- go.mod | 19 +++-- go.sum | 124 +++++++++++++++++++++++++++-- lib/config/config.go | 26 ++++-- lib/logging/logger.go | 57 ------------- lib/mqtt/conn_handler.go | 2 +- lib/mqtt/mqtt_handler.go | 23 +++--- lib/mqtt/server_context.go | 50 ++++++++---- lib/mqtt/server_context_test.go | 9 ++- lib/persistence/badger_provider.go | 25 +----- lib/persistence/provider.go | 25 +++++- lib/persistence/redis_provider.go | 105 ++++++++++++++++++++++++ 12 files changed, 341 insertions(+), 132 deletions(-) create mode 100644 lib/persistence/redis_provider.go diff --git a/app/main.go b/app/main.go index d16c725..9bf3d4f 100644 --- a/app/main.go +++ b/app/main.go @@ -4,6 +4,7 @@ import ( "github.com/c16a/hermes/lib/config" "github.com/c16a/hermes/lib/mqtt" "github.com/c16a/hermes/lib/transports" + "go.uber.org/zap" "log" "os" ) @@ -12,12 +13,17 @@ func main() { configFilePath := os.Getenv("CONFIG_FILE_PATH") + logger, err := zap.NewProduction() + if err != nil { + log.Fatal(err) + } + serverConfig, err := config.ParseConfig(configFilePath) if err != nil { log.Fatal(err) } - ctx, err := mqtt.NewServerContext(serverConfig) + ctx, err := mqtt.NewServerContext(serverConfig, logger) if err != nil { log.Fatal(err) } diff --git a/go.mod b/go.mod index 5e16051..b907f43 100644 --- a/go.mod +++ b/go.mod @@ -6,24 +6,31 @@ require ( github.com/dgraph-io/badger/v2 v2.2007.4 github.com/eclipse/paho.golang v0.10.0 github.com/go-ldap/ldap/v3 v3.4.1 + github.com/go-redis/redis/v8 v8.11.3 github.com/gorilla/websocket v1.4.2 github.com/satori/go.uuid v1.2.0 github.com/sirupsen/logrus v1.8.1 + go.uber.org/zap v1.19.1 ) require ( github.com/Azure/go-ntlmssp v0.0.0-20200615164410-66371956d46c // indirect github.com/cespare/xxhash v1.1.0 // indirect + github.com/cespare/xxhash/v2 v2.1.1 // indirect github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de // indirect github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/go-asn1-ber/asn1-ber v1.5.1 // indirect - github.com/golang/protobuf v1.3.1 // indirect + github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.3 // indirect github.com/klauspost/compress v1.12.3 // indirect - github.com/pkg/errors v0.8.1 // indirect - golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9 // indirect - golang.org/x/net v0.0.0-20190620200207-3b0461eec859 // indirect - golang.org/x/sync v0.0.0-20201207232520-09787c993a3a // indirect - golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 // indirect + github.com/pkg/errors v0.9.1 // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.6.0 // indirect + golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect + golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect + golang.org/x/sys v0.0.0-20210510120138-977fb7262007 // indirect + google.golang.org/protobuf v1.26.0 // indirect ) diff --git a/go.sum b/go.sum index 6738a99..4a02e74 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,12 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= +github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= @@ -19,24 +23,45 @@ github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de h1:t0UHb5vdo github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eclipse/paho.golang v0.10.0 h1:oUGPjRwWcZQRgDD9wVDV7y7i7yBSxts3vcvcNJo8B4Q= github.com/eclipse/paho.golang v0.10.0/go.mod h1:rhrV37IEwauUyx8FHrvmXOKo+QRKng5ncoN1vJiJMcs= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/go-asn1-ber/asn1-ber v1.5.1 h1:pDbRAunXzIUXfx4CB2QJFv5IuPiuoW+sWvr/Us009o8= github.com/go-asn1-ber/asn1-ber v1.5.1/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0= github.com/go-ldap/ldap/v3 v3.4.1 h1:fU/0xli6HY02ocbMuozHAYsaHLcnkLjvho2r5a34BUU= github.com/go-ldap/ldap/v3 v3.4.1/go.mod h1:iYS1MdmrmceOJ1QOTnRXrIs7i3kloqtmGQjRvjKpyMg= -github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= +github.com/go-redis/redis/v8 v8.11.3 h1:GCjoYp8c+yQTJfc0n69iwSiHjvuAdruxl7elnZCxgt8= +github.com/go-redis/redis/v8 v8.11.3/go.mod h1:xNJ9xDG09FsIPwh3bWdk+0oDWHbtF9rPN0F/oD9XeKc= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/klauspost/compress v1.12.3 h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU= github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= @@ -48,9 +73,21 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= +github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= +github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.15.0 h1:WjP/FQ/sk43MRmnEcT+MlDw2TFvkrXlprrPST/IudjU= +github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= -github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= @@ -69,31 +106,102 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4= +go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI= +go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9 h1:vEg9joUBmeBcK9iSJftGNf3coIG4HqZElCPehJsfAYM= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= +golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4= +golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/lib/config/config.go b/lib/config/config.go index 52f2712..4e7cc51 100644 --- a/lib/config/config.go +++ b/lib/config/config.go @@ -7,12 +7,12 @@ type Config struct { // Server stores all server related configuration type Server struct { - Tls *Tls `json:"tls" yaml:"tls"` - TcpAddress string `json:"tcp,omitempty" yaml:"tcp,omitempty"` - HttpAddress string `json:"http,omitempty" yaml:"http,omitempty"` - MaxQos byte `json:"max_qos,omitempty" yaml:"max_qos,omitempty"` - Auth *Auth `json:"auth,omitempty" yaml:"auth,omitempty"` - Offline *Offline `json:"offline,omitempty"` + Tls *Tls `json:"tls" yaml:"tls"` + TcpAddress string `json:"tcp,omitempty" yaml:"tcp,omitempty"` + HttpAddress string `json:"http,omitempty" yaml:"http,omitempty"` + MaxQos byte `json:"max_qos,omitempty" yaml:"max_qos,omitempty"` + Auth *Auth `json:"auth,omitempty" yaml:"auth,omitempty"` + Persistence *Persistence `json:"persistence,omitempty" yaml:"persistence,omitempty"` } // Tls stores the TLS config for the server @@ -28,8 +28,20 @@ type Auth struct { LdapDn string `json:"ldap_dn,omitempty" yaml:"ldap_dn,omitempty"` } -type Offline struct { +type Badger struct { Path string `json:"path,omitempty"` MaxTableSize int64 `json:"max_table_size,omitempty"` NumTables int `json:"num_tables,omitempty"` } + +type Redis struct { + Url string `json:"url" yaml:"url"` + Password string `json:"password" yaml:"password"` +} + +type Persistence struct { + Type string `json:"type" yaml:"type"` + + Badger *Badger `json:"badger" yaml:"badger"` + Redis *Redis `json:"redis" yaml:"redis"` +} diff --git a/lib/logging/logger.go b/lib/logging/logger.go index 23aac73..0a2ee9b 100644 --- a/lib/logging/logger.go +++ b/lib/logging/logger.go @@ -1,7 +1,6 @@ package logging import ( - "github.com/eclipse/paho.golang/packets" log "github.com/sirupsen/logrus" "os" "sync" @@ -26,60 +25,4 @@ func createLogger() *log.Logger { return logger } -func LogControlPacket(packet *packets.ControlPacket) { - logger := GetLogger() - logger.WithFields(log.Fields{ - "packetID": packet.PacketID(), - "type": getPacketType(packet.Type), - }).Debug("Received packet") -} - -func LogOutgoingPacket(packetType byte) { - logger := GetLogger() - logger.WithFields(log.Fields{ - "type": getPacketType(packetType), - }).Debug("Writing packet") -} - -func LogCustom(msg string, level log.Level) { - logger := GetLogger() - logger.WithFields(log.Fields{ - }).Log(level, msg) -} - -func getPacketType(packetType byte) string { - switch packetType { - case packets.CONNECT: - return "CONNECT" - case packets.CONNACK: - return "CONNACK" - case packets.PUBLISH: - return "PUBLISH" - case packets.PUBACK: - return "PUBACK" - case packets.PUBREC: - return "PUBREC" - case packets.PUBREL: - return "PUBREL" - case packets.PUBCOMP: - return "PUBCOMP" - case packets.SUBSCRIBE: - return "SUBSCRIBE" - case packets.SUBACK: - return "SUBACK" - case packets.UNSUBSCRIBE: - return "UNSUBSCRIBE" - case packets.UNSUBACK: - return "UNSUBACK" - case packets.PINGREQ: - return "PINREQ" - case packets.PINGRESP: - return "PINGRESP" - case packets.DISCONNECT: - return "DISCONNECT" - case packets.AUTH: - return "AUTH" - } - return "" -} diff --git a/lib/mqtt/conn_handler.go b/lib/mqtt/conn_handler.go index 5dd0192..f5dc33d 100644 --- a/lib/mqtt/conn_handler.go +++ b/lib/mqtt/conn_handler.go @@ -3,7 +3,7 @@ package mqtt import "net" func HandleMqttConnection(conn net.Conn, ctx *ServerContext) { - handler := &MqttHandler{base: ctx} + handler := &MqttHandler{base: ctx, logger: ctx.logger} for true { handler.Handle(conn) diff --git a/lib/mqtt/mqtt_handler.go b/lib/mqtt/mqtt_handler.go index d52ffd7..fd4f1cf 100644 --- a/lib/mqtt/mqtt_handler.go +++ b/lib/mqtt/mqtt_handler.go @@ -2,15 +2,16 @@ package mqtt import ( "fmt" - "github.com/c16a/hermes/lib/logging" "github.com/eclipse/paho.golang/packets" "github.com/eclipse/paho.golang/paho" uuid "github.com/satori/go.uuid" + "go.uber.org/zap" "io" ) type MqttHandler struct { - base MqttBase + base MqttBase + logger *zap.Logger } func (handler *MqttHandler) Handle(readWriter io.ReadWriter) { @@ -18,7 +19,11 @@ func (handler *MqttHandler) Handle(readWriter io.ReadWriter) { if err != nil { return } - logging.LogControlPacket(cPacket) + + handler.logger.With( + zap.Uint16("packetID", cPacket.PacketID()), + zap.String("type", cPacket.PacketType()), + ).Info("Received packet") var packetHandler func(io.ReadWriter, *packets.ControlPacket, MqttBase) @@ -48,6 +53,11 @@ func (handler *MqttHandler) Handle(readWriter io.ReadWriter) { packetHandler(readWriter, cPacket, handler.base) + handler.logger.With( + zap.Uint16("packetID", cPacket.PacketID()), + zap.String("type", cPacket.PacketType()), + ).Info("Writing packet") + return } @@ -72,7 +82,6 @@ func handleConnect(readWriter io.ReadWriter, controlPacket *packets.ControlPacke }, } - logging.LogOutgoingPacket(packets.CONNACK) _, err := connAckPacket.WriteTo(readWriter) if err != nil { return @@ -96,7 +105,6 @@ func handlePingRequest(readWriter io.ReadWriter, controlPacket *packets.ControlP pingResponsePacket := packets.Pingresp{} - logging.LogOutgoingPacket(packets.PINGRESP) _, err := pingResponsePacket.WriteTo(readWriter) if err != nil { fmt.Println(err) @@ -134,7 +142,6 @@ func handlePubQoS1(readWriter io.ReadWriter, publishPacket *packets.Publish, bas PacketID: publishPacket.PacketID, } - logging.LogOutgoingPacket(packets.PUBACK) _, err := pubAck.WriteTo(readWriter) if err != nil { return @@ -153,7 +160,6 @@ func handlePubQos2(readWriter io.ReadWriter, publishPacket *packets.Publish, bas pubReceived.ReasonCode = packets.PubrecImplementationSpecificError } - logging.LogOutgoingPacket(packets.PUBREC) _, err = pubReceived.WriteTo(readWriter) if err != nil { return @@ -174,7 +180,6 @@ func handlePubRel(readWriter io.ReadWriter, controlPacket *packets.ControlPacket _ = base.FreePacketID(readWriter, pubRelPacket) - logging.LogOutgoingPacket(packets.PUBCOMP) _, err := pubComplete.WriteTo(readWriter) if err != nil { return @@ -192,7 +197,6 @@ func handleSubscribe(readWriter io.ReadWriter, controlPacket *packets.ControlPac Reasons: base.Subscribe(readWriter, subscribePacket), } - logging.LogOutgoingPacket(packets.SUBACK) _, err := subAck.WriteTo(readWriter) if err != nil { fmt.Println(err) @@ -211,7 +215,6 @@ func handleUnsubscribe(readWriter io.ReadWriter, controlPacket *packets.ControlP Reasons: base.Unsubscribe(readWriter, unsubscribePacket), } - logging.LogOutgoingPacket(packets.UNSUBACK) _, err := unsubAck.WriteTo(readWriter) if err != nil { fmt.Println(err) diff --git a/lib/mqtt/server_context.go b/lib/mqtt/server_context.go index 898bf5a..4bcbebe 100644 --- a/lib/mqtt/server_context.go +++ b/lib/mqtt/server_context.go @@ -5,11 +5,10 @@ import ( "fmt" "github.com/c16a/hermes/lib/auth" "github.com/c16a/hermes/lib/config" - "github.com/c16a/hermes/lib/logging" "github.com/c16a/hermes/lib/persistence" "github.com/c16a/hermes/lib/utils" "github.com/eclipse/paho.golang/packets" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" "io" "math/rand" "sync" @@ -23,27 +22,44 @@ type ServerContext struct { config *config.Config authProvider auth.AuthorisationProvider persistenceProvider persistence.Provider + + logger *zap.Logger } // NewServerContext creates a new server context. // // This should only be called once per cluster node. -func NewServerContext(config *config.Config) (*ServerContext, error) { - authProvider, err := auth.FetchProviderFromConfig(config) +func NewServerContext(c *config.Config, logger *zap.Logger) (*ServerContext, error) { + authProvider, err := auth.FetchProviderFromConfig(c) if err != nil { fmt.Println("auth provider setup failed:", err) } - persistenceProvider, err := persistence.NewBadgerProvider(config) - if err != nil { - fmt.Println("persistence provider setup failed:", err) + var providerSetupFn func(*config.Config) (persistence.Provider, error) + var persistenceProvider persistence.Provider + switch c.Server.Persistence.Type { + case "memory": + providerSetupFn = persistence.NewBadgerProvider + case "redis": + providerSetupFn = persistence.NewRedisProvider } + + if providerSetupFn == nil { + fmt.Println("persistence provider cannot be chosen") + } else { + persistenceProvider, err = providerSetupFn(c) + if err != nil { + fmt.Println("persistence provider setup failed:", err) + } + } + return &ServerContext{ mu: &sync.RWMutex{}, connectedClientsMap: make(map[string]*ConnectedClient, 0), - config: config, + config: c, authProvider: authProvider, persistenceProvider: persistenceProvider, + logger: logger, }, nil } @@ -54,10 +70,10 @@ func (ctx *ServerContext) AddClient(conn io.Writer, connect *packets.Connect) (c if authError := ctx.authProvider.Validate(connect.Username, string(connect.Password)); authError != nil { code = 135 sessionExists = false - logging.LogCustom("auth failed", log.ErrorLevel) + ctx.logger.Error("auth failed") return } - logging.LogCustom(fmt.Sprintf("auth succeeed for user: %s", connect.Username), log.DebugLevel) + ctx.logger.Debug(fmt.Sprintf("auth succeeed for user: %s", connect.Username)) } clientExists := ctx.checkForClient(connect.ClientID) @@ -65,14 +81,14 @@ func (ctx *ServerContext) AddClient(conn io.Writer, connect *packets.Connect) (c if clientExists { if clientRequestForFreshSession { // If client asks for fresh session, delete existing ones - logging.LogCustom(fmt.Sprintf("Removing old connection for clientID: %s", connect.ClientID), log.DebugLevel) + ctx.logger.Debug(fmt.Sprintf("Removing old connection for clientID: %s", connect.ClientID)) delete(ctx.connectedClientsMap, connect.ClientID) ctx.doAddClient(conn, connect) } else { - logging.LogCustom(fmt.Sprintf("Updating clientID: %s with new connection", connect.ClientID), log.DebugLevel) + ctx.logger.Debug(fmt.Sprintf("Updating clientID: %s with new connection", connect.ClientID)) ctx.doUpdateClient(connect.ClientID, conn) if ctx.persistenceProvider != nil { - logging.LogCustom(fmt.Sprintf("Fetching missed messages for clientID: %s", connect.ClientID), log.DebugLevel) + ctx.logger.Debug(fmt.Sprintf("Fetching missed messages for clientID: %s", connect.ClientID)) _ = ctx.sendMissedMessages(connect.ClientID, conn) } } @@ -97,10 +113,10 @@ func (ctx *ServerContext) Disconnect(conn io.Writer, disconnect *packets.Disconn } if shouldDelete { - logging.LogCustom(fmt.Sprintf("Deleting connection for clientID: %s", clientIdToRemove), log.DebugLevel) + ctx.logger.Debug(fmt.Sprintf("Deleting connection for clientID: %s", clientIdToRemove)) delete(ctx.connectedClientsMap, clientIdToRemove) } else { - logging.LogCustom(fmt.Sprintf("Marking connection as disconnected for clientID: %s", clientIdToRemove), log.DebugLevel) + ctx.logger.Debug(fmt.Sprintf("Marking connection as disconnected for clientID: %s", clientIdToRemove)) ctx.mu.Lock() ctx.connectedClientsMap[clientIdToRemove].IsConnected = false ctx.mu.Unlock() @@ -119,7 +135,7 @@ func (ctx *ServerContext) Publish(publish *packets.Publish) { // non-shared subscriptions if !client.IsConnected && !client.IsClean && ctx.persistenceProvider != nil { // save for offline usage - logging.LogCustom(fmt.Sprintf("Saving offline delivery message for clientID: %s", client.ClientID), log.DebugLevel) + ctx.logger.Debug(fmt.Sprintf("Saving offline delivery message for clientID: %s", client.ClientID)) ctx.persistenceProvider.SaveForOfflineDelivery(client.ClientID, publish) } if client.IsConnected { @@ -271,7 +287,7 @@ func (ctx *ServerContext) doAddClient(conn io.Writer, connect *packets.Connect) Subscriptions: make(map[string]packets.SubOptions, 0), } - logging.LogCustom(fmt.Sprintf("Creating new connection for clientID: %s", connect.ClientID), log.DebugLevel) + ctx.logger.Debug(fmt.Sprintf("Creating new connection for clientID: %s", connect.ClientID)) ctx.mu.Lock() ctx.connectedClientsMap[connect.ClientID] = newClient ctx.mu.Unlock() diff --git a/lib/mqtt/server_context_test.go b/lib/mqtt/server_context_test.go index 89a7fff..820560d 100644 --- a/lib/mqtt/server_context_test.go +++ b/lib/mqtt/server_context_test.go @@ -6,6 +6,7 @@ import ( "github.com/c16a/hermes/lib/config" "github.com/c16a/hermes/lib/persistence" "github.com/eclipse/paho.golang/packets" + "go.uber.org/zap" "io" "io/ioutil" "reflect" @@ -25,9 +26,10 @@ func TestNewServerContext(t *testing.T) { }{ // TODO: Add test cases. } + logger := zap.NewNop() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := NewServerContext(tt.args.config) + got, err := NewServerContext(tt.args.config, logger) if (err != nil) != tt.wantErr { t.Errorf("NewServerContext() error = %v, wantErr %v", err, tt.wantErr) return @@ -203,6 +205,7 @@ func TestServerContext_AddClient(t *testing.T) { config: tt.fields.config, authProvider: tt.fields.authProvider, persistenceProvider: tt.fields.persistenceProvider, + logger: zap.NewNop(), } gotCode, gotSessionExists, gotMaxQos := ctx.AddClient(tt.args.conn, tt.args.connect) if gotCode != tt.wantCode { @@ -284,6 +287,7 @@ func TestServerContext_Disconnect(t *testing.T) { config: tt.fields.config, authProvider: tt.fields.authProvider, persistenceProvider: tt.fields.persistenceProvider, + logger: zap.NewNop(), } ctx.Disconnect(tt.args.conn, tt.args.disconnect) }) @@ -392,6 +396,7 @@ func TestServerContext_Publish(t *testing.T) { config: tt.fields.config, authProvider: tt.fields.authProvider, persistenceProvider: tt.fields.persistenceProvider, + logger: zap.NewNop(), } ctx.Publish(tt.args.publish) }) @@ -557,6 +562,7 @@ func TestServerContext_Subscribe(t *testing.T) { config: tt.fields.config, authProvider: tt.fields.authProvider, persistenceProvider: tt.fields.persistenceProvider, + logger: zap.NewNop(), } if got := ctx.Subscribe(tt.args.conn, tt.args.subscribe); !reflect.DeepEqual(got, tt.want) { t.Errorf("Subscribe() = %v, want %v", got, tt.want) @@ -627,6 +633,7 @@ func TestServerContext_Unsubscribe(t *testing.T) { config: tt.fields.config, authProvider: tt.fields.authProvider, persistenceProvider: tt.fields.persistenceProvider, + logger: zap.NewNop(), } if got := ctx.Unsubscribe(tt.args.conn, tt.args.unsubscribe); !reflect.DeepEqual(got, tt.want) { t.Errorf("Unsubscribe() = %v, want %v", got, tt.want) diff --git a/lib/persistence/badger_provider.go b/lib/persistence/badger_provider.go index 9f97f4c..d9a3260 100644 --- a/lib/persistence/badger_provider.go +++ b/lib/persistence/badger_provider.go @@ -1,8 +1,6 @@ package persistence import ( - "bytes" - "encoding/gob" "errors" "fmt" "github.com/c16a/hermes/lib/config" @@ -21,7 +19,7 @@ type BadgerProvider struct { db *badger.DB } -func NewBadgerProvider(config *config.Config) (*BadgerProvider, error) { +func NewBadgerProvider(config *config.Config) (Provider, error) { db, err := openDB(config) if err != nil { return nil, err @@ -30,7 +28,7 @@ func NewBadgerProvider(config *config.Config) (*BadgerProvider, error) { } func openDB(config *config.Config) (*badger.DB, error) { - offlineConfig := config.Server.Offline + offlineConfig := config.Server.Persistence.Badger var opts badger.Options if offlineConfig == nil { @@ -137,22 +135,3 @@ func (b *BadgerProvider) CheckForPacketIdReuse(clientID string, packetID uint16) }) return reuseFlag, err } - -func getBytes(bundle interface{}) ([]byte, error) { - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - err := enc.Encode(bundle) - if err != nil { - return nil, err - } - return buf.Bytes(), nil -} - -func getPublishPacket(src []byte) (*packets.Publish, error) { - buf := bytes.NewBuffer(src) - decoder := gob.NewDecoder(buf) - - var publish packets.Publish - err := decoder.Decode(&publish) - return &publish, err -} diff --git a/lib/persistence/provider.go b/lib/persistence/provider.go index 5d63143..f01663b 100644 --- a/lib/persistence/provider.go +++ b/lib/persistence/provider.go @@ -1,6 +1,10 @@ package persistence -import "github.com/eclipse/paho.golang/packets" +import ( + "bytes" + "encoding/gob" + "github.com/eclipse/paho.golang/packets" +) type Provider interface { SaveForOfflineDelivery(clientId string, publish *packets.Publish) error @@ -10,3 +14,22 @@ type Provider interface { FreePacketID(clientID string, packetID uint16) error CheckForPacketIdReuse(clientID string, packetID uint16) (bool, error) } + +func getBytes(bundle interface{}) ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(bundle) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func getPublishPacket(src []byte) (*packets.Publish, error) { + buf := bytes.NewBuffer(src) + decoder := gob.NewDecoder(buf) + + var publish packets.Publish + err := decoder.Decode(&publish) + return &publish, err +} \ No newline at end of file diff --git a/lib/persistence/redis_provider.go b/lib/persistence/redis_provider.go new file mode 100644 index 0000000..907274f --- /dev/null +++ b/lib/persistence/redis_provider.go @@ -0,0 +1,105 @@ +package persistence + +import ( + "context" + "errors" + "fmt" + "github.com/c16a/hermes/lib/config" + "github.com/eclipse/paho.golang/packets" + "github.com/go-redis/redis/v8" + "time" +) + +type RedisProvider struct { + client *redis.Client +} + +func NewRedisProvider(config *config.Config) (Provider, error) { + offlineConfig := config.Server.Persistence.Redis + + rdb := redis.NewClient(&redis.Options{ + Addr: offlineConfig.Url, + Password: offlineConfig.Password, + DB: 0, + }) + + err := rdb.Echo(context.Background(), "HELLO").Err() + if err != nil { + fmt.Println(err) + return nil, err + } else { + fmt.Println("Connected to redis persistence provider") + } + return &RedisProvider{client: rdb}, nil +} + +func (r *RedisProvider) SaveForOfflineDelivery(clientId string, publish *packets.Publish) error { + _, err := r.client.TxPipelined(context.Background(), func(pipeliner redis.Pipeliner) error { + key := fmt.Sprintf("urn:messages:%s", clientId) + pipeliner.LPush(context.Background(), key, publish) + + // Set expiry + if publish.Properties != nil && publish.Properties.MessageExpiry != nil { + pipeliner.Expire(context.Background(), key, time.Duration(int(*publish.Properties.MessageExpiry))*time.Second) + } + return nil + }) + return err +} + +func (r *RedisProvider) GetMissedMessages(clientId string) ([]*packets.Publish, error) { + publishPackets := make([]*packets.Publish, 0) + _, err := r.client.TxPipelined(context.Background(), func(pipeliner redis.Pipeliner) error { + key := fmt.Sprintf("urn:messages:%s", clientId) + payloads, err := pipeliner.LRange(context.Background(), key, 0, -1).Result() + if err != nil { + return err + } + for _, payload := range payloads { + payloadBytes := []byte(payload) + publishPacket, err := getPublishPacket(payloadBytes) + if err != nil { + continue + } + publishPackets = append(publishPackets, publishPacket) + } + return nil + }) + return publishPackets, err +} + +func (r *RedisProvider) ReservePacketID(clientID string, packetID uint16) error { + _, err := r.client.TxPipelined(context.Background(), func(pipeliner redis.Pipeliner) error { + key := fmt.Sprintf("urn:packets:%s:%d", clientID, packetID) + pipeliner.Set(context.Background(), key, PacketReserved, 24*time.Hour) + return nil + }) + return err +} + +func (r *RedisProvider) FreePacketID(clientID string, packetID uint16) error { + _, err := r.client.TxPipelined(context.Background(), func(pipeliner redis.Pipeliner) error { + key := fmt.Sprintf("urn:packets:%s:%d", clientID, packetID) + pipeliner.Del(context.Background(), key) + return nil + }) + return err +} + +func (r *RedisProvider) CheckForPacketIdReuse(clientID string, packetID uint16) (bool, error) { + reuseFlag := false + _, err := r.client.TxPipelined(context.Background(), func(pipeliner redis.Pipeliner) error { + key := fmt.Sprintf("urn:packets:%s:%d", clientID, packetID) + resBytes, err := pipeliner.Get(context.Background(), key).Bytes() + if err != nil { + return err + } + if resBytes[0] == PacketReserved { + reuseFlag = true + } else { + return errors.New("some weird error") + } + return nil + }) + return reuseFlag, err +} From f4d0e0d08f772c309c7217b89cdb39847f03066b Mon Sep 17 00:00:00 2001 From: Chaitanya Munukutla Date: Sun, 26 Sep 2021 14:19:29 +0530 Subject: [PATCH 2/4] Replace logrus with zap Signed-off-by: Chaitanya Munukutla --- app/main.go | 4 +- go.mod | 5 +- go.sum | 9 +--- lib/logging/logger.go | 28 ---------- lib/mqtt/mqtt_handler.go | 84 ++++++++++++++---------------- lib/mqtt/server_context.go | 10 ++-- lib/persistence/badger_provider.go | 3 +- lib/persistence/redis_provider.go | 7 +-- lib/transports/tcp_server.go | 6 ++- lib/transports/ws_server.go | 5 +- 10 files changed, 62 insertions(+), 99 deletions(-) delete mode 100644 lib/logging/logger.go diff --git a/app/main.go b/app/main.go index 9bf3d4f..7a0e987 100644 --- a/app/main.go +++ b/app/main.go @@ -28,6 +28,6 @@ func main() { log.Fatal(err) } - go transports.StartWebSocketServer(serverConfig, ctx) - transports.StartTcpServer(serverConfig, ctx) + go transports.StartWebSocketServer(serverConfig, ctx, logger) + transports.StartTcpServer(serverConfig, ctx, logger) } diff --git a/go.mod b/go.mod index b907f43..923b8ed 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/go-redis/redis/v8 v8.11.3 github.com/gorilla/websocket v1.4.2 github.com/satori/go.uuid v1.2.0 - github.com/sirupsen/logrus v1.8.1 go.uber.org/zap v1.19.1 ) @@ -25,11 +24,11 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.3 // indirect github.com/klauspost/compress v1.12.3 // indirect - github.com/pkg/errors v0.9.1 // indirect + github.com/pkg/errors v0.8.1 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect - golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect + golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20210510120138-977fb7262007 // indirect google.golang.org/protobuf v1.26.0 // indirect diff --git a/go.sum b/go.sum index 4a02e74..5040b9c 100644 --- a/go.sum +++ b/go.sum @@ -85,16 +85,13 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y github.com/onsi/gomega v1.15.0 h1:WjP/FQ/sk43MRmnEcT+MlDw2TFvkrXlprrPST/IudjU= github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= -github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= -github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= @@ -139,9 +136,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= -golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q= -golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -155,7 +151,6 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/lib/logging/logger.go b/lib/logging/logger.go deleted file mode 100644 index 0a2ee9b..0000000 --- a/lib/logging/logger.go +++ /dev/null @@ -1,28 +0,0 @@ -package logging - -import ( - log "github.com/sirupsen/logrus" - "os" - "sync" -) - -var logOnce sync.Once - -var loggerInstance *log.Logger - -func GetLogger() *log.Logger { - logOnce.Do(func() { - loggerInstance = createLogger() - }) - return loggerInstance -} - -func createLogger() *log.Logger { - var logger = log.New() - logger.Out = os.Stdout - logger.Level = log.DebugLevel - logger.Formatter = &log.JSONFormatter{} - return logger -} - - diff --git a/lib/mqtt/mqtt_handler.go b/lib/mqtt/mqtt_handler.go index fd4f1cf..6cfbcb4 100644 --- a/lib/mqtt/mqtt_handler.go +++ b/lib/mqtt/mqtt_handler.go @@ -1,7 +1,7 @@ package mqtt import ( - "fmt" + "errors" "github.com/eclipse/paho.golang/packets" "github.com/eclipse/paho.golang/paho" uuid "github.com/satori/go.uuid" @@ -25,7 +25,7 @@ func (handler *MqttHandler) Handle(readWriter io.ReadWriter) { zap.String("type", cPacket.PacketType()), ).Info("Received packet") - var packetHandler func(io.ReadWriter, *packets.ControlPacket, MqttBase) + var packetHandler func(io.ReadWriter, *packets.ControlPacket, MqttBase) error switch cPacket.Type { case packets.CONNECT: @@ -51,7 +51,10 @@ func (handler *MqttHandler) Handle(readWriter io.ReadWriter) { return } - packetHandler(readWriter, cPacket, handler.base) + err = packetHandler(readWriter, cPacket, handler.base) + if err != nil { + handler.logger.Error("error handling packet", zap.Error(err)) + } handler.logger.With( zap.Uint16("packetID", cPacket.PacketID()), @@ -61,10 +64,10 @@ func (handler *MqttHandler) Handle(readWriter io.ReadWriter) { return } -func handleConnect(readWriter io.ReadWriter, controlPacket *packets.ControlPacket, base MqttBase) { +func handleConnect(readWriter io.ReadWriter, controlPacket *packets.ControlPacket, base MqttBase) error { connectPacket, ok := controlPacket.Content.(*packets.Connect) if !ok { - return + return errors.New("invalid packet") } if len(connectPacket.ClientID) == 0 { @@ -83,60 +86,55 @@ func handleConnect(readWriter io.ReadWriter, controlPacket *packets.ControlPacke } _, err := connAckPacket.WriteTo(readWriter) - if err != nil { - return - } + return err } -func handleDisconnect(readWriter io.ReadWriter, controlPacket *packets.ControlPacket, base MqttBase) { +func handleDisconnect(readWriter io.ReadWriter, controlPacket *packets.ControlPacket, base MqttBase) error { disconnectPacket, ok := controlPacket.Content.(*packets.Disconnect) if !ok { - return + return errors.New("invalid packet") } base.Disconnect(readWriter, disconnectPacket) + return nil } -func handlePingRequest(readWriter io.ReadWriter, controlPacket *packets.ControlPacket, base MqttBase) { +func handlePingRequest(readWriter io.ReadWriter, controlPacket *packets.ControlPacket, base MqttBase) error { _, ok := controlPacket.Content.(*packets.Pingreq) if !ok { - return + return errors.New("invalid packet") } pingResponsePacket := packets.Pingresp{} _, err := pingResponsePacket.WriteTo(readWriter) - if err != nil { - fmt.Println(err) - return - } + return err } -func handlePublish(readWriter io.ReadWriter, controlPacket *packets.ControlPacket, base MqttBase) { +func handlePublish(readWriter io.ReadWriter, controlPacket *packets.ControlPacket, base MqttBase) error { publishPacket, ok := controlPacket.Content.(*packets.Publish) if !ok { - return + return errors.New("invalid packet") } switch publishPacket.QoS { case 0: - handlePubQos0(publishPacket, base) - break + return handlePubQos0(publishPacket, base) case 1: - handlePubQoS1(readWriter, publishPacket, base) - break + return handlePubQoS1(readWriter, publishPacket, base) case 2: - handlePubQos2(readWriter, publishPacket, base) + return handlePubQos2(readWriter, publishPacket, base) } - return + return nil } -func handlePubQos0(publishPacket *packets.Publish, base MqttBase) { +func handlePubQos0(publishPacket *packets.Publish, base MqttBase) error { base.Publish(publishPacket) + return nil } -func handlePubQoS1(readWriter io.ReadWriter, publishPacket *packets.Publish, base MqttBase) { +func handlePubQoS1(readWriter io.ReadWriter, publishPacket *packets.Publish, base MqttBase) error { pubAck := packets.Puback{ ReasonCode: packets.PubackSuccess, PacketID: publishPacket.PacketID, @@ -144,12 +142,13 @@ func handlePubQoS1(readWriter io.ReadWriter, publishPacket *packets.Publish, bas _, err := pubAck.WriteTo(readWriter) if err != nil { - return + return err } base.Publish(publishPacket) + return nil } -func handlePubQos2(readWriter io.ReadWriter, publishPacket *packets.Publish, base MqttBase) { +func handlePubQos2(readWriter io.ReadWriter, publishPacket *packets.Publish, base MqttBase) error { pubReceived := packets.Pubrec{ ReasonCode: packets.PubrecSuccess, PacketID: publishPacket.PacketID, @@ -162,15 +161,16 @@ func handlePubQos2(readWriter io.ReadWriter, publishPacket *packets.Publish, bas _, err = pubReceived.WriteTo(readWriter) if err != nil { - return + return err } base.Publish(publishPacket) + return nil } -func handlePubRel(readWriter io.ReadWriter, controlPacket *packets.ControlPacket, base MqttBase) { +func handlePubRel(readWriter io.ReadWriter, controlPacket *packets.ControlPacket, base MqttBase) error { pubRelPacket, ok := controlPacket.Content.(*packets.Pubrel) if !ok { - return + return errors.New("invalid packet") } pubComplete := packets.Pubcomp{ @@ -181,15 +181,13 @@ func handlePubRel(readWriter io.ReadWriter, controlPacket *packets.ControlPacket _ = base.FreePacketID(readWriter, pubRelPacket) _, err := pubComplete.WriteTo(readWriter) - if err != nil { - return - } + return err } -func handleSubscribe(readWriter io.ReadWriter, controlPacket *packets.ControlPacket, base MqttBase) { +func handleSubscribe(readWriter io.ReadWriter, controlPacket *packets.ControlPacket, base MqttBase) error { subscribePacket, ok := controlPacket.Content.(*packets.Subscribe) if !ok { - return + return errors.New("invalid packet") } subAck := packets.Suback{ @@ -198,16 +196,13 @@ func handleSubscribe(readWriter io.ReadWriter, controlPacket *packets.ControlPac } _, err := subAck.WriteTo(readWriter) - if err != nil { - fmt.Println(err) - return - } + return err } -func handleUnsubscribe(readWriter io.ReadWriter, controlPacket *packets.ControlPacket, base MqttBase) { +func handleUnsubscribe(readWriter io.ReadWriter, controlPacket *packets.ControlPacket, base MqttBase) error { unsubscribePacket, ok := controlPacket.Content.(*packets.Unsubscribe) if !ok { - return + return errors.New("invalid packet") } unsubAck := packets.Unsuback{ @@ -216,8 +211,5 @@ func handleUnsubscribe(readWriter io.ReadWriter, controlPacket *packets.ControlP } _, err := unsubAck.WriteTo(readWriter) - if err != nil { - fmt.Println(err) - return - } + return err } diff --git a/lib/mqtt/server_context.go b/lib/mqtt/server_context.go index 4bcbebe..5479ae6 100644 --- a/lib/mqtt/server_context.go +++ b/lib/mqtt/server_context.go @@ -32,10 +32,10 @@ type ServerContext struct { func NewServerContext(c *config.Config, logger *zap.Logger) (*ServerContext, error) { authProvider, err := auth.FetchProviderFromConfig(c) if err != nil { - fmt.Println("auth provider setup failed:", err) + logger.Error("auth provider setup failed", zap.Error(err)) } - var providerSetupFn func(*config.Config) (persistence.Provider, error) + var providerSetupFn func(*config.Config, *zap.Logger) (persistence.Provider, error) var persistenceProvider persistence.Provider switch c.Server.Persistence.Type { case "memory": @@ -45,11 +45,11 @@ func NewServerContext(c *config.Config, logger *zap.Logger) (*ServerContext, err } if providerSetupFn == nil { - fmt.Println("persistence provider cannot be chosen") + logger.Error("persistence provider cannot be chosen") } else { - persistenceProvider, err = providerSetupFn(c) + persistenceProvider, err = providerSetupFn(c, logger) if err != nil { - fmt.Println("persistence provider setup failed:", err) + logger.Error("persistence provider setup failed", zap.Error(err)) } } diff --git a/lib/persistence/badger_provider.go b/lib/persistence/badger_provider.go index d9a3260..713e520 100644 --- a/lib/persistence/badger_provider.go +++ b/lib/persistence/badger_provider.go @@ -8,6 +8,7 @@ import ( "github.com/dgraph-io/badger/v2/options" "github.com/eclipse/paho.golang/packets" uuid "github.com/satori/go.uuid" + "go.uber.org/zap" "time" ) @@ -19,7 +20,7 @@ type BadgerProvider struct { db *badger.DB } -func NewBadgerProvider(config *config.Config) (Provider, error) { +func NewBadgerProvider(config *config.Config, logger *zap.Logger) (Provider, error) { db, err := openDB(config) if err != nil { return nil, err diff --git a/lib/persistence/redis_provider.go b/lib/persistence/redis_provider.go index 907274f..0edc47b 100644 --- a/lib/persistence/redis_provider.go +++ b/lib/persistence/redis_provider.go @@ -7,6 +7,7 @@ import ( "github.com/c16a/hermes/lib/config" "github.com/eclipse/paho.golang/packets" "github.com/go-redis/redis/v8" + "go.uber.org/zap" "time" ) @@ -14,7 +15,7 @@ type RedisProvider struct { client *redis.Client } -func NewRedisProvider(config *config.Config) (Provider, error) { +func NewRedisProvider(config *config.Config, logger *zap.Logger) (Provider, error) { offlineConfig := config.Server.Persistence.Redis rdb := redis.NewClient(&redis.Options{ @@ -25,10 +26,10 @@ func NewRedisProvider(config *config.Config) (Provider, error) { err := rdb.Echo(context.Background(), "HELLO").Err() if err != nil { - fmt.Println(err) + logger.Error("Could not connect to redis persistence provider", zap.Error(err)) return nil, err } else { - fmt.Println("Connected to redis persistence provider") + logger.Info("Connected to redis persistence provider") } return &RedisProvider{client: rdb}, nil } diff --git a/lib/transports/tcp_server.go b/lib/transports/tcp_server.go index 62392b2..bad63b0 100644 --- a/lib/transports/tcp_server.go +++ b/lib/transports/tcp_server.go @@ -5,10 +5,11 @@ import ( "fmt" "github.com/c16a/hermes/lib/config" "github.com/c16a/hermes/lib/mqtt" + "go.uber.org/zap" "net" ) -func StartTcpServer(serverConfig *config.Config, ctx *mqtt.ServerContext) { +func StartTcpServer(serverConfig *config.Config, ctx *mqtt.ServerContext, logger *zap.Logger) { var listener net.Listener var listenerErr error @@ -36,7 +37,8 @@ func StartTcpServer(serverConfig *config.Config, ctx *mqtt.ServerContext) { } defer listener.Close() - fmt.Printf("Starting TCP server on %s\n", tcpAddress) + logger.Info(fmt.Sprintf("Starting TCP server on %s", tcpAddress)) + for { conn, err := listener.Accept() if err != nil { diff --git a/lib/transports/ws_server.go b/lib/transports/ws_server.go index a94a07c..4a1e261 100644 --- a/lib/transports/ws_server.go +++ b/lib/transports/ws_server.go @@ -5,11 +5,12 @@ import ( "github.com/c16a/hermes/lib/config" "github.com/c16a/hermes/lib/mqtt" "github.com/gorilla/websocket" + "go.uber.org/zap" "log" "net/http" ) -func StartWebSocketServer(serverConfig *config.Config, ctx *mqtt.ServerContext) { +func StartWebSocketServer(serverConfig *config.Config, ctx *mqtt.ServerContext, logger *zap.Logger) { upgrader := websocket.Upgrader{} httpAddr := serverConfig.Server.HttpAddress @@ -24,6 +25,6 @@ func StartWebSocketServer(serverConfig *config.Config, ctx *mqtt.ServerContext) go mqtt.HandleMqttConnection(c.UnderlyingConn(), ctx) }) - fmt.Printf("Starting Websocket server on %s\n", httpAddr) + logger.Info(fmt.Sprintf("Starting Websocket server on %s", httpAddr)) log.Fatal(http.ListenAndServe(httpAddr, nil)) } From 88ed966e7a0235fe0d686dcb34495300be83bb0e Mon Sep 17 00:00:00 2001 From: Chaitanya Munukutla Date: Sun, 26 Sep 2021 14:29:37 +0530 Subject: [PATCH 3/4] Add multi-platform docker push on PR merge Signed-off-by: Chaitanya Munukutla --- .github/workflows/docker.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 246e497..9360b7f 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -29,7 +29,7 @@ jobs: with: context: . file: ./Dockerfile - platforms: linux/amd64 + platforms: linux/amd64,linux/arm64 username: $GITHUB_ACTOR password: ${{ secrets.CR_PAT }} push: true From 805825bcc6703d5aa1feb3dea0998a967eec0395 Mon Sep 17 00:00:00 2001 From: Chaitanya Munukutla Date: Sun, 26 Sep 2021 16:29:46 +0530 Subject: [PATCH 4/4] Add error handling for redis persistence Signed-off-by: Chaitanya Munukutla --- docs/java.md | 13 ++++++++++ lib/mqtt/server_context.go | 32 +++++++++++++++-------- lib/persistence/redis_provider.go | 42 ++++++++++++++++++++----------- 3 files changed, 61 insertions(+), 26 deletions(-) diff --git a/docs/java.md b/docs/java.md index 265f810..69eaabb 100644 --- a/docs/java.md +++ b/docs/java.md @@ -48,7 +48,12 @@ repositories { ```java var persistence = new MemoryPersistence(); var client = new MqttClient(broker, clientID, persistence); + var connOpts = new MqttConnectionOptions(); +// Setting clean start to "false" enables the client +// to receive offline messages send while it was disconnected. +connOpts.setCleanStart(false); + client.connect(connOpts); ``` @@ -71,6 +76,8 @@ client.setCallback(new MqttCallback(){ public void messageArrived(String topic, MqttMessage message) throws Exception{ // Do something awesome } + + // other implemented methods }); client.connect(connOpts); @@ -78,6 +85,12 @@ client.connect(connOpts); client.subscribe("my-topic", 0); ``` +### Closing the connection + +```java +client.disconnect(); +``` + ## Spring Integration Spring Integration provides inbound and outbound channel adapters to support the MQTT protocol. diff --git a/lib/mqtt/server_context.go b/lib/mqtt/server_context.go index 5479ae6..1aa9eda 100644 --- a/lib/mqtt/server_context.go +++ b/lib/mqtt/server_context.go @@ -73,7 +73,7 @@ func (ctx *ServerContext) AddClient(conn io.Writer, connect *packets.Connect) (c ctx.logger.Error("auth failed") return } - ctx.logger.Debug(fmt.Sprintf("auth succeeed for user: %s", connect.Username)) + ctx.logger.Info(fmt.Sprintf("auth succeeed for user: %s", connect.Username)) } clientExists := ctx.checkForClient(connect.ClientID) @@ -81,15 +81,18 @@ func (ctx *ServerContext) AddClient(conn io.Writer, connect *packets.Connect) (c if clientExists { if clientRequestForFreshSession { // If client asks for fresh session, delete existing ones - ctx.logger.Debug(fmt.Sprintf("Removing old connection for clientID: %s", connect.ClientID)) + ctx.logger.Info(fmt.Sprintf("Removing old connection for clientID: %s", connect.ClientID)) delete(ctx.connectedClientsMap, connect.ClientID) ctx.doAddClient(conn, connect) } else { - ctx.logger.Debug(fmt.Sprintf("Updating clientID: %s with new connection", connect.ClientID)) + ctx.logger.Info(fmt.Sprintf("Updating clientID: %s with new connection", connect.ClientID)) ctx.doUpdateClient(connect.ClientID, conn) if ctx.persistenceProvider != nil { - ctx.logger.Debug(fmt.Sprintf("Fetching missed messages for clientID: %s", connect.ClientID)) - _ = ctx.sendMissedMessages(connect.ClientID, conn) + ctx.logger.Info(fmt.Sprintf("Fetching missed messages for clientID: %s", connect.ClientID)) + err := ctx.sendMissedMessages(connect.ClientID, conn) + if err != nil { + ctx.logger.Error("failed to fetch offline messages", zap.Error(err)) + } } } } else { @@ -113,10 +116,10 @@ func (ctx *ServerContext) Disconnect(conn io.Writer, disconnect *packets.Disconn } if shouldDelete { - ctx.logger.Debug(fmt.Sprintf("Deleting connection for clientID: %s", clientIdToRemove)) + ctx.logger.Info(fmt.Sprintf("Deleting connection for clientID: %s", clientIdToRemove)) delete(ctx.connectedClientsMap, clientIdToRemove) } else { - ctx.logger.Debug(fmt.Sprintf("Marking connection as disconnected for clientID: %s", clientIdToRemove)) + ctx.logger.Info(fmt.Sprintf("Marking connection as disconnected for clientID: %s", clientIdToRemove)) ctx.mu.Lock() ctx.connectedClientsMap[clientIdToRemove].IsConnected = false ctx.mu.Unlock() @@ -135,8 +138,11 @@ func (ctx *ServerContext) Publish(publish *packets.Publish) { // non-shared subscriptions if !client.IsConnected && !client.IsClean && ctx.persistenceProvider != nil { // save for offline usage - ctx.logger.Debug(fmt.Sprintf("Saving offline delivery message for clientID: %s", client.ClientID)) - ctx.persistenceProvider.SaveForOfflineDelivery(client.ClientID, publish) + ctx.logger.Info(fmt.Sprintf("Saving offline delivery message for clientID: %s", client.ClientID)) + err := ctx.persistenceProvider.SaveForOfflineDelivery(client.ClientID, publish) + if err != nil { + ctx.logger.Error("failed to save offline message", zap.Error(err)) + } } if client.IsConnected { // send direct message @@ -273,7 +279,11 @@ func (ctx *ServerContext) sendMissedMessages(clientId string, conn io.Writer) er } for _, msg := range missedMessages { - msg.WriteTo(conn) + if _, writeErr := msg.WriteTo(conn); writeErr != nil { + if ctx.persistenceProvider.SaveForOfflineDelivery(clientId, msg) != nil { + ctx.logger.Error("failed to save offline message", zap.Error(err)) + } + } } return nil } @@ -287,7 +297,7 @@ func (ctx *ServerContext) doAddClient(conn io.Writer, connect *packets.Connect) Subscriptions: make(map[string]packets.SubOptions, 0), } - ctx.logger.Debug(fmt.Sprintf("Creating new connection for clientID: %s", connect.ClientID)) + ctx.logger.Info(fmt.Sprintf("Creating new connection for clientID: %s", connect.ClientID)) ctx.mu.Lock() ctx.connectedClientsMap[connect.ClientID] = newClient ctx.mu.Unlock() diff --git a/lib/persistence/redis_provider.go b/lib/persistence/redis_provider.go index 0edc47b..6dd65b8 100644 --- a/lib/persistence/redis_provider.go +++ b/lib/persistence/redis_provider.go @@ -37,7 +37,12 @@ func NewRedisProvider(config *config.Config, logger *zap.Logger) (Provider, erro func (r *RedisProvider) SaveForOfflineDelivery(clientId string, publish *packets.Publish) error { _, err := r.client.TxPipelined(context.Background(), func(pipeliner redis.Pipeliner) error { key := fmt.Sprintf("urn:messages:%s", clientId) - pipeliner.LPush(context.Background(), key, publish) + + publishBytes, err := getBytes(publish) + if err != nil { + return err + } + pipeliner.LPush(context.Background(), key, publishBytes) // Set expiry if publish.Properties != nil && publish.Properties.MessageExpiry != nil { @@ -50,23 +55,30 @@ func (r *RedisProvider) SaveForOfflineDelivery(clientId string, publish *packets func (r *RedisProvider) GetMissedMessages(clientId string) ([]*packets.Publish, error) { publishPackets := make([]*packets.Publish, 0) - _, err := r.client.TxPipelined(context.Background(), func(pipeliner redis.Pipeliner) error { - key := fmt.Sprintf("urn:messages:%s", clientId) - payloads, err := pipeliner.LRange(context.Background(), key, 0, -1).Result() + key := fmt.Sprintf("urn:messages:%s", clientId) + + // Get the length of the list + length, err := r.client.LLen(context.Background(), key).Result() + if err != nil { + return nil, err + } + + // Pop everything in the list + payloads, err := r.client.LPopCount(context.Background(), key, int(length)).Result() + if err != nil { + return nil, err + } + + for _, payload := range payloads { + payloadBytes := []byte(payload) + publishPacket, err := getPublishPacket(payloadBytes) if err != nil { - return err + continue } - for _, payload := range payloads { - payloadBytes := []byte(payload) - publishPacket, err := getPublishPacket(payloadBytes) - if err != nil { - continue - } - publishPackets = append(publishPackets, publishPacket) - } - return nil - }) + publishPackets = append(publishPackets, publishPacket) + } return publishPackets, err + } func (r *RedisProvider) ReservePacketID(clientID string, packetID uint16) error {