Skip to content

Commit

Permalink
feat(fullnode) Add filterOrders option to streaming subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
UnbornAztecKing committed Jan 8, 2025
1 parent 2a79ca0 commit 05ff9f3
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ export interface StreamOrderbookUpdatesRequest {
/** Market ids for price updates. */

marketIds: number[];
/** Filter order updates in addition to position updates */

filterOrders: boolean;
}
/**
* StreamOrderbookUpdatesRequest is a request message for the
Expand All @@ -293,6 +296,9 @@ export interface StreamOrderbookUpdatesRequestSDKType {
/** Market ids for price updates. */

market_ids: number[];
/** Filter order updates in addition to position updates */

filter_orders: boolean;
}
/**
* StreamOrderbookUpdatesResponse is a response message for the
Expand Down Expand Up @@ -1298,7 +1304,8 @@ function createBaseStreamOrderbookUpdatesRequest(): StreamOrderbookUpdatesReques
return {
clobPairId: [],
subaccountIds: [],
marketIds: []
marketIds: [],
filterOrders: false
};
}

Expand All @@ -1323,6 +1330,11 @@ export const StreamOrderbookUpdatesRequest = {
}

writer.ldelim();

if (message.filterOrders === true) {
writer.uint32(32).bool(message.filterOrders);
}

return writer;
},

Expand Down Expand Up @@ -1365,6 +1377,10 @@ export const StreamOrderbookUpdatesRequest = {

break;

case 4:
message.filterOrders = reader.bool();
break;

default:
reader.skipType(tag & 7);
break;
Expand All @@ -1379,6 +1395,7 @@ export const StreamOrderbookUpdatesRequest = {
message.clobPairId = object.clobPairId?.map(e => e) || [];
message.subaccountIds = object.subaccountIds?.map(e => SubaccountId.fromPartial(e)) || [];
message.marketIds = object.marketIds?.map(e => e) || [];
message.filterOrders = object.filterOrders ?? false;
return message;
}

Expand Down
3 changes: 3 additions & 0 deletions proto/dydxprotocol/clob/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ message StreamOrderbookUpdatesRequest {

// Market ids for price updates.
repeated uint32 market_ids = 3;

// Filter order updates in addition to position updates
bool filter_orders = 4;
}

// StreamOrderbookUpdatesResponse is a response message for the
Expand Down
91 changes: 88 additions & 3 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package streaming

import (
"fmt"
"slices"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -94,6 +95,9 @@ type OrderbookSubscription struct {
// If interval snapshots are turned on, the next block height at which
// a snapshot should be sent out.
nextSnapshotBlock uint32

// Filter orders for subaccountIds
filterOrders bool
}

func (sub *OrderbookSubscription) IsInitialized() bool {
Expand Down Expand Up @@ -187,11 +191,79 @@ func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32
return id
}

// TODO best practice for ensuring all cases are handled
// default error? default panic?
func GetOffChannelUpdateV1SubaccountId(update ocutypes.OffChainUpdateV1) uint32 {
var orderSubaccountIdNumber uint32
switch ocu1 := update.UpdateMessage.(type) {
case *ocutypes.OffChainUpdateV1_OrderPlace:
orderSubaccountIdNumber = ocu1.OrderPlace.Order.OrderId.SubaccountId.Number
case *ocutypes.OffChainUpdateV1_OrderRemove:
orderSubaccountIdNumber = ocu1.OrderRemove.RemovedOrderId.SubaccountId.Number
case *ocutypes.OffChainUpdateV1_OrderUpdate:
orderSubaccountIdNumber = ocu1.OrderUpdate.OrderId.SubaccountId.Number
case *ocutypes.OffChainUpdateV1_OrderReplace:
orderSubaccountIdNumber = ocu1.OrderReplace.Order.OrderId.SubaccountId.Number

}
return orderSubaccountIdNumber
}

// Filter StreamUpdates for subaccountIdNumbers
// If a StreamUpdate_OrderUpdate contains no updates for subscribed subaccounts, drop message
// If a StreamUpdate_OrderUpdate contains updates for subscribed subaccounts, construct a new
// StreamUpdate_OrderUpdate with updates only for subscribed subaccounts
func (sub *OrderbookSubscription) FilterSubaccountStreamUpdates(output chan []clobtypes.StreamUpdate) {
subaccountIdNumbers := make([]uint32, len(sub.subaccountIds))
for i, subaccountId := range sub.subaccountIds {
subaccountIdNumbers[i] = subaccountId.Number
}

// If reflection becomes too expensive, split updatesChannel by message type
for updates := range sub.updatesChannel {
filteredUpdates := []clobtypes.StreamUpdate{}
for _, update := range updates {
switch updateMessage := update.UpdateMessage.(type) {
case *clobtypes.StreamUpdate_OrderbookUpdate:
orderBookUpdates := []ocutypes.OffChainUpdateV1{}
for _, orderBookUpdate := range updateMessage.OrderbookUpdate.Updates {
orderBookUpdateSubaccountIdNumber := GetOffChannelUpdateV1SubaccountId(orderBookUpdate)
if slices.Contains(subaccountIdNumbers, orderBookUpdateSubaccountIdNumber) {
orderBookUpdates = append(orderBookUpdates, orderBookUpdate)
}
}
// Drop the StreamUpdate_OrderbookUpdate if all updates inside were dropped
if len(orderBookUpdates) > 0 {
if len(orderBookUpdates) < len(updateMessage.OrderbookUpdate.Updates) {
update = clobtypes.StreamUpdate{
BlockHeight: update.BlockHeight,
ExecMode: update.ExecMode,
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Snapshot: updateMessage.OrderbookUpdate.Snapshot,
Updates: orderBookUpdates,
},
},
}
}
filteredUpdates = append(filteredUpdates, update)
}
default:
filteredUpdates = append(filteredUpdates, update)
}
}
if len(filteredUpdates) > 0 {
output <- filteredUpdates
}
}
}

// Subscribe subscribes to the orderbook updates stream.
func (sm *FullNodeStreamingManagerImpl) Subscribe(
clobPairIds []uint32,
subaccountIds []*satypes.SubaccountId,
marketIds []uint32,
filterOrders bool,
messageSender types.OutgoingMessageSender,
) (
err error,
Expand Down Expand Up @@ -265,9 +337,22 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
sm.EmitMetrics()
sm.Unlock()

// If filterOrders, listen to filtered channel and start filter goroutine
// Error if fitlerOrders but no subaccounts are subscribed
filteredUpdateChannel := subscription.updatesChannel
if subscription.filterOrders {
if len(subaccountIds) == 0 {
// TODO panic?
// log error
}
filteredUpdateChannel = make(chan []clobtypes.StreamUpdate)
defer close(filteredUpdateChannel)
go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel)
}

// Use current goroutine to consistently poll subscription channel for updates
// to send through stream.
for updates := range subscription.updatesChannel {
for updates := range filteredUpdateChannel {
metrics.IncrCounterWithLabels(
metrics.GrpcSendResponseToSubscriberCount,
1,
Expand Down Expand Up @@ -1080,12 +1165,12 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
sm.FlushStreamUpdatesWithLock()

// Cache updates to sync local ops queue
sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
syncLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue),
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds)
sm.cacheStreamUpdatesByClobPairWithLock(syncLocalUpdates, syncLocalClobPairIds)

// Cache updates for finalized fills.
fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills(
Expand Down
1 change: 1 addition & 0 deletions protocol/streaming/noop_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func (sm *NoopGrpcStreamingManager) Subscribe(
_ []uint32,
_ []*satypes.SubaccountId,
_ []uint32,
_ bool,
_ types.OutgoingMessageSender,
) (
err error,
Expand Down
1 change: 1 addition & 0 deletions protocol/streaming/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type FullNodeStreamingManager interface {
clobPairIds []uint32,
subaccountIds []*satypes.SubaccountId,
marketIds []uint32,
filterOrders bool,
srv OutgoingMessageSender,
) (
err error,
Expand Down
24 changes: 24 additions & 0 deletions protocol/streaming/ws/websocket_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) {
return
}

// Parse filterOrders from query parameters
filterOrders, err := parseFilterOrders(r)
if err != nil {
ws.logger.Error("Error parsing filterOrders", "err", err)
if err := sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()); err != nil {
ws.logger.Error("Error sending close message", "err", err)
}
return
}

websocketMessageSender := &WebsocketMessageSender{
cdc: ws.cdc,
conn: conn,
Expand All @@ -110,6 +120,7 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) {
clobPairIds,
subaccountIds,
marketIds,
filterOrders,
websocketMessageSender,
)
if err != nil {
Expand Down Expand Up @@ -169,6 +180,19 @@ func parseSubaccountIds(r *http.Request) ([]*satypes.SubaccountId, error) {
return subaccountIds, nil
}

// parseFilterOrders is a helper function to parse the filterOrders flag from the query parameters.
func parseFilterOrders(r *http.Request) (bool, error) {
filterOrdersParam := r.URL.Query().Get("filterOrders")
if filterOrdersParam == "" {
return false, nil
}
filterOrders, err := strconv.ParseBool(filterOrdersParam)
if err != nil {
return false, fmt.Errorf("invalid filterOrders: %s", filterOrdersParam)
}
return filterOrders, nil
}

// parseUint32 is a helper function to parse the uint32 from the query parameters.
func parseUint32(r *http.Request, queryParam string) ([]uint32, error) {
param := r.URL.Query().Get(queryParam)
Expand Down
1 change: 1 addition & 0 deletions protocol/x/clob/keeper/grpc_stream_orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ func (k Keeper) StreamOrderbookUpdates(
req.GetClobPairId(),
req.GetSubaccountIds(),
req.GetMarketIds(),
req.GetFilterOrders(),
stream,
)
if err != nil {
Expand Down
Loading

0 comments on commit 05ff9f3

Please sign in to comment.