From 28336da3974db8c84d32e1a603cc9e89c982cc9e Mon Sep 17 00:00:00 2001 From: Meredith Baxter Date: Fri, 13 Sep 2019 18:17:59 -0400 Subject: [PATCH 1/2] Treat discovery peer with unknown endpoint as unknown --- .../ethereum/p2p/discovery/DiscoveryPeer.java | 5 + .../p2p/discovery/internal/PeerTable.java | 6 +- .../p2p/discovery/PeerDiscoveryAgentTest.java | 98 +++++++++++++++++++ .../discovery/PeerDiscoveryTestHelper.java | 33 ++++++- .../internal/MockPeerDiscoveryAgent.java | 28 +++++- 5 files changed, 162 insertions(+), 8 deletions(-) diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/DiscoveryPeer.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/DiscoveryPeer.java index a5e9a7778a..cc8a2a9c88 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/DiscoveryPeer.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/DiscoveryPeer.java @@ -109,6 +109,11 @@ public Endpoint getEndpoint() { return endpoint; } + public boolean discoveryEndpointMatches(final DiscoveryPeer peer) { + return peer.getEndpoint().getHost().equals(endpoint.getHost()) + && peer.getEndpoint().getUdpPort() == endpoint.getUdpPort(); + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("DiscoveryPeer{"); diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerTable.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerTable.java index a2357a2fe9..ea9148d730 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerTable.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerTable.java @@ -81,12 +81,14 @@ public PeerTable(final BytesValue nodeId) { * @param peer The peer to query. * @return The stored representation. */ - public Optional get(final PeerId peer) { + public Optional get(final DiscoveryPeer peer) { if (!idBloom.mightContain(peer.getId())) { return Optional.empty(); } final int distance = distanceFrom(peer); - return table[distance].getAndTouch(peer.getId()); + return table[distance] + .getAndTouch(peer.getId()) + .filter(known -> known.discoveryEndpointMatches(peer)); } /** diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java index f3f9a8de68..017315af93 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryAgentTest.java @@ -20,6 +20,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import tech.pegasys.pantheon.crypto.SECP256K1; +import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair; import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryTestHelper.AgentBuilder; import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.FindNeighborsPacketData; import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.MockPeerDiscoveryAgent; @@ -308,6 +310,102 @@ public void bonding_disallowOutgoingBonding() { assertThat(remoteIncomingPackets).isEmpty(); } + /** + * These tests simulates the case where a node crashes then comes back up with a new ip address or + * listening port. + */ + @Test + public void bonding_simulatePeerRestartingWithNewEndpoint_updatedPort() { + simulatePeerRestartingOnDifferentEndpoint(false, true); + } + + @Test + public void bonding_simulatePeerRestartingWithNewEndpoint_updatedHost() { + simulatePeerRestartingOnDifferentEndpoint(true, false); + } + + @Test + public void bonding_simulatePeerRestartingWithNewEndpoint_updatedHostAndPort() { + simulatePeerRestartingOnDifferentEndpoint(true, true); + } + + public void simulatePeerRestartingOnDifferentEndpoint( + final boolean updateHost, final boolean updatePort) { + // Setup peer + final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent(); + final DiscoveryPeer agentPeer = agent.getAdvertisedPeer().get(); + + final KeyPair remoteKeyPair = SECP256K1.KeyPair.generate(); + final String remoteIp = "1.2.3.4"; + final MockPeerDiscoveryAgent remoteAgent = + helper.createDiscoveryAgent( + helper + .agentBuilder() + .keyPair(remoteKeyPair) + .advertisedHost(remoteIp) + .bootstrapPeers(agentPeer)); + + agent.start(999); + remoteAgent.start(888); + final DiscoveryPeer remotePeer = remoteAgent.getAdvertisedPeer().get(); + + // Remote agent should have bonded with agent + assertThat(agent.streamDiscoveredPeers()).hasSize(1); + assertThat(agent.streamDiscoveredPeers()).contains(remoteAgent.getAdvertisedPeer().get()); + + // Create a new remote agent with same id, and new endpoint + remoteAgent.stop(); + final int newPort = updatePort ? 0 : remotePeer.getEndpoint().getUdpPort(); + final String newIp = updateHost ? "1.2.3.5" : remoteIp; + final MockPeerDiscoveryAgent updatedRemoteAgent = + helper.createDiscoveryAgent( + helper + .agentBuilder() + .keyPair(remoteKeyPair) + .advertisedHost(newIp) + .bindPort(newPort) + .bootstrapPeers(agentPeer)); + updatedRemoteAgent.start(889); + final DiscoveryPeer updatedRemotePeer = updatedRemoteAgent.getAdvertisedPeer().get(); + + // Sanity check + assertThat( + updatedRemotePeer.getEndpoint().getUdpPort() == remotePeer.getEndpoint().getUdpPort()) + .isEqualTo(!updatePort); + assertThat(updatedRemotePeer.getEndpoint().getHost().equals(remotePeer.getEndpoint().getHost())) + .isEqualTo(!updateHost); + assertThat(updatedRemotePeer.getId()).isEqualTo(remotePeer.getId()); + + // Check that our restarted agent receives a PONG response + final List incomingPackets = updatedRemoteAgent.getIncomingPackets(); + assertThat(incomingPackets).hasSizeGreaterThan(0); + final long pongCount = + incomingPackets.stream() + .filter(packet -> packet.fromAgent.equals(agent)) + .filter(packet -> packet.packet.getType().equals(PacketType.PONG)) + .count(); + assertThat(pongCount).isGreaterThan(0); + + // Check that agent has an endpoint matching the restarted node + final Optional discoveredPeer = + agent + .streamDiscoveredPeers() + .filter(peer -> peer.getId().equals(updatedRemotePeer.getId())) + .findFirst(); + assertThat(discoveredPeer).isPresent(); + assertThat(discoveredPeer.get().getEndpoint().getUdpPort()) + .isEqualTo(updatedRemotePeer.getEndpoint().getUdpPort()); + assertThat(discoveredPeer.get().getEndpoint().getHost()) + .isEqualTo(updatedRemotePeer.getEndpoint().getHost()); + // Check endpoint is consistent with enodeURL + assertThat(discoveredPeer.get().getEnodeURL().getDiscoveryPortOrZero()) + .isEqualTo(updatedRemotePeer.getEndpoint().getUdpPort()); + assertThat(discoveredPeer.get().getEnodeURL().getListeningPortOrZero()) + .isEqualTo(updatedRemotePeer.getEndpoint().getFunctionalTcpPort()); + assertThat(discoveredPeer.get().getEnodeURL().getIpAsString()) + .isEqualTo(updatedRemotePeer.getEndpoint().getHost()); + } + @Test public void neighbors_allowOutgoingRequest() { // Setup peer diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java index 7840ef1942..1505148e5b 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java @@ -12,6 +12,7 @@ */ package tech.pegasys.pantheon.ethereum.p2p.discovery; +import static com.google.common.base.Preconditions.checkNotNull; import static java.util.Arrays.asList; import tech.pegasys.pantheon.crypto.SECP256K1; @@ -31,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.OptionalInt; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -179,6 +181,9 @@ public static class AgentBuilder { private List bootnodes = Collections.emptyList(); private boolean active = true; private PeerPermissions peerPermissions = PeerPermissions.noop(); + private String advertisedHost = "127.0.0.1"; + private OptionalInt bindPort = OptionalInt.empty(); + private KeyPair keyPair = SECP256K1.KeyPair.generate(); private AgentBuilder( final Map agents, @@ -215,14 +220,36 @@ public AgentBuilder active(final boolean active) { return this; } + public AgentBuilder advertisedHost(final String host) { + checkNotNull(host); + this.advertisedHost = host; + return this; + } + + public AgentBuilder bindPort(final int bindPort) { + if (bindPort == 0) { + // Zero means pick the next available port + return this; + } + this.bindPort = OptionalInt.of(bindPort); + return this; + } + + public AgentBuilder keyPair(final KeyPair keyPair) { + checkNotNull(keyPair); + this.keyPair = keyPair; + return this; + } + public MockPeerDiscoveryAgent build() { + final int port = bindPort.orElse(nextAvailablePort.incrementAndGet()); final DiscoveryConfiguration config = new DiscoveryConfiguration(); config.setBootnodes(bootnodes); - config.setBindPort(nextAvailablePort.incrementAndGet()); + config.setAdvertisedHost(advertisedHost); + config.setBindPort(port); config.setActive(active); - return new MockPeerDiscoveryAgent( - SECP256K1.KeyPair.generate(), config, peerPermissions, agents); + return new MockPeerDiscoveryAgent(keyPair, config, peerPermissions, agents); } } } diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/MockPeerDiscoveryAgent.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/MockPeerDiscoveryAgent.java index 09f9be1312..f0b32b0478 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/MockPeerDiscoveryAgent.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/MockPeerDiscoveryAgent.java @@ -34,6 +34,7 @@ public class MockPeerDiscoveryAgent extends PeerDiscoveryAgent { // The set of known agents operating on the network private final Map agentNetwork; private final Deque incomingPackets = new ArrayDeque<>(); + private boolean isActive = false; public MockPeerDiscoveryAgent( final KeyPair keyPair, @@ -65,9 +66,9 @@ public List getIncomingPackets() { @Override protected CompletableFuture listenForConnections() { + isActive = true; // Skip network setup for tests - InetSocketAddress address = - new InetSocketAddress(config.getAdvertisedHost(), config.getBindPort()); + InetSocketAddress address = new InetSocketAddress(config.getBindHost(), config.getBindPort()); return CompletableFuture.completedFuture(address); } @@ -75,15 +76,35 @@ protected CompletableFuture listenForConnections() { protected CompletableFuture sendOutgoingPacket( final DiscoveryPeer toPeer, final Packet packet) { CompletableFuture result = new CompletableFuture<>(); + if (!this.isActive) { + result.completeExceptionally(new Exception("Attempt to send message from an inactive agent")); + } + MockPeerDiscoveryAgent toAgent = agentNetwork.get(toPeer.getId()); if (toAgent == null) { result.completeExceptionally( new Exception( "Attempt to send to unknown peer. Agents must be constructed through PeerDiscoveryTestHelper.")); + return result; + } + + final DiscoveryPeer agentPeer = toAgent.getAdvertisedPeer().get(); + if (!toPeer.getEndpoint().getHost().equals(agentPeer.getEndpoint().getHost())) { + LOG.warn( + "Attempt to send packet to discovery peer using the wrong host address. Sending to {}, but discovery peer is listening on {}", + toPeer.getEndpoint().getHost(), + agentPeer.getEndpoint().getHost()); + } else if (toPeer.getEndpoint().getUdpPort() != agentPeer.getEndpoint().getUdpPort()) { + LOG.warn( + "Attempt to send packet to discovery peer using the wrong udp port. Sending to {}, but discovery peer is listening on {}", + toPeer.getEndpoint().getUdpPort(), + agentPeer.getEndpoint().getUdpPort()); + } else if (!toAgent.isActive) { + LOG.warn("Attempt to send packet to an inactive peer."); } else { toAgent.processIncomingPacket(this, packet); - result.complete(null); } + result.complete(null); return result; } @@ -99,6 +120,7 @@ protected AsyncExecutor createWorkerExecutor() { @Override public CompletableFuture stop() { + isActive = false; return CompletableFuture.completedFuture(null); } From a159fcb11e669044e4745727a44ff803d0f2c832 Mon Sep 17 00:00:00 2001 From: Meredith Baxter Date: Fri, 13 Sep 2019 18:29:31 -0400 Subject: [PATCH 2/2] Fix setter --- .../pantheon/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java | 1 + 1 file changed, 1 insertion(+) diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java index 1505148e5b..508f9529c0 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryTestHelper.java @@ -228,6 +228,7 @@ public AgentBuilder advertisedHost(final String host) { public AgentBuilder bindPort(final int bindPort) { if (bindPort == 0) { + this.bindPort = OptionalInt.empty(); // Zero means pick the next available port return this; }