Skip to content

Commit

Permalink
gateway-mt: initialize and use eventstat publisher for http user agents
Browse files Browse the repository at this point in the history
Change-Id: Ic92b35d14341994913fe9041dc826e05f3d0d6b3
  • Loading branch information
elek committed Mar 31, 2022
1 parent 04a990a commit 1d42c08
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 65 deletions.
7 changes: 3 additions & 4 deletions cmd/gateway-mt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,9 @@ func cmdRun(cmd *cobra.Command, _ []string) (err error) {
zap.S().Warn("Failed to initialize telemetry batcher: ", err)
}

// to reduce cardinality we send
// TODO: it works only if one target address is configured which should be checked here
if err := process.InitMetricsWithHostname(ctx, zap.L(), server.StatRegistry); err != nil {
zap.S().Warn("Failed to initialize telemetry batcher for statistics: ", err)
// special event stat publisher for counters with unbounded cardinality
if err := process.InitEventStatPublisherWithHostname(ctx, zap.L(), &server.StatRegistry); err != nil {
zap.S().Warn("Failed to initialize event stat publisher for statistics: ", err)
}

// setup environment variables for Minio
Expand Down
61 changes: 4 additions & 57 deletions pkg/server/collect_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,20 @@ package server

import (
"net/http"
"sync"

"github.com/spacemonkeygo/monkit/v3"
"gopkg.in/webhelp.v1/whroute"

"storj.io/common/eventstat"
"storj.io/common/useragent"
)

// AgentCollector is a helper to register monkit monitor for HTTP User-Agents.
type AgentCollector struct {
counter *TagCounter
counter eventstat.Sink
}

// NewAgentCollector creates a new collector and registers it to a monkit scope.
func NewAgentCollector(name string, scope *monkit.Scope) *AgentCollector {
counter := NewTagCounter(name)
scope.Chain(counter)
func NewAgentCollector(name string, counter eventstat.Sink) *AgentCollector {
return &AgentCollector{
counter: counter,
}
Expand All @@ -38,57 +35,7 @@ func (a *AgentCollector) Wrap(h http.Handler) http.Handler {
product = product[:32]
}
}
a.counter.Increment(product)
a.counter(product)
h.ServeHTTP(w, r)
})
}

// TagCounter is counting tags since last stat/report.
// use it for datasets with high cardinality to keep memory usage on low.
type TagCounter struct {
mu sync.Mutex
counters map[string]uint64
name string
}

// NewTagCounter creates a new TagCounter with name user for the reported metrics.
func NewTagCounter(measurement string) *TagCounter {
return &TagCounter{
counters: map[string]uint64{},
name: measurement,
}
}

// Stats implements the monkit.StatSource interface.
func (c *TagCounter) Stats(cb func(key monkit.SeriesKey, field string, val float64)) {
c.mu.Lock()
counters := c.counters
for key := range c.counters {
delete(c.counters, key)
}
c.mu.Unlock()

for name, value := range counters {
key := monkit.NewSeriesKey(c.name).WithTags(monkit.NewSeriesTag("agent", name))
cb(key, "count", float64(value))
}
cb(monkit.NewSeriesKey(c.name), "samples", float64(len(counters)))
}

// Increment bumps the usage count of one of the counters.
func (c *TagCounter) Increment(tag string) {
c.mu.Lock()
// safety valve, hard limit the memory / network usage
if len(c.counters) < 1000 {
c.counters[tag]++
} else {
// no new counters, but bump the value
_, found := c.counters[tag]
if found {
c.counters[tag]++
} else {
c.counters["<DISCARDED>"]++
}
}
c.mu.Unlock()
}
8 changes: 4 additions & 4 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (
"time"

"github.com/gorilla/mux"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"

"storj.io/common/eventstat"
"storj.io/common/rpc/rpcpool"
"storj.io/gateway-mt/pkg/authclient"
"storj.io/gateway-mt/pkg/httpserver"
Expand All @@ -36,8 +36,8 @@ var (
minioOnce sync.Once

// StatRegistry is a specific registry which is reported only to one destination
// it makes it possible to reset bucket based statistics after each send.
StatRegistry = monkit.NewRegistry()
// it is used for event statistics with unbounded cardinality.
StatRegistry = eventstat.Registry{}
)

// Peer represents an S3 compatible http server.
Expand Down Expand Up @@ -90,7 +90,7 @@ func New(config Config, log *zap.Logger, trustedIPs trustedip.List, corsAllowedO

handler = minio.CriticalErrorHandler{Handler: minio.CorsHandler(corsAllowedOrigins)(handler)}

agentCollector := NewAgentCollector("s3_user_agent", StatRegistry.ScopeNamed("storj.io/gateway-mt/pkg/server"))
agentCollector := NewAgentCollector("s3_user_agent", StatRegistry.NewTagCounter("s3_http_user_agent", "agent"))
handler = agentCollector.Wrap(handler)

var tlsConfig *httpserver.TLSConfig
Expand Down
3 changes: 3 additions & 0 deletions testsuite/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ require (
github.com/xdg/stringprep v1.0.0 // indirect
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb // indirect
github.com/zeebo/admission/v3 v3.0.3 // indirect
github.com/zeebo/errs/v2 v2.0.3 // indirect
github.com/zeebo/float16 v0.1.0 // indirect
github.com/zeebo/incenc v0.0.0-20180505221441-0d92902eec54 // indirect
go.etcd.io/bbolt v1.3.5 // indirect
go.etcd.io/etcd v0.0.0-20201125193152-8a03d2e9614b // indirect
go.opentelemetry.io/otel v0.18.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions testsuite/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,7 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb h1:ZkM6LRnq40pR1Ox0hTHlnpkcOTuFIDQpZ1IN8rKKhX0=
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ=
github.com/zeebo/admission/v3 v3.0.2/go.mod h1:BP3isIv9qa2A7ugEratNq1dnl2oZRXaQUGdU7WXKtbw=
github.com/zeebo/admission/v3 v3.0.3 h1:mwP/Y9EE8zRXOK8ma7CpEJfpiaKv4D4JWIOU4E8FPOw=
github.com/zeebo/admission/v3 v3.0.3/go.mod h1:2OWyAS5yo0Xvj2AEUosOjTUHxaY0oIIiCrXGKCYzWpo=
github.com/zeebo/assert v0.0.0-20181109011804-10f827ce2ed6/go.mod h1:yssERNPivllc1yU3BvpjYI5BUW+zglcz6QWqeVRL5t0=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
Expand All @@ -1000,7 +1001,9 @@ github.com/zeebo/errs v1.3.0 h1:hmiaKqgYZzcVgRL1Vkc1Mn2914BbzB0IBxs+ebeutGs=
github.com/zeebo/errs v1.3.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
github.com/zeebo/errs/v2 v2.0.3 h1:WwqAmopgot4ZC+CgIveP+H91Nf78NDEGWjtAXen45Hw=
github.com/zeebo/errs/v2 v2.0.3/go.mod h1:OKmvVZt4UqpyJrYFykDKm168ZquJ55pbbIVUICNmLN0=
github.com/zeebo/float16 v0.1.0 h1:kRqxv5og6z1emEyz5FpW0/BVHe5VfxEAw6b1ljCZlUc=
github.com/zeebo/float16 v0.1.0/go.mod h1:fssGvvXu+XS8MH57cKmyrLB/cqioYeYX/2mXCN3a5wo=
github.com/zeebo/incenc v0.0.0-20180505221441-0d92902eec54 h1:+cwNE5KJ3pika4HuzmDHkDlK5myo0G9Sv+eO7WWxnUQ=
github.com/zeebo/incenc v0.0.0-20180505221441-0d92902eec54/go.mod h1:EI8LcOBDlSL3POyqwC1eJhOYlMBMidES+613EtmmT5w=
github.com/zeebo/ini v0.0.0-20210331155437-86af75b4f524/go.mod h1:oiTrvEJ3c6v+Kpfz1tun0BO+EuR3eKdH4tF+WvEbjw8=
github.com/zeebo/structs v1.0.2/go.mod h1:LphfpprlqJQcbCq+eA3iIK/NsejMwk9mlfH/tM1XuKQ=
Expand Down

0 comments on commit 1d42c08

Please sign in to comment.