From 51ea8f71cf3c7085d047ca5a59dbf160e8aaa034 Mon Sep 17 00:00:00 2001 From: GodFuper <40344333+GodFuper@users.noreply.github.com> Date: Thu, 5 Dec 2024 16:15:56 +0600 Subject: [PATCH] Virtual threads (#65) * Upgrading the Java version to version 21 * Adding a service for asynchronous tasks * Moved the implementation of multithreading to AsyncService * Removed Executor.java * A little refactoring * spotlessApply * Shutting down the application in one function. * Transferring Lock functionality to LockUtil * AsyncService optimization * Return interrupted thread for PeerTurnRefreshModule * Return interrupted thread for GPGNetClient * Executor with dependency injection * spotlessApply * Let all processes in executor terminate correctly, otherwise close the application. * Returned Thread to classes with infinite loops. * Error handling * Fix close Ice Adapter * Fixing shutdown after InterruptedException --- .github/workflows/build.yml | 2 +- .github/workflows/release.yml | 2 +- README.md | 2 +- build.gradle | 2 +- client/build.gradle | 2 +- ice-adapter/build.gradle | 2 +- .../com/faforever/iceadapter/IceAdapter.java | 69 +-- .../com/faforever/iceadapter/debug/Debug.java | 31 +- .../iceadapter/debug/DebugWindow.java | 82 ++-- .../debug/DebugWindowController.java | 10 +- .../debug/InfoWindowController.java | 19 +- .../iceadapter/debug/TelemetryDebugger.java | 45 +- .../iceadapter/gpgnet/FaDataOutputStream.java | 57 ++- .../iceadapter/gpgnet/GPGNetServer.java | 62 +-- .../faforever/iceadapter/ice/GameSession.java | 4 +- .../com/faforever/iceadapter/ice/Peer.java | 80 ++-- .../ice/PeerConnectivityCheckerModule.java | 71 ++-- .../iceadapter/ice/PeerIceModule.java | 392 ++++++++++-------- .../iceadapter/ice/PeerTurnRefreshModule.java | 22 +- .../faforever/iceadapter/rpc/RPCHandler.java | 21 +- .../faforever/iceadapter/rpc/RPCService.java | 9 +- .../faforever/iceadapter/util/Executor.java | 20 - .../iceadapter/util/ExecutorHolder.java | 13 + .../faforever/iceadapter/util/LockUtil.java | 17 + .../iceadapter/util/NetworkToolbox.java | 8 +- .../iceadapter/util/PingWrapper.java | 44 +- .../faforever/iceadapter/util/TrayIcon.java | 16 +- server/build.gradle | 2 +- shared/build.gradle | 2 +- 29 files changed, 642 insertions(+), 466 deletions(-) delete mode 100644 ice-adapter/src/main/java/com/faforever/iceadapter/util/Executor.java create mode 100644 ice-adapter/src/main/java/com/faforever/iceadapter/util/ExecutorHolder.java create mode 100644 ice-adapter/src/main/java/com/faforever/iceadapter/util/LockUtil.java diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 17faed5..2edf1f2 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -8,7 +8,7 @@ on: [push] jobs: checks: runs-on: ubuntu-latest - container: eclipse-temurin:17-jdk + container: eclipse-temurin:21-jdk steps: - name: Checkout code uses: actions/checkout@v4 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ae9f588..11b292b 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -14,7 +14,7 @@ jobs: release: runs-on: ubuntu-latest - container: eclipse-temurin:17-jdk + container: eclipse-temurin:21-jdk steps: - uses: actions/checkout@v3 diff --git a/README.md b/README.md index 879b388..2897b6c 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ A P2P connection proxy for Supreme Commander: Forged Alliance using [ICE](https://en.wikipedia.org/wiki/Interactive_Connectivity_Establishment). ## Building -Build the project using gradle from the provided wrapper (Java 17 required). +Build the project using gradle from the provided wrapper (Java 21 required). ## JSONRPC Protocol The `faf-ice-adapter` is controlled using a bi-directional [JSON-RPC](http://www.jsonrpc.org/specification) interface over TCP. diff --git a/build.gradle b/build.gradle index 4379ba4..975933d 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { group 'com.faforever' version '1.0-SNAPSHOT' -sourceCompatibility = JavaVersion.VERSION_17 +sourceCompatibility = JavaVersion.VERSION_21 repositories { mavenCentral() diff --git a/client/build.gradle b/client/build.gradle index 15c2d1d..be56980 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -5,7 +5,7 @@ apply plugin: 'com.github.johnrengelman.shadow' group 'com.faforever' version '1.0-SNAPSHOT' -sourceCompatibility = JavaVersion.VERSION_17 +sourceCompatibility = JavaVersion.VERSION_21 repositories { mavenCentral() diff --git a/ice-adapter/build.gradle b/ice-adapter/build.gradle index c478cae..1a7903a 100644 --- a/ice-adapter/build.gradle +++ b/ice-adapter/build.gradle @@ -14,7 +14,7 @@ apply plugin: 'com.diffplug.spotless' group 'com.faforever' -sourceCompatibility = JavaVersion.VERSION_17 +sourceCompatibility = JavaVersion.VERSION_21 repositories { mavenCentral() diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/IceAdapter.java b/ice-adapter/src/main/java/com/faforever/iceadapter/IceAdapter.java index 6cfd090..bd6bbe3 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/IceAdapter.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/IceAdapter.java @@ -1,26 +1,30 @@ package com.faforever.iceadapter; -import static com.faforever.iceadapter.debug.Debug.debug; - import com.faforever.iceadapter.debug.Debug; import com.faforever.iceadapter.gpgnet.GPGNetServer; import com.faforever.iceadapter.gpgnet.GameState; import com.faforever.iceadapter.ice.GameSession; import com.faforever.iceadapter.ice.PeerIceModule; import com.faforever.iceadapter.rpc.RPCService; -import com.faforever.iceadapter.util.Executor; +import com.faforever.iceadapter.util.ExecutorHolder; +import com.faforever.iceadapter.util.LockUtil; import com.faforever.iceadapter.util.TrayIcon; -import java.util.concurrent.Callable; import lombok.extern.slf4j.Slf4j; import picocli.CommandLine; +import java.util.concurrent.*; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static com.faforever.iceadapter.debug.Debug.debug; + @CommandLine.Command( name = "faf-ice-adapter", mixinStandardHelpOptions = true, usageHelpAutoWidth = true, description = "An ice (RFC 5245) based network bridge between FAF client and ForgedAlliance.exe") @Slf4j -public class IceAdapter implements Callable { +public class IceAdapter implements Callable, AutoCloseable { private static IceAdapter INSTANCE; private static String VERSION = "SNAPSHOT"; private static volatile GameSession GAME_SESSION; @@ -29,6 +33,8 @@ public class IceAdapter implements Callable { private IceOptions iceOptions; private volatile boolean running = true; + private final ExecutorService executor = ExecutorHolder.getExecutor(); + private static final Lock lockGameSession = new ReentrantLock(); public static void main(String[] args) { new CommandLine(new IceAdapter()).setUnmatchedArgumentsAllowed(true).execute(args); @@ -60,6 +66,13 @@ public void start() { debug().startupComplete(); } + @Override + public void close() { + executor.shutdown(); + CompletableFuture.runAsync(executor::shutdownNow, + CompletableFuture.delayedExecutor(250, TimeUnit.MILLISECONDS)); + } + public static void onHostGame(String mapName) { log.info("onHostGame"); createGameSession(); @@ -113,42 +126,46 @@ public static void onDisconnectFromPeer(int remotePlayerId) { }); } - private static synchronized void createGameSession() { - if (GAME_SESSION != null) { - GAME_SESSION.close(); - GAME_SESSION = null; - } + private static void createGameSession() { + LockUtil.executeWithLock(lockGameSession, () -> { + if (GAME_SESSION != null) { + GAME_SESSION.close(); + GAME_SESSION = null; + } - GAME_SESSION = new GameSession(); + GAME_SESSION = new GameSession(); + }); } /** * Triggered by losing gpgnet connection to FA. * Closes the active Game/ICE session */ - public static synchronized void onFAShutdown() { - if (GAME_SESSION != null) { - log.info("FA SHUTDOWN, closing everything"); - GAME_SESSION.close(); - GAME_SESSION = null; - // Do not put code outside of this if clause, else it will be executed multiple times - } + public static void onFAShutdown() { + LockUtil.executeWithLock(lockGameSession, () -> { + if (GAME_SESSION != null) { + log.info("FA SHUTDOWN, closing everything"); + GAME_SESSION.close(); + GAME_SESSION = null; + // Do not put code outside of this if clause, else it will be executed multiple times + } + }); } /** * Stop the ICE adapter */ - public static void close() { - log.info("close() - stopping the adapter"); - - Executor.executeDelayed(500, () -> System.exit(0)); + public static void close(int status) { + log.info("close() - stopping the adapter. Status: {}", status); onFAShutdown(); // will close gameSession aswell GPGNetServer.close(); RPCService.close(); + Debug.close(); TrayIcon.close(); - - System.exit(0); + INSTANCE.close(); + CompletableFuture.runAsync(() -> System.exit(status), + CompletableFuture.delayedExecutor(500, TimeUnit.MILLISECONDS)); } public static int getId() { @@ -183,6 +200,10 @@ public static boolean isRunning() { return INSTANCE.running; } + public static Executor getExecutor() { + return INSTANCE.executor; + } + public static GameSession getGameSession() { return GAME_SESSION; } diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/debug/Debug.java b/ice-adapter/src/main/java/com/faforever/iceadapter/debug/Debug.java index bcdbb88..8e0fc7f 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/debug/Debug.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/debug/Debug.java @@ -1,9 +1,11 @@ package com.faforever.iceadapter.debug; import com.faforever.iceadapter.IceAdapter; -import java.lang.reflect.InvocationTargetException; import lombok.extern.slf4j.Slf4j; +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.CompletableFuture; + @Slf4j public class Debug { // TODO @@ -17,6 +19,8 @@ public class Debug { public static int RPC_PORT; + private static TelemetryDebugger telemetryDebugger; + private static final DebugFacade debugFacade = new DebugFacade(); public static void register(Debugger debugger) { @@ -28,7 +32,8 @@ public static void remove(Debugger debugger) { } public static void init() { - new TelemetryDebugger(IceAdapter.getTelemetryServer(), IceAdapter.getGameId(), IceAdapter.getId()); + telemetryDebugger = new TelemetryDebugger( + IceAdapter.getTelemetryServer(), IceAdapter.getGameId(), IceAdapter.getId()); // Debugger window is started and set to debugFuture when either window is requested as the info window can be // used to open the debug window @@ -38,25 +43,33 @@ public static void init() { } if (isJavaFxSupported()) { - new Thread(() -> { + CompletableFuture.runAsync( + () -> { try { Class.forName("com.faforever.iceadapter.debug.DebugWindow") .getMethod("launchApplication") .invoke(null); + } catch (InvocationTargetException e) { + log.info("DebugWindows stopped"); } catch (IllegalAccessException | ClassNotFoundException - | NoSuchMethodException - | InvocationTargetException e) { - e.printStackTrace(); - log.error("Could not create DebugWindow. Running without debug window."); + | NoSuchMethodException e) { + log.error("Could not create DebugWindow. Running without debug window.", e); } - }) - .start(); // Completes future once application started + }, + IceAdapter.getExecutor()); } else { log.info("No JavaFX support detected. Running without debug window."); } } + public static void close() { + if (telemetryDebugger != null) { + telemetryDebugger.close(); + telemetryDebugger = null; + } + } + public static Debugger debug() { return debugFacade; } diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/debug/DebugWindow.java b/ice-adapter/src/main/java/com/faforever/iceadapter/debug/DebugWindow.java index 418848e..b10508c 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/debug/DebugWindow.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/debug/DebugWindow.java @@ -5,11 +5,7 @@ import com.faforever.iceadapter.gpgnet.GameState; import com.faforever.iceadapter.ice.Peer; import com.faforever.iceadapter.ice.PeerConnectivityCheckerModule; -import com.faforever.iceadapter.util.Executor; import com.nbarraille.jjsonrpc.JJsonPeer; -import java.io.IOException; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; import javafx.application.Application; import javafx.application.Platform; import javafx.beans.property.SimpleBooleanProperty; @@ -29,6 +25,13 @@ import org.ice4j.ice.CandidatePair; import org.ice4j.ice.CandidateType; import org.ice4j.ice.Component; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.util.Comparator; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; @Slf4j public class DebugWindow extends Application implements Debugger { @@ -69,25 +72,30 @@ public void start(Stage stage) { stage.setScene(scene); stage.setTitle("FAF ICE adapter - Debugger - Build: %s".formatted(IceAdapter.getVersion())); - // stage.setOnCloseRequest(Event::consume); - // stage.show(); if (Debug.ENABLE_DEBUG_WINDOW) { - Executor.executeDelayed(Debug.DELAY_UI_MS, () -> runOnUIThread(stage::show)); + CompletableFuture.runAsync( + () -> runOnUIThread(stage::show), + CompletableFuture.delayedExecutor( + Debug.DELAY_UI_MS, TimeUnit.MILLISECONDS, IceAdapter.getExecutor())); } - // new Thread(() -> Debug.debug.complete(this)).start(); log.info("Created debug window."); if (Debug.ENABLE_INFO_WINDOW) { - Executor.executeDelayed(Debug.DELAY_UI_MS, () -> runOnUIThread(() -> new InfoWindow().init())); + CompletableFuture.runAsync( + () -> runOnUIThread(() -> new InfoWindow().init()), + CompletableFuture.delayedExecutor( + Debug.DELAY_UI_MS, TimeUnit.MILLISECONDS, IceAdapter.getExecutor())); } } public void showWindow() { - runOnUIThread(() -> stage.show()); - initStaticVariables(); - initPeers(); + runOnUIThread(() -> { + stage.show(); + initStaticVariables(); + initPeers(); + }); } @Override @@ -96,7 +104,6 @@ public void startupComplete() { } public void initStaticVariables() { - runOnUIThread(() -> { controller.versionLabel.setText("Version: %s".formatted(IceAdapter.getVersion())); controller.userLabel.setText("User: %s(%d)".formatted(IceAdapter.getLogin(), IceAdapter.getId())); @@ -116,6 +123,7 @@ public void initPeers() { p.connectivityUpdate(peer); peers.add(p); } + peers.sort(DebugPeer::compareTo); } }); } @@ -158,36 +166,33 @@ public void gameStateChanged() { @Override public void connectToPeer(int id, String login, boolean localOffer) { - new Thread(() -> { - synchronized (peers) { - peers.add(new DebugPeer(id, login, localOffer)); // Might callback into jfx - } - }) - .start(); + runOnUIThread(() -> { + synchronized (peers) { + peers.add(new DebugPeer(id, login, localOffer)); // Might callback into jfx + peers.sort(DebugPeer::compareTo); + } + }); } @Override public void disconnectFromPeer(int id) { - new Thread(() -> { - synchronized (peers) { - peers.removeIf(peer -> peer.id.get() == id); // Might callback into jfx - } - }) - .start(); + runOnUIThread(() -> { + synchronized (peers) { + peers.removeIf(peer -> peer.id.get() == id); // Might callback into jfx + peers.sort(DebugPeer::compareTo); + } + }); } @Override public void peerStateChanged(Peer peer) { - new Thread(() -> { - synchronized (peers) { - peers.stream() - .filter(p -> p.id.get() == peer.getRemoteId()) - .forEach(p -> { - p.stateChangedUpdate(peer); - }); - } - }) - .start(); + runOnUIThread(() -> { + synchronized (peers) { + peers.stream().filter(p -> p.id.get() == peer.getRemoteId()).forEach(p -> { + p.stateChangedUpdate(peer); + }); + } + }); } @Override @@ -217,7 +222,7 @@ public static void launchApplication() { @AllArgsConstructor // @Getter //PropertyValueFactory will attempt to access fieldNameProperty(), then getFieldName() (expecting value, // not property) and then isFieldName() methods - public static class DebugPeer { + public static class DebugPeer implements Comparable { public SimpleIntegerProperty id = new SimpleIntegerProperty(-1); public SimpleStringProperty login = new SimpleStringProperty(""); public SimpleBooleanProperty localOffer = new SimpleBooleanProperty(false); @@ -366,5 +371,10 @@ public void connectivityUpdate(Peer peer) { .orElse(-1L) .intValue()); } + + @Override + public int compareTo(@NotNull DebugPeer o) { + return Comparator.comparingLong(DebugPeer::getId).compare(this, o); + } } } diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/debug/DebugWindowController.java b/ice-adapter/src/main/java/com/faforever/iceadapter/debug/DebugWindowController.java index 3a23aee..178fecc 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/debug/DebugWindowController.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/debug/DebugWindowController.java @@ -1,7 +1,6 @@ package com.faforever.iceadapter.debug; import com.faforever.iceadapter.IceAdapter; -import java.util.Objects; import javafx.event.ActionEvent; import javafx.fxml.FXML; import javafx.scene.control.*; @@ -11,6 +10,8 @@ import lombok.extern.slf4j.Slf4j; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; + @Slf4j public class DebugWindowController { public static DebugWindowController INSTANCE; @@ -49,12 +50,13 @@ public class DebugWindowController { public DebugWindowController() {} public void onKillAdapterClicked(ActionEvent actionEvent) { - System.exit(337); + IceAdapter.close(337); } public void reconnectToPeer(DebugWindow.DebugPeer peer) { - if (Objects.nonNull(peer)) { - new Thread(() -> IceAdapter.getGameSession().reconnectToPeer(peer.getId())).start(); + if (peer != null) { + CompletableFuture.runAsync( + () -> IceAdapter.getGameSession().reconnectToPeer(peer.getId()), IceAdapter.getExecutor()); } } diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/debug/InfoWindowController.java b/ice-adapter/src/main/java/com/faforever/iceadapter/debug/InfoWindowController.java index 516d1c5..0560822 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/debug/InfoWindowController.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/debug/InfoWindowController.java @@ -2,14 +2,16 @@ import com.faforever.iceadapter.IceAdapter; import com.faforever.iceadapter.util.TrayIcon; -import java.awt.*; -import java.io.IOException; -import java.net.URI; import javafx.event.ActionEvent; import javafx.fxml.FXML; import javafx.scene.control.Button; import lombok.SneakyThrows; +import java.awt.*; +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.CompletableFuture; + public class InfoWindowController { public Button killAdapterButton; public Button showDebugWindowButton; @@ -25,12 +27,12 @@ public void onKillAdapterClicked(ActionEvent actionEvent) { killAdapterButton.setText( "This will disconnect you from the game! Click " + (3 - killRequests) + " more times."); } else { - System.exit(337); + IceAdapter.close(337); } } public void onShowDebugWindowClicked(ActionEvent actionEvent) { - DebugWindow.INSTANCE.thenAcceptAsync(DebugWindow::showWindow); + DebugWindow.INSTANCE.thenAcceptAsync(DebugWindow::showWindow, IceAdapter.getExecutor()); } @SneakyThrows @@ -41,14 +43,15 @@ public void onTelemetryWebUiClicked(ActionEvent actionEvent) { IceAdapter.getGameId(), IceAdapter.getId()); - new Thread(() -> { + CompletableFuture.runAsync( + () -> { try { Desktop.getDesktop().browse(URI.create(url)); } catch (IOException e) { throw new RuntimeException(e); } - }) - .start(); + }, + IceAdapter.getExecutor()); } public void onMinimizeToTrayClicked(ActionEvent actionEvent) { diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/debug/TelemetryDebugger.java b/ice-adapter/src/main/java/com/faforever/iceadapter/debug/TelemetryDebugger.java index 2b4325d..b7f0028 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/debug/TelemetryDebugger.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/debug/TelemetryDebugger.java @@ -4,20 +4,19 @@ import com.faforever.iceadapter.gpgnet.GPGNetServer; import com.faforever.iceadapter.ice.Peer; import com.faforever.iceadapter.ice.PeerConnectivityCheckerModule; -import com.faforever.iceadapter.telemetry.ConnectToPeer; -import com.faforever.iceadapter.telemetry.CoturnServer; -import com.faforever.iceadapter.telemetry.DisconnectFromPeer; -import com.faforever.iceadapter.telemetry.OutgoingMessageV1; -import com.faforever.iceadapter.telemetry.RegisterAsPeer; -import com.faforever.iceadapter.telemetry.UpdateCoturnList; -import com.faforever.iceadapter.telemetry.UpdateGameState; -import com.faforever.iceadapter.telemetry.UpdateGpgnetState; -import com.faforever.iceadapter.telemetry.UpdatePeerConnectivity; -import com.faforever.iceadapter.telemetry.UpdatePeerState; +import com.faforever.iceadapter.telemetry.*; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.google.common.util.concurrent.RateLimiter; import com.nbarraille.jjsonrpc.JJsonPeer; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.ice4j.ice.Candidate; +import org.ice4j.ice.CandidatePair; +import org.ice4j.ice.Component; +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ServerHandshake; + import java.net.ConnectException; import java.net.URI; import java.time.Instant; @@ -29,16 +28,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; -import org.ice4j.ice.Candidate; -import org.ice4j.ice.CandidatePair; -import org.ice4j.ice.Component; -import org.java_websocket.client.WebSocketClient; -import org.java_websocket.handshake.ServerHandshake; @Slf4j -public class TelemetryDebugger implements Debugger { +public class TelemetryDebugger implements Debugger, AutoCloseable { private final WebSocketClient websocketClient; private final ObjectMapper objectMapper; @@ -87,10 +79,9 @@ public void onError(Exception ex) { objectMapper = new ObjectMapper(); objectMapper.registerModule(new JavaTimeModule()); - sendingLoopThread = new Thread(this::sendingLoop, "sendingLoop"); - sendingLoopThread.setUncaughtExceptionHandler( - (t, e) -> log.error("Thread sendingLoop crashed unexpectedly", e)); - sendingLoopThread.start(); + sendingLoopThread = Thread.ofVirtual() + .name("sendingLoop") + .start(this::sendingLoop); } private void sendMessage(OutgoingMessageV1 message) { @@ -103,7 +94,7 @@ private void sendMessage(OutgoingMessageV1 message) { @SneakyThrows private void sendingLoop() { - while (true) { + while (!Thread.currentThread().isInterrupted()) { var message = messageQueue.take(); try { String json = objectMapper.writeValueAsString(message); @@ -116,6 +107,9 @@ private void sendingLoop() { log.trace("Sending telemetry message: {}", json); websocketClient.send(json); + } catch (InterruptedException e) { + log.info("Sending loop interrupted"); + return; } catch (Exception e) { log.error("Error on sending message object: {}", message, e); } @@ -226,4 +220,9 @@ public void updateCoturnList(Collection servers) { servers.stream().map(CoturnServer::host).findFirst().orElse(null), servers)); } + + @Override + public void close() { + sendingLoopThread.interrupt(); + } } diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/gpgnet/FaDataOutputStream.java b/ice-adapter/src/main/java/com/faforever/iceadapter/gpgnet/FaDataOutputStream.java index 02bb91a..08a3442 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/gpgnet/FaDataOutputStream.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/gpgnet/FaDataOutputStream.java @@ -8,6 +8,8 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * Writes data to Forged Alliance (the forgedalliance, not the lobby). @@ -20,6 +22,7 @@ public class FaDataOutputStream extends OutputStream { public static final char DELIMITER = '\b'; private final LittleEndianDataOutputStream outputStream; private final Charset charset = StandardCharsets.UTF_8; + private final Lock writer = new ReentrantLock(); public FaDataOutputStream(OutputStream outputStream) { this.outputStream = new LittleEndianDataOutputStream(new BufferedOutputStream(outputStream)); @@ -27,15 +30,46 @@ public FaDataOutputStream(OutputStream outputStream) { @Override public void write(int b) throws IOException { - outputStream.write(b); + writer.lock(); + try { + outputStream.write(b); + } finally { + writer.unlock(); + } + } + + public void writeMessage(String header, Object... args) throws IOException { + writer.lock(); + try { + writeString(header); + writeArgs(Arrays.asList(args)); + outputStream.flush(); + } finally { + writer.unlock(); + } } @Override public void flush() throws IOException { - outputStream.flush(); + writer.lock(); + try { + outputStream.flush(); + } finally { + writer.unlock(); + } + } + + @Override + public void close() throws IOException { + writer.lock(); + try { + outputStream.close(); + } finally { + writer.unlock(); + } } - public synchronized void writeArgs(List args) throws IOException { + private void writeArgs(List args) throws IOException { writeInt(args.size()); for (Object arg : args) { @@ -52,27 +86,16 @@ public synchronized void writeArgs(List args) throws IOException { } } - public void writeInt(int value) throws IOException { + private void writeInt(int value) throws IOException { outputStream.writeInt(value); } - public synchronized void writeByte(int b) throws IOException { + private void writeByte(int b) throws IOException { outputStream.writeByte(b); } - public synchronized void writeString(String string) throws IOException { + private void writeString(String string) throws IOException { outputStream.writeInt(string.length()); outputStream.write(string.getBytes(charset)); } - - public synchronized void writeMessage(String header, Object... args) throws IOException { - writeString(header); - writeArgs(Arrays.asList(args)); - outputStream.flush(); - } - - @Override - public void close() throws IOException { - outputStream.close(); - } } diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/gpgnet/GPGNetServer.java b/ice-adapter/src/main/java/com/faforever/iceadapter/gpgnet/GPGNetServer.java index 6395389..269110f 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/gpgnet/GPGNetServer.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/gpgnet/GPGNetServer.java @@ -1,10 +1,12 @@ package com.faforever.iceadapter.gpgnet; -import static com.faforever.iceadapter.debug.Debug.debug; - import com.faforever.iceadapter.IceAdapter; import com.faforever.iceadapter.rpc.RPCService; +import com.faforever.iceadapter.util.LockUtil; import com.faforever.iceadapter.util.NetworkToolbox; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; @@ -13,14 +15,17 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; + +import static com.faforever.iceadapter.debug.Debug.debug; @Slf4j public class GPGNetServer { private static int GPGNET_PORT; private static int LOBBY_PORT; + private static final Lock lockSocket = new ReentrantLock(); private static ServerSocket serverSocket; private static volatile GPGNetClient currentClient; @@ -50,9 +55,10 @@ public static void init(int gpgnetPort, int lobbyPort) { serverSocket = new ServerSocket(GPGNetServer.getGpgnetPort()); } catch (IOException e) { log.error("Couldn't start GPGNetServer", e); - System.exit(-1); + IceAdapter.close(-1); } - new Thread(GPGNetServer::acceptThread).start(); + + CompletableFuture.runAsync(GPGNetServer::acceptThread, IceAdapter.getExecutor()); log.info("GPGNetServer started"); } @@ -67,6 +73,7 @@ public static class GPGNetClient { private final Thread listenerThread; private volatile boolean stopping = false; private FaDataOutputStream gpgnetOut; + private final Lock lockStream = new ReentrantLock(); private final CompletableFuture lobbyFuture = new CompletableFuture<>(); private GPGNetClient(Socket socket) { @@ -77,9 +84,7 @@ private GPGNetClient(Socket socket) { } catch (IOException e) { log.error("Could not create GPGNet output steam to FA", e); } - - listenerThread = new Thread(this::listenerThread); - listenerThread.start(); + listenerThread = Thread.startVirtualThread(this::listenerThread); RPCService.onConnectionStateChanged("Connected"); log.info("GPGNetClient has connected"); @@ -129,17 +134,20 @@ private void processGpgnetMessage(String command, List args) { /** * Send a message to this FA instance via GPGNet */ - public synchronized void sendGpgnetMessage(String command, Object... args) { - try { - gpgnetOut.writeMessage(command, args); - log.info( - "Sent GPGNet message: {} {}", - command, - Arrays.stream(args).map(Object::toString).collect(Collectors.joining(" "))); - } catch (IOException e) { - log.error("Error while communicating with FA (output), assuming shutdown", e); - GPGNetServer.onGpgnetConnectionLost(); - } + public void sendGpgnetMessage(String command, Object... args) { + LockUtil.executeWithLock(lockStream, () -> { + try { + + gpgnetOut.writeMessage(command, args); + log.info( + "Sent GPGNet message: {} {}", + command, + Arrays.stream(args).map(Object::toString).collect(Collectors.joining(" "))); + } catch (IOException e) { + log.error("Error while communicating with FA (output), assuming shutdown", e); + GPGNetServer.onGpgnetConnectionLost(); + } + }); } /** @@ -152,7 +160,9 @@ private void listenerThread() { // and is now going to set GPGNetServer.currentClient try (var inputStream = socket.getInputStream(); var gpgnetIn = new FaDataInputStream(inputStream)) { - while ((!triggerActive || GPGNetServer.currentClient == this) && !stopping) { + while (!Thread.currentThread().isInterrupted() + && (!triggerActive || GPGNetServer.currentClient == this) + && !stopping) { String command = gpgnetIn.readString(); List args = gpgnetIn.readChunks(); @@ -191,7 +201,7 @@ public void close() { */ private static void onGpgnetConnectionLost() { log.info("GPGNet connection lost"); - synchronized (serverSocket) { + LockUtil.executeWithLock(lockSocket, () -> { if (currentClient != null) { currentClient.close(); currentClient = null; @@ -204,7 +214,7 @@ private static void onGpgnetConnectionLost() { IceAdapter.onFAShutdown(); } - } + }); debug().gpgnetConnectedDisconnected(); } @@ -212,10 +222,10 @@ private static void onGpgnetConnectionLost() { * Listens for incoming connections from a game instance */ private static void acceptThread() { - while (IceAdapter.isRunning()) { + while (!Thread.currentThread().isInterrupted() && IceAdapter.isRunning()) { try { Socket socket = serverSocket.accept(); - synchronized (serverSocket) { + LockUtil.executeWithLock(lockSocket, () -> { if (currentClient != null) { onGpgnetConnectionLost(); } @@ -224,7 +234,7 @@ private static void acceptThread() { clientFuture.complete(currentClient); debug().gpgnetConnectedDisconnected(); - } + }); } catch (SocketException e) { return; } catch (IOException e) { diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/GameSession.java b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/GameSession.java index 95a5c44..c43db40 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/GameSession.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/GameSession.java @@ -55,7 +55,7 @@ public int connectToPeer(String remotePlayerLogin, int remotePlayerId, boolean o Peer peer = new Peer(this, remotePlayerId, remotePlayerLogin, offer, preferredPort); peers.put(remotePlayerId, peer); debug().connectToPeer(remotePlayerId, remotePlayerLogin, offer); - return peer.getFaSocket().getLocalPort(); + return peer.getLocalPort(); } /** @@ -80,7 +80,7 @@ public void reconnectToPeer(Integer remotePlayerId) { if (Objects.nonNull(reconnectPeer)) { String remotePlayerLogin = reconnectPeer.getRemoteLogin(); boolean offer = reconnectPeer.isLocalOffer(); - int port = reconnectPeer.getFaSocket().getLocalPort(); + int port = reconnectPeer.getLocalPort(); disconnectFromPeer(remotePlayerId); connectToPeer(remotePlayerLogin, remotePlayerId, offer, port); diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/Peer.java b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/Peer.java index b5893fd..29dcecc 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/Peer.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/Peer.java @@ -2,18 +2,23 @@ import com.faforever.iceadapter.IceAdapter; import com.faforever.iceadapter.gpgnet.GPGNetServer; -import java.io.IOException; -import java.net.*; +import com.faforever.iceadapter.util.LockUtil; import lombok.Getter; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import java.io.IOException; +import java.net.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + /** * Represents a peer in the current game session which we are connected to */ @Getter @Slf4j public class Peer { - private final GameSession gameSession; private final int remoteId; @@ -24,7 +29,8 @@ public class Peer { public volatile boolean closing = false; private final PeerIceModule ice = new PeerIceModule(this); - private DatagramSocket faSocket; // Socket on which we are listening for FA / sending data to FA + private final DatagramSocket faSocket; // Socket on which we are listening for FA / sending data to FA + private final Lock lockSocketSend = new ReentrantLock(); public Peer(GameSession gameSession, int remoteId, String remoteLogin, boolean localOffer, int preferredPort) { this.gameSession = gameSession; @@ -36,26 +42,32 @@ public Peer(GameSession gameSession, int remoteId, String remoteLogin, boolean l log.debug( "Peer created: {}, localOffer: {}, preferredPort: {}", getPeerIdentifier(), localOffer, preferredPort); - initForwarding(preferredPort); + faSocket = initForwarding(preferredPort); + + CompletableFuture.runAsync(this::faListener, IceAdapter.getExecutor()); if (localOffer) { - new Thread(ice::initiateIce).start(); + CompletableFuture.runAsync(ice::initiateIce, IceAdapter.getExecutor()); } } + public int getLocalPort() { + return faSocket.getLocalPort(); + } + /** * Starts waiting for data from FA */ - private void initForwarding(int port) { + @SneakyThrows(SocketException.class) + private DatagramSocket initForwarding(int port) { try { - faSocket = new DatagramSocket(port); + DatagramSocket socket = new DatagramSocket(port); + log.debug("Now forwarding data to peer {}", getPeerIdentifier()); + return socket; } catch (SocketException e) { log.error("Could not create socket for peer: {}", getPeerIdentifier(), e); + throw e; } - - new Thread(this::faListener).start(); - - log.debug("Now forwarding data to peer {}", getPeerIdentifier()); } /** @@ -64,31 +76,37 @@ private void initForwarding(int port) { * @param offset * @param length */ - synchronized void onIceDataReceived(byte data[], int offset, int length) { - try { - DatagramPacket packet = new DatagramPacket( - data, offset, length, InetAddress.getByName("127.0.0.1"), GPGNetServer.getLobbyPort()); - faSocket.send(packet); - } catch (UnknownHostException e) { - } catch (IOException e) { - if (closing) { - log.debug("Ignoring error the send packet because the connection was closed {}", getPeerIdentifier()); - } else { - log.error( - "Error while writing to local FA as peer (probably disconnecting from peer) {}", - getPeerIdentifier(), - e); + void onIceDataReceived(byte[] data, int offset, int length) { + LockUtil.executeWithLock(lockSocketSend, () -> { + try { + DatagramPacket packet = new DatagramPacket( + data, offset, length, InetAddress.getByName("127.0.0.1"), GPGNetServer.getLobbyPort()); + faSocket.send(packet); + } catch (UnknownHostException e) { + } catch (IOException e) { + if (closing) { + log.debug( + "Ignoring error the send packet because the connection was closed {}", getPeerIdentifier()); + } else { + log.error( + "Error while writing to local FA as peer (probably disconnecting from peer) {}", + getPeerIdentifier(), + e); + } } - } + }); } /** * This method get's invoked by the thread listening for data from FA */ private void faListener() { - byte data[] = new byte + byte[] data = new byte [65536]; // 64KiB = UDP MTU, in practice due to ethernet frames being <= 1500 B, this is often not used - while (IceAdapter.isRunning() && IceAdapter.getGameSession() == gameSession) { + while (!Thread.currentThread().isInterrupted() + && IceAdapter.isRunning() + && IceAdapter.getGameSession() == gameSession + && !closing) { try { DatagramPacket packet = new DatagramPacket(data, data.length); faSocket.receive(packet); @@ -118,9 +136,7 @@ public void close() { log.info("Closing peer for player {}", getPeerIdentifier()); closing = true; - if (faSocket != null) { - faSocket.close(); - } + faSocket.close(); ice.close(); } diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerConnectivityCheckerModule.java b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerConnectivityCheckerModule.java index d32a477..1dcee46 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerConnectivityCheckerModule.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerConnectivityCheckerModule.java @@ -1,22 +1,29 @@ package com.faforever.iceadapter.ice; -import static com.faforever.iceadapter.debug.Debug.debug; - +import com.faforever.iceadapter.IceAdapter; +import com.faforever.iceadapter.util.LockUtil; import com.google.common.primitives.Longs; -import java.util.Arrays; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -@Slf4j +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static com.faforever.iceadapter.debug.Debug.debug; + /** * Periodically sends echo requests via the ICE data channel and initiates a reconnect after timeout * ONLY THE OFFERING ADAPTER of a connection will send echos and reoffer. */ +@Slf4j public class PeerConnectivityCheckerModule { private static final int ECHO_INTERVAL = 1000; private final PeerIceModule ice; + private final Lock lockIce = new ReentrantLock(); private volatile boolean running = false; private volatile Thread checkerThread; @@ -36,38 +43,42 @@ public PeerConnectivityCheckerModule(PeerIceModule ice) { this.ice = ice; } - synchronized void start() { - if (running) { - return; - } + void start() { + LockUtil.executeWithLock(lockIce, () -> { + if (running) { + return; + } - running = true; - log.debug("Starting connectivity checker for peer {}", ice.getPeer().getRemoteId()); + running = true; + log.debug("Starting connectivity checker for peer {}", ice.getPeer().getRemoteId()); - averageRTT = 0.0f; - lastPacketReceived = System.currentTimeMillis(); + averageRTT = 0.0f; + lastPacketReceived = System.currentTimeMillis(); - checkerThread = new Thread(this::checkerThread, getThreadName()); - checkerThread.setUncaughtExceptionHandler( - (t, e) -> log.error("Thread {} crashed unexpectedly", t.getName(), e)); - checkerThread.start(); + checkerThread = Thread.ofVirtual() + .name(getThreadName()) + .uncaughtExceptionHandler((t, e) -> log.error("Thread {} crashed unexpectedly", t.getName(), e)) + .start(this::checkerThread); + }); } private String getThreadName() { return "connectivityChecker-" + ice.getPeer().getRemoteId(); } - synchronized void stop() { - if (!running) { - return; - } + void stop() { + LockUtil.executeWithLock(lockIce, () -> { + if (!running) { + return; + } - running = false; + running = false; - if (checkerThread != null) { - checkerThread.interrupt(); - checkerThread = null; - } + if (checkerThread != null) { + checkerThread.interrupt(); + checkerThread = null; + } + }); } /** @@ -100,9 +111,10 @@ void echoReceived(byte[] data, int offset, int length) { } private void checkerThread() { - while (!Thread.currentThread().isInterrupted()) { + while (!Thread.currentThread().isInterrupted() && running) { log.trace("Running connectivity checker"); + Peer peer = ice.getPeer(); byte[] data = new byte[9]; data[0] = 'e'; @@ -111,7 +123,7 @@ private void checkerThread() { ice.sendViaIce(data, 0, data.length); - debug().peerConnectivityUpdate(ice.getPeer()); + debug().peerConnectivityUpdate(peer); try { Thread.sleep(ECHO_INTERVAL); @@ -119,15 +131,14 @@ private void checkerThread() { log.warn( "{} (sleeping checkerThread) was interrupted", Thread.currentThread().getName()); - Thread.currentThread().interrupt(); return; } if (System.currentTimeMillis() - lastPacketReceived > 10000) { log.warn( "Didn't receive any answer to echo requests for the past 10 seconds from {}, aborting connection", - ice.getPeer().getRemoteLogin()); - new Thread(ice::onConnectionLost).start(); + peer.getRemoteLogin()); + CompletableFuture.runAsync(ice::onConnectionLost, IceAdapter.getExecutor()); return; } } diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerIceModule.java b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerIceModule.java index 6784319..1879cc7 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerIceModule.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerIceModule.java @@ -1,23 +1,10 @@ package com.faforever.iceadapter.ice; -import static com.faforever.iceadapter.debug.Debug.debug; -import static com.faforever.iceadapter.ice.IceState.*; - import com.faforever.iceadapter.IceAdapter; import com.faforever.iceadapter.rpc.RPCService; import com.faforever.iceadapter.util.CandidateUtil; -import com.faforever.iceadapter.util.Executor; +import com.faforever.iceadapter.util.LockUtil; import com.faforever.iceadapter.util.TrayIcon; -import java.io.IOException; -import java.net.DatagramPacket; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.ice4j.TransportAddress; @@ -26,6 +13,19 @@ import org.ice4j.ice.harvest.TurnCandidateHarvester; import org.ice4j.security.LongTermCredential; +import java.io.IOException; +import java.net.DatagramPacket; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +import static com.faforever.iceadapter.debug.Debug.debug; +import static com.faforever.iceadapter.ice.IceState.*; + @Getter @Slf4j public class PeerIceModule { @@ -72,6 +72,10 @@ public static void setForceRelay(boolean forceRelay) { // How often have we been waiting for a response to local candidates/offer private final AtomicInteger awaitingCandidatesEventId = new AtomicInteger(0); + private final Lock lockInit = new ReentrantLock(); + private final Lock lockLostConnection = new ReentrantLock(); + private final Lock lockMessageReceived = new ReentrantLock(); + public PeerIceModule(Peer peer) { this.peer = peer; } @@ -89,19 +93,27 @@ private void setState(IceState newState) { /** * Will start the ICE Process */ - synchronized void initiateIce() { - if (iceState != NEW && iceState != DISCONNECTED) { - log.warn( - getLogPrefix() + "ICE already in progress, aborting re initiation. current state: {}", - iceState.getMessage()); - return; - } + void initiateIce() { + LockUtil.executeWithLock(lockInit, () -> { + if (peer.isClosing()) { + log.warn("{} Peer not connected anymore, aborting reinitiation of ICE", getLogPrefix()); + return; + } + + if (iceState != NEW && iceState != DISCONNECTED) { + log.warn( + "{} ICE already in progress, aborting re initiation. current state: {}", + getLogPrefix(), + iceState.getMessage()); + return; + } - setState(GATHERING); - log.info(getLogPrefix() + "Initiating ICE for peer"); + setState(GATHERING); + log.info("{} Initiating ICE for peer", getLogPrefix()); - createAgent(); - gatherCandidates(); + createAgent(); + gatherCandidates(); + }); } /** @@ -122,7 +134,7 @@ private void createAgent() { * Gathers all local candidates, packs them into a message and sends them to the other peer via RPC */ private void gatherCandidates() { - log.info(getLogPrefix() + "Gathering ice candidates"); + log.info("{} Gathering ice candidates", getLogPrefix()); List iceServers = getViableIceServers(); @@ -135,39 +147,44 @@ private void gatherCandidates() { a, new LongTermCredential(iceServer.getTurnUsername(), iceServer.getTurnCredential()))) .forEach(agent::addCandidateHarvester)); - CompletableFuture gatheringFuture = CompletableFuture.runAsync(() -> { - try { - component = agent.createComponent( - mediaStream, - MINIMUM_PORT + (int) (ThreadLocalRandom.current().nextDouble() * 999.0), - MINIMUM_PORT, - MINIMUM_PORT + 1000); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + CompletableFuture gatheringFuture = CompletableFuture.runAsync( + () -> { + try { + component = agent.createComponent( + mediaStream, + MINIMUM_PORT + + (int) (ThreadLocalRandom.current().nextDouble() * 999.0), + MINIMUM_PORT, + MINIMUM_PORT + 1000); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, + IceAdapter.getExecutor()); - Executor.executeDelayed(5000, () -> { - if (!gatheringFuture.isDone()) { - gatheringFuture.cancel(true); - } - }); + CompletableFuture.runAsync( + () -> { + if (!gatheringFuture.isDone()) { + gatheringFuture.cancel(true); + } + }, + CompletableFuture.delayedExecutor(5000, TimeUnit.MILLISECONDS, IceAdapter.getExecutor())); try { gatheringFuture.join(); } catch (CompletionException e) { // Completed exceptionally - log.error(getLogPrefix() + "Error while creating stream component/gathering candidates", e); - new Thread(this::onConnectionLost).start(); + log.error("{} Error while creating stream component/gathering candidates", getLogPrefix(), e); + CompletableFuture.runAsync(this::onConnectionLost, IceAdapter.getExecutor()); return; } catch (CancellationException e) { // was cancelled due to timeout - log.error(getLogPrefix() + "Gathering candidates timed out", e); - new Thread(this::onConnectionLost).start(); + log.error("{} Gathering candidates timed out", getLogPrefix(), e); + CompletableFuture.runAsync(this::onConnectionLost, IceAdapter.getExecutor()); return; } - int previousConnectivityAttempts = getConnectivityAttempsInThePast(FORCE_SRFLX_RELAY_INTERVAL); + long previousConnectivityAttempts = getConnectivityAttempsInThePast(FORCE_SRFLX_RELAY_INTERVAL); CandidatesMessage localCandidatesMessage = CandidateUtil.packCandidates( IceAdapter.getId(), peer.getRemoteId(), @@ -177,7 +194,8 @@ a, new LongTermCredential(iceServer.getTurnUsername(), iceServer.getTurnCredenti previousConnectivityAttempts < FORCE_RELAY_COUNT && ALLOW_REFLEXIVE, ALLOW_RELAY); log.debug( - getLogPrefix() + "Sending own candidates to {}, offered candidates: {}", + "{} Sending own candidates to {}, offered candidates: {}", + getLogPrefix(), peer.getRemoteId(), localCandidatesMessage.candidates().stream() .map(it -> it.type().toString() + "(" + it.protocol() + ")") @@ -188,18 +206,21 @@ a, new LongTermCredential(iceServer.getTurnUsername(), iceServer.getTurnCredenti // Make sure to abort the connection process and reinitiate when we haven't received an answer to our offer in 6 // seconds, candidate packet was probably lost final int currentAwaitingCandidatesEventId = awaitingCandidatesEventId.incrementAndGet(); - Executor.executeDelayed(6000, () -> { - if (peer.isClosing()) { - log.warn( - getLogPrefix() + "Peer {} not connected anymore, aborting reinitiation of ICE", - peer.getRemoteId()); - return; - } - if (iceState == AWAITING_CANDIDATES - && currentAwaitingCandidatesEventId == awaitingCandidatesEventId.get()) { - onConnectionLost(); - } - }); + CompletableFuture.runAsync( + () -> { + if (peer.isClosing()) { + log.warn( + "{} Peer {} not connected anymore, aborting reinitiation of ICE", + getLogPrefix(), + peer.getRemoteId()); + return; + } + if (iceState == AWAITING_CANDIDATES + && currentAwaitingCandidatesEventId == awaitingCandidatesEventId.get()) { + onConnectionLost(); + } + }, + CompletableFuture.delayedExecutor(6000, TimeUnit.MILLISECONDS, IceAdapter.getExecutor())); } private List getViableIceServers() { @@ -240,54 +261,59 @@ private List getViableIceServers() { * Starts harvesting local candidates if in answer mode, then initiates the actual ICE process * @param remoteCandidatesMessage */ - public synchronized void onIceMessageReceived(CandidatesMessage remoteCandidatesMessage) { - if (peer.isClosing()) { - log.warn(getLogPrefix() + "Peer not connected anymore, discarding ice message"); - return; - } - - // Start ICE async as it's blocking and this is the RPC thread - new Thread(() -> { - log.debug( - getLogPrefix() + "Got IceMsg for peer, offered candidates: {}", - remoteCandidatesMessage.candidates().stream() - .map(it -> it.type().toString() + "(" + it.protocol() + ")") - .collect(Collectors.joining(", "))); - - if (peer.isLocalOffer()) { - if (iceState != AWAITING_CANDIDATES) { - log.warn( - getLogPrefix() + "Received candidates unexpectedly, current state: {}", - iceState.getMessage()); - return; - } + public void onIceMessageReceived(CandidatesMessage remoteCandidatesMessage) { + LockUtil.executeWithLock(lockMessageReceived, () -> { + if (peer.isClosing()) { + log.warn("{} Peer not connected anymore, discarding ice message", getLogPrefix()); + return; + } - } else { - // Check if we are already processing an ICE offer and if so stop it - if (iceState != NEW && iceState != DISCONNECTED) { - log.info(getLogPrefix() + "Received new candidates/offer, stopping..."); - onConnectionLost(); + // Start ICE async as it's blocking and this is the RPC thread + CompletableFuture.runAsync( + () -> { + log.debug( + "{} Got IceMsg for peer, offered candidates: {}", + getLogPrefix(), + remoteCandidatesMessage.candidates().stream() + .map(it -> it.type().toString() + "(" + it.protocol() + ")") + .collect(Collectors.joining(", "))); + + if (peer.isLocalOffer()) { + if (iceState != AWAITING_CANDIDATES) { + log.warn( + "{} Received candidates unexpectedly, current state: {}", + getLogPrefix(), + iceState.getMessage()); + return; + } + + } else { + // Check if we are already processing an ICE offer and if so stop it + if (iceState != NEW && iceState != DISCONNECTED) { + log.info("{} Received new candidates/offer, stopping...", getLogPrefix()); + onConnectionLost(); + } + + // Answer mode, initialize agent and gather candidates + initiateIce(); } - // Answer mode, initialize agent and gather candidates - initiateIce(); - } - - setState(CHECKING); - - int previousConnectivityAttempts = getConnectivityAttempsInThePast(FORCE_SRFLX_RELAY_INTERVAL); - CandidateUtil.unpackCandidates( - remoteCandidatesMessage, - agent, - component, - mediaStream, - previousConnectivityAttempts < FORCE_SRFLX_COUNT && ALLOW_HOST, - previousConnectivityAttempts < FORCE_RELAY_COUNT && ALLOW_REFLEXIVE, - ALLOW_RELAY); - - startIce(); - }) - .start(); + setState(CHECKING); + + long previousConnectivityAttempts = getConnectivityAttempsInThePast(FORCE_SRFLX_RELAY_INTERVAL); + CandidateUtil.unpackCandidates( + remoteCandidatesMessage, + agent, + component, + mediaStream, + previousConnectivityAttempts < FORCE_SRFLX_COUNT && ALLOW_HOST, + previousConnectivityAttempts < FORCE_RELAY_COUNT && ALLOW_REFLEXIVE, + ALLOW_RELAY); + + startIce(); + }, + IceAdapter.getExecutor()); + }); } /** @@ -296,17 +322,21 @@ public synchronized void onIceMessageReceived(CandidatesMessage remoteCandidates private void startIce() { connectivityAttemptTimes.add(0, System.currentTimeMillis()); - log.debug(getLogPrefix() + "Starting ICE for peer {}", peer.getRemoteId()); + log.debug("{} Starting ICE for peer {}", getLogPrefix(), peer.getRemoteId()); agent.startConnectivityEstablishment(); // Wait for termination/completion of the agent long iceStartTime = System.currentTimeMillis(); - while (agent.getState() - != IceProcessingState.COMPLETED) { // TODO include more?, maybe stop on COMPLETED, is that to early? + while (!Thread.currentThread().isInterrupted() + && agent.getState() + != IceProcessingState + .COMPLETED) { // TODO include more?, maybe stop on COMPLETED, is that to early? try { Thread.sleep(20); } catch (InterruptedException e) { - log.error(getLogPrefix() + "Interrupted while waiting for ICE", e); + log.error("{} Interrupted while waiting for ICE", getLogPrefix(), e); + onConnectionLost(); + return; } if (agent.getState() == IceProcessingState.FAILED) { // TODO null pointer due to no agent? @@ -315,15 +345,17 @@ private void startIce() { } if (System.currentTimeMillis() - iceStartTime > 15_000) { - log.error(getLogPrefix() + "ABORTING ICE DUE TO TIMEOUT"); + log.error("{} ABORTING ICE DUE TO TIMEOUT", getLogPrefix()); onConnectionLost(); return; } } - log.debug(getLogPrefix() + "ICE terminated, connected, selected candidate pair: " - + component.getSelectedPair().getLocalCandidate().getType().toString() + " <-> " - + component.getSelectedPair().getRemoteCandidate().getType().toString()); + log.debug( + "{} ICE terminated, connected, selected candidate pair: {} <-> {}", + getLogPrefix(), + component.getSelectedPair().getLocalCandidate().getType().toString(), + component.getSelectedPair().getRemoteCandidate().getType().toString()); // We are connected connected = true; @@ -339,8 +371,7 @@ private void startIce() { connectivityChecker.start(); } - listenerThread = new Thread(this::listener); - listenerThread.start(); + listenerThread = Thread.startVirtualThread(this::listener); } /** @@ -348,72 +379,70 @@ private void startIce() { * Will close agent, stop listener and connectivity checker thread and change state to disconnected * Will then reinitiate ICE */ - public synchronized void onConnectionLost() { - if (iceState == DISCONNECTED) { - log.warn(getLogPrefix() + "Lost connection, albeit already in ice state disconnected"); - return; // TODO: will this kill the life cycle? - } - - IceState previousState = getIceState(); + public void onConnectionLost() { + LockUtil.executeWithLock(lockLostConnection, () -> { + if (iceState == DISCONNECTED) { + log.warn("{} Lost connection, albeit already in ice state disconnected", getLogPrefix()); + return; // TODO: will this kill the life cycle? + } - if (listenerThread != null) { - listenerThread.interrupt(); - listenerThread = null; - } + IceState previousState = getIceState(); - if (turnRefreshModule != null) { - turnRefreshModule.close(); - turnRefreshModule = null; - } + if (listenerThread != null) { + listenerThread.interrupt(); + listenerThread = null; + } - connectivityChecker.stop(); + if (turnRefreshModule != null) { + turnRefreshModule.close(); + turnRefreshModule = null; + } - if (connected) { - connected = false; - log.warn(getLogPrefix() + "ICE connection has been lost for peer"); - RPCService.onConnected(IceAdapter.getId(), peer.getRemoteId(), false); - } + connectivityChecker.stop(); - setState(DISCONNECTED); + if (connected) { + connected = false; + log.warn("{} ICE connection has been lost for peer", getLogPrefix()); + RPCService.onConnected(IceAdapter.getId(), peer.getRemoteId(), false); + } - if (agent != null) { - agent.free(); - agent = null; - mediaStream = null; - component = null; - } + setState(DISCONNECTED); - debug().peerStateChanged(this.peer); + if (agent != null) { + agent.free(); + agent = null; + mediaStream = null; + component = null; + } - if (peer.isClosing()) { - log.warn(getLogPrefix() + "Peer not connected anymore, aborting onConnectionLost of ICE"); - return; - } + debug().peerStateChanged(this.peer); - if (peer.getGameSession().isGameEnded()) { - log.warn(getLogPrefix() + "GAME ENDED, ABORTING onConnectionLost of ICE for peer "); - return; - } + if (peer.isClosing()) { + log.warn("{} Peer not connected anymore, aborting onConnectionLost of ICE", getLogPrefix()); + return; + } - if (previousState == CONNECTED) { - TrayIcon.showMessage("Reconnecting to " + this.peer.getRemoteLogin() + " (connection lost)"); - } + if (peer.getGameSession().isGameEnded()) { + log.warn("{} GAME ENDED, ABORTING onConnectionLost of ICE for peer ", getLogPrefix()); + return; + } - if (previousState == CONNECTED && peer.isLocalOffer()) { - // We were connected before, retry immediately - Executor.executeDelayed(0, this::reinitIce); - } else if (peer.isLocalOffer()) { - // Last ice attempt didn't succeed, so wait a bit - Executor.executeDelayed(5000, this::reinitIce); - } - } + if (previousState == CONNECTED) { + TrayIcon.showMessage("Reconnecting to %s (connection lost)".formatted(this.peer.getRemoteLogin())); + } - private synchronized void reinitIce() { - if (peer.isClosing()) { - log.warn(getLogPrefix() + "Peer not connected anymore, aborting reinitiation of ICE"); - return; - } - initiateIce(); + if (previousState == CONNECTED && peer.isLocalOffer()) { + // We were connected before, retry immediately + CompletableFuture.runAsync( + this::initiateIce, + CompletableFuture.delayedExecutor(0, TimeUnit.MILLISECONDS, IceAdapter.getExecutor())); + } else if (peer.isLocalOffer()) { + // Last ice attempt didn't succeed, so wait a bit + CompletableFuture.runAsync( + this::initiateIce, + CompletableFuture.delayedExecutor(5000, TimeUnit.MILLISECONDS, IceAdapter.getExecutor())); + } + }); } /** @@ -455,7 +484,7 @@ void sendViaIce(byte[] data, int offset, int length) { .getTransportAddress() .getPort())); } catch (IOException e) { - log.warn(getLogPrefix() + "Failed to send data via ICE", e); + log.warn("{} Failed to send data via ICE", getLogPrefix(), e); onConnectionLost(); } catch (NullPointerException e) { log.error("Component is null", e); @@ -467,12 +496,14 @@ void sendViaIce(byte[] data, int offset, int length) { * Listens for data incoming via ice socket */ public void listener() { - log.debug(getLogPrefix() + "Now forwarding data from ICE to FA for peer"); + log.debug("{} Now forwarding data from ICE to FA for peer", getLogPrefix()); Component localComponent = component; byte[] data = new byte [65536]; // 64KiB = UDP MTU, in practice due to ethernet frames being <= 1500 B, this is often not used - while (IceAdapter.isRunning() && IceAdapter.getGameSession() == peer.getGameSession()) { + while (!Thread.currentThread().isInterrupted() + && IceAdapter.isRunning() + && IceAdapter.getGameSession() == peer.getGameSession()) { try { DatagramPacket packet = new DatagramPacket(data, data.length); localComponent @@ -497,13 +528,14 @@ public void listener() { } } else { log.warn( - getLogPrefix() + "Received invalid packet, first byte: 0x{}, length: {}", + "{} Received invalid packet, first byte: 0x{}, length: {}", + getLogPrefix(), data[0], packet.getLength()); } } catch (IOException e) { // TODO: nullpointer from localComponent.xxxx???? - log.warn(getLogPrefix() + "Error while reading from ICE adapter", e); + log.warn("{} Error while reading from ICE adapter", getLogPrefix(), e); if (component == localComponent) { onConnectionLost(); } @@ -511,10 +543,14 @@ public void listener() { } } - log.debug(getLogPrefix() + "No longer listening for messages from ICE"); + log.debug("{} No longer listening for messages from ICE", getLogPrefix()); } void close() { + if (listenerThread != null) { + listenerThread.interrupt(); + listenerThread = null; + } if (turnRefreshModule != null) { turnRefreshModule.close(); } @@ -524,15 +560,15 @@ void close() { connectivityChecker.stop(); } - public int getConnectivityAttempsInThePast(final long millis) { + public long getConnectivityAttempsInThePast(final long millis) { // copy list to avoid concurrency issues - return (int) new ArrayList(connectivityAttemptTimes) + return new ArrayList<>(connectivityAttemptTimes) .stream() .filter(time -> time > (System.currentTimeMillis() - millis)) .count(); } public String getLogPrefix() { - return "ICE %s: ".formatted(peer.getPeerIdentifier()); + return "ICE %s:".formatted(peer.getPeerIdentifier()); } } diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerTurnRefreshModule.java b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerTurnRefreshModule.java index 0e8c1b3..44da49c 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerTurnRefreshModule.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerTurnRefreshModule.java @@ -1,9 +1,5 @@ package com.faforever.iceadapter.ice; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.time.Duration; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.ice4j.ice.RelayedCandidate; @@ -13,6 +9,11 @@ import org.ice4j.message.Request; import org.ice4j.stack.TransactionID; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.time.Duration; + /** * Sends continuous refresh requests to the turn server */ @@ -58,15 +59,14 @@ public PeerTurnRefreshModule(PeerIceModule ice, RelayedCandidate candidate) { } if (harvest != null) { - refreshThread = new Thread(this::refreshThread); - refreshThread.start(); + refreshThread = Thread.startVirtualThread(this::refreshThread); log.info("Started turn refresh module for peer {}", ice.getPeer().getRemoteLogin()); } } private void refreshThread() { - while (running) { + while (!Thread.currentThread().isInterrupted() && running) { Request refreshRequest = MessageFactory.createRefreshRequest( 600); // Maximum lifetime of turn is 600 seconds (10 minutes), server may limit this even further @@ -77,19 +77,23 @@ private void refreshThread() { log.info("Sent turn refresh request."); } catch (IllegalAccessException | InvocationTargetException e) { - log.error("Could not send turn refresh request!.", e); + log.error("Could not send turn refresh request!", e); } try { Thread.sleep(REFRESH_INTERVAL); } catch (InterruptedException e) { log.warn("Sleeping refreshThread was interrupted"); + return; } } } public void close() { running = false; - refreshThread.interrupt(); + if (refreshThread != null) { + refreshThread.interrupt(); + refreshThread = null; + } } } diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/rpc/RPCHandler.java b/ice-adapter/src/main/java/com/faforever/iceadapter/rpc/RPCHandler.java index a712c98..591d9d3 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/rpc/RPCHandler.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/rpc/RPCHandler.java @@ -9,11 +9,6 @@ import com.faforever.iceadapter.ice.Peer; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -23,6 +18,14 @@ import org.ice4j.ice.CandidateType; import org.ice4j.ice.Component; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + /** * Handles calls from JsonRPC (the client) */ @@ -31,6 +34,7 @@ public class RPCHandler { private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + private final Lock lockStatus = new ReentrantLock(); private final int rpcPort; public void hostGame(String mapName) { @@ -99,7 +103,8 @@ public String status() { List relays = new ArrayList<>(); GameSession gameSession = IceAdapter.getGameSession(); if (gameSession != null) { - synchronized (gameSession.getPeers()) { + lockStatus.lock(); + try { gameSession.getPeers().values().stream() .map(peer -> { IceStatus.IceRelay.IceRelayICEState iceRelayICEState = @@ -142,6 +147,8 @@ public String status() { iceRelayICEState); }) .forEach(relays::add); + } finally { + lockStatus.unlock(); } } @@ -163,6 +170,6 @@ public String status() { public void quit() { log.warn("Close requested, stopping..."); - IceAdapter.close(); + IceAdapter.close(0); } } diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/rpc/RPCService.java b/ice-adapter/src/main/java/com/faforever/iceadapter/rpc/RPCService.java index c9200e9..3c38f26 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/rpc/RPCService.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/rpc/RPCService.java @@ -1,7 +1,5 @@ package com.faforever.iceadapter.rpc; -import static com.faforever.iceadapter.debug.Debug.debug; - import com.faforever.iceadapter.IceAdapter; import com.faforever.iceadapter.debug.Debug; import com.faforever.iceadapter.debug.InfoWindow; @@ -13,10 +11,13 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.nbarraille.jjsonrpc.JJsonPeer; import com.nbarraille.jjsonrpc.TcpServer; +import lombok.extern.slf4j.Slf4j; + import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; -import lombok.extern.slf4j.Slf4j; + +import static com.faforever.iceadapter.debug.Debug.debug; /** * Handles communication between client and adapter, opens a server for the client to connect to @@ -52,7 +53,7 @@ public static void init(int port) { log.info( "Lost connection to first RPC Peer. GameState: {}, Stopping adapter...", gameState.getName()); - IceAdapter.close(); + IceAdapter.close(0); } }); }); diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/util/Executor.java b/ice-adapter/src/main/java/com/faforever/iceadapter/util/Executor.java deleted file mode 100644 index f814d2d..0000000 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/util/Executor.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.faforever.iceadapter.util; - -public class Executor { - - /** - * Creates a that sleeps for a give time and then calls the provided runnable - * @param timeMs The time to wait in ms - * @param runnable The runnable - */ - public static void executeDelayed(int timeMs, Runnable runnable) { - new Thread(() -> { - try { - Thread.sleep(timeMs); - } catch (InterruptedException e) { - } - runnable.run(); - }) - .start(); - } -} diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/util/ExecutorHolder.java b/ice-adapter/src/main/java/com/faforever/iceadapter/util/ExecutorHolder.java new file mode 100644 index 0000000..61553a2 --- /dev/null +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/util/ExecutorHolder.java @@ -0,0 +1,13 @@ +package com.faforever.iceadapter.util; + +import lombok.experimental.UtilityClass; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@UtilityClass +public class ExecutorHolder { + public ExecutorService getExecutor() { + return Executors.newVirtualThreadPerTaskExecutor(); + } +} diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/util/LockUtil.java b/ice-adapter/src/main/java/com/faforever/iceadapter/util/LockUtil.java new file mode 100644 index 0000000..6161b9a --- /dev/null +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/util/LockUtil.java @@ -0,0 +1,17 @@ +package com.faforever.iceadapter.util; + +import lombok.experimental.UtilityClass; + +import java.util.concurrent.locks.Lock; + +@UtilityClass +public class LockUtil { + public void executeWithLock(Lock lock, Runnable task) { + lock.lock(); + try { + task.run(); + } finally { + lock.unlock(); + } + } +} diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/util/NetworkToolbox.java b/ice-adapter/src/main/java/com/faforever/iceadapter/util/NetworkToolbox.java index 5f739bd..0bdb7a6 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/util/NetworkToolbox.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/util/NetworkToolbox.java @@ -1,11 +1,13 @@ package com.faforever.iceadapter.util; +import com.faforever.iceadapter.IceAdapter; +import lombok.extern.slf4j.Slf4j; + import java.io.IOException; import java.net.DatagramSocket; import java.net.ServerSocket; import java.net.SocketException; import java.util.Random; -import lombok.extern.slf4j.Slf4j; @Slf4j public class NetworkToolbox { @@ -32,7 +34,7 @@ public static int findFreeTCPPort(int min, int max) { } log.error("Could not find a free tcp port"); - System.exit(-1); + IceAdapter.close(-1); return -1; } @@ -56,7 +58,7 @@ public static int findFreeUDPPort(int min, int max) { } log.error("Could not find a free tcp port"); - System.exit(-1); + IceAdapter.close(-1); return -1; } } diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/util/PingWrapper.java b/ice-adapter/src/main/java/com/faforever/iceadapter/util/PingWrapper.java index 0e04013..4ff5a93 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/util/PingWrapper.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/util/PingWrapper.java @@ -1,13 +1,15 @@ package com.faforever.iceadapter.util; +import com.faforever.iceadapter.IceAdapter; import com.google.common.io.CharStreams; +import lombok.extern.slf4j.Slf4j; + import java.io.IOException; import java.io.InputStreamReader; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.regex.Matcher; import java.util.regex.Pattern; -import lombok.extern.slf4j.Slf4j; /* * A wrapper around calling the system `ping` executable to query the latency of a host. @@ -34,27 +36,29 @@ public static CompletableFuture getLatency(String address, Integer count output_pattern = GNU_OUTPUT_PATTERN; } - return CompletableFuture.supplyAsync(() -> { - try { - process.waitFor(); - InputStreamReader reader = new InputStreamReader(process.getInputStream()); - String output = CharStreams.toString(reader); - reader.close(); + return CompletableFuture.supplyAsync( + () -> { + try { + process.waitFor(); + InputStreamReader reader = new InputStreamReader(process.getInputStream()); + String output = CharStreams.toString(reader); + reader.close(); - Matcher m = output_pattern.matcher(output); + Matcher m = output_pattern.matcher(output); - if (m.find()) { - double result = Double.parseDouble(m.group(1)); - log.debug("Pinged {} with an RTT of {}", address, result); - return result; - } else { - log.warn("Failed to ping {}", address); - throw new RuntimeException("Failed to contact the host"); - } - } catch (InterruptedException | IOException | RuntimeException e) { - throw new CompletionException(e); - } - }); + if (m.find()) { + double result = Double.parseDouble(m.group(1)); + log.debug("Pinged {} with an RTT of {}", address, result); + return result; + } else { + log.warn("Failed to ping {}", address); + throw new RuntimeException("Failed to contact the host"); + } + } catch (InterruptedException | IOException | RuntimeException e) { + throw new CompletionException(e); + } + }, + IceAdapter.getExecutor()); } catch (IOException e) { CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(e); diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/util/TrayIcon.java b/ice-adapter/src/main/java/com/faforever/iceadapter/util/TrayIcon.java index a95a744..7939891 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/util/TrayIcon.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/util/TrayIcon.java @@ -1,16 +1,19 @@ package com.faforever.iceadapter.util; +import com.faforever.iceadapter.IceAdapter; import com.faforever.iceadapter.debug.Debug; import com.faforever.iceadapter.debug.DebugWindow; import com.faforever.iceadapter.debug.InfoWindow; +import lombok.extern.slf4j.Slf4j; + +import javax.imageio.ImageIO; +import javax.swing.*; import java.awt.*; import java.awt.event.MouseEvent; import java.awt.event.MouseListener; import java.io.IOException; import java.net.URL; -import javax.imageio.ImageIO; -import javax.swing.*; -import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.CompletableFuture; @Slf4j public class TrayIcon { @@ -45,7 +48,8 @@ public void mouseClicked(MouseEvent mouseEvent) {} @Override public void mousePressed(MouseEvent mouseEvent) { - new Thread(() -> { + CompletableFuture.runAsync( + () -> { if (InfoWindow.INSTANCE == null) { log.info("Launching ICE adapter debug window"); Debug.ENABLE_INFO_WINDOW = true; @@ -53,8 +57,8 @@ public void mousePressed(MouseEvent mouseEvent) { } else { InfoWindow.INSTANCE.show(); } - }) - .start(); + }, + IceAdapter.getExecutor()); } @Override diff --git a/server/build.gradle b/server/build.gradle index 7ad8282..3d46c03 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -5,7 +5,7 @@ apply plugin: 'com.github.johnrengelman.shadow' group 'com.faforever' version '1.0-SNAPSHOT' -sourceCompatibility = JavaVersion.VERSION_17 +sourceCompatibility = JavaVersion.VERSION_21 repositories { mavenCentral() diff --git a/shared/build.gradle b/shared/build.gradle index d8cf631..95aa06b 100644 --- a/shared/build.gradle +++ b/shared/build.gradle @@ -4,7 +4,7 @@ apply plugin: 'java' group 'com.faforever' version '1.0-SNAPSHOT' -sourceCompatibility = JavaVersion.VERSION_17 +sourceCompatibility = JavaVersion.VERSION_21 dependencies{ annotationProcessor("org.projectlombok:lombok:$lombokVersion")