diff --git a/cmd/linksharing/config.yaml.lock b/cmd/linksharing/config.yaml.lock index 2dbb7f61..8aca4c19 100644 --- a/cmd/linksharing/config.yaml.lock +++ b/cmd/linksharing/config.yaml.lock @@ -70,6 +70,12 @@ client.identity.cert-path: "" # path to the private key for this identity client.identity.key-path: "" +# the number of concurrent requests total to allow +# concurrent-request-limit: 40000 + +# if true, wait until a slot opens. if false, return 429 +# concurrent-request-wait: false + # RPC connection pool capacity connection-pool.capacity: 100 diff --git a/cmd/linksharing/main.go b/cmd/linksharing/main.go index 5ea9c6b1..d53f19c7 100644 --- a/cmd/linksharing/main.go +++ b/cmd/linksharing/main.go @@ -54,6 +54,8 @@ type LinkSharing struct { StandardViewsHTML bool `user:"true" help:"serve HTML as text/html instead of text/plain for standard (non-hosting) requests" default:"false"` ListPageLimit int `help:"maximum number of paths to list on a single page" default:"100"` DynamicAssetsDir string `help:"use a assets dir that is reparsed for every request" default:""` + ConcurrentRequestLimit int `help:"the number of concurrent requests total to allow" default:"40000"` + ConcurrentRequestWait bool `help:"if true, wait until a slot opens. if false, return 429" default:"false"` Client struct { Identity uplinkutil.IdentityConfig @@ -212,7 +214,9 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) { ChainPEM: clientCertPEM, KeyPEM: clientKeyPEM, }, - ListPageLimit: runCfg.ListPageLimit, + ListPageLimit: runCfg.ListPageLimit, + ConcurrentRequestLimit: runCfg.ConcurrentRequestLimit, + ConcurrentRequestWait: runCfg.ConcurrentRequestWait, }, GeoLocationDB: runCfg.GeoLocationDB, ShutdownDelay: runCfg.ShutdownDelay, diff --git a/pkg/linksharing/sharing/handler.go b/pkg/linksharing/sharing/handler.go index f4254d7d..e127b2d0 100644 --- a/pkg/linksharing/sharing/handler.go +++ b/pkg/linksharing/sharing/handler.go @@ -17,6 +17,7 @@ import ( "github.com/spacemonkeygo/monkit/v3" "github.com/zeebo/errs" "go.uber.org/zap" + "golang.org/x/sync/semaphore" "storj.io/common/ranger" "storj.io/common/ranger/httpranger" @@ -147,6 +148,11 @@ type Config struct { // Maximum number of paths to list on a single page. ListPageLimit int + + // ConcurrentRequestLimit is the number of total concurrent requests a handler will serve. If <= 0, no limit. + ConcurrentRequestLimit int + // ConcurrentRequestWait if true will make requests wait for a free slot instead of returning 429 (the default, false). + ConcurrentRequestWait bool } // ConnectionPoolConfig is a config struct for configuring RPC connection pool options. @@ -178,6 +184,8 @@ type Handler struct { archiveRanger func(ctx context.Context, project *uplink.Project, bucket, key, path string, canReturnGzip bool) (_ ranger.Ranger, isGzip bool, _ error) inShutdown *int32 listPageLimit int + concurrentRequests *semaphore.Weighted + concurrentRequestWait bool } // NewHandler creates a new link sharing HTTP handler. @@ -283,6 +291,11 @@ func NewHandler(log *zap.Logger, mapper *objectmap.IPDB, txtRecords *TXTRecords, } } + var concurrentRequests *semaphore.Weighted + if config.ConcurrentRequestLimit > 0 { + concurrentRequests = semaphore.NewWeighted(int64(config.ConcurrentRequestLimit)) + } + return &Handler{ log: log, urlBases: bases, @@ -301,6 +314,8 @@ func NewHandler(log *zap.Logger, mapper *objectmap.IPDB, txtRecords *TXTRecords, archiveRanger: defaultArchiveRanger, inShutdown: inShutdown, listPageLimit: config.ListPageLimit, + concurrentRequests: concurrentRequests, + concurrentRequestWait: config.ConcurrentRequestWait, }, nil } @@ -430,6 +445,12 @@ func (handler *Handler) serveHTTP(ctx context.Context, w http.ResponseWriter, r } handler.cors(ctx, w, r) + done, err := handler.rateLimit(ctx) + if err != nil { + return errdata.WithStatus(err, http.StatusTooManyRequests) + } + defer done() + ourDomain, err := isDomainOurs(r.Host, handler.urlBases) if err != nil { return err @@ -476,6 +497,22 @@ func (handler *Handler) healthProcess(ctx context.Context, w http.ResponseWriter return err } +func (handler *Handler) rateLimit(ctx context.Context) (done func(), err error) { + if handler.concurrentRequests == nil { + return func() {}, nil + } + if handler.concurrentRequestWait { + err := handler.concurrentRequests.Acquire(ctx, 1) + if err != nil { + return nil, err + } + } else if !handler.concurrentRequests.TryAcquire(1) { + return nil, errs.New("too many requests") + } + + return func() { handler.concurrentRequests.Release(1) }, nil +} + func cacheControlStatic(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Cache-Control", "public, max-age=15552000")