Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Treat discovery peer with unknown endpoint as unknown #1942

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public Endpoint getEndpoint() {
return endpoint;
}

public boolean discoveryEndpointMatches(final DiscoveryPeer peer) {
return peer.getEndpoint().getHost().equals(endpoint.getHost())
&& peer.getEndpoint().getUdpPort() == endpoint.getUdpPort();
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("DiscoveryPeer{");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,14 @@ public PeerTable(final BytesValue nodeId) {
* @param peer The peer to query.
* @return The stored representation.
*/
public Optional<DiscoveryPeer> get(final PeerId peer) {
public Optional<DiscoveryPeer> get(final DiscoveryPeer peer) {
if (!idBloom.mightContain(peer.getId())) {
return Optional.empty();
}
final int distance = distanceFrom(peer);
return table[distance].getAndTouch(peer.getId());
return table[distance]
.getAndTouch(peer.getId())
.filter(known -> known.discoveryEndpointMatches(peer));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import tech.pegasys.pantheon.crypto.SECP256K1;
import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair;
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryTestHelper.AgentBuilder;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.FindNeighborsPacketData;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.MockPeerDiscoveryAgent;
Expand Down Expand Up @@ -308,6 +310,102 @@ public void bonding_disallowOutgoingBonding() {
assertThat(remoteIncomingPackets).isEmpty();
}

/**
* These tests simulates the case where a node crashes then comes back up with a new ip address or
* listening port.
*/
@Test
public void bonding_simulatePeerRestartingWithNewEndpoint_updatedPort() {
simulatePeerRestartingOnDifferentEndpoint(false, true);
}

@Test
public void bonding_simulatePeerRestartingWithNewEndpoint_updatedHost() {
simulatePeerRestartingOnDifferentEndpoint(true, false);
}

@Test
public void bonding_simulatePeerRestartingWithNewEndpoint_updatedHostAndPort() {
simulatePeerRestartingOnDifferentEndpoint(true, true);
}

public void simulatePeerRestartingOnDifferentEndpoint(
final boolean updateHost, final boolean updatePort) {
// Setup peer
final MockPeerDiscoveryAgent agent = helper.startDiscoveryAgent();
final DiscoveryPeer agentPeer = agent.getAdvertisedPeer().get();

final KeyPair remoteKeyPair = SECP256K1.KeyPair.generate();
final String remoteIp = "1.2.3.4";
final MockPeerDiscoveryAgent remoteAgent =
helper.createDiscoveryAgent(
helper
.agentBuilder()
.keyPair(remoteKeyPair)
.advertisedHost(remoteIp)
.bootstrapPeers(agentPeer));

agent.start(999);
remoteAgent.start(888);
final DiscoveryPeer remotePeer = remoteAgent.getAdvertisedPeer().get();

// Remote agent should have bonded with agent
assertThat(agent.streamDiscoveredPeers()).hasSize(1);
assertThat(agent.streamDiscoveredPeers()).contains(remoteAgent.getAdvertisedPeer().get());

// Create a new remote agent with same id, and new endpoint
remoteAgent.stop();
final int newPort = updatePort ? 0 : remotePeer.getEndpoint().getUdpPort();
final String newIp = updateHost ? "1.2.3.5" : remoteIp;
final MockPeerDiscoveryAgent updatedRemoteAgent =
helper.createDiscoveryAgent(
helper
.agentBuilder()
.keyPair(remoteKeyPair)
.advertisedHost(newIp)
.bindPort(newPort)
.bootstrapPeers(agentPeer));
updatedRemoteAgent.start(889);
final DiscoveryPeer updatedRemotePeer = updatedRemoteAgent.getAdvertisedPeer().get();

// Sanity check
assertThat(
updatedRemotePeer.getEndpoint().getUdpPort() == remotePeer.getEndpoint().getUdpPort())
.isEqualTo(!updatePort);
assertThat(updatedRemotePeer.getEndpoint().getHost().equals(remotePeer.getEndpoint().getHost()))
.isEqualTo(!updateHost);
assertThat(updatedRemotePeer.getId()).isEqualTo(remotePeer.getId());

// Check that our restarted agent receives a PONG response
final List<IncomingPacket> incomingPackets = updatedRemoteAgent.getIncomingPackets();
assertThat(incomingPackets).hasSizeGreaterThan(0);
final long pongCount =
incomingPackets.stream()
.filter(packet -> packet.fromAgent.equals(agent))
.filter(packet -> packet.packet.getType().equals(PacketType.PONG))
.count();
assertThat(pongCount).isGreaterThan(0);

// Check that agent has an endpoint matching the restarted node
final Optional<DiscoveryPeer> discoveredPeer =
agent
.streamDiscoveredPeers()
.filter(peer -> peer.getId().equals(updatedRemotePeer.getId()))
.findFirst();
assertThat(discoveredPeer).isPresent();
assertThat(discoveredPeer.get().getEndpoint().getUdpPort())
.isEqualTo(updatedRemotePeer.getEndpoint().getUdpPort());
assertThat(discoveredPeer.get().getEndpoint().getHost())
.isEqualTo(updatedRemotePeer.getEndpoint().getHost());
// Check endpoint is consistent with enodeURL
assertThat(discoveredPeer.get().getEnodeURL().getDiscoveryPortOrZero())
.isEqualTo(updatedRemotePeer.getEndpoint().getUdpPort());
assertThat(discoveredPeer.get().getEnodeURL().getListeningPortOrZero())
.isEqualTo(updatedRemotePeer.getEndpoint().getFunctionalTcpPort());
assertThat(discoveredPeer.get().getEnodeURL().getIpAsString())
.isEqualTo(updatedRemotePeer.getEndpoint().getHost());
}

@Test
public void neighbors_allowOutgoingRequest() {
// Setup peer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package tech.pegasys.pantheon.ethereum.p2p.discovery;

import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Arrays.asList;

import tech.pegasys.pantheon.crypto.SECP256K1;
Expand All @@ -31,6 +32,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -179,6 +181,9 @@ public static class AgentBuilder {
private List<EnodeURL> bootnodes = Collections.emptyList();
private boolean active = true;
private PeerPermissions peerPermissions = PeerPermissions.noop();
private String advertisedHost = "127.0.0.1";
private OptionalInt bindPort = OptionalInt.empty();
private KeyPair keyPair = SECP256K1.KeyPair.generate();

private AgentBuilder(
final Map<BytesValue, MockPeerDiscoveryAgent> agents,
Expand Down Expand Up @@ -215,14 +220,37 @@ public AgentBuilder active(final boolean active) {
return this;
}

public AgentBuilder advertisedHost(final String host) {
checkNotNull(host);
this.advertisedHost = host;
return this;
}

public AgentBuilder bindPort(final int bindPort) {
if (bindPort == 0) {
this.bindPort = OptionalInt.empty();
// Zero means pick the next available port
return this;
}
this.bindPort = OptionalInt.of(bindPort);
return this;
}

public AgentBuilder keyPair(final KeyPair keyPair) {
checkNotNull(keyPair);
this.keyPair = keyPair;
return this;
}

public MockPeerDiscoveryAgent build() {
final int port = bindPort.orElse(nextAvailablePort.incrementAndGet());
final DiscoveryConfiguration config = new DiscoveryConfiguration();
config.setBootnodes(bootnodes);
config.setBindPort(nextAvailablePort.incrementAndGet());
config.setAdvertisedHost(advertisedHost);
config.setBindPort(port);
config.setActive(active);

return new MockPeerDiscoveryAgent(
SECP256K1.KeyPair.generate(), config, peerPermissions, agents);
return new MockPeerDiscoveryAgent(keyPair, config, peerPermissions, agents);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class MockPeerDiscoveryAgent extends PeerDiscoveryAgent {
// The set of known agents operating on the network
private final Map<BytesValue, MockPeerDiscoveryAgent> agentNetwork;
private final Deque<IncomingPacket> incomingPackets = new ArrayDeque<>();
private boolean isActive = false;

public MockPeerDiscoveryAgent(
final KeyPair keyPair,
Expand Down Expand Up @@ -65,25 +66,45 @@ public List<IncomingPacket> getIncomingPackets() {

@Override
protected CompletableFuture<InetSocketAddress> listenForConnections() {
isActive = true;
// Skip network setup for tests
InetSocketAddress address =
new InetSocketAddress(config.getAdvertisedHost(), config.getBindPort());
InetSocketAddress address = new InetSocketAddress(config.getBindHost(), config.getBindPort());
return CompletableFuture.completedFuture(address);
}

@Override
protected CompletableFuture<Void> sendOutgoingPacket(
final DiscoveryPeer toPeer, final Packet packet) {
CompletableFuture<Void> result = new CompletableFuture<>();
if (!this.isActive) {
result.completeExceptionally(new Exception("Attempt to send message from an inactive agent"));
}

MockPeerDiscoveryAgent toAgent = agentNetwork.get(toPeer.getId());
if (toAgent == null) {
result.completeExceptionally(
new Exception(
"Attempt to send to unknown peer. Agents must be constructed through PeerDiscoveryTestHelper."));
return result;
}

final DiscoveryPeer agentPeer = toAgent.getAdvertisedPeer().get();
if (!toPeer.getEndpoint().getHost().equals(agentPeer.getEndpoint().getHost())) {
LOG.warn(
"Attempt to send packet to discovery peer using the wrong host address. Sending to {}, but discovery peer is listening on {}",
toPeer.getEndpoint().getHost(),
agentPeer.getEndpoint().getHost());
} else if (toPeer.getEndpoint().getUdpPort() != agentPeer.getEndpoint().getUdpPort()) {
LOG.warn(
"Attempt to send packet to discovery peer using the wrong udp port. Sending to {}, but discovery peer is listening on {}",
toPeer.getEndpoint().getUdpPort(),
agentPeer.getEndpoint().getUdpPort());
} else if (!toAgent.isActive) {
LOG.warn("Attempt to send packet to an inactive peer.");
} else {
toAgent.processIncomingPacket(this, packet);
result.complete(null);
}
result.complete(null);
return result;
}

Expand All @@ -99,6 +120,7 @@ protected AsyncExecutor createWorkerExecutor() {

@Override
public CompletableFuture<?> stop() {
isActive = false;
return CompletableFuture.completedFuture(null);
}

Expand Down