Skip to content

Commit

Permalink
pkg/linksharing/sharing: rate limit total requests
Browse files Browse the repository at this point in the history
Change-Id: I5c71ef40cb1075a015b16dd3b6ffb2f290e8de7a
  • Loading branch information
jtolio committed Nov 8, 2024
1 parent 307d21d commit 41c5357
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 1 deletion.
6 changes: 6 additions & 0 deletions cmd/linksharing/config.yaml.lock
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ client.identity.cert-path: ""
# path to the private key for this identity
client.identity.key-path: ""

# the number of concurrent requests total to allow
# concurrent-request-limit: 40000

# if true, wait until a slot opens. if false, return 429
# concurrent-request-wait: false

# RPC connection pool capacity
connection-pool.capacity: 100

Expand Down
6 changes: 5 additions & 1 deletion cmd/linksharing/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type LinkSharing struct {
StandardViewsHTML bool `user:"true" help:"serve HTML as text/html instead of text/plain for standard (non-hosting) requests" default:"false"`
ListPageLimit int `help:"maximum number of paths to list on a single page" default:"100"`
DynamicAssetsDir string `help:"use a assets dir that is reparsed for every request" default:""`
ConcurrentRequestLimit int `help:"the number of concurrent requests total to allow" default:"40000"`
ConcurrentRequestWait bool `help:"if true, wait until a slot opens. if false, return 429" default:"false"`

Client struct {
Identity uplinkutil.IdentityConfig
Expand Down Expand Up @@ -212,7 +214,9 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
ChainPEM: clientCertPEM,
KeyPEM: clientKeyPEM,
},
ListPageLimit: runCfg.ListPageLimit,
ListPageLimit: runCfg.ListPageLimit,
ConcurrentRequestLimit: runCfg.ConcurrentRequestLimit,
ConcurrentRequestWait: runCfg.ConcurrentRequestWait,
},
GeoLocationDB: runCfg.GeoLocationDB,
ShutdownDelay: runCfg.ShutdownDelay,
Expand Down
37 changes: 37 additions & 0 deletions pkg/linksharing/sharing/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"

"storj.io/common/ranger"
"storj.io/common/ranger/httpranger"
Expand Down Expand Up @@ -147,6 +148,11 @@ type Config struct {

// Maximum number of paths to list on a single page.
ListPageLimit int

// ConcurrentRequestLimit is the number of total concurrent requests a handler will serve. If <= 0, no limit.
ConcurrentRequestLimit int
// ConcurrentRequestWait if true will make requests wait for a free slot instead of returning 429 (the default, false).
ConcurrentRequestWait bool
}

// ConnectionPoolConfig is a config struct for configuring RPC connection pool options.
Expand Down Expand Up @@ -178,6 +184,8 @@ type Handler struct {
archiveRanger func(ctx context.Context, project *uplink.Project, bucket, key, path string, canReturnGzip bool) (_ ranger.Ranger, isGzip bool, _ error)
inShutdown *int32
listPageLimit int
concurrentRequests *semaphore.Weighted
concurrentRequestWait bool
}

// NewHandler creates a new link sharing HTTP handler.
Expand Down Expand Up @@ -283,6 +291,11 @@ func NewHandler(log *zap.Logger, mapper *objectmap.IPDB, txtRecords *TXTRecords,
}
}

var concurrentRequests *semaphore.Weighted
if config.ConcurrentRequestLimit > 0 {
concurrentRequests = semaphore.NewWeighted(int64(config.ConcurrentRequestLimit))
}

return &Handler{
log: log,
urlBases: bases,
Expand All @@ -301,6 +314,8 @@ func NewHandler(log *zap.Logger, mapper *objectmap.IPDB, txtRecords *TXTRecords,
archiveRanger: defaultArchiveRanger,
inShutdown: inShutdown,
listPageLimit: config.ListPageLimit,
concurrentRequests: concurrentRequests,
concurrentRequestWait: config.ConcurrentRequestWait,
}, nil
}

Expand Down Expand Up @@ -430,6 +445,12 @@ func (handler *Handler) serveHTTP(ctx context.Context, w http.ResponseWriter, r
}
handler.cors(ctx, w, r)

done, err := handler.rateLimit(ctx)
if err != nil {
return errdata.WithStatus(err, http.StatusTooManyRequests)
}
defer done()

ourDomain, err := isDomainOurs(r.Host, handler.urlBases)
if err != nil {
return err
Expand Down Expand Up @@ -476,6 +497,22 @@ func (handler *Handler) healthProcess(ctx context.Context, w http.ResponseWriter
return err
}

func (handler *Handler) rateLimit(ctx context.Context) (done func(), err error) {
if handler.concurrentRequests == nil {
return func() {}, nil
}
if handler.concurrentRequestWait {
err := handler.concurrentRequests.Acquire(ctx, 1)
if err != nil {
return nil, err
}
} else if !handler.concurrentRequests.TryAcquire(1) {
return nil, errs.New("too many requests")
}

return func() { handler.concurrentRequests.Release(1) }, nil
}

func cacheControlStatic(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Cache-Control", "public, max-age=15552000")
Expand Down

0 comments on commit 41c5357

Please sign in to comment.