Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DEX-1709 broadcast ordering fix #757

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@ object ExchangeTransactionBroadcastActor {
}

case Command.Tick =>
val updatedInProgress = inProgress.view.mapValues(_.decreasedAttempts)
//noinspection SortFilter
val updatedInProgress = inProgress.view
.mapValues(_.decreasedAttempts)
.toList
.sortBy { case (_, v) => v.tx.timestamp }
.filter {
case (txId, x) =>
val valid = x.isValid // This could be in Event.Broadcasted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,38 @@ class ExchangeTransactionBroadcastActorSpecification
broadcasted shouldBe Seq(event.tx)
}

"ordered transactions broadcast on retry" in {
var broadcasted = Seq.empty[ExchangeTransaction]
val attempts = new AtomicInteger(0)
val actor = defaultActorWithSettings(
Settings(
interval = 20.millis,
maxPendingTime = 200.millis
)
) { tx =>
if (attempts.incrementAndGet() > 2) {
broadcasted = broadcasted :+ tx
Future.successful(CheckedBroadcastResult.Confirmed)
} else Future.successful(CheckedBroadcastResult.Failed("Couldn't broadcast transaction", canRetry = true))
}

val client = testKit.createTestProbe[Observed]()

val createdTs = System.currentTimeMillis()
val time = new TestTime(createdTs)
val event1 = sampleEvent(client.ref, createdTs - 500L, time)
val event2 = sampleEvent(client.ref, createdTs, time)

actor ! event2
actor ! event1

(1 to 2).foreach(_ => manualTime.timePasses(21.millis))

eventually {
broadcasted shouldBe Seq(event1.tx, event2.tx)
}
}

"send a response to a client, if a transaction" - {
def test(result: CheckedBroadcastResult): Unit = {
val actor = defaultActor { _ =>
Expand Down Expand Up @@ -264,45 +296,51 @@ class ExchangeTransactionBroadcastActorSpecification
clientRef: ActorRef[Observed],
createdTs: Long = System.currentTimeMillis(),
time: Time = new TestTime()
): ExchangeTransactionBroadcastActor.Command.Broadcast = {
val now = time.getTimestamp()
val expiration = now + 1.day.toMillis
): ExchangeTransactionBroadcastActor.Command.Broadcast =
ExchangeTransactionBroadcastActor.Command.Broadcast(
clientRef = clientRef,
addressSpendings = Map.empty,
tx = ExchangeTransactionV3
.mk(
amountAssetDecimals = 8,
priceAssetDecimals = 8,
buyOrder = Order.buy(
sender = KeyPair(Array.emptyByteArray),
matcher = KeyPair(Array.emptyByteArray),
pair = pair,
amount = 100,
price = 6000000L,
timestamp = now,
expiration = expiration,
matcherFee = 100
),
sellOrder = Order.sell(
sender = KeyPair(Array.emptyByteArray),
matcher = KeyPair(Array.emptyByteArray),
pair = pair,
amount = 100,
price = 6000000L,
timestamp = now,
expiration = expiration,
matcherFee = 100
),
tx = sampleTransaction(createdTs, time)
)

private def sampleTransaction(
createdTs: Long = System.currentTimeMillis(),
time: Time = new TestTime()
): ExchangeTransaction = {
val now = time.getTimestamp()
val expiration = now + 1.day.toMillis
ExchangeTransactionV3
.mk(
amountAssetDecimals = 8,
priceAssetDecimals = 8,
buyOrder = Order.buy(
sender = KeyPair(Array.emptyByteArray),
matcher = KeyPair(Array.emptyByteArray),
pair = pair,
amount = 100,
price = 6000000L,
buyMatcherFee = 0L,
sellMatcherFee = 0L,
fee = 300000L,
timestamp = createdTs,
proofs = Proofs.empty
).transaction
)
timestamp = now,
expiration = expiration,
matcherFee = 100
),
sellOrder = Order.sell(
sender = KeyPair(Array.emptyByteArray),
matcher = KeyPair(Array.emptyByteArray),
pair = pair,
amount = 100,
price = 6000000L,
timestamp = now,
expiration = expiration,
matcherFee = 100
),
amount = 100,
price = 6000000L,
buyMatcherFee = 0L,
sellMatcherFee = 0L,
fee = 300000L,
timestamp = createdTs,
proofs = Proofs.empty
).transaction
}

}
19 changes: 14 additions & 5 deletions waves-ext/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,18 @@ waves {

}

akka.actor.waves-dex-grpc-scheduler {
type = "Dispatcher"
executor = "thread-pool-executor"
thread-pool-executor.fixed-pool-size = 8
throughput = 10
akka.actor {
waves-dex-grpc-scheduler {
type = "Dispatcher"
executor = "thread-pool-executor"
thread-pool-executor.fixed-pool-size = 8
throughput = 10
}

broadcast-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
thread-pool-executor.fixed-pool-size = 1
throughput = 1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class DEXExtension(context: ExtensionContext) extends Extension with ScorexLoggi
executionModel = ExecutionModel.AlwaysAsyncExecution
)

private val broadcastEc = context.actorSystem.dispatchers.lookup("akka.actor.broadcast-dispatcher")

implicit val byteStrValueReader: ValueReader[ByteStr] = (config: Config, path: String) => {
val str = config.as[String](path)
decodeBase58(str, config)
Expand All @@ -45,7 +47,7 @@ class DEXExtension(context: ExtensionContext) extends Extension with ScorexLoggi
val lpAccountsFilePath: String = context.settings.config.as[String]("waves.dex.lp-accounts.file-path")
val lpAccounts: Set[ByteStr] = lpAccountsFromPath(lpAccountsFilePath, context.settings.config)

apiService = new WavesBlockchainApiGrpcService(context, allowedBlockchainStateAccounts, lpAccounts)
apiService = new WavesBlockchainApiGrpcService(context, allowedBlockchainStateAccounts, lpAccounts, broadcastEc)

val bindAddress = new InetSocketAddress(host, port)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,16 @@ import monix.reactive.subjects.ConcurrentSubject
import shapeless.Coproduct

import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

class WavesBlockchainApiGrpcService(context: ExtensionContext, allowedBlockchainStateAccounts: Set[ByteStr], lpAccounts: Set[ByteStr])(implicit
class WavesBlockchainApiGrpcService(
context: ExtensionContext,
allowedBlockchainStateAccounts: Set[ByteStr],
lpAccounts: Set[ByteStr],
broadcastEc: ExecutionContext
)(implicit
sc: Scheduler
) extends WavesBlockchainApiGrpc.WavesBlockchainApi
with ScorexLogging {
Expand Down Expand Up @@ -124,12 +129,18 @@ class WavesBlockchainApiGrpcService(context: ExtensionContext, allowedBlockchain
.explicitGetErr()
}

override def checkedBroadcast(request: CheckedBroadcastRequest): Future[CheckedBroadcastResponse] =
override def checkedBroadcast(request: CheckedBroadcastRequest): Future[CheckedBroadcastResponse] = {
//"sc" name is mandatory here in order to override outer "implicit sc"
implicit val sc: ExecutionContext = broadcastEc

val maybeTx = request.transaction
.fold(GenericError("The signed transaction must be specified").asLeft[SignedExchangeTransaction])(_.asRight[GenericError])
.flatMap(_.toVanilla)
log.info(s"Broadcasting (1) ${maybeTx.map(_.id().toString).getOrElse("*")}")

Future {
for {
grpcTx <- request.transaction
.fold(GenericError("The signed transaction must be specified").asLeft[SignedExchangeTransaction])(_.asRight[GenericError])
tx <- grpcTx.toVanilla
tx <- maybeTx
isConfirmed <- context.transactionsApi.transactionById(tx.id()).fold(false)(_ => true).asRight
isInUtx <- context.transactionsApi.unconfirmedTransactionById(tx.id()).fold(false)(_ => true).asRight
} yield (tx, isConfirmed, isInUtx)
Expand All @@ -153,6 +164,7 @@ class WavesBlockchainApiGrpcService(context: ExtensionContext, allowedBlockchain
val message = Option(e.getMessage).getOrElse(e.getClass.getName)
CheckedBroadcastResponse(CheckedBroadcastResponse.Result.Failed(CheckedBroadcastResponse.Failure(message)))
}
}

private def handleTxInUtx(tx: exchange.ExchangeTransaction): Future[CheckedBroadcastResponse.Result] =
broadcastTransaction(tx).map {
Expand All @@ -164,11 +176,13 @@ class WavesBlockchainApiGrpcService(context: ExtensionContext, allowedBlockchain
}
}

private def broadcastTransaction(tx: exchange.ExchangeTransaction): Future[TracedResult[ValidationError, Boolean]] =
private def broadcastTransaction(tx: exchange.ExchangeTransaction): Future[TracedResult[ValidationError, Boolean]] = {
log.info(s"Broadcasting (2) ${tx.id()}")
context.transactionsApi.broadcastTransaction(tx).andThen {
case Success(r) => log.info(s"Broadcast ${tx.id()}: ${r.resultE}")
case Success(r) => log.info(s"Broadcasting (3) ${tx.id()}: ${r.resultE}")
case Failure(e) => log.warn(s"Can't broadcast ${tx.id()}", e)
}
}

override def isFeatureActivated(request: IsFeatureActivatedRequest): Future[IsFeatureActivatedResponse] = Future {
IsFeatureActivatedResponse(
Expand Down