From 0571c9819d2efd48fce7aec4b7a950038c437496 Mon Sep 17 00:00:00 2001 From: scottf Date: Mon, 28 Oct 2024 16:38:28 -0400 Subject: [PATCH] Server Pool Update --- server-pool/README.md | 13 ++- .../gradle/wrapper/gradle-wrapper.properties | 1 + server-pool/gradlew | 1 - server-pool/gradlew.bat | 4 +- .../java/io/nats/slp/ExampleServerPool.java | 94 +++++++++++++++++++ .../src/main/java/io/nats/slp/Main.java | 25 +++++ .../java/io/nats/slp/ProvideServersList.java | 70 -------------- 7 files changed, 131 insertions(+), 77 deletions(-) create mode 100644 server-pool/src/main/java/io/nats/slp/ExampleServerPool.java create mode 100644 server-pool/src/main/java/io/nats/slp/Main.java delete mode 100644 server-pool/src/main/java/io/nats/slp/ProvideServersList.java diff --git a/server-pool/README.md b/server-pool/README.md index 2b76e79..943269c 100644 --- a/server-pool/README.md +++ b/server-pool/README.md @@ -1,12 +1,17 @@ ![NATS](../images/large-logo.png) -# Server List Provider +# Example Server Pool -Example how the developer can provide the connection/reconnection info themselves / dynamically. +This example is a template example for implementing a server pool. -__TODO: FIX!!!__ +The ExampleServerPool class extends the internal implementation, NatsServerPool. -CURRENTLY NON-FUNCTIONAL AGAINST LATEST VERSION AS INITIAL IMPLEMENTATION WAS EXPERIMENTAL +There are comments in the example code to explain the intention of each method in the interface and custom methods in the implementation. + +## Source Code +[ServerPool](https://github.com/nats-io/nats.java/blob/main/src/main/java/io/nats/client/ServerPool.java) interface. + +[NatsServerPool](https://github.com/nats-io/nats.java/blob/main/src/main/java/io/nats/client/impl/NatsServerPool.java) implementation. ## License diff --git a/server-pool/gradle/wrapper/gradle-wrapper.properties b/server-pool/gradle/wrapper/gradle-wrapper.properties index a4b4429..cd141cf 100644 --- a/server-pool/gradle/wrapper/gradle-wrapper.properties +++ b/server-pool/gradle/wrapper/gradle-wrapper.properties @@ -3,3 +3,4 @@ distributionPath=wrapper/dists distributionUrl=https\://services.gradle.org/distributions/gradle-6.3-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists + diff --git a/server-pool/gradlew b/server-pool/gradlew index 2fe81a7..35a1cfb 100644 --- a/server-pool/gradlew +++ b/server-pool/gradlew @@ -1,5 +1,4 @@ #!/usr/bin/env sh - # # Copyright 2015 the original author or authors. # diff --git a/server-pool/gradlew.bat b/server-pool/gradlew.bat index 62bd9b9..bc01e42 100644 --- a/server-pool/gradlew.bat +++ b/server-pool/gradlew.bat @@ -15,11 +15,11 @@ @rem @if "%DEBUG%" == "" @echo off -@rem ########################################################################## +@rem ######################################################################### @rem @rem Gradle startup script for Windows @rem -@rem ########################################################################## +@rem ######################################################################### @rem Set local scope for the variables with windows NT shell if "%OS%"=="Windows_NT" setlocal diff --git a/server-pool/src/main/java/io/nats/slp/ExampleServerPool.java b/server-pool/src/main/java/io/nats/slp/ExampleServerPool.java new file mode 100644 index 0000000..a7c7432 --- /dev/null +++ b/server-pool/src/main/java/io/nats/slp/ExampleServerPool.java @@ -0,0 +1,94 @@ +package io.nats.slp; + +import io.nats.client.Options; +import io.nats.client.impl.NatsServerPool; +import io.nats.client.support.NatsUri; + +import java.util.List; + +// By extending NatsServerPool you get a jump start on +// the implementation for ServerPool +class ExampleServerPool extends NatsServerPool { + + @Override + public void initialize(Options opts) { + // here is the opportunity to inspect any Options + // you are already using in your connection + super.initialize(opts); + } + + @Override + public boolean acceptDiscoveredUrls(List discoveredServers) { + // Whenever the client receives ServerInfo from the server + // it provides a list of available servers to the pool + // ----- + // If you are implementing custom behavior, for instance + // you already know about all the servers you want, + // so you might not care about these. + return super.acceptDiscoveredUrls(discoveredServers); + } + + @Override + protected void afterListChanged() { + // This method is part of NatsServerPool, not the ServerPool interface + // This is called in NatsServerPool at the end of the initialize() method + // and at the end of the acceptDiscoveredUrls() method + super.afterListChanged(); + } + + @Override + public NatsUri peekNextServer() { + // this allows the connection handle to peek at the next server + // you will give it via the nextServer() method + return super.peekNextServer(); + } + + @Override + public NatsUri nextServer() { + // give the connection the next server to try to connect to + // from your pool + return super.nextServer(); + } + + @Override + public List resolveHostToIps(String host) { + // the connection will try to resolve hosts to ip addresses + // if Options.isNoResolveHostnames is not set (default is to resolve) + return super.resolveHostToIps(host); + } + + @Override + public void connectSucceeded(NatsUri nuri) { + // The connection lets you know if it was able to + // connect to the nextServer() you gave it + super.connectSucceeded(nuri); + } + + @Override + public void connectFailed(NatsUri nuri) { + // The connection lets you know if it could not + // connect to the nextServer() you gave it + super.connectFailed(nuri); + } + + @Override + public List getServerList() { + // This method is not used by the connection other than to pass on + // information to the user + return super.getServerList(); + } + + @Override + public boolean hasSecureServer() { + // This asks your pool if any of your connection are secure + // so it knows whether to upgrade the connection + return super.hasSecureServer(); + } + + @Override + protected int findEquivalent(List list, NatsUri toFind) { + // This method is part of NatsServerPool, not the ServerPool interface + // It's used to keep its own server list unique + return super.findEquivalent(list, toFind); + } +} diff --git a/server-pool/src/main/java/io/nats/slp/Main.java b/server-pool/src/main/java/io/nats/slp/Main.java new file mode 100644 index 0000000..83b54a2 --- /dev/null +++ b/server-pool/src/main/java/io/nats/slp/Main.java @@ -0,0 +1,25 @@ +package io.nats.slp; + +import io.nats.client.Connection; +import io.nats.client.Nats; +import io.nats.client.Options; + +public class Main { + static String[] BOOTSTRAP = new String[] {"nats://host1:4222","nats://host2:4222","nats://host3:4222"}; + + public static void main(String[] args) { + // provide a custom implementation of the ServerPool + // There also is a property for a no-op constructor ServerPool instance: servers_pool_implementation_class + Options options = new Options.Builder() + .servers(BOOTSTRAP) + .serverPool(new ExampleServerPool()) + .build(); + + try (Connection nc = Nats.connect(options)) { + // During connects or reconnects, the server pool implementation is called + } + catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/server-pool/src/main/java/io/nats/slp/ProvideServersList.java b/server-pool/src/main/java/io/nats/slp/ProvideServersList.java deleted file mode 100644 index ac4be0c..0000000 --- a/server-pool/src/main/java/io/nats/slp/ProvideServersList.java +++ /dev/null @@ -1,70 +0,0 @@ -package io.nats.slp; - -import io.nats.client.Connection; -import io.nats.client.Nats; -import io.nats.client.Options; -import io.nats.client.api.ServerInfo; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; - -class ProvideServersList { // implements ServerListProvider { - // TO DEMONSTRATE I STARTED A 3 CLUSTER SERVER LOCALLY on ports 4222, 5222 and 6222 - // The ServerListProvider implementation in this case is a simple randomization - - static String[] BOOTSTRAP = new String[] {"0.0.0.0:4222", "0.0.0.0:5222", "0.0.0.0:6222"}; - - public static void main(String[] args) { - Options options = new Options.Builder() - .servers(BOOTSTRAP) -// .serverListProvider(new ProvideServersList()) - .build(); - - System.out.println("CONNECTING"); - try (Connection nc = Nats.connect(options)) { - ServerInfo si = nc.getServerInfo(); - System.out.println("CONNECTED 1"); - System.out.println(" to: " + si.getHost() + " " + si.getPort()); - System.out.println(" discovered: " + si.getConnectURLs()); - - // WHILE THE THREAD IS SLEEPING, KILL THE SERVER WE ARE CONNECTED TO SO A RECONNECT OCCURS - Thread.sleep(10_000); - - System.out.println("CONNECTED 2"); - si = nc.getServerInfo(); - System.out.println(" to: " + si.getHost() + " " + si.getPort()); - System.out.println(" discovered: " + si.getConnectURLs()); - } - catch (Exception e) { - e.printStackTrace(); - } - } - - public List getServerList(String currentServer, List optionsServersUnprocessed, List discoveredServersUnprocessed) { - System.out.println("GET SERVER LIST"); - System.out.println(" current server: " + currentServer); - System.out.println(" servers as bootstrapped: " + optionsServersUnprocessed); - System.out.println(" servers as discovered: " + discoveredServersUnprocessed); - - if (discoveredServersUnprocessed.size() == 0) { - return randomize(currentServer, Arrays.asList(BOOTSTRAP)); - } - - return randomize(currentServer, discoveredServersUnprocessed); - } - - private static List randomize(String currentServer, List servers) { - if (servers.size() > 1) { - if (currentServer != null) { - servers.remove(currentServer); - } - Collections.shuffle(servers, ThreadLocalRandom.current()); - if (currentServer != null) { - servers.add(currentServer); - } - } - return servers; - } -}