diff --git a/dex/src/main/scala/com/wavesplatform/dex/actors/orderbook/AggregatedOrderBookActor.scala b/dex/src/main/scala/com/wavesplatform/dex/actors/orderbook/AggregatedOrderBookActor.scala index 83f02f6199..7ad361853f 100644 --- a/dex/src/main/scala/com/wavesplatform/dex/actors/orderbook/AggregatedOrderBookActor.scala +++ b/dex/src/main/scala/com/wavesplatform/dex/actors/orderbook/AggregatedOrderBookActor.scala @@ -238,7 +238,7 @@ object AggregatedOrderBookActor { lastTrade = None, lastUpdateTs = 0, compiledHttpView = Map.empty, - ws = WsOrderBookState(Map.empty, Set.empty, Set.empty, lastTrade = None, changedTickSize = None), + ws = WsOrderBookState.empty, wsSendSchedule = Cancellable.alreadyCancelled ) diff --git a/dex/src/main/scala/com/wavesplatform/dex/api/ws/state/WsOrderBookState.scala b/dex/src/main/scala/com/wavesplatform/dex/api/ws/state/WsOrderBookState.scala index 10242ca8c5..a36570b6e9 100644 --- a/dex/src/main/scala/com/wavesplatform/dex/api/ws/state/WsOrderBookState.scala +++ b/dex/src/main/scala/com/wavesplatform/dex/api/ws/state/WsOrderBookState.scala @@ -20,21 +20,20 @@ case class WsOrderBookState( changedAsks: Set[Price], changedBids: Set[Price], lastTrade: Option[LastTrade], - changedTickSize: Option[Double] + changedTickSize: Option[Double], + prevTickState: WsState ) { def addSubscription(x: ActorRef[WsOrderBookChanges]): WsOrderBookState = copy(wsConnections = wsConnections.updated(x, 0L)) def withoutSubscription(x: ActorRef[WsOrderBookChanges]): WsOrderBookState = { val updatedConnections = wsConnections - x - if (updatedConnections.isEmpty) WsOrderBookState.empty + if (updatedConnections.isEmpty) WsOrderBookState.empty.copy(prevTickState = prevTickState) else copy(wsConnections = updatedConnections) } def hasSubscriptions: Boolean = wsConnections.nonEmpty - def hasChanges: Boolean = changedAsks.nonEmpty || changedBids.nonEmpty || lastTrade.nonEmpty || changedTickSize.nonEmpty - def denormalized(amountDecimals: Int, priceDecimals: Int, xs: TreeMap[Price, Amount], ordering: Ordering[Double]): TreeMap[Double, Double] = xs.map { case (price, amount) => @@ -55,13 +54,16 @@ case class WsOrderBookState( bids: TreeMap[Price, Amount], timestamp: Long ): WsOrderBookState = copy( - wsConnections = - if (hasChanges) { + wsConnections = { + val preparedAsks = take(asks, changedAsks, prevTickState.asks) + val preparedBids = take(bids, changedBids, prevTickState.bids) + + if (lastTrade.nonEmpty || changedTickSize.nonEmpty || preparedBids.nonEmpty || preparedAsks.nonEmpty) { val changes = protocol.WsOrderBookChanges( assetPair = assetPair, - asks = denormalized(amountDecimals, priceDecimals, take(asks, changedAsks), OrderBook.asksDenormalizedOrdering), - bids = denormalized(amountDecimals, priceDecimals, take(bids, changedBids), OrderBook.bidsDenormalizedOrdering), + asks = denormalized(amountDecimals, priceDecimals, preparedAsks, OrderBook.asksDenormalizedOrdering), + bids = denormalized(amountDecimals, priceDecimals, preparedBids, OrderBook.bidsDenormalizedOrdering), lastTrade = lastTrade.map(lastTrade(amountDecimals, priceDecimals, _)), updateId = 0L, // Will be changed below timestamp = timestamp, @@ -73,20 +75,48 @@ case class WsOrderBookState( conn ! changes.copy(updateId = newUpdateId) conn -> newUpdateId } - } else wsConnections, + } else wsConnections + }, changedAsks = Set.empty, changedBids = Set.empty, lastTrade = None, - changedTickSize = None + changedTickSize = None, + prevTickState = updatePrevTickState(asks, bids, lastTrade, changedTickSize) ) - def take(xs: TreeMap[Price, Amount], levels: Set[Price]): TreeMap[Price, Amount] = { + def updatePrevTickState(asks: Map[Price, Amount], bids: Map[Price, Amount], lt: Option[LastTrade], lc: Option[Double]): WsState = + prevTickState.copy( + asks = (prevTickState.asks ++ asks).filter(_._2 != 0), + bids = (prevTickState.bids ++ bids).filter(_._2 != 0), + lastTrade = if (lt.isEmpty) prevTickState.lastTrade else lt, + tickSize = if (lc.isEmpty) prevTickState.tickSize else lc + ) + + def updatePrevTickStateFromDiff(asks: Map[Price, Amount], bids: Map[Price, Amount], lt: Option[LastTrade], lc: Option[Double]): WsState = + prevTickState.copy( + asks = applyLevelDiff(prevTickState.asks, asks), + bids = applyLevelDiff(prevTickState.bids, bids), + lastTrade = if (lt.isEmpty) prevTickState.lastTrade else lt, + tickSize = if (lc.isEmpty) prevTickState.tickSize else lc + ) + + def applyLevelDiff(m1: TreeMap[Price, Amount], m2: Map[Price, Amount]): TreeMap[Price, Amount] = (m1 ++ m2.map { + case (pr, am) => + (pr, m1.get(pr).fold(am)(_ + am)) + }).filter(_._2 != 0) + + def take(xs: TreeMap[Price, Amount], levels: Set[Price], prevChanges: TreeMap[Price, Amount]): TreeMap[Price, Amount] = { // 1. Levels will be always smaller, than xs // 2. A level could gone from xs val r = TreeMap.newBuilder[Price, Amount](xs.ordering) levels.foreach { level => - val v = xs.getOrElse(level, 0L) - r += level -> v + xs.get(level).fold { + prevChanges.get(level).foreach(_ => r += level -> 0L) // means that level was removed + } { v => + if (!prevChanges.get(level).contains(v)) // if there was no changes to this level, then don't send it + r += level -> v + } + } r.result() } @@ -99,16 +129,38 @@ case class WsOrderBookState( lastTradeLens.modify(if (lt.isEmpty) _ else lt) andThen changedTickSizeLens.modify(if (ts.isEmpty) _ else ts) )(this) - else this + else + copy( + prevTickState = updatePrevTickState(lc.asks, lc.bids, lt, ts) + ) } object WsOrderBookState { - val empty = WsOrderBookState(Map.empty, Set.empty, Set.empty, None, None) + val empty = WsOrderBookState(Map.empty, Set.empty, Set.empty, None, None, WsState.empty) + + def withPrevTickState( + asks: TreeMap[Price, Amount], + bids: TreeMap[Price, Amount], + lastTrade: Option[LastTrade], + tickSize: Option[Double] + ): WsOrderBookState = empty.copy(prevTickState = WsState(asks, bids, lastTrade, tickSize)) val genLens = GenLens[WsOrderBookState] val changedAsksLens = genLens(_.changedAsks) val changedBidsLens = genLens(_.changedBids) val lastTradeLens = genLens(_.lastTrade) val changedTickSizeLens = genLens(_.changedTickSize) + + final private[WsOrderBookState] case class WsState( + asks: TreeMap[Price, Amount], + bids: TreeMap[Price, Amount], + lastTrade: Option[LastTrade], + tickSize: Option[Double] + ) + + object WsState { + val empty = WsState(TreeMap.empty, TreeMap.empty, None, None) + } + }