From 4ae0f8eeea6a270b2be30ebb9f797a89939fed85 Mon Sep 17 00:00:00 2001 From: June <2571240520@qq.com> Date: Sat, 20 Apr 2024 15:32:57 +0800 Subject: [PATCH] Optimized some sync bugs and improved synchronization speed; Now a full used single sync thread will consume 3 Mbps. --- .../java/io/xdag/config/AbstractConfig.java | 2 +- .../java/io/xdag/config/spec/NodeSpec.java | 2 +- .../java/io/xdag/core/BlockchainImpl.java | 1 + src/main/java/io/xdag/net/XdagP2pHandler.java | 8 ++--- .../io/xdag/net/message/MessageQueue.java | 29 +++++++++---------- 5 files changed, 20 insertions(+), 22 deletions(-) diff --git a/src/main/java/io/xdag/config/AbstractConfig.java b/src/main/java/io/xdag/config/AbstractConfig.java index 8582e8e4..5e636b8f 100644 --- a/src/main/java/io/xdag/config/AbstractConfig.java +++ b/src/main/java/io/xdag/config/AbstractConfig.java @@ -80,7 +80,7 @@ public class AbstractConfig implements Config, AdminSpec, NodeSpec, WalletSpec, protected int netMaxOutboundConnections = 128; protected int netMaxInboundConnections = 512; protected int netMaxInboundConnectionsPerIp = 5; - protected int netMaxMessageQueueSize = 4096; +// protected int netMaxMessageQueueSize = 4096; protected int netMaxFrameBodySize = 128 * 1024; protected int netMaxPacketSize = 16 * 1024 * 1024; protected int netRelayRedundancy = 8; diff --git a/src/main/java/io/xdag/config/spec/NodeSpec.java b/src/main/java/io/xdag/config/spec/NodeSpec.java index d0a28c17..7b95843b 100644 --- a/src/main/java/io/xdag/config/spec/NodeSpec.java +++ b/src/main/java/io/xdag/config/spec/NodeSpec.java @@ -44,7 +44,7 @@ public interface NodeSpec { int getWaitEpoch(); - int getNetMaxMessageQueueSize(); +// int getNetMaxMessageQueueSize(); int getNetHandshakeExpiry(); diff --git a/src/main/java/io/xdag/core/BlockchainImpl.java b/src/main/java/io/xdag/core/BlockchainImpl.java index 8be5a81d..a8572642 100644 --- a/src/main/java/io/xdag/core/BlockchainImpl.java +++ b/src/main/java/io/xdag/core/BlockchainImpl.java @@ -866,6 +866,7 @@ public XAmount unApplyBlock(Block block) { for (Address link : links) { if (!link.isAddress) { Block ref = getBlockByHash(link.getAddress(), false); + //even mainBlock duplicate link the TX_block which other mainBlock is handled, we could check the TX ref if this mainBlock. if (ref.getInfo().getRef() != null && equalBytes(ref.getInfo().getRef(), block.getHashLow().toArray()) && ((ref.getInfo().flags & BI_MAIN_REF) != 0)) { diff --git a/src/main/java/io/xdag/net/XdagP2pHandler.java b/src/main/java/io/xdag/net/XdagP2pHandler.java index a8af0a18..07a9398d 100644 --- a/src/main/java/io/xdag/net/XdagP2pHandler.java +++ b/src/main/java/io/xdag/net/XdagP2pHandler.java @@ -338,10 +338,10 @@ private ReasonCode checkPeer(Peer peer, boolean newHandShake) { // } // validator can't share IP address - if (channelMgr.isActiveIP(channel.getRemoteIp()) // already connected - && nodeSpec.getNetwork() == Network.MAINNET) { // on main net - return ReasonCode.VALIDATOR_IP_LIMITED; - } +// if (channelMgr.isActiveIP(channel.getRemoteIp()) // already connected +// && nodeSpec.getNetwork() == Network.MAINNET) { // on main net +// return ReasonCode.VALIDATOR_IP_LIMITED; +// } return null; } diff --git a/src/main/java/io/xdag/net/message/MessageQueue.java b/src/main/java/io/xdag/net/message/MessageQueue.java index 0e647fbf..ceee7e84 100644 --- a/src/main/java/io/xdag/net/message/MessageQueue.java +++ b/src/main/java/io/xdag/net/message/MessageQueue.java @@ -30,11 +30,7 @@ import io.netty.channel.ChannelHandlerContext; import io.xdag.config.Config; import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import io.xdag.net.message.p2p.DisconnectMessage; @@ -50,8 +46,8 @@ public class MessageQueue { .daemon(true) .build()); private final Config config; - - private final Queue queue = new ConcurrentLinkedQueue<>(); + //'8192' is a value obtained from testing experience, not a standard value.Looking forward to optimization. + private final BlockingQueue queue = new LinkedBlockingQueue<>(8192); private final Queue prioritized = new ConcurrentLinkedQueue<>(); private ChannelHandlerContext ctx; private ScheduledFuture timerTask; @@ -95,18 +91,18 @@ public void disconnect(ReasonCode code) { } } - public boolean sendMessage(Message msg) { - if (size() >= config.getNodeSpec().getNetMaxMessageQueueSize()) { - disconnect(ReasonCode.MESSAGE_QUEUE_FULL); - return false; - } - + public void sendMessage(Message msg) { + //when full message queue, whitelist don't need to disconnect. if (config.getNodeSpec().getNetPrioritizedMessages().contains(msg.getCode())) { prioritized.add(msg); } else { - queue.add(msg); + try { + //update to BlockingQueue, capacity 8192 + queue.put(msg); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } - return true; } public int size() { @@ -114,7 +110,8 @@ public int size() { } private void nudgeQueue() { - int n = Math.min(5, size()); + //Increase bandwidth consumption of a full used single sync thread to 3 Mbps. + int n = Math.min(8, size()); if (n == 0) { return; }