Skip to content

Commit

Permalink
Merge pull request #5362 from eclipse-vertx/endpoint-resolver-validit…
Browse files Browse the repository at this point in the history
…y-check

The endpoint resolver implementation should better use the address resolver validity check
  • Loading branch information
vietj authored Oct 21, 2024
2 parents ba82495 + 94b3afb commit 1a40458
Show file tree
Hide file tree
Showing 13 changed files with 140 additions and 70 deletions.
4 changes: 2 additions & 2 deletions vertx-core/src/main/java/examples/HTTPExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.ProxyOptions;
import io.vertx.core.net.ProxyType;
import io.vertx.core.net.endpoint.EndpointServer;
import io.vertx.core.net.endpoint.ServerEndpoint;
import io.vertx.core.streams.Pipe;
import io.vertx.core.streams.ReadStream;

Expand Down Expand Up @@ -1448,7 +1448,7 @@ public static void customLoadBalancingPolicy(Vertx vertx) {
.build();
}

private static int indexOfEndpoint(List<? extends EndpointServer> endpoints) {
private static int indexOfEndpoint(List<? extends ServerEndpoint> endpoints) {
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ public interface Endpoint {
/**
* The servers capable of serving requests for this endpoint.
*/
List<EndpointServer> servers();
List<ServerEndpoint> servers();

/**
* Select a server.
*
* @return the selected server
*/
default EndpointServer selectServer() {
default ServerEndpoint selectServer() {
return selectServer(null);
}

Expand All @@ -37,6 +37,6 @@ default EndpointServer selectServer() {
* @param key the routing key
* @return the selected server
*/
EndpointServer selectServer(String key);
ServerEndpoint selectServer(String key);

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ default InteractionMetrics<?> newMetrics() {
int numberOfRequests = Integer.MAX_VALUE;
int selected = -1;
int idx = 0;
for (EndpointServer node : servers) {
for (ServerEndpoint node : servers) {
int val = ((DefaultInteractionMetrics)node.metrics()).numberOfInflightRequests();
if (val < numberOfRequests) {
numberOfRequests = val;
Expand Down Expand Up @@ -125,6 +125,6 @@ static LoadBalancer consistentHashing(int numberOfVirtualServers, LoadBalancer f
* @param listOfServers the list of servers
* @return the selector
*/
ServerSelector selector(List<? extends EndpointServer> listOfServers);
ServerSelector selector(List<? extends ServerEndpoint> listOfServers);

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
@VertxGen
public interface EndpointServer {
public interface ServerEndpoint {

/**
* @return the node key for hashing strategies
*/
String key();

/**
* @return the node socket address
* @return the server socket address
*/
SocketAddress address();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*/
package io.vertx.core.net.endpoint.impl;

import io.vertx.core.net.endpoint.EndpointServer;
import io.vertx.core.net.endpoint.ServerEndpoint;
import io.vertx.core.net.endpoint.ServerSelector;

import java.nio.charset.StandardCharsets;
Expand All @@ -27,19 +27,19 @@
*/
public class ConsistentHashingSelector implements ServerSelector {

private final List<? extends EndpointServer> endpoints;
private final SortedMap<Long, EndpointServer> nodes;
private final List<? extends ServerEndpoint> endpoints;
private final SortedMap<Long, ServerEndpoint> nodes;
private final ServerSelector fallbackSelector;

public ConsistentHashingSelector(List<? extends EndpointServer> endpoints, int numberOfVirtualNodes, ServerSelector fallbackSelector) {
public ConsistentHashingSelector(List<? extends ServerEndpoint> endpoints, int numberOfVirtualNodes, ServerSelector fallbackSelector) {
MessageDigest instance;
try {
instance = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new UnsupportedOperationException(e);
}
SortedMap<Long, EndpointServer> ring = new TreeMap<>();
for (EndpointServer node : endpoints) {
SortedMap<Long, ServerEndpoint> ring = new TreeMap<>();
for (ServerEndpoint node : endpoints) {
for (int idx = 0;idx < numberOfVirtualNodes;idx++) {
String nodeId = node.key() + "-" + idx;
long hash = hash(instance, nodeId.getBytes(StandardCharsets.UTF_8));
Expand Down Expand Up @@ -86,14 +86,14 @@ public int select(String key) {
throw new UnsupportedOperationException(e);
}
long hash = hash(md, key.getBytes(StandardCharsets.UTF_8));
SortedMap<Long, EndpointServer> map = nodes.tailMap(hash);
SortedMap<Long, ServerEndpoint> map = nodes.tailMap(hash);
Long val;
if (map.isEmpty()) {
val = nodes.firstKey();
} else {
val = map.firstKey();
}
EndpointServer endpoint = nodes.get(val);
ServerEndpoint endpoint = nodes.get(val);
return endpoints.indexOf(endpoint); // TODO IMPROVE THAT
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.internal.net.endpoint.EndpointResolverInternal;
import io.vertx.core.net.endpoint.EndpointServer;
import io.vertx.core.net.endpoint.ServerEndpoint;
import io.vertx.core.net.endpoint.ServerInteraction;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.net.endpoint.InteractionMetrics;
Expand Down Expand Up @@ -80,7 +80,7 @@ public void lookupEndpoint(Address address, Promise<io.vertx.core.net.endpoint.E
return;
}
ManagedEndpoint resolved = resolveAddress(casted);
((Future) resolved.endpoint).onComplete(promise);
resolved.endpoint.onComplete(promise);
}

private class EndpointImpl implements io.vertx.core.net.endpoint.Endpoint {
Expand All @@ -93,13 +93,13 @@ public EndpointImpl(A address, AtomicLong lastAccessed, S state) {
this.lastAccessed = lastAccessed;
}
@Override
public List<EndpointServer> servers() {
public List<ServerEndpoint> servers() {
return endpointResolver.endpoint(state).servers;
}
public void close() {
endpointResolver.dispose(state);
}
private EndpointServer selectEndpoint(S state, String routingKey) {
private ServerEndpoint selectEndpoint(S state, String routingKey) {
ListOfServers listOfServers = endpointResolver.endpoint(state);
int idx;
if (routingKey == null) {
Expand All @@ -112,11 +112,8 @@ private EndpointServer selectEndpoint(S state, String routingKey) {
}
return null;
}
public EndpointServer selectServer(String key) {
if (!endpointResolver.isValid(state)) {
throw new IllegalStateException("Cannot resolve address " + address );
}
EndpointServer endpoint = selectEndpoint(state, key);
public ServerEndpoint selectServer(String key) {
ServerEndpoint endpoint = selectEndpoint(state, key);
if (endpoint == null) {
throw new IllegalStateException("No results for " + address );
}
Expand All @@ -128,6 +125,7 @@ private class ManagedEndpoint extends ManagedResource {

private final Future<EndpointImpl> endpoint;
private final AtomicBoolean disposed = new AtomicBoolean();
private boolean valid;

public ManagedEndpoint(Future<EndpointImpl> endpoint) {
super();
Expand Down Expand Up @@ -186,7 +184,15 @@ public Result(Future<EndpointImpl> fut, ManagedEndpoint endpoint, boolean create
private final BiFunction<ManagedEndpoint, Boolean, Result> fn = (endpoint, created) -> new Result(endpoint.endpoint, endpoint, created);

private ManagedEndpoint resolveAddress(A address) {
Result sFuture = endpointManager.withResource(address, provider, t -> true, fn);
Result sFuture = endpointManager.withResource(address, provider, managedEndpoint -> {
Future<EndpointImpl> fut = managedEndpoint.endpoint;
if (fut.succeeded()) {
EndpointImpl endpoint = fut.result();
return endpointResolver.isValid(endpoint.state);
} else {
return true;
}
}, fn);
if (sFuture.created) {
sFuture.fut.onFailure(err -> {
if (sFuture.endpoint.disposed.compareAndSet(false, true)) {
Expand All @@ -199,15 +205,15 @@ private ManagedEndpoint resolveAddress(A address) {
return sFuture.endpoint;
}

private static class ListOfServers implements Iterable<EndpointServer> {
final List<EndpointServer> servers;
private static class ListOfServers implements Iterable<ServerEndpoint> {
final List<ServerEndpoint> servers;
final ServerSelector selector;
private ListOfServers(List<EndpointServer> servers, ServerSelector selector) {
private ListOfServers(List<ServerEndpoint> servers, ServerSelector selector) {
this.servers = servers;
this.selector = selector;
}
@Override
public Iterator<EndpointServer> iterator() {
public Iterator<ServerEndpoint> iterator() {
return servers.iterator();
}
@Override
Expand All @@ -216,12 +222,12 @@ public String toString() {
}
}

public class EndpointServerImpl implements EndpointServer {
public class ServerEndpointImpl implements ServerEndpoint {
final AtomicLong lastAccessed;
final String key;
final N endpoint;
final InteractionMetrics<?> metrics;
public EndpointServerImpl(AtomicLong lastAccessed, String key, N endpoint, InteractionMetrics<?> metrics) {
public ServerEndpointImpl(AtomicLong lastAccessed, String key, N endpoint, InteractionMetrics<?> metrics) {
this.lastAccessed = lastAccessed;
this.key = key;
this.endpoint = endpoint;
Expand Down Expand Up @@ -282,14 +288,14 @@ private Future<EndpointImpl> resolve(A address) {
EndpointBuilder<ListOfServers, N> builder = new EndpointBuilder<>() {
@Override
public EndpointBuilder<ListOfServers, N> addServer(N server, String key) {
List<EndpointServer> list = new ArrayList<>();
List<ServerEndpoint> list = new ArrayList<>();
InteractionMetrics<?> metrics = loadBalancer.newMetrics();
list.add(new EndpointServerImpl(lastAccessed, key, server, metrics));
list.add(new ServerEndpointImpl(lastAccessed, key, server, metrics));
return new EndpointBuilder<>() {
@Override
public EndpointBuilder<ListOfServers, N> addServer(N server, String key) {
InteractionMetrics<?> metrics = loadBalancer.newMetrics();
list.add(new EndpointServerImpl(lastAccessed, key, server, metrics));
list.add(new ServerEndpointImpl(lastAccessed, key, server, metrics));
return this;
}
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.vertx.test.fakeloadbalancer;

import io.vertx.core.net.endpoint.EndpointServer;
import io.vertx.core.net.endpoint.ServerEndpoint;
import io.vertx.core.net.endpoint.ServerSelector;
import io.vertx.core.net.endpoint.LoadBalancer;
import io.vertx.core.net.endpoint.InteractionMetrics;
Expand All @@ -10,9 +10,9 @@

public class FakeLoadBalancer implements LoadBalancer {

List<? extends EndpointServer> endpoints;
List<? extends ServerEndpoint> endpoints;

public List<? extends EndpointServer> endpoints() {
public List<? extends ServerEndpoint> endpoints() {
return endpoints;
}

Expand All @@ -22,7 +22,7 @@ public InteractionMetrics<?> newMetrics() {
}

@Override
public ServerSelector selector(List<? extends EndpointServer> listOfServers) {
public ServerSelector selector(List<? extends ServerEndpoint> listOfServers) {
this.endpoints = listOfServers;
return LoadBalancer.ROUND_ROBIN.selector(listOfServers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io.vertx.core.net.Address;
import io.vertx.core.net.AddressResolver;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.endpoint.EndpointServer;
import io.vertx.core.net.endpoint.ServerEndpoint;
import io.vertx.core.spi.endpoint.EndpointResolver;
import io.vertx.core.spi.endpoint.EndpointBuilder;

Expand All @@ -16,12 +16,22 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

public class FakeEndpointResolver<B> implements AddressResolver, EndpointResolver<FakeAddress, FakeEndpoint, FakeState<B>, B> {

public static class Endpoint {
final List<SocketAddress> addresses;
final boolean valid;
public Endpoint(List<SocketAddress> addresses, boolean valid) {
this.addresses = addresses;
this.valid = valid;
}
}

class LazyFakeState {
final String name;
volatile List<SocketAddress> endpoints;
volatile Supplier<Endpoint> endpointSupplier;
AtomicReference<FakeState<B>> state = new AtomicReference<>();
LazyFakeState(String name) {
this.name = name;
Expand All @@ -31,8 +41,12 @@ class LazyFakeState {
private final ConcurrentMap<String, LazyFakeState> map = new ConcurrentHashMap<>();

public void registerAddress(String name, List<SocketAddress> endpoints) {
registerAddress(name, () -> new Endpoint(endpoints, true));
}

public void registerAddress(String name, Supplier<Endpoint> supplier) {
LazyFakeState lazy = map.computeIfAbsent(name, LazyFakeState::new);
lazy.endpoints = endpoints;
lazy.endpointSupplier = supplier;
FakeState prev = lazy.state.getAndSet(null);
if (prev != null) {
prev.isValid = false;
Expand All @@ -49,7 +63,7 @@ public List<FakeEndpoint> endpoints(String name) {
Iterator s1 = ((Iterable) state.state.get().endpoints).iterator();
List<FakeEndpoint> list = new ArrayList<>();
for (Object o : ((Iterable) state.state.get().endpoints)) {
EndpointServer instance = (EndpointServer) o;
ServerEndpoint instance = (ServerEndpoint) o;
list.add((FakeEndpoint) instance.unwrap());
}
return list;
Expand All @@ -71,11 +85,13 @@ public FakeAddress tryCast(Address address) {
public Future<FakeState<B>> resolve(FakeAddress address, EndpointBuilder<B, FakeEndpoint> builder) {
LazyFakeState state = map.get(address.name());
if (state != null) {
if (state.state.get() == null) {
for (SocketAddress socketAddress : state.endpoints) {
FakeState<B> blah = state.state.get();
if (blah == null || !blah.isValid) {
Endpoint endpoint = state.endpointSupplier.get();
for (SocketAddress socketAddress : endpoint.addresses) {
builder = builder.addServer(new FakeEndpoint(socketAddress));
}
state.state.set(new FakeState<>(state.name, builder.build()));
state.state.set(new FakeState<>(state.name, builder.build(), endpoint.valid));
}
return Future.succeededFuture(state.state.get());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ public class FakeState<B> {
final B endpoints;
volatile boolean isValid;

FakeState(String name, B endpoints) {
FakeState(String name, B endpoints, boolean valid) {
this.name = name;
this.endpoints = endpoints;
this.isValid = true;
this.isValid = valid;
}
}
Loading

0 comments on commit 1a40458

Please sign in to comment.