Skip to content

Commit

Permalink
Merge pull request tronprotocol#5460 from ss3344520/fix_msg_process
Browse files Browse the repository at this point in the history
feat(net): fix failure to process FETCH_INV_DATA message
  • Loading branch information
xxo1shine authored Sep 8, 2023
2 parents 437c12e + 7e2b7fc commit 9f8cebd
Showing 1 changed file with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@
@Slf4j(topic = "net")
@Component
public class AdvService {

private final int MAX_INV_TO_FETCH_CACHE_SIZE = 100_000;
private final int MAX_TRX_CACHE_SIZE = 50_000;
private final int MAX_BLOCK_CACHE_SIZE = 10;
private final int MAX_SPREAD_SIZE = 1_000;
private final long TIMEOUT = MSG_CACHE_DURATION_IN_BLOCKS * BLOCK_PRODUCED_INTERVAL;

@Autowired
private TronNetDelegate tronNetDelegate;
Expand Down Expand Up @@ -264,30 +264,30 @@ private void consumerInvToFetch() {
Collection<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
.filter(peer -> peer.isIdle())
.collect(Collectors.toList());

InvSender invSender = new InvSender();
long now = System.currentTimeMillis();
synchronized (this) {
if (invToFetch.isEmpty() || peers.isEmpty()) {
return;
}
long now = System.currentTimeMillis();
invToFetch.forEach((item, time) -> {
if (time < now - MSG_CACHE_DURATION_IN_BLOCKS * BLOCK_PRODUCED_INTERVAL) {
if (time < now - TIMEOUT) {
logger.info("This obj is too late to fetch, type: {} hash: {}", item.getType(),
item.getHash());
invToFetch.remove(item);
invToFetchCache.invalidate(item);
return;
}
peers.stream().filter(peer -> peer.getAdvInvReceive().getIfPresent(item) != null
&& invSender.getSize(peer) < MAX_TRX_FETCH_PER_PEER)
.sorted(Comparator.comparingInt(peer -> invSender.getSize(peer)))
.findFirst().ifPresent(peer -> {
if (peer.checkAndPutAdvInvRequest(item, now)) {
invSender.add(item, peer);
}
invToFetch.remove(item);
});
peers.stream().filter(peer -> {
Long t = peer.getAdvInvReceive().getIfPresent(item);
return t != null && now - t < TIMEOUT && invSender.getSize(peer) < MAX_TRX_FETCH_PER_PEER;
}).sorted(Comparator.comparingInt(peer -> invSender.getSize(peer)))
.findFirst().ifPresent(peer -> {
if (peer.checkAndPutAdvInvRequest(item, now)) {
invSender.add(item, peer);
}
invToFetch.remove(item);
});
});
}

Expand Down

0 comments on commit 9f8cebd

Please sign in to comment.