Skip to content

Commit

Permalink
chore: added subscription limits check (#370)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtsitrin authored Jun 27, 2023
1 parent 1a683ca commit 216740a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
19 changes: 15 additions & 4 deletions rpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package client
import (
"context"
"errors"

"fmt"
"sort"
"time"

sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"

rconfig "github.com/dymensionxyz/dymint/config"
abciconv "github.com/dymensionxyz/dymint/conv/abci"
"github.com/dymensionxyz/dymint/mempool"
Expand Down Expand Up @@ -97,10 +100,8 @@ func (c *Client) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*ctypes.Re
// This code is a local client, so we can assume that subscriber is ""
subscriber := "" //ctx.RemoteAddr()

if c.EventBus.NumClients() >= c.config.MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", c.config.MaxSubscriptionClients)
} else if c.EventBus.NumClientSubscriptions(subscriber) >= c.config.MaxSubscriptionsPerClient {
return nil, fmt.Errorf("max_subscriptions_per_client %d reached", c.config.MaxSubscriptionsPerClient)
if err := c.IsSubscriptionAllowed(subscriber); err != nil {
return nil, sdkerrors.Wrap(err, "subscription not allowed")
}

// Subscribe to tx being committed in block.
Expand Down Expand Up @@ -878,6 +879,16 @@ func (c *Client) normalizeHeight(height *int64) uint64 {
return heightValue
}

func (c *Client) IsSubscriptionAllowed(subscriber string) error {
if c.EventBus.NumClients() >= c.config.MaxSubscriptionClients {
return fmt.Errorf("max_subscription_clients %d reached", c.config.MaxSubscriptionClients)
} else if c.EventBus.NumClientSubscriptions(subscriber) >= c.config.MaxSubscriptionsPerClient {
return fmt.Errorf("max_subscriptions_per_client %d reached", c.config.MaxSubscriptionsPerClient)
}

return nil
}

func validatePerPage(perPagePtr *int) int {
if perPagePtr == nil { // no per_page parameter
return defaultPerPage
Expand Down
5 changes: 4 additions & 1 deletion rpc/json/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"
"time"

"cosmossdk.io/errors"
"github.com/gorilla/rpc/v2/json2"
"github.com/tendermint/tendermint/libs/pubsub"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
Expand Down Expand Up @@ -91,7 +92,9 @@ func newService(c *client.Client, l log.Logger) *service {
func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsConn, subscriptionID []byte) (*ctypes.ResultSubscribe, error) {
addr := req.RemoteAddr

// TODO(tzdybal): pass config and check subscriptions limits
if err := s.client.IsSubscriptionAllowed(addr); err != nil {
return nil, errors.Wrap(err, "subscription not allowed")
}

q, err := tmquery.New(args.Query)
if err != nil {
Expand Down

0 comments on commit 216740a

Please sign in to comment.