From b9b0237bb74e8416b6b426fdd722ae523dab46f2 Mon Sep 17 00:00:00 2001 From: XAMPPRocky <4464295+XAMPPRocky@users.noreply.github.com> Date: Tue, 27 Feb 2024 14:37:37 +0100 Subject: [PATCH] Update Unreal Engine Plugin (#778) --- docs/src/sdks/unreal-engine.md | 10 +- .../Quilkin/Private/QuilkinPacketHandler.h | 44 -- .../Private/QuilkinSocketSubsystem.cpp | 109 --- .../Quilkin/Private/QuilkinSocketSubsystem.h | 50 -- sdks/ue5/.gitignore | 2 + sdks/{ue4 => ue5}/Quilkin.uplugin | 0 sdks/ue5/README.md | 34 + .../Quilkin/Private/QuilkinCircularBuffer.h | 73 ++ .../Quilkin/Private/QuilkinConcurrentMap.h | 206 ++++++ .../Quilkin/Private/QuilkinConstants.h} | 22 +- .../Private/QuilkinControlMessageProtocol.cpp | 196 +++++ .../Private/QuilkinControlMessageProtocol.h | 147 ++++ .../Quilkin/Private/QuilkinDelegates.cpp | 22 + .../Quilkin/Private/QuilkinEndpoint.cpp | 60 ++ .../Quilkin/Private/QuilkinInternetAddrs.cpp | 53 ++ .../Quilkin/Private/QuilkinInternetAddrs.h} | 29 +- .../Source/Quilkin/Private/QuilkinLog.cpp | 2 +- .../Source/Quilkin/Private/QuilkinLog.h | 4 +- .../Source/Quilkin/Private/QuilkinModule.cpp | 21 +- .../Source/Quilkin/Private/QuilkinModule.h | 6 +- .../Quilkin/Private/QuilkinPacketHandler.cpp | 39 + .../Quilkin/Private/QuilkinPacketHandler.h | 60 ++ .../Source/Quilkin/Private/QuilkinResult.h | 51 ++ .../Quilkin/Private/QuilkinSettings.cpp} | 6 +- .../Source/Quilkin/Private/QuilkinSocket.cpp | 28 +- .../Source/Quilkin/Private/QuilkinSocket.h | 16 +- .../Private/QuilkinSocketSubsystem.cpp | 693 ++++++++++++++++++ .../Quilkin/Private/QuilkinSocketSubsystem.h | 176 +++++ .../QuilkinControlMessageProtocolTest.cpp | 185 +++++ .../Source/Quilkin/Public/QuilkinDelegates.h | 57 ++ .../Source/Quilkin/Public/QuilkinEndpoint.h | 72 ++ .../Source/Quilkin/Public/QuilkinSettings.h | 177 +++++ .../Source/Quilkin/Quilkin.build.cs | 16 +- 33 files changed, 2388 insertions(+), 278 deletions(-) delete mode 100644 sdks/ue4/Source/Quilkin/Private/QuilkinPacketHandler.h delete mode 100644 sdks/ue4/Source/Quilkin/Private/QuilkinSocketSubsystem.cpp delete mode 100644 sdks/ue4/Source/Quilkin/Private/QuilkinSocketSubsystem.h create mode 100644 sdks/ue5/.gitignore rename sdks/{ue4 => ue5}/Quilkin.uplugin (100%) create mode 100644 sdks/ue5/README.md create mode 100644 sdks/ue5/Source/Quilkin/Private/QuilkinCircularBuffer.h create mode 100644 sdks/ue5/Source/Quilkin/Private/QuilkinConcurrentMap.h rename sdks/{ue4/Source/Quilkin/Private/QuilkinPacketHandler.cpp => ue5/Source/Quilkin/Private/QuilkinConstants.h} (60%) create mode 100644 sdks/ue5/Source/Quilkin/Private/QuilkinControlMessageProtocol.cpp create mode 100644 sdks/ue5/Source/Quilkin/Private/QuilkinControlMessageProtocol.h create mode 100644 sdks/ue5/Source/Quilkin/Private/QuilkinDelegates.cpp create mode 100644 sdks/ue5/Source/Quilkin/Private/QuilkinEndpoint.cpp create mode 100644 sdks/ue5/Source/Quilkin/Private/QuilkinInternetAddrs.cpp rename sdks/{ue4/Source/Quilkin/Public/QuilkinDelegates.h => ue5/Source/Quilkin/Private/QuilkinInternetAddrs.h} (54%) rename sdks/{ue4 => ue5}/Source/Quilkin/Private/QuilkinLog.cpp (95%) rename sdks/{ue4 => ue5}/Source/Quilkin/Private/QuilkinLog.h (87%) rename sdks/{ue4 => ue5}/Source/Quilkin/Private/QuilkinModule.cpp (84%) rename sdks/{ue4 => ue5}/Source/Quilkin/Private/QuilkinModule.h (86%) create mode 100644 sdks/ue5/Source/Quilkin/Private/QuilkinPacketHandler.cpp create mode 100644 sdks/ue5/Source/Quilkin/Private/QuilkinPacketHandler.h create mode 100644 sdks/ue5/Source/Quilkin/Private/QuilkinResult.h rename sdks/{ue4/Source/Quilkin/Private/QuilkinDelegates.cpp => ue5/Source/Quilkin/Private/QuilkinSettings.cpp} (79%) rename sdks/{ue4 => ue5}/Source/Quilkin/Private/QuilkinSocket.cpp (92%) rename sdks/{ue4 => ue5}/Source/Quilkin/Private/QuilkinSocket.h (93%) create mode 100644 sdks/ue5/Source/Quilkin/Private/QuilkinSocketSubsystem.cpp create mode 100644 sdks/ue5/Source/Quilkin/Private/QuilkinSocketSubsystem.h create mode 100644 sdks/ue5/Source/Quilkin/Private/Tests/QuilkinControlMessageProtocolTest.cpp create mode 100644 sdks/ue5/Source/Quilkin/Public/QuilkinDelegates.h create mode 100644 sdks/ue5/Source/Quilkin/Public/QuilkinEndpoint.h create mode 100644 sdks/ue5/Source/Quilkin/Public/QuilkinSettings.h rename sdks/{ue4 => ue5}/Source/Quilkin/Quilkin.build.cs (79%) diff --git a/docs/src/sdks/unreal-engine.md b/docs/src/sdks/unreal-engine.md index 168179fe65..100a92bdeb 100644 --- a/docs/src/sdks/unreal-engine.md +++ b/docs/src/sdks/unreal-engine.md @@ -1,11 +1,3 @@ # Quilkin Unreal Engine Plugin -This is an alpha version of the Unreal Engine plugin for Quilkin. Currently it only supports adding a routing token in the following format. - -``` - | token | version -X bytes | 16 bytes | 1 bytes -``` - -## How to install -To get this client proxy installed, the SDK should be located in `Engine` path for Plugins, so copy the whole `ue4` folder (resides under `sdks` folder) in your Unreal Engine path `/[UE4 Root]/Engine/Plugins`, then you may want to rename the ue4 folder to `Quilkin`. Unreal Engine will automatically discover the plugin by searching for `.uplugin` file. +{{#include ../../../sdks/ue5/README.md }} diff --git a/sdks/ue4/Source/Quilkin/Private/QuilkinPacketHandler.h b/sdks/ue4/Source/Quilkin/Private/QuilkinPacketHandler.h deleted file mode 100644 index 4f7654e596..0000000000 --- a/sdks/ue4/Source/Quilkin/Private/QuilkinPacketHandler.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2022 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "CoreMinimal.h" - -class FQuilkinPacketHandler -{ -public: - FQuilkinPacketHandler(); - bool IsEnabled(); - - FORCEINLINE const FBitWriter Handle(const uint8* Packet, int32 CountBytes) - { - // Add the current packet version. - uint8 PacketVersion = 0; - int PacketVersionNumBytes = 1; - - // Reserve enough space for the token and packet version. - FBitWriter NewPacket((CountBytes + RoutingToken.Num() + PacketVersionNumBytes) * 8, true); - - NewPacket.Serialize((void*)Packet, CountBytes); - NewPacket.Serialize(RoutingToken.GetData(), RoutingToken.Num()); - NewPacket.Serialize(&PacketVersion, PacketVersionNumBytes); - return NewPacket; - } - -private: - TArray RoutingToken; -}; diff --git a/sdks/ue4/Source/Quilkin/Private/QuilkinSocketSubsystem.cpp b/sdks/ue4/Source/Quilkin/Private/QuilkinSocketSubsystem.cpp deleted file mode 100644 index fa377e9d19..0000000000 --- a/sdks/ue4/Source/Quilkin/Private/QuilkinSocketSubsystem.cpp +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright 2022 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "QuilkinSocketSubsystem.h" -#include "QuilkinSocket.h" -#include "QuilkinLog.h" - -FQuilkinSocketSubsystem::FQuilkinSocketSubsystem(ISocketSubsystem* WrappedSocketSubsystem) : SocketSubsystem{WrappedSocketSubsystem} -{ -} - -FQuilkinSocketSubsystem::~FQuilkinSocketSubsystem() -{ -} - -bool FQuilkinSocketSubsystem::Init(FString& Error) -{ - return true; -} - -void FQuilkinSocketSubsystem::Shutdown() -{ -} - -FSocket* FQuilkinSocketSubsystem::CreateSocket(const FName& SocketType, const FString& SocketDescription, const FName& ProtocolName) -{ - FSocket* WrappedSocket = SocketSubsystem->CreateSocket(SocketType, SocketDescription, ProtocolName); - if (WrappedSocket == nullptr) - { - UE_LOG(LogQuilkin, Warning, TEXT("CreateSocket returned nullptr")); - return nullptr; - } - - ESocketType InSocketType = WrappedSocket->GetSocketType(); - return new FQuilkinSocket(FUniqueSocket(WrappedSocket), InSocketType, SocketDescription, ProtocolName); -} - -void FQuilkinSocketSubsystem::DestroySocket(FSocket* Socket) -{ - SocketSubsystem->DestroySocket(Socket); -} - -FAddressInfoResult FQuilkinSocketSubsystem::GetAddressInfo(const TCHAR* HostName, const TCHAR* ServiceName, EAddressInfoFlags QueryFlags, const FName ProtocolTypeName, ESocketType SocketType) -{ - return SocketSubsystem->GetAddressInfo(HostName, ServiceName, QueryFlags, ProtocolTypeName, SocketType); -} - -TSharedPtr FQuilkinSocketSubsystem::GetAddressFromString(const FString& InAddress) -{ - return SocketSubsystem->GetAddressFromString(InAddress); -} - -bool FQuilkinSocketSubsystem::RequiresChatDataBeSeparate() -{ - return SocketSubsystem->RequiresChatDataBeSeparate(); -} - -bool FQuilkinSocketSubsystem::RequiresEncryptedPackets() -{ - return SocketSubsystem->RequiresEncryptedPackets(); -} - -bool FQuilkinSocketSubsystem::GetHostName(FString& HostName) -{ - return SocketSubsystem->GetHostName(HostName); -} - -TSharedRef FQuilkinSocketSubsystem::CreateInternetAddr() -{ - return SocketSubsystem->CreateInternetAddr(); -} - -bool FQuilkinSocketSubsystem::HasNetworkDevice() -{ - return SocketSubsystem->HasNetworkDevice(); -} - -const TCHAR* FQuilkinSocketSubsystem::GetSocketAPIName() const -{ - return SocketSubsystem->GetSocketAPIName(); -} - -ESocketErrors FQuilkinSocketSubsystem::GetLastErrorCode() -{ - return SocketSubsystem->GetLastErrorCode(); -} - -ESocketErrors FQuilkinSocketSubsystem::TranslateErrorCode(int32 Code) -{ - return SocketSubsystem->TranslateErrorCode(Code); -} - -bool FQuilkinSocketSubsystem::IsSocketWaitSupported() const -{ - return SocketSubsystem->IsSocketWaitSupported(); -} diff --git a/sdks/ue4/Source/Quilkin/Private/QuilkinSocketSubsystem.h b/sdks/ue4/Source/Quilkin/Private/QuilkinSocketSubsystem.h deleted file mode 100644 index 01bffe6520..0000000000 --- a/sdks/ue4/Source/Quilkin/Private/QuilkinSocketSubsystem.h +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2022 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "CoreMinimal.h" -#include "SocketSubsystem.h" - -#define QUILKIN_SOCKETSUBSYSTEM_NAME TEXT("Quilkin") - -class FQuilkinSocketSubsystem : public ISocketSubsystem -{ -public: - FQuilkinSocketSubsystem(ISocketSubsystem* WrappedSocketSubsystem); - virtual ~FQuilkinSocketSubsystem(); - - //~ Begin ISocketSubsystem Interface - virtual bool Init(FString& Error) override; - virtual void Shutdown() override; - virtual FSocket* CreateSocket(const FName& SocketType, const FString& SocketDescription, const FName& ProtocolName) override; - virtual void DestroySocket(FSocket* Socket) override; - virtual FAddressInfoResult GetAddressInfo(const TCHAR* HostName, const TCHAR* ServiceName = nullptr, EAddressInfoFlags QueryFlags = EAddressInfoFlags::Default, const FName ProtocolTypeName = NAME_None, ESocketType SocketType = ESocketType::SOCKTYPE_Unknown) override; - virtual TSharedPtr GetAddressFromString(const FString& InAddress) override; - virtual bool RequiresChatDataBeSeparate() override; - virtual bool RequiresEncryptedPackets() override; - virtual bool GetHostName(FString& HostName) override; - virtual TSharedRef CreateInternetAddr() override; - virtual bool HasNetworkDevice() override; - virtual const TCHAR* GetSocketAPIName() const override; - virtual ESocketErrors GetLastErrorCode() override; - virtual ESocketErrors TranslateErrorCode(int32 Code) override; - virtual bool IsSocketWaitSupported() const override; - //~ End ISocketSubsystem Interface - -protected: - ISocketSubsystem* SocketSubsystem; -}; diff --git a/sdks/ue5/.gitignore b/sdks/ue5/.gitignore new file mode 100644 index 0000000000..cfbcb6f485 --- /dev/null +++ b/sdks/ue5/.gitignore @@ -0,0 +1,2 @@ +Binaries/ +Intermediate/ diff --git a/sdks/ue4/Quilkin.uplugin b/sdks/ue5/Quilkin.uplugin similarity index 100% rename from sdks/ue4/Quilkin.uplugin rename to sdks/ue5/Quilkin.uplugin diff --git a/sdks/ue5/README.md b/sdks/ue5/README.md new file mode 100644 index 0000000000..e4826240f2 --- /dev/null +++ b/sdks/ue5/README.md @@ -0,0 +1,34 @@ +This is an unreal engine 5 plugin for Quilkin, a UDP proxy for gameservers. The plugin provides several features that you can use with Quilkin deployments, such as proxying game traffic, and latency measurement. + +You can also find guide level documentation on how the proxy works in the [Quilkin Book](https://googleforgames.github.io/quilkin/main/book/). + +### Installation + +Copy this plugin to your `Plugins` folder in your `Engine` directory. + +### Configuration +Static configuration is available in the editor through `UQuilkinDeveloperSettings` in "Project Settings". + +Dynamic configuration is available through `UQuilkinConfigSubsystem`, it is initialised from the settings provided in `UQuilkinDeveloperSettings`, but can also be updated in code, and users can bind individual properties to delegates allowing them to dynamically set based on custom logic. + +- `bool Enabled` Whether the plugin will attach a versioned routing token to UDP packets to allow load balancers forward traffic to the correct gameserver. This also requires the address the clients connect to be a Quilkin load balancer, if connected directly to a gameserver the client will be rejected. +- `bool EnabledInPie` By default `Enabled` is disabled in editor to prevent interfering with local clients and gameservers, you can override this behaviour by also enabling `EnabledInPie`. +- `TArray RoutingToken` The routing token representing the gameserver a client wants to reach, the token **must** be 16 bytes exactly. Currently the plugin only supports using `Enabled` with a routing token to create the following layout. It is assumed that the routing token would come from an external service, such as a matchmaking system. + +``` + | token | version + X bytes | 16 bytes | 1 byte +``` + +- `TArray Endpoints` A set of Quilkin load balancer endpoints that can be used for the following features. +- `bool MeasureEndpoints` When enabled, the plugin will start a new `Tick` task that executes at a fixed interval (currently 30 seconds), where it will spawn a new background task that will ping each endpoint in `Endpoints`, and track its measurement in a fixed size circular buffer. + Pings are handled through Quilkin Control Message Protocol, this is a bespoke protocol for UDP to be able to support situations where for example using ICMP is not possible, see the [Quilkin Book](https://googleforgames.github.io/quilkin/main/book/services/proxy/qcmp.html) for more details on the protocol data unit. + **Note** `MeasureEndpoints` is orthogonal to `Enabled` and `UseEndpoints` meaning that you can use `MeasureEndpoints` for latency measurements without being required to also use Quilkin for game traffic. +- `bool UseEndpoints` Whether to use `Endpoints` for game traffic. When enabled, instead of using the provided `FInternetAddr`, the plugin will choose the lowest latency endpoint available and send traffic through that endpoint to connect to the gameserver, and if the latency should exceed `JitterThreshold` then the plugin will attempt to redirect traffic to the next available endpoint with the lowest latency. + +### Delegates +Quilkin exposes a number of delegates to be able to access certain information, they can be accessed through the `FQuilkinDelegates` class. + +- `GetQuilkinEndpointMeasurements` returns `TArray` representing each endpoint set in `Endpoints` with their median latency. The array will be empty if no endpoints have been set and `MeasureEndpoints` is not enabled. + +- `GetLowestLatencyEndpoint` returns `TOptional` is a specialisation of `GetQuilkinEndpointMeasurements` returning the lowest latency endpoint and its median latency. The delegate will return `None` if the array is empty and `MeasureEndpoints` is not enabled. diff --git a/sdks/ue5/Source/Quilkin/Private/QuilkinCircularBuffer.h b/sdks/ue5/Source/Quilkin/Private/QuilkinCircularBuffer.h new file mode 100644 index 0000000000..9224d4de0a --- /dev/null +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinCircularBuffer.h @@ -0,0 +1,73 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "Containers/Deque.h" + +/* A circular first-in last-out buffer used for taking the median of measurements */ +template +class CircularBuffer +{ +public: + explicit CircularBuffer(size_t InCapacity = 50) + : Capacity(InCapacity) + { + } + + void Add(const T& value) + { + if (Buffer.Num() == Capacity) + { + Buffer.PopFirst(); + } + Buffer.PushLast(value); + } + + bool IsEmpty() + { + return Num() == 0; + } + + size_t Num() + { + return Buffer.Num(); + } + + T Median() const + { + if (Buffer.IsEmpty()) + { + return T{}; // Return default value if the buffer is empty + } + + TArray Sorted; + for (const auto& Item : Buffer) + { + Sorted.Add(Item); + } + + Sorted.Sort(); + + size_t Middle = Sorted.Num() / 2; + + return (Sorted.Num() % 2 == 0) ? (Sorted[Middle] + Sorted[Middle - 1]) / 2 : Sorted[Middle]; + } + +private: + size_t Capacity; + TDeque Buffer; +}; + diff --git a/sdks/ue5/Source/Quilkin/Private/QuilkinConcurrentMap.h b/sdks/ue5/Source/Quilkin/Private/QuilkinConcurrentMap.h new file mode 100644 index 0000000000..750ae6283d --- /dev/null +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinConcurrentMap.h @@ -0,0 +1,206 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +/** A `TMap` data structure wrapped with `FRWLock` to allow for thread safe + * concurrent access. + */ +template +class TSConcurrentMap +{ +public: + TSConcurrentMap() = default; + + ~TSConcurrentMap() { Empty(); } + + /** Resets the map with the specified keysand calls `Func` with each key for + * generating the value to be inserted into the map. + */ + template void ResetWithKeys(const TArray& NewKeys, Fn Func) + { + RWLock.WriteLock(); + DataMap.Empty(); + + for (const TKey& Endpoint : NewKeys) + { + TValue DefaultValue = Func(&Endpoint); + DataMap.Add(Endpoint, DefaultValue); + } + RWLock.WriteUnlock(); + } + + /** If `Key` is not present, inserts `Value` into the map, otherwise it updates + * the existing `Key` entry. + */ + void AddOrUpdate(const TKey& Key, const TValue& Value) + { + RWLock.WriteLock(); + TValue* FoundValue = DataMap.Find(Key); + if (FoundValue) + { + *FoundValue = Value; + } + else + { + DataMap.Add(Key, Value); + } + RWLock.WriteUnlock(); + } + + /** If `Key` is not present, inserts `Value` into the map, otherwise it has + * no effect. + */ + void Add(const TKey& Key, const TValue& Value) + { + RWLock.WriteLock(); + DataMap.Add(Key, Value); + RWLock.WriteUnlock(); + } + + /** Returns the pointer to the value matching `Key`, if present, otherwise + * returns a null pointer. + */ + const TValue* Find(const TKey& Key) const + { + RWLock.ReadLock(); + const TValue* Result = DataMap.Find(Key); + RWLock.ReadUnlock(); + + return Result; + } + + /** Finds or inserts a default `TValue` in `Key`, and then calls `Add` on + * `TValue` with `Latency`. + */ + void FindOrDefaultToAdd(const TKey& Key, int64 Latency) + { + RWLock.WriteLock(); + DataMap.FindOrAdd(Key, TValue()).Add(Latency); + RWLock.WriteUnlock(); + } + + /** Removes the entry matching `Key`, if present. */ + void Remove(const TKey& Key) + { + RWLock.WriteLock(); + DataMap.Remove(Key); + RWLock.WriteUnlock(); + } + + /** Returns the number of entries in the map. */ + int32 Num() const + { + RWLock.ReadLock(); + auto Num = DataMap.Num(); + RWLock.ReadUnlock(); + return Num; + } + + /** Returns whether the map contains no entries. */ + bool IsEmpty() const + { + RWLock.ReadLock(); + auto Empty = DataMap.IsEmpty(); + RWLock.ReadUnlock(); + return Empty; + } + + /** Returns whether the map contains an entry matching `Key`. */ + bool Contains(const TKey& Key) const + { + RWLock.ReadLock(); + auto Result = DataMap.Contains(Key); + RWLock.ReadUnlock(); + return Result; + } + + /** Removes all entries from the map. */ + void Empty() + { + RWLock.WriteLock(); + DataMap.Empty(); + RWLock.WriteUnlock(); + } + + /** Returns all keys from the map. */ + TArray GetKeys() const + { + RWLock.ReadLock(); + TArray Keys; + DataMap.GetKeys(Keys); + RWLock.ReadUnlock(); + return Keys; + } + + /** Accepts a closure which accepts (KEY, VALUE) and returns a ENTRY. + * + * SAFETY: The closure must not call any method which write locks this map, otherwise + * it will cause re-entrance. + */ + template + TArray MapToArray(Fn Closure) const + { + TArray Entries; + RWLock.ReadLock(); + for (auto& Entry : DataMap) { + Entries.Push(Closure(Entry.template Get<0>(), Entry.template Get<1>())); + } + RWLock.ReadUnlock(); + return Entries; + } + + /** Accepts a closure which accepts (KEY, VALUE) and returns a TOptional. + * If TOptional is not set, then that entry is not included in the returned array. + * + * SAFETY: The closure must not call any method which write locks this map, otherwise + * it will cause re-entrance. + */ + template + TArray FilterMapToArray(Fn Closure) const + { + TArray Entries; + RWLock.ReadLock(); + for (auto& Entry : DataMap) { + TOptional Option = Closure(Entry.template Get<0>(), Entry.template Get<1>()); + + if (Option.IsSet()) { + Entries.Push(Option.GetValue()); + } + } + RWLock.ReadUnlock(); + return Entries; + } + + /** Accepts a closure which accepts (KEY, VALUE) and returns void. + * + * SAFETY: The closure must not call any method which write locks this map, otherwise + * it will cause re-entrance. + */ + template void ForEach(Fn Func) const + { + RWLock.ReadLock(); + for (const TPair& Pair : DataMap) + { + Func(Pair.template Get<0>(), Pair.template Get<1>()); + } + RWLock.ReadUnlock(); + } + +private: + TMap DataMap; + mutable FRWLock RWLock; +}; + diff --git a/sdks/ue4/Source/Quilkin/Private/QuilkinPacketHandler.cpp b/sdks/ue5/Source/Quilkin/Private/QuilkinConstants.h similarity index 60% rename from sdks/ue4/Source/Quilkin/Private/QuilkinPacketHandler.cpp rename to sdks/ue5/Source/Quilkin/Private/QuilkinConstants.h index c5a7a995f8..c27112a9f0 100644 --- a/sdks/ue4/Source/Quilkin/Private/QuilkinPacketHandler.cpp +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinConstants.h @@ -1,5 +1,5 @@ /* - * Copyright 2022 Google LLC + * Copyright 2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,20 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#pragma once -#include "QuilkinPacketHandler.h" -#include "QuilkinDelegates.h" +#include "CoreMinimal.h" - -FQuilkinPacketHandler::FQuilkinPacketHandler() -{ - if (FQuilkinDelegates::GetQuilkinRoutingToken.IsBound()) - { - RoutingToken = FQuilkinDelegates::GetQuilkinRoutingToken.Execute(); - } +inline constexpr uint64_t MillisToNanos(uint64_t milliseconds) { + return milliseconds * 1000000; } -bool FQuilkinPacketHandler::IsEnabled() +constexpr int64 DefaultLatencyThreshold = MillisToNanos(150); +constexpr int64 DefaultPenaltyLatency = MillisToNanos(200); + +inline int64 NanosToMillis(int64 Nanoseconds) { - return RoutingToken.Num() > 0; + return Nanoseconds / 1'000'000; } diff --git a/sdks/ue5/Source/Quilkin/Private/QuilkinControlMessageProtocol.cpp b/sdks/ue5/Source/Quilkin/Private/QuilkinControlMessageProtocol.cpp new file mode 100644 index 0000000000..16b76905c8 --- /dev/null +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinControlMessageProtocol.cpp @@ -0,0 +1,196 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "QuilkinControlMessageProtocol.h" + +#include "Containers/UnrealString.h" +#include "QuilkinLog.h" + +template +TOptional FProtocolVariant::AsVariant() { + if (std::is_same() && (GetCode() == FPing::Discriminant())) { + return TOptional(static_cast(this)); + } + else if (std::is_same() && (GetCode() == FPingReply::Discriminant())) { + return TOptional(static_cast(this)); + } + else { + return TOptional(); + } +} + +// We get linker errors without this specialisation, I don't know why. +template <> +TOptional FProtocolVariant::AsVariant() { + if (GetCode() == FPingReply::Discriminant()) { + return TOptional(static_cast(this)); + } + else { + return TOptional(); + } +} + +FBufferArchive FProtocolVariant::Encode() const +{ + FBufferArchive VariantAr = EncodeVariant(); + checkSlow(VariantAr.Num() == PayloadLength()); + + FBufferArchive Ar; + ArchiveExtensions::EncodeBe(Ar, MagicHeader); + ArchiveExtensions::EncodeBe(Ar, ProtocolVersion); + ArchiveExtensions::EncodeBe(Ar, GetCode()); + ArchiveExtensions::EncodeBe(Ar, PayloadLength()); + ArchiveExtensions::EncodeArchiveBe(Ar, VariantAr); + + return Ar; +} + +TResult, FString> FProtocolVariant::Decode(const TArray& Buffer) +{ + FMemoryReader Ar(Buffer); +#if PLATFORM_LITTLE_ENDIAN + Ar.ArForceByteSwapping = true; +#endif + + if (Ar.TotalSize() < sizeof(MagicHeader) + sizeof(ProtocolVersion) + sizeof(uint8)) + { + return TResult, FString>("Buffer too small"); + } + + uint8 DecodedMagicHeader[4]; + uint8 DecodedProtocolVersion; + uint8 Discriminant; + + Ar << DecodedMagicHeader[0]; + Ar << DecodedMagicHeader[1]; + Ar << DecodedMagicHeader[2]; + Ar << DecodedMagicHeader[3]; + + Ar << DecodedProtocolVersion << Discriminant; + + bool headerMatches = true; + for (size_t i = 0; i < 4; ++i) + { + uint8 Lhs = DecodedMagicHeader[i]; + uint8 Rhs = MagicHeaderBytes[i]; + + if (Lhs != Rhs) + { + headerMatches = false; + } + } + + if (!headerMatches) + { + return TResult, FString>("Invalid magic header"); + } + + if (DecodedProtocolVersion != ProtocolVersion) + { + return TResult, FString>("unknown protocol version"); + } + + uint16 Length = 0; + + for (auto i = 0; i < 2; i++) { + uint8 Byte; + Length = Length << 8; + Ar << Byte; + Length = Length | Byte; + } + + switch (Discriminant) + { + case 0: + { + if (Length != FPing::StaticPayloadLength()) { + return TResult, FString>("Ping Length Mismatch"); + } + + auto DecodeResult = FPing::DecodeVariant(Ar); + if (DecodeResult.IsError()) + { + return TResult, FString>(DecodeResult.GetError()); + } + return TResult, FString>(MakeShared(DecodeResult.GetValue())); + } + case 1: + { + if (Length != FPingReply::StaticPayloadLength()) { + return TResult, FString>("Ping Length Mismatch"); + } + + auto DecodeResult = FPingReply::DecodeVariant(Ar); + if (DecodeResult.IsError()) + { + return TResult, FString>(DecodeResult.GetError()); + } + return TResult, FString>(MakeShared(DecodeResult.GetValue())); + } + default: + // Unknown packet type, return error + return TResult, FString>("Unknown packet type"); + } +} + +FBufferArchive FPing::EncodeVariant() const +{ + FBufferArchive Ar; + ArchiveExtensions::EncodeBe(Ar, Nonce); + ArchiveExtensions::EncodeBe(Ar, ClientTimestamp); + return Ar; +} + +TResult FPing::DecodeVariant(FMemoryReader& Ar) +{ + if (Ar.TotalSize() - Ar.Tell() < FPing::StaticPayloadLength()) + { + return TResult("Insufficient data for FPing"); + } + + uint8 DecodedNonce; + int64 DecodedClientTimestamp; + + Ar << DecodedNonce; + Ar << DecodedClientTimestamp; + + return TResult(FPing(DecodedNonce, DecodedClientTimestamp)); +} + +FBufferArchive FPingReply::EncodeVariant() const +{ + FBufferArchive Ar; + ArchiveExtensions::EncodeBe(Ar, Nonce); + ArchiveExtensions::EncodeBe(Ar, ClientTimestamp); + ArchiveExtensions::EncodeBe(Ar, ServerStartTimestamp); + ArchiveExtensions::EncodeBe(Ar, ServerTransmitTimestamp); + return Ar; +} + +TResult FPingReply::DecodeVariant(FMemoryReader& Ar) +{ + if (Ar.TotalSize() - Ar.Tell() < FPingReply::StaticPayloadLength()) + { + return TResult("Insufficient data for FPingReply"); + } + + uint8 DecodedNonce; + int64 DecodedClientTimestamp, DecodedServerStartTimestamp, DecodedServerTransmitTimestamp; + + Ar << DecodedNonce << DecodedClientTimestamp << DecodedServerStartTimestamp << DecodedServerTransmitTimestamp; + + return TResult(FPingReply(DecodedNonce, DecodedClientTimestamp, DecodedServerStartTimestamp, DecodedServerTransmitTimestamp)); +} \ No newline at end of file diff --git a/sdks/ue5/Source/Quilkin/Private/QuilkinControlMessageProtocol.h b/sdks/ue5/Source/Quilkin/Private/QuilkinControlMessageProtocol.h new file mode 100644 index 0000000000..75dafedb75 --- /dev/null +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinControlMessageProtocol.h @@ -0,0 +1,147 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +#include "Containers/Union.h" +#include "Containers/UnrealString.h" +#include "CoreMinimal.h" +#include "Math/UnrealMathUtility.h" +#include "Misc/DateTime.h" +#include "Serialization/BufferArchive.h" +#include "Serialization/MemoryReader.h" + +#include "QuilkinResult.h" + +static inline int64 GetUnixTimestampInNanos() +{ + auto now = std::chrono::system_clock::now(); + auto now_ns = std::chrono::duration_cast(now.time_since_epoch()); + return now_ns.count(); +} + +static constexpr uint32 ConvertMagicHeaderToUInt32(const uint8 Header[4]) { + return (static_cast(Header[0]) << 24) | + (static_cast(Header[1]) << 16) | + (static_cast(Header[2]) << 8) | + (static_cast(Header[3])); +} + +class FProtocolVariant +{ +public: + virtual ~FProtocolVariant() = default; + + template + TOptional AsVariant(); + + FBufferArchive Encode() const; + static TResult, FString> Decode(const TArray& Buffer); + virtual FBufferArchive EncodeVariant() const = 0; + virtual uint16 PayloadLength() const = 0; + virtual uint8 GetCode() const = 0; + virtual uint8 GetNonce() const = 0; + +private: + static constexpr uint8 MagicHeaderBytes[] = { 'Q' , 'L' , 'K' , 'N' }; + static constexpr uint32 MagicHeader = ConvertMagicHeaderToUInt32(MagicHeaderBytes); + static constexpr uint8 ProtocolVersion = 0; +}; + +struct FPing : public FProtocolVariant +{ + uint8 Nonce; + int64 ClientTimestamp; + +public: + explicit FPing() + { + Nonce = FMath::RandHelper(256); + ClientTimestamp = GetUnixTimestampInNanos(); + } + + explicit FPing(uint8 InNonce, int64 InClientTimestamp) + : Nonce(InNonce), ClientTimestamp(InClientTimestamp) + { + } + + static TResult DecodeVariant(FMemoryReader& Ar); + static uint8 Discriminant() { return 0; } + static uint8 StaticPayloadLength() { return sizeof(Nonce) + sizeof(ClientTimestamp); } + virtual FBufferArchive EncodeVariant() const override; + virtual uint16 PayloadLength() const override { return StaticPayloadLength(); } + virtual uint8 GetCode() const override { return Discriminant(); } + virtual uint8 GetNonce() const override { return Nonce; } + uint8 GetTimestamp() const { return ClientTimestamp; } +}; + +struct FPingReply : public FProtocolVariant +{ + uint8 Nonce; + int64 ClientTimestamp; + int64 ServerStartTimestamp; + int64 ServerTransmitTimestamp; + +public: + explicit FPingReply(uint8 InNonce, int64 InClientTimestamp, int64 InServerStartTimestamp, int64 InServerTransmitTimestamp) + : Nonce(InNonce), ClientTimestamp(InClientTimestamp), ServerStartTimestamp(InServerStartTimestamp), ServerTransmitTimestamp(InServerTransmitTimestamp) + { + } + + int64 RoundTimeDelay(int64 ClientReceiveTimestamp) { + return (ClientReceiveTimestamp - ClientTimestamp) + - (ServerTransmitTimestamp - ServerStartTimestamp); + } + + int64 RoundTimeDelay() { + return RoundTimeDelay(GetUnixTimestampInNanos()); + } + + static TResult DecodeVariant(FMemoryReader& Ar); + static uint8 Discriminant() { return 1; } + static uint8 StaticPayloadLength() { return sizeof(Nonce) + sizeof(ClientTimestamp) + sizeof(ServerStartTimestamp) + sizeof(ServerTransmitTimestamp); } + virtual FBufferArchive EncodeVariant() const override; + virtual uint16 PayloadLength() const override { return StaticPayloadLength(); } + virtual uint8 GetCode() const override { return Discriminant(); } + virtual uint8 GetNonce() const override { return Nonce; } +}; + +class ArchiveExtensions +{ +public: + /* Encodes an integer format in big endian encoding, this should + be handled by `FBufferArchive.ArForceByteSwapping` for us, but + that doesn't seem to actually work as expected. + */ + template + static void EncodeBe(FBufferArchive& Ar, Int Value) { + for (int i = sizeof(Int) - 1; i >= 0; --i) { + uint8 Byte = (Value >> (8 * i)) & 0xFF; + Ar.Serialize(&Byte, sizeof(Byte)); + } + } + + /* Encodes a buffer archive into another in big endian format. */ + static void EncodeArchiveBe(FBufferArchive& Ar, FBufferArchive& Input) { + for (int32 i = 0; i < Input.Num(); ++i) + { + uint8 Byte = Input[i]; + ArchiveExtensions::EncodeBe(Ar, Byte); + } + } +}; diff --git a/sdks/ue5/Source/Quilkin/Private/QuilkinDelegates.cpp b/sdks/ue5/Source/Quilkin/Private/QuilkinDelegates.cpp new file mode 100644 index 0000000000..6df664379f --- /dev/null +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinDelegates.cpp @@ -0,0 +1,22 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "QuilkinDelegates.h" + +FQuilkinDelegates::FGetQuilkinEndpointMeasurements FQuilkinDelegates::GetQuilkinEndpointMeasurements; +FQuilkinDelegates::FGetLowestLatencyEndpoint FQuilkinDelegates::GetLowestLatencyEndpoint; +FQuilkinDelegates::FGetLowestLatencyEndpointInRegion FQuilkinDelegates::GetLowestLatencyEndpointInRegion; +FQuilkinDelegates::FGetLowestLatencyToDatacenters FQuilkinDelegates::GetLowestLatencyToDatacenters; diff --git a/sdks/ue5/Source/Quilkin/Private/QuilkinEndpoint.cpp b/sdks/ue5/Source/Quilkin/Private/QuilkinEndpoint.cpp new file mode 100644 index 0000000000..2d4bffab20 --- /dev/null +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinEndpoint.cpp @@ -0,0 +1,60 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "QuilkinEndpoint.h" + +#include "Interfaces/IPv4/IPv4Address.h" +#include "QuilkinSocketSubsystem.h" + +const TResult, ResolveError> +FQuilkinEndpoint::ToInternetAddr(FQuilkinSocketSubsystem* SocketSubsystem) const +{ + return ToInternetAddrBase(SocketSubsystem, Host, TrafficPort); +} + +const TResult, ResolveError> +FQuilkinEndpoint::ToQcmpInternetAddr(FQuilkinSocketSubsystem* SocketSubsystem) const +{ + return ToInternetAddrBase(SocketSubsystem, Host, QcmpPort); +} + +const TResult, ResolveError> +FQuilkinEndpoint::ToInternetAddrBase(FQuilkinSocketSubsystem* SocketSubsystem, FString InHost, uint16 Port) const +{ + const FName Name = FName(); + TSharedRef Addr = SocketSubsystem->CreateInternetAddr(Name); + + FIPv4Address IPv4Address; + if (FIPv4Address::Parse(*InHost, IPv4Address)) + { + Addr->SetIp(IPv4Address.Value); + } + else + { + bool Resolved; + Addr->SetIp(*InHost, Resolved); + + if (!Resolved) + { + ResolveError Error = {}; + return TResult, ResolveError>(Error); + } + } + + Addr->SetPort(Port); + return TResult, ResolveError>(Addr); +} + diff --git a/sdks/ue5/Source/Quilkin/Private/QuilkinInternetAddrs.cpp b/sdks/ue5/Source/Quilkin/Private/QuilkinInternetAddrs.cpp new file mode 100644 index 0000000000..a385f37b52 --- /dev/null +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinInternetAddrs.cpp @@ -0,0 +1,53 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "QuilkinInternetAddrs.h" +#include "IPAddress.h" + +TOptional FQuilkinInternetAddrs::GetLowestLatencyEndpoint() { + if (Endpoints.IsEmpty()) { + return TOptional>(); + } + bool NoMeasurements = true; + Endpoints.ForEach([&NoMeasurements](FQuilkinEndpoint Endpoint, CircularBuffer Buffer) { + if (!Buffer.IsEmpty()) { + NoMeasurements = false; + } + }); + if (NoMeasurements) { + return TOptional>(); + } + FQuilkinEndpoint LowestEndpoint; + int64 LowestLatency = INT64_MAX; + Endpoints.ForEach([&LowestEndpoint, &LowestLatency](FQuilkinEndpoint Endpoint, CircularBuffer Buffer) { + int64 Median = Buffer.Median(); + if (Median < LowestLatency) { + LowestEndpoint = Endpoint; + LowestLatency = Median; + } + }); + return TOptional(TTuple(LowestEndpoint, LowestLatency)); +} + +void FQuilkinInternetAddrs::SetIp(uint32 InAddr) { + UE_LOG(LogQuilkin, Warning, TEXT("SetIp will no-op while Quilkin is enabled, set the available proxy endpoints through `AddAddr`")); +} + +void FQuilkinInternetAddrs::SetIp(const TCHAR* InAddr, bool& bIsValid) { + UE_LOG(LogQuilkin, Warning, TEXT("SetIp will no-op while Quilkin is enabled, set the available proxy endpoints through `AddAddr`")); +} diff --git a/sdks/ue4/Source/Quilkin/Public/QuilkinDelegates.h b/sdks/ue5/Source/Quilkin/Private/QuilkinInternetAddrs.h similarity index 54% rename from sdks/ue4/Source/Quilkin/Public/QuilkinDelegates.h rename to sdks/ue5/Source/Quilkin/Private/QuilkinInternetAddrs.h index 6b49017dbb..c333768646 100644 --- a/sdks/ue4/Source/Quilkin/Public/QuilkinDelegates.h +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinInternetAddrs.h @@ -1,5 +1,5 @@ /* - * Copyright 2022 Google LLC + * Copyright 2024 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,19 +17,20 @@ #pragma once #include "CoreMinimal.h" -#include "UObject/ObjectMacros.h" -#include "UObject/UObjectGlobals.h" -#include "UObject/Object.h" +#include "QuilkinCircularBuffer.h" +#include "../Public/QuilkinEndpoint.h" +#include "IPAddress.h" -class QUILKIN_API FQuilkinDelegates -{ +class QUILKIN_API FQuilkinInternetAddrs : public FInternetAddr { public: - /** - * Delegate used to retrieve the client's proxy routing token if - * connection takes place via a proxy. - * - * @return The client's routing token to use. - */ - DECLARE_DELEGATE_RetVal(TArray, FGetQuilkinRoutingToken); - static FGetQuilkinRoutingToken GetQuilkinRoutingToken; + //~ Start FInternetAddr overrides + virtual void SetIp(uint32 InAddr) override; + virtual void SetIp(const TCHAR* InAddr, bool& bIsValid) override; + + //~ End FInternetAddr overrides + + TOptional GetLowestLatencyEndpoint(); + +private: + TSConcurrentMap> Endpoints; }; diff --git a/sdks/ue4/Source/Quilkin/Private/QuilkinLog.cpp b/sdks/ue5/Source/Quilkin/Private/QuilkinLog.cpp similarity index 95% rename from sdks/ue4/Source/Quilkin/Private/QuilkinLog.cpp rename to sdks/ue5/Source/Quilkin/Private/QuilkinLog.cpp index 98413e632a..80fb246067 100644 --- a/sdks/ue4/Source/Quilkin/Private/QuilkinLog.cpp +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinLog.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2022 Google LLC + * Copyright 2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/sdks/ue4/Source/Quilkin/Private/QuilkinLog.h b/sdks/ue5/Source/Quilkin/Private/QuilkinLog.h similarity index 87% rename from sdks/ue4/Source/Quilkin/Private/QuilkinLog.h rename to sdks/ue5/Source/Quilkin/Private/QuilkinLog.h index 57fa95fbec..61d1a9a524 100644 --- a/sdks/ue4/Source/Quilkin/Private/QuilkinLog.h +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinLog.h @@ -1,5 +1,5 @@ /* - * Copyright 2022 Google LLC + * Copyright 2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,4 +18,4 @@ #include "CoreMinimal.h" -DECLARE_LOG_CATEGORY_EXTERN(LogQuilkin, VeryVerbose, All); +DECLARE_LOG_CATEGORY_EXTERN(LogQuilkin, Display, All); diff --git a/sdks/ue4/Source/Quilkin/Private/QuilkinModule.cpp b/sdks/ue5/Source/Quilkin/Private/QuilkinModule.cpp similarity index 84% rename from sdks/ue4/Source/Quilkin/Private/QuilkinModule.cpp rename to sdks/ue5/Source/Quilkin/Private/QuilkinModule.cpp index f54340c0a5..442295bdec 100644 --- a/sdks/ue4/Source/Quilkin/Private/QuilkinModule.cpp +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinModule.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2022 Google LLC + * Copyright 2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,11 @@ #include "Modules/ModuleManager.h" #include "SocketSubsystemModule.h" #include "UObject/NameTypes.h" +#include "Misc/ConfigCacheIni.h" + +#define FIVE_MINUTES 5 * 60 + +IMPLEMENT_MODULE(FQuilkinModule, Quilkin); void FQuilkinModule::StartupModule() { @@ -37,9 +42,9 @@ void FQuilkinModule::StartupModule() return; } - FSocketSubsystemModule &SocketSubsystemModule = FModuleManager::LoadModuleChecked("Sockets"); + FSocketSubsystemModule& SocketSubsystemModule = FModuleManager::LoadModuleChecked("Sockets"); - ISocketSubsystem *DefaultSocketSubsystem = SocketSubsystemModule.GetSocketSubsystem(); + ISocketSubsystem* DefaultSocketSubsystem = SocketSubsystemModule.GetSocketSubsystem(); if (DefaultSocketSubsystem == nullptr) { UE_LOG(LogQuilkin, Log, TEXT("No default SocketSubsystem was set. Will not use Quilkin SocketSubsystem")); @@ -47,7 +52,9 @@ void FQuilkinModule::StartupModule() } UE_LOG(LogQuilkin, Log, TEXT("Overriding default SocketSubsystem with QuilkinSocketSubsystem")); - QuilkinSocketSubsystem = MakeUnique(DefaultSocketSubsystem); + QuilkinSocketSubsystem = MakeShared(DefaultSocketSubsystem); + FString Unused; + QuilkinSocketSubsystem->Init(Unused); SocketSubsystemModule.RegisterSocketSubsystem(QUILKIN_SOCKETSUBSYSTEM_NAME, QuilkinSocketSubsystem.Get(), true); } @@ -60,7 +67,7 @@ void FQuilkinModule::ShutdownModule() return; } - FSocketSubsystemModule &SocketSubsystemModule = FModuleManager::LoadModuleChecked("Sockets"); + FSocketSubsystemModule& SocketSubsystemModule = FModuleManager::LoadModuleChecked("Sockets"); SocketSubsystemModule.UnregisterSocketSubsystem(QUILKIN_SOCKETSUBSYSTEM_NAME); QuilkinSocketSubsystem.Reset(); } @@ -74,6 +81,4 @@ bool FQuilkinModule::SupportsAutomaticShutdown() { // Shutdown gets called by the SocketSubsystem, if we were registered (and we don't do anything if we weren't) return false; -} - -IMPLEMENT_MODULE(FQuilkinModule, Quilkin); +} \ No newline at end of file diff --git a/sdks/ue4/Source/Quilkin/Private/QuilkinModule.h b/sdks/ue5/Source/Quilkin/Private/QuilkinModule.h similarity index 86% rename from sdks/ue4/Source/Quilkin/Private/QuilkinModule.h rename to sdks/ue5/Source/Quilkin/Private/QuilkinModule.h index 2fd8a888db..72448e4487 100644 --- a/sdks/ue4/Source/Quilkin/Private/QuilkinModule.h +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinModule.h @@ -1,5 +1,5 @@ /* - * Copyright 2022 Google LLC + * Copyright 2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,8 @@ #include "CoreMinimal.h" #include "Modules/ModuleInterface.h" #include "QuilkinSocketSubsystem.h" +#include "QuilkinEndpoint.h" +#include "QuilkinConcurrentMap.h" class FQuilkinModule : public IModuleInterface { @@ -31,5 +33,5 @@ class FQuilkinModule : public IModuleInterface //~ End IModuleInterface Interface private: - TUniquePtr QuilkinSocketSubsystem; + TSharedPtr QuilkinSocketSubsystem; }; diff --git a/sdks/ue5/Source/Quilkin/Private/QuilkinPacketHandler.cpp b/sdks/ue5/Source/Quilkin/Private/QuilkinPacketHandler.cpp new file mode 100644 index 0000000000..1ddef65c1f --- /dev/null +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinPacketHandler.cpp @@ -0,0 +1,39 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "QuilkinPacketHandler.h" +#include "Engine/GameInstance.h" +#include "QuilkinSettings.h" + +FQuilkinPacketHandler::FQuilkinPacketHandler() +{ + auto Settings = UQuilkinConfigSubsystem::Get(); + RoutingToken = Settings->GetRoutingToken(); + Enabled = Settings->PacketHandling && Settings->GetEnabled(); + + if (UE_LOG_ACTIVE(LogQuilkin, Display)) + { + FString Base64Token = FBase64::Encode(RoutingToken); + UE_LOG(LogQuilkin, Display, TEXT("Initialising PacketHandler: Packet Handling: %s, Routing Token: %s"), Enabled ? TEXT("Enabled") : TEXT("Disabled"), *Base64Token); + } +} + +bool FQuilkinPacketHandler::IsEnabled() +{ + // If it was disabled when it was initially enabled, keep it disabled, + // If it was disabled later while it was enabled, disable it. + return Enabled && UQuilkinConfigSubsystem::Get()->GetEnabled(); +} diff --git a/sdks/ue5/Source/Quilkin/Private/QuilkinPacketHandler.h b/sdks/ue5/Source/Quilkin/Private/QuilkinPacketHandler.h new file mode 100644 index 0000000000..d7abc096c9 --- /dev/null +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinPacketHandler.h @@ -0,0 +1,60 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "CoreMinimal.h" +#include "QuilkinLog.h" +#include "Serialization/BitWriter.h" +#include "Misc/Base64.h" + +class FQuilkinPacketHandler +{ +public: + FQuilkinPacketHandler(); + bool IsEnabled(); + bool IsDisabled() { + return !IsEnabled(); + }; + + template FORCEINLINE const bool Write(const uint8* Data, int32 CountBytes, int32& BytesSent, Fn WriteToSocket) + { + if (IsDisabled()) { + return WriteToSocket(Data, CountBytes, BytesSent); + } + ensureMsgf(RoutingToken.Num() == 16, TEXT("Routing token must be 16 bytes, received %d, proxy connection will fail"), RoutingToken.Num()); + + // Add the current packet version. + uint8 PacketVersion = 0; + int PacketVersionNumBytes = 1; + // Reserve enough space for the token and packet version. + FBitWriter Packet((CountBytes + RoutingToken.Num() + PacketVersionNumBytes) * 8, true); + + Packet.Serialize((void*)Data, CountBytes); + Packet.Serialize(RoutingToken.GetData(), RoutingToken.Num()); + Packet.Serialize(&PacketVersion, PacketVersionNumBytes); + if (UE_LOG_ACTIVE(LogQuilkin, VeryVerbose)) + { + FString Base64Token = FBase64::Encode(RoutingToken); + UE_LOG(LogQuilkin, VeryVerbose, TEXT("Wrapping packet in Quilkin PDU; version: %d, size: %d, token: %s"), PacketVersion, Packet.GetNumBytes(), *Base64Token); + } + return WriteToSocket(Packet.GetData(), Packet.GetNumBytes(), BytesSent); + } + +private: + TArray RoutingToken; + bool Enabled = false; +}; diff --git a/sdks/ue5/Source/Quilkin/Private/QuilkinResult.h b/sdks/ue5/Source/Quilkin/Private/QuilkinResult.h new file mode 100644 index 0000000000..8d76580152 --- /dev/null +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinResult.h @@ -0,0 +1,51 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "Misc/TVariant.h" + +/* Equivalvent of Rust's `()` type to represent no-op operations */ +struct TUnit {}; + +/* +* This is a port of Rust's Result type, in order to make it +* easier to define fallible functions without needing out parameters +*/ +template +class TResult +{ +public: + ~TResult() = default; + + TResult(TValue InValue) + : Data(TInPlaceType(), InValue) + { + } + + TResult(TError InError) + : Data(TInPlaceType(), InError) + { + } + + bool IsSuccess() const { return Data.template IsType(); } + bool IsError() const { return Data.template IsType(); } + const TValue& GetValue() const { check(IsSuccess()); return Data.template Get(); } + const TError& GetError() const { check(IsError()); return Data.template Get(); } + +private: + TVariant Data; +}; \ No newline at end of file diff --git a/sdks/ue4/Source/Quilkin/Private/QuilkinDelegates.cpp b/sdks/ue5/Source/Quilkin/Private/QuilkinSettings.cpp similarity index 79% rename from sdks/ue4/Source/Quilkin/Private/QuilkinDelegates.cpp rename to sdks/ue5/Source/Quilkin/Private/QuilkinSettings.cpp index 2d01773705..d5eaf5cbda 100644 --- a/sdks/ue4/Source/Quilkin/Private/QuilkinDelegates.cpp +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinSettings.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2022 Google LLC + * Copyright 2024 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,6 +14,4 @@ * limitations under the License. */ -#include "QuilkinDelegates.h" - -FQuilkinDelegates::FGetQuilkinRoutingToken FQuilkinDelegates::GetQuilkinRoutingToken; +#include "QuilkinSettings.h" diff --git a/sdks/ue4/Source/Quilkin/Private/QuilkinSocket.cpp b/sdks/ue5/Source/Quilkin/Private/QuilkinSocket.cpp similarity index 92% rename from sdks/ue4/Source/Quilkin/Private/QuilkinSocket.cpp rename to sdks/ue5/Source/Quilkin/Private/QuilkinSocket.cpp index 7b50f7052d..7b3899c157 100644 --- a/sdks/ue4/Source/Quilkin/Private/QuilkinSocket.cpp +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinSocket.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2022 Google LLC + * Copyright 2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,14 @@ #include "QuilkinSocket.h" +#include "Containers/Ticker.h" +#include "Async/Async.h" + +#include "QuilkinConcurrentMap.h" +#include "QuilkinConstants.h" +#include "QuilkinSettings.h" +#include "QuilkinSocketSubsystem.h" + FQuilkinSocket::FQuilkinSocket(FUniqueSocket WrappedSocket, ESocketType InSocketType, const FString& InSocketDescription, const FName& InSocketProtocol) : FSocket(InSocketType, InSocketDescription, InSocketProtocol) , Socket{MoveTemp(WrappedSocket)} @@ -74,26 +82,16 @@ FSocket* FQuilkinSocket::Accept(FInternetAddr& OutAddr, const FString& InSocketD bool FQuilkinSocket::SendTo(const uint8* Data, int32 Count, int32& BytesSent, const FInternetAddr& Destination) { - if (!Handler.IsEnabled()) - { + return Handler.Write(Data, Count, BytesSent, [this, &Destination](const uint8* Data, int32 Count, int32& BytesSent) { return Socket.Get()->SendTo(Data, Count, BytesSent, Destination); - } - - FBitWriter Packet; - Packet = Handler.Handle(Data, Count); - return Socket.Get()->SendTo(Packet.GetData(), Packet.GetNumBytes(), BytesSent, Destination); + }); } bool FQuilkinSocket::Send(const uint8* Data, int32 Count, int32& BytesSent) { - if (!Handler.IsEnabled()) - { + return Handler.Write(Data, Count, BytesSent, [this](const uint8* Data, int32 Count, int32& BytesSent) { return Socket.Get()->Send(Data, Count, BytesSent); - } - - FBitWriter Packet; - Packet = Handler.Handle(Data, Count); - return Socket.Get()->Send(Packet.GetData(), Packet.GetNumBytes(), BytesSent); + }); } bool FQuilkinSocket::RecvFrom(uint8* Data, int32 BufferSize, int32& BytesRead, FInternetAddr& Source, ESocketReceiveFlags::Type Flags) diff --git a/sdks/ue4/Source/Quilkin/Private/QuilkinSocket.h b/sdks/ue5/Source/Quilkin/Private/QuilkinSocket.h similarity index 93% rename from sdks/ue4/Source/Quilkin/Private/QuilkinSocket.h rename to sdks/ue5/Source/Quilkin/Private/QuilkinSocket.h index 36d7153b2a..71d8ecf2b2 100644 --- a/sdks/ue4/Source/Quilkin/Private/QuilkinSocket.h +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinSocket.h @@ -1,5 +1,5 @@ /* - * Copyright 2022 Google LLC + * Copyright 2023 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,13 +16,20 @@ #pragma once +#include + #include "CoreMinimal.h" #include "Sockets.h" #include "SocketSubsystem.h" + +#include "QuilkinCircularBuffer.h" +#include "QuilkinEndpoint.h" #include "QuilkinPacketHandler.h" +#include "Containers/Ticker.h" + +class FQuilkinSocketSubsystem; -class FQuilkinSocket - : public FSocket +class FQuilkinSocket: public FSocket { public: FQuilkinSocket(FUniqueSocket WrappedSocket, ESocketType InSocketType, const FString& InSocketDescription, const FName& InSocketProtocol); @@ -68,7 +75,8 @@ class FQuilkinSocket virtual bool RecvFromWithPktInfo(uint8* Data, int32 BufferSize, int32& BytesRead, FInternetAddr& Source, FInternetAddr& Destination, ESocketReceiveFlags::Type Flags = ESocketReceiveFlags::None) override; //~ End FSocket Interface + TWeakPtr Subsystem; protected: FUniqueSocket Socket; FQuilkinPacketHandler Handler; -}; +}; \ No newline at end of file diff --git a/sdks/ue5/Source/Quilkin/Private/QuilkinSocketSubsystem.cpp b/sdks/ue5/Source/Quilkin/Private/QuilkinSocketSubsystem.cpp new file mode 100644 index 0000000000..eb88eefc17 --- /dev/null +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinSocketSubsystem.cpp @@ -0,0 +1,693 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "QuilkinSocketSubsystem.h" +#include "QuilkinControlMessageProtocol.h" +#include "QuilkinCircularBuffer.h" +#include "QuilkinDelegates.h" +#include "QuilkinSocket.h" +#include "QuilkinLog.h" +#include "QuilkinSettings.h" +#include "Async/Async.h" +#include "Async/ParallelFor.h" +#include "IPAddress.h" +#include "Engine/GameInstance.h" +#include "Runtime/Online/HTTP/Public/Http.h" +#include "GenericPlatform/GenericPlatformMath.h" + +static int64 NowSeconds() { + return FDateTime::UtcNow().ToUnixTimestamp(); +} + +FQuilkinSocketSubsystem::FQuilkinSocketSubsystem(ISocketSubsystem* WrappedSocketSubsystem) + : PingSocket{nullptr} + , SocketSubsystem{WrappedSocketSubsystem} +{ + FQueuedThreadPool* ThreadPool = FQueuedThreadPool::Allocate(); + int32 NumThreads = FGenericPlatformMath::Max(FPlatformMisc::NumberOfCores() / 2, 2); + if (!ThreadPool->Create(NumThreads, 8 * 1024)) { + UE_LOG(LogQuilkin, Error, TEXT("Couldn't allocate thread pool")); + } + this->PingThreadPool = ThreadPool; +} + +FQuilkinSocketSubsystem::~FQuilkinSocketSubsystem() +{ +} + +bool FQuilkinSocketSubsystem::Init(FString& Error) +{ + UE_LOG(LogQuilkin, Display, TEXT("Initialising Socket Subsystem")); + PingSocket = CreateRawSocket(TEXT("QuilkinPingSocket")); + FQuilkinDelegates::GetQuilkinEndpointMeasurements.BindRaw(this, &FQuilkinSocketSubsystem::GetEndpointMeasurements); + FQuilkinDelegates::GetLowestLatencyEndpoint.BindRaw(this, &FQuilkinSocketSubsystem::GetLowestLatencyEndpoint); + FQuilkinDelegates::GetLowestLatencyEndpointInRegion.BindRaw(this, &FQuilkinSocketSubsystem::GetLowestLatencyEndpointInRegion); + FQuilkinDelegates::GetLowestLatencyToDatacenters.BindRaw(this, &FQuilkinSocketSubsystem::GetLowestLatencyToDatacenters); + + return true; +} + +void FQuilkinSocketSubsystem::Shutdown() +{ + if (PingSocket != nullptr) + { + PingSocket = nullptr; + } + + if (UQuilkinConfigSubsystem::IsAvailable()) { + auto Config = UQuilkinConfigSubsystem::Get(); + if (!Config->OnEndpointsChanged.IsBound()) { + Config->OnEndpointsChanged.Remove(UpdateEndpointHandle); + } + } + + if (FQuilkinDelegates::GetLowestLatencyEndpoint.IsBound()) { + FQuilkinDelegates::GetLowestLatencyEndpoint.Unbind(); + } + + if (FQuilkinDelegates::GetLowestLatencyEndpointInRegion.IsBound()) { + FQuilkinDelegates::GetLowestLatencyEndpointInRegion.Unbind(); + } + + if (FQuilkinDelegates::GetLowestLatencyToDatacenters.IsBound()) { + FQuilkinDelegates::GetLowestLatencyToDatacenters.Unbind(); + } +} + +bool FQuilkinSocketSubsystem::Tick(float DeltaTime) { + // We bind here as a "late initialisation" step, as the SocketSubsystem will be initialised before GEngine. + if (UQuilkinConfigSubsystem::IsAvailable() && !UpdateEndpointHandle.IsValid()) { + UpdateEndpointHandle = UQuilkinConfigSubsystem::Get()->OnEndpointsChanged.AddRaw(this, &FQuilkinSocketSubsystem::UpdateEndpoints); + } + + if (!UQuilkinConfigSubsystem::Get()->GetMeasureEndpoints()) + { + return true; + } + + TickElapsed += DeltaTime; + + if (TickElapsed >= 60) { + TickElapsed = 0; + + // If there's no internet + if (FGenericPlatformMisc::GetNetworkConnectionType() == ENetworkConnectionType::None) { + UE_LOG(LogQuilkin, Warning, TEXT("no internet connection available")); + return true; + } + + PingEndpoints(); + } + + return true; +} + +TArray> FQuilkinSocketSubsystem::GetEndpointMeasurements() { + return Endpoints.FilterMapToArray>([](FQuilkinEndpoint Endpoint, CircularBuffer Buffer) { + auto Median = Buffer.Median(); + if (Median == 0) { + return TOptional>(); + } + else { + return TOptional>(TTuple(Endpoint, Median)); + } + }); +} + +void FQuilkinSocketSubsystem::AllocatePingSocketsForEndpoints() { + SocketAllocationLock.Lock(); + + auto EndpointLength = Endpoints.Num(); + auto PingSocketLength = PingSockets.Num(); + if (PingSocketLength < EndpointLength) { + for (int32 i = 0; i < EndpointLength - PingSocketLength; i++) { + auto SocketResult = CreateRandomUdpSocket(); + + if (SocketResult.IsError()) { + UE_LOG(LogQuilkin, Error, TEXT("Couldn't allocate socket, message: %s"), *SocketResult.GetError()); + SocketAllocationLock.Unlock(); + return; + } + + PingSockets.Add(SocketResult.GetValue()); + } + } + else if (PingSocketLength > EndpointLength) { + for (int32 i = 0; i < PingSocketLength - EndpointLength; i++) { + if (!PingSockets.Pop()->Close()) { + auto ErrorCode = GetLastErrorCode(); + FString ErrorDescription = GetSocketError(ErrorCode); + UE_LOG(LogQuilkin, Error, TEXT("failed to close socket, code: %d, message: %s"), ErrorCode, *ErrorDescription); + } + } + } + + SocketAllocationLock.Unlock(); +} + +void FQuilkinSocketSubsystem::UpdateEndpoints(TArray NewEndpoints) { + Endpoints.ResetWithKeys(NewEndpoints, [](auto Endpoint) -> CircularBuffer { + return CircularBuffer(50); + }); + + PingEndpoints(); +} + +TOptional FQuilkinSocketSubsystem::GetLowestLatencyEndpoint() +{ + return GetLowestLatencyEndpointImplementation(TOptional()); +} + +TOptional> FQuilkinSocketSubsystem::GetLowestLatencyEndpointInRegion(FString Region) const +{ + return GetLowestLatencyEndpointImplementation(TOptional(Region)); +} + +TOptional> FQuilkinSocketSubsystem::GetLowestLatencyEndpointImplementation(TOptional Region) const +{ + if (Endpoints.IsEmpty()) { + return TOptional>(); + } + + bool NoMeasurements = true; + Endpoints.ForEach([&NoMeasurements](FQuilkinEndpoint Endpoint, CircularBuffer Buffer) { + if (!Buffer.IsEmpty()) { + NoMeasurements = false; + } + }); + + if (NoMeasurements) { + return TOptional>(); + } + + FQuilkinEndpoint LowestEndpoint; + int64 LowestLatency = INT64_MAX; + Endpoints.ForEach([&LowestEndpoint, &LowestLatency, &Region](FQuilkinEndpoint Endpoint, CircularBuffer Buffer) { + int64 Median = Buffer.Median(); + + // If the region has been set, and it doesn't match OR the median is zero, skip the endpoint. + if ((Region.IsSet() && Region.GetValue() != Endpoint.Region) || Median == 0) + { + return; + } + + if (Median < LowestLatency) + { + LowestEndpoint = Endpoint; + LowestLatency = Median; + } + }); + + return TOptional(TTuple(LowestEndpoint, LowestLatency)); + +} + +TOptional FQuilkinSocketSubsystem::GetLowestLatencyProxyToDatacenter(FString IcaoCode) const { + TOptional FoundEndpoint; + int64 LowestLatency = INT64_MAX; + + Endpoints.ForEach([this, &FoundEndpoint, LowestLatency, IcaoCode](FQuilkinEndpoint Endpoint, CircularBuffer Buffer) { + auto ProxyLatency = Buffer.Median(); + + const TArray* FoundEntries = Datacenters.Find(Endpoint); + + if (FoundEntries == nullptr) { + UE_LOG(LogQuilkin, Warning, TEXT("no measured datacenters for %s"), *Endpoint.ToString()); + return; + } + + auto FoundDatacenter = FoundEntries->FindByPredicate([IcaoCode](auto Datacenter) { + return Datacenter.IcaoCode == IcaoCode; + }); + + if (FoundDatacenter == nullptr) { + UE_LOG(LogQuilkin, Warning, TEXT("haven't measured %s for %s"), *IcaoCode, *Endpoint.ToString()); + return; + } + + if (FoundEndpoint.IsSet()) { + if (FoundDatacenter->TotalDistance(ProxyLatency) < LowestLatency) { + FoundEndpoint = Endpoint; + } + } + else { + FoundEndpoint = Endpoint; + } + }); + + return FoundEndpoint; +} + +TMap FQuilkinSocketSubsystem::GetLowestLatencyToDatacenters() const +{ + TMap Map; + + Endpoints.ForEach([this, &Map](FQuilkinEndpoint Endpoint, CircularBuffer Buffer) { + auto ProxyLatency = Buffer.Median(); + auto FoundDatacenters = Datacenters.Find(Endpoint); + + if (FoundDatacenters == nullptr) { + UE_LOG(LogQuilkin, Warning, TEXT("no measured datacenters for %s"), *Endpoint.ToString()); + return; + } + + for (auto& Datacenter : *FoundDatacenters) { + auto FoundEntry = Map.Find(Datacenter.IcaoCode); + + if (FoundEntry == nullptr) { + Map.Add(Datacenter.IcaoCode, Datacenter.TotalDistance(ProxyLatency)); + } + else { + auto TotalDistance = Datacenter.TotalDistance(ProxyLatency); + if (*FoundEntry > TotalDistance) { + Map.Add(Datacenter.IcaoCode, TotalDistance); + } + } + } + }); + + return Map; +} + +TResult FQuilkinSocketSubsystem::CreateRandomUdpSocket() +{ + FSocket* Socket = CreateRawSocket(TEXT("QuilkinPingSocket")); + + if (!Socket) + { + return TResult(TEXT("couldn't create ping socket")); + } + + return TResult(Socket); +} + +void FQuilkinSocketSubsystem::PingEndpoints() { + static std::atomic IS_ACTIVE(false); + if (Endpoints.IsEmpty()) { + UE_LOG(LogQuilkin, Verbose, TEXT("no endpoints to measure")); + return; + } + + if (IS_ACTIVE.load()) { + UE_LOG(LogQuilkin, Verbose, TEXT("ping task already executing")); + return; + } + + IS_ACTIVE.store(true); + AsyncTask(ENamedThreads::AnyBackgroundThreadNormalTask, [this]() { + auto Keys = Endpoints.GetKeys(); + + AllocatePingSocketsForEndpoints(); + + if (PingSockets.Num() != Endpoints.Num()) { + UE_LOG(LogQuilkin, Error, TEXT("Couldn't allocate enough sockets to measure latency")); + IS_ACTIVE.store(false); + return; + } + + TArray*> Tasks; + for (int32 Index = 0; Index < Endpoints.Num(); ++Index) + { + auto Task = new FAsyncTask(this, Keys, Index); + Task->StartBackgroundTask(PingThreadPool); + Tasks.Add(Task); + } + + for (auto Task : Tasks) + { + Task->EnsureCompletion(); + } + + if (UE_LOG_ACTIVE(LogQuilkin, Verbose)) { + auto Result = this->GetLowestLatencyEndpoint(); + + if (!Result.IsSet()) { + IS_ACTIVE.store(false); + return; + } + + auto Pair = Result.GetValue(); + FQuilkinEndpoint Endpoint = Pair.template Get<0>(); + UE_LOG(LogQuilkin, Verbose, TEXT("Lowest latency endpoint is %s (%dms)"), *Endpoint.ToString(), NanosToMillis(Pair.template Get<1>())); + } + + GetDatacenterLatencies(); + + if (UQuilkinConfigSubsystem::IsAvailable()) { + auto Config = UQuilkinConfigSubsystem::Get(); + if (Config->MeasurementCompleted.IsBound()) { + Config->MeasurementCompleted.Broadcast(); + } + } + + IS_ACTIVE.store(false); + }); +} + +TResult FQuilkinSocketSubsystem::SendPing(FSocket* Socket, FInternetAddr& AddrRef) +{ + uint32 PingCount = 5; + TArray Nonces; + auto Addr = AddrRef.Clone(); + Addr->SetPort(7600); + + UE_LOG(LogQuilkin, Verbose, TEXT("measuring latency to %s"), *Addr->ToString(true)); + + for (uint32 i = 0; i < PingCount; i++) { + auto Result = SendPacket(Socket, *Addr); + if (Result.IsError()) { + UE_LOG(LogQuilkin, Warning, TEXT("failed to send ping to %s: %s"), *Addr->ToString(true), *Result.GetError()); + } + else { + Nonces.Push(Result.GetValue()); + } + } + + if (Nonces.IsEmpty()) { + return TResult(TEXT("all pings to failed")); + } + + auto NewLatency = WaitForResponses(Socket, *Addr, PingCount, Nonces); + + if (NewLatency < 0ll) + return TResult(TEXT("exit requested")); + + UE_LOG(LogQuilkin, Verbose, TEXT("new measured latency for %s: %dms"), *Addr->ToString(true), NanosToMillis(NewLatency)); + return TResult(NewLatency); +} + +TResult FQuilkinSocketSubsystem::SendPacket(FSocket* Socket, FInternetAddr& Addr) +{ + auto Ping = FPing(); + auto Buffer = Ping.Encode(); + auto BytesSent = 0; + UE_LOG(LogQuilkin, Verbose, TEXT("sending ping to %s, Nonce: %d"), *Addr.ToString(true), Ping.GetNonce()); + if (!Socket->SendTo(Buffer.GetData(), Buffer.Num(), BytesSent, Addr)) { + auto ErrorCode = GetLastErrorCode(); + FString ErrorDescription = GetSocketError(ErrorCode); + return TResult(ErrorDescription); + } + else { + return TResult(Ping.GetNonce()); + } +} + +int64 FQuilkinSocketSubsystem::WaitForResponses(FSocket* Socket, FInternetAddr& Addr, uint32 PingCount, TArray Nonces) +{ + const double Timeout = 5.0; + const double StartTime = NowSeconds(); + uint32 ExpectedResponses = Nonces.Num(); + int32 ExceededTimeouts = 0; + CircularBuffer SuccessfulResponses = CircularBuffer(PingCount); + + while ((SuccessfulResponses.Num() < Nonces.Num()) && (ExceededTimeouts <= Nonces.Num()) && (NowSeconds() - StartTime < Timeout)) + { + if (IsEngineExitRequested()) + return -1ll; + + TArray Buffer; + Buffer.SetNumUninitialized(1024); + int32 BytesReceived = 0; + UE_LOG(LogQuilkin, Verbose, TEXT("waiting on ping response from %s"), *Addr.ToString(true)); + if (Socket->Wait(ESocketWaitConditions::WaitForRead, FTimespan::FromMilliseconds(500))) + { + if (Socket->RecvFrom(Buffer.GetData(), Buffer.Num(), BytesReceived, Addr)) + { + UE_LOG(LogQuilkin, Verbose, TEXT("received response from %s"), *Addr.ToString(true)); + auto Result = FPingReply::Decode(Buffer); + + if (Result.IsError()) { + UE_LOG(LogQuilkin, Warning, TEXT("failed to decode ping reply: %s"), *Result.GetError()); + continue; + } + + auto Packet = Result.GetValue()->AsVariant(); + + if (!Packet.IsSet()) { + UE_LOG(LogQuilkin, Warning, TEXT("expected ping reply, found: %d"), Result.GetValue()->GetCode()); + continue; + } + + auto Reply = Packet.GetValue(); + + if (!Nonces.Contains(Reply->GetNonce())) { + UE_LOG(LogQuilkin, Warning, TEXT("received nonce (%d) didn't match any sent nonce"), Reply->GetNonce()); + continue; + } + else { + UE_LOG(LogQuilkin, Verbose, TEXT("received nonce (%d)"), Reply->GetNonce()); + } + + SuccessfulResponses.Add(Reply->RoundTimeDelay()); + } + } + else + { + ExceededTimeouts += 1; + continue; + } + } + + if (ExceededTimeouts > 0) { + UE_LOG(LogQuilkin, Display, TEXT("%s exceeded WaitForRead timeout %d times"), *Addr.ToString(true), ExceededTimeouts); + } + + // If we sent less pings or we received less pings due to system errors or packet loss, + // then we penalise the endpoint's latency measurement for being inconsistent. + uint32 PenaltyFactor = (PingCount - ExpectedResponses) + (ExpectedResponses - SuccessfulResponses.Num()) + ExceededTimeouts; + for (uint32 i = 0; i < PenaltyFactor; i++) { + SuccessfulResponses.Add(DefaultPenaltyLatency); + } + + return SuccessfulResponses.Median(); +} + +void FQuilkinSocketSubsystem::GetDatacenterLatencies() { + if (!UQuilkinConfigSubsystem::IsAvailable()) { + UE_LOG(LogQuilkin, Display, TEXT("config subsystem unavailable, terminating GetDatacenterLatencies")); + return; + } + + auto Config = UQuilkinConfigSubsystem::Get(); + + Endpoints.ForEach([this, &Config](FQuilkinEndpoint Endpoint, CircularBuffer Buffer) { + auto LatencyInMillis = uint64(NanosToMillis(Buffer.Median())); + if (LatencyInMillis >= Config->GetPingThresholdMillis()) { + UE_LOG(LogQuilkin, Verbose, TEXT("Skipping %s, measured latency (%dms) > ping threshold (%dms"), *Endpoint.ToString(), LatencyInMillis, Config->GetPingThresholdMillis()); + return; + } + + auto HttpRequest = FHttpModule::Get().CreateRequest(); + HttpRequest->SetURL(Endpoint.ToQcmpInternetAddr(this).GetValue()->ToString(true)); + HttpRequest->SetVerb("GET"); + HttpRequest->SetTimeout(0.5); + + HttpRequest->OnProcessRequestComplete().BindLambda([this, Endpoint](FHttpRequestPtr Request, FHttpResponsePtr Response, bool bWasSuccessful) { + if (!bWasSuccessful || !Response.IsValid()) + { + UE_LOG(LogQuilkin, Warning, TEXT("GetDatacenters failed for %s: (was successful: %s), (response is valid: %s"), *Endpoint.ToString(), bWasSuccessful ? TEXT("true"): TEXT("false"), Response.IsValid() ? TEXT("true"): TEXT("false")); + return; + } + + FString JsonResponse = Response->GetContentAsString(); + + TSharedPtr JsonObject; + TSharedRef> Reader = TJsonReaderFactory<>::Create(JsonResponse); + if (FJsonSerializer::Deserialize(Reader, JsonObject) && JsonObject.IsValid()) + { + TArray DatacenterArray; + + for (auto& Elem : JsonObject->Values) + { + double Distance; + FString IcaoCode = Elem.Key; + if (Elem.Value->TryGetNumber(Distance)) { + UE_LOG(LogQuilkin, Verbose, TEXT("%s has %dms latency to %s"), *Endpoint.ToString(), NanosToMillis(std::llround(Distance)), *IcaoCode); + DatacenterArray.Add(FQuilkinDatacenter { + IcaoCode, + Distance, + }); + } + } + + this->Datacenters.Add(Endpoint, DatacenterArray); + } + }); + + HttpRequest->ProcessRequest(); + }); +} + +// MARK: ISocketSubsystem Interface + +FSocket* FQuilkinSocketSubsystem::CreateSocket(const FName& SocketType, const FString& SocketDescription, bool bForceUDP) +{ + return SocketSubsystem->CreateSocket(SocketType, SocketDescription, bForceUDP); +} + +FSocket* FQuilkinSocketSubsystem::CreateSocket(const FName& SocketType, const FString& SocketDescription, const FName& ProtocolName) +{ + FSocket* WrappedSocket = SocketSubsystem->CreateSocket(SocketType, SocketDescription, ProtocolName); + if (WrappedSocket == nullptr) + { + UE_LOG(LogQuilkin, Warning, TEXT("CreateSocket returned nullptr")); + return nullptr; + } + + ESocketType InSocketType = WrappedSocket->GetSocketType(); + auto Socket = new FQuilkinSocket(FUniqueSocket(WrappedSocket), InSocketType, SocketDescription, ProtocolName); + + Socket->Subsystem = AsWeak(); + return Socket; +} + +FSocket* FQuilkinSocketSubsystem::CreateRawSocket(const FString& SocketDescription) { + FName SocketType = NAME_DGram; + FSocket* WrappedSocket = SocketSubsystem->CreateSocket(SocketType, SocketDescription, true); + if (WrappedSocket == nullptr) + { + UE_LOG(LogQuilkin, Warning, TEXT("CreateSocket returned nullptr")); + return nullptr; + } + + ESocketType InSocketType = WrappedSocket->GetSocketType(); + return WrappedSocket; +}; + +void FQuilkinSocketSubsystem::DestroySocket(FSocket* Socket) +{ + SocketSubsystem->DestroySocket(Socket); +} + +FResolveInfoCached* FQuilkinSocketSubsystem::CreateResolveInfoCached(TSharedPtr Addr) const +{ + return SocketSubsystem->CreateResolveInfoCached(Addr); +} + +FAddressInfoResult FQuilkinSocketSubsystem::GetAddressInfo(const TCHAR* HostName, const TCHAR* ServiceName, EAddressInfoFlags QueryFlags, const FName ProtocolTypeName, ESocketType SocketType) +{ + return SocketSubsystem->GetAddressInfo(HostName, ServiceName, QueryFlags, ProtocolTypeName, SocketType); +} + +void FQuilkinSocketSubsystem::GetAddressInfoAsync(FAsyncGetAddressInfoCallback Callback, const TCHAR* HostName, const TCHAR* ServiceName, EAddressInfoFlags QueryFlags, const FName ProtocolTypeName, ESocketType SocketType) +{ + SocketSubsystem->GetAddressInfoAsync(Callback, HostName, ServiceName, QueryFlags, ProtocolTypeName, SocketType); +} + +TSharedPtr FQuilkinSocketSubsystem::GetAddressFromString(const FString& InAddress) +{ + return SocketSubsystem->GetAddressFromString(InAddress); +} + +FResolveInfo* FQuilkinSocketSubsystem::GetHostByName(const ANSICHAR* HostName) +{ + return SocketSubsystem->GetHostByName(HostName); +} + +bool FQuilkinSocketSubsystem::RequiresChatDataBeSeparate() +{ + return SocketSubsystem->RequiresChatDataBeSeparate(); +} + +bool FQuilkinSocketSubsystem::RequiresEncryptedPackets() +{ + return SocketSubsystem->RequiresEncryptedPackets(); +} + +bool FQuilkinSocketSubsystem::GetHostName(FString& HostName) +{ + return SocketSubsystem->GetHostName(HostName); +} + +TSharedRef FQuilkinSocketSubsystem::CreateInternetAddr() +{ + return SocketSubsystem->CreateInternetAddr(); +} + +TSharedRef FQuilkinSocketSubsystem::CreateInternetAddr(const FName ProtocolType) +{ + return SocketSubsystem->CreateInternetAddr(ProtocolType); +} + +TSharedRef FQuilkinSocketSubsystem::GetLocalBindAddr(FOutputDevice& Out) +{ + return SocketSubsystem->GetLocalBindAddr(Out); +} + +TArray> FQuilkinSocketSubsystem::GetLocalBindAddresses() +{ + return SocketSubsystem->GetLocalBindAddresses(); +} + +bool FQuilkinSocketSubsystem::GetLocalAdapterAddresses(TArray>& OutAddresses) +{ + return SocketSubsystem->GetLocalAdapterAddresses(OutAddresses); +} + +TUniquePtr FQuilkinSocketSubsystem::CreateRecvMulti(int32 MaxNumPackets, int32 MaxPacketSize, ERecvMultiFlags Flags) +{ + return SocketSubsystem->CreateRecvMulti(MaxNumPackets, MaxPacketSize, Flags); +} + +TSharedRef FQuilkinSocketSubsystem::GetLocalHostAddr(FOutputDevice& Out, bool& bCanBindAll) +{ + return SocketSubsystem->GetLocalHostAddr(Out, bCanBindAll); +} + +bool FQuilkinSocketSubsystem::GetMultihomeAddress(TSharedRef& Addr) +{ + return SocketSubsystem->GetMultihomeAddress(Addr); +} + +bool FQuilkinSocketSubsystem::HasNetworkDevice() +{ + return SocketSubsystem->HasNetworkDevice(); +} + +const TCHAR* FQuilkinSocketSubsystem::GetSocketAPIName() const +{ + return SocketSubsystem->GetSocketAPIName(); +} + +ESocketErrors FQuilkinSocketSubsystem::GetLastErrorCode() +{ + return SocketSubsystem->GetLastErrorCode(); +} + +ESocketErrors FQuilkinSocketSubsystem::TranslateErrorCode(int32 Code) +{ + return SocketSubsystem->TranslateErrorCode(Code); +} + +bool FQuilkinSocketSubsystem::IsSocketRecvMultiSupported() const +{ + return SocketSubsystem->IsSocketRecvMultiSupported(); +} + +bool FQuilkinSocketSubsystem::IsSocketWaitSupported() const +{ + return SocketSubsystem->IsSocketWaitSupported(); +} + +double FQuilkinSocketSubsystem::TranslatePacketTimestamp(const FPacketTimestamp& Timestamp, ETimestampTranslation Translation) +{ + return SocketSubsystem->TranslatePacketTimestamp(Timestamp, Translation); +} + +bool FQuilkinSocketSubsystem::IsRecvFromWithPktInfoSupported() const +{ + return SocketSubsystem->IsRecvFromWithPktInfoSupported(); +} diff --git a/sdks/ue5/Source/Quilkin/Private/QuilkinSocketSubsystem.h b/sdks/ue5/Source/Quilkin/Private/QuilkinSocketSubsystem.h new file mode 100644 index 0000000000..55a958932e --- /dev/null +++ b/sdks/ue5/Source/Quilkin/Private/QuilkinSocketSubsystem.h @@ -0,0 +1,176 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "CoreMinimal.h" +#include "Misc/QueuedThreadPool.h" +#include "SocketSubsystem.h" + +#include "QuilkinConstants.h" +#include "QuilkinConcurrentMap.h" +#include "QuilkinEndpoint.h" +#include "QuilkinCircularBuffer.h" +#include "QuilkinSocket.h" + +struct FQuilkinEndpoint; + +#define QUILKIN_SOCKETSUBSYSTEM_NAME TEXT("Quilkin") + +struct FQuilkinDatacenter { + FString IcaoCode; + double Distance; + + int64 TotalDistance(int64 ProxyLatency) const { + int64 RoundedLatency = std::llround(Distance); + return ProxyLatency + RoundedLatency; + } +}; + +class FQuilkinSocketSubsystem : public ISocketSubsystem, + public TSharedFromThis, + public FTSTickerObjectBase +{ +public: + FSocket* PingSocket; + FDelegateHandle UpdateEndpointHandle; + FQueuedThreadPool* PingThreadPool; + + FQuilkinSocketSubsystem(ISocketSubsystem* WrappedSocketSubsystem); + virtual ~FQuilkinSocketSubsystem(); + + FSocket* CreateRawSocket(const FString& SocketDescription); + bool Tick(float DeltaTime) override; + + + //~ Begin ISocketSubsystem Interface + virtual bool Init(FString& Error) override; + virtual void Shutdown() override; + virtual FSocket* CreateSocket(const FName& SocketType, const FString& SocketDescription, bool bForceUDP) override; + virtual FSocket* CreateSocket(const FName& SocketType, const FString& SocketDescription, const FName& ProtocolName) override; + virtual void DestroySocket(FSocket* Socket) override; + virtual class FResolveInfoCached* CreateResolveInfoCached(TSharedPtr Addr) const override; + virtual FAddressInfoResult GetAddressInfo(const TCHAR* HostName, const TCHAR* ServiceName = nullptr, EAddressInfoFlags QueryFlags = EAddressInfoFlags::Default, const FName ProtocolTypeName = NAME_None, ESocketType SocketType = ESocketType::SOCKTYPE_Unknown) override; + virtual void GetAddressInfoAsync(FAsyncGetAddressInfoCallback Callback, const TCHAR* HostName, + const TCHAR* ServiceName, EAddressInfoFlags QueryFlags, + const FName ProtocolTypeName, + ESocketType SocketType) override; + virtual TSharedPtr GetAddressFromString(const FString& InAddress) override; + virtual class FResolveInfo* GetHostByName(const ANSICHAR* HostName); + virtual bool RequiresChatDataBeSeparate() override; + virtual bool RequiresEncryptedPackets() override; + virtual bool GetHostName(FString& HostName) override; + virtual TSharedRef CreateInternetAddr() override; + virtual TSharedRef CreateInternetAddr(const FName ProtocolType) override; + virtual TSharedRef GetLocalBindAddr(FOutputDevice& Out) override; + virtual TArray> GetLocalBindAddresses() override; + virtual bool GetLocalAdapterAddresses(TArray>& OutAddresses) override; + virtual TUniquePtr CreateRecvMulti(int32 MaxNumPackets, int32 MaxPacketSize, + ERecvMultiFlags Flags = ERecvMultiFlags::None) override; + virtual TSharedRef GetLocalHostAddr(FOutputDevice& Out, bool& bCanBindAll) override; + virtual bool GetMultihomeAddress(TSharedRef& Addr) override; + virtual bool HasNetworkDevice() override; + virtual const TCHAR* GetSocketAPIName() const override; + virtual ESocketErrors GetLastErrorCode() override; + virtual ESocketErrors TranslateErrorCode(int32 Code) override; + virtual bool IsSocketRecvMultiSupported() const override; + virtual bool IsSocketWaitSupported() const override; + virtual double TranslatePacketTimestamp(const FPacketTimestamp& Timestamp, + ETimestampTranslation Translation) override; + virtual bool IsRecvFromWithPktInfoSupported() const override; + //~ End ISocketSubsystem Interface + + TResult SendPing(FSocket* Socket, FInternetAddr& Endpoint); + TResult SendPacket(FSocket* Socket, FInternetAddr& Endpoint); + TArray> GetEndpointMeasurements(); + + TArray PingSockets; + TSConcurrentMap> Endpoints; +protected: + ISocketSubsystem* SocketSubsystem; + TSConcurrentMap> Datacenters; + FCriticalSection SocketAllocationLock; + float TickElapsed = 0; + + TOptional GetLowestLatencyProxyToDatacenter(FString Datacenter) const; + TMap GetLowestLatencyToDatacenters() const; + void GetDatacenterLatencies(); + void AllocatePingSocketsForEndpoints(); + TResult CreateRandomUdpSocket(); + static int64 WaitForResponses(FSocket* Socket, FInternetAddr& Endpoint, uint32 PingCount, TArray Nonces); + void PingEndpoints(); + /* Returns a tuple of the latency with the lowest median latency, along with + the median latency of that endpoint. Returns `None` if no endpoints available. + */ + TOptional> GetLowestLatencyEndpoint(); + TOptional> GetLowestLatencyEndpointInRegion(FString Region) const; + /* Shared implementation between `GetLowestLatencyEndpoint` and `GetLowestLatencyEndpointInRegion`. */ + TOptional> GetLowestLatencyEndpointImplementation(TOptional Region) const; + void UpdateEndpoints(TArray); +}; + +class FPingTask : public FNonAbandonableTask +{ + friend class FAsyncTask; +public: + FPingTask(FQuilkinSocketSubsystem* InSubsystem, TArray& InKeys, int32 TaskIndex) + : Subsystem(InSubsystem) + , Keys(InKeys) + , Index(TaskIndex) + {} + + void DoWork() + { + if (Index >= Keys.Num() || Index >= Subsystem->PingSockets.Num()) { + UE_LOG(LogQuilkin, Warning, TEXT("Cancelling task as index greater than available sockets")); + return; + } + + auto Endpoint = Keys[Index]; + auto Socket = Subsystem->PingSockets[Index]; + auto EndpointResult = Endpoint.ToInternetAddr(Subsystem); + + if (EndpointResult.IsError()) { + UE_LOG(LogQuilkin, Warning, TEXT("Couldn't resolve %s to an IP address, adding %dms penalty"), *Endpoint.Host, NanosToMillis(DefaultPenaltyLatency)); + Subsystem->Endpoints.FindOrDefaultToAdd(Endpoint, DefaultPenaltyLatency); + return; + } + TSharedRef Addr = EndpointResult.GetValue(); + FInternetAddr& Ptr = Addr.Get(); + auto PingResult = Subsystem->SendPing(Socket, Ptr); + if (PingResult.IsError()) { + UE_LOG(LogQuilkin, Warning, TEXT("ping for %s failed: %s, adding %dms penalty"), *Endpoint.Host, *PingResult.GetError(), NanosToMillis(DefaultPenaltyLatency)); + Subsystem->Endpoints.FindOrDefaultToAdd(Endpoint, DefaultPenaltyLatency); + return; + } + + auto Latency = PingResult.GetValue(); + Subsystem->Endpoints.FindOrDefaultToAdd(Endpoint, Latency); + } + + FORCEINLINE TStatId GetStatId() const + { + RETURN_QUICK_DECLARE_CYCLE_STAT(FMyTask, STATGROUP_ThreadPoolAsyncTasks); + } + +private: + FQuilkinSocketSubsystem* Subsystem; + TArray& Keys; + int32 Index; +}; + diff --git a/sdks/ue5/Source/Quilkin/Private/Tests/QuilkinControlMessageProtocolTest.cpp b/sdks/ue5/Source/Quilkin/Private/Tests/QuilkinControlMessageProtocolTest.cpp new file mode 100644 index 0000000000..9623384b56 --- /dev/null +++ b/sdks/ue5/Source/Quilkin/Private/Tests/QuilkinControlMessageProtocolTest.cpp @@ -0,0 +1,185 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#if WITH_DEV_AUTOMATION_TESTS + +#include "../QuilkinControlMessageProtocol.h" +#include "../QuilkinLog.h" + +#include "CoreMinimal.h" +#include "Misc/AutomationTest.h" + +template +static bool ArrayIsEqual(const TArray& Array1, const TArray& Array2) +{ + if (Array1.Num() != Array2.Num()) + { + return false; + } + + for (int32 i = 0; i < Array1.Num(); ++i) + { + if (!(Array1[i] == Array2[i])) + { + return false; + } + } + + return true; +} + +IMPLEMENT_SIMPLE_AUTOMATION_TEST(FTestPing, "Quilkin.Protocol.Ping", EAutomationTestFlags::EditorContext | EAutomationTestFlags::EngineFilter) + +bool FTestPing::RunTest(const FString& Parameters) +{ + const TArray Input = { + // Magic + 'Q', 'L', 'K', 'N', + // Version + 0, + // Code + 0, + // Length + 0, 9, + // Nonce + 0xBF, + // Payload + 0, 0, 0, 0, 0x63, 0xb6, 0xe9, 0x57 + }; + + auto Result = FProtocolVariant::Decode(Input); + TestTrue("Decode Ping", Result.IsSuccess()); + TSharedPtr Variant = Result.GetValue(); + TestTrue("Nonce Matches", Variant->GetNonce() == 0xBF); + TestTrue("Ping Variant", Variant->GetCode() == 0); + // FPing* Ping = Cast(*Variant.Get()); + // TestTrue("Ping Timestamp", Ping->GetTimestamp() == 0x63B6E957); + + // Encode + auto Archive = Result.GetValue()->Encode(); + TArray Buffer = static_cast&>(Archive); + TestTrue("Encoded Ping Equals Input", ArrayIsEqual(Buffer, Input)); + return true; +} + +IMPLEMENT_SIMPLE_AUTOMATION_TEST(FTestPingReply, "Quilkin.Protocol.PingReply", EAutomationTestFlags::EditorContext | EAutomationTestFlags::EngineFilter) + +bool FTestPingReply::RunTest(const FString& Parameters) +{ + const TArray Input = { + // Magic + 'Q', 'L', 'K', 'N', + // Version + 0, + // Code + 1, + // Length + 0, 25, + // Nonce + 0xBF, + // Payload + 0, 0, 0, 0, 0x63, 0xb6, 0xe9, 0x57, + 0, 0, 0, 0, 0x63, 0xb6, 0xe9, 0x57, + 0, 0, 0, 0, 0x63, 0xb6, 0xe9, 0x57 + }; + + auto Result = FProtocolVariant::Decode(Input); + TestTrue("Decode PingReply", Result.IsSuccess()); + TSharedPtr Variant = Result.GetValue(); + TestTrue("Nonce Matches", Variant->GetNonce() == 0xBF); + TestTrue("PingReply Variant", Variant->GetCode() == 1); + auto Archive = Variant->Encode(); + TArray Buffer = static_cast&>(Archive); + TestTrue("Encoded PingReply Equals Input", ArrayIsEqual(Buffer, Input)); + return true; +} + +IMPLEMENT_SIMPLE_AUTOMATION_TEST(FTestRejectMalformedPacket, "Quilkin.Protocol.RejectMalformedPacket", EAutomationTestFlags::EditorContext | EAutomationTestFlags::EngineFilter) + +bool FTestRejectMalformedPacket::RunTest(const FString& Parameters) +{ + const TArray Input = { + // Magic + 'Q', 'L', 'K', 'N', + // Version + 0, + // Code (intentionally Ping) + 0, + // Length + 0, 25, + // Nonce + 0xBF, + // Payload + 0, 0, 0, 0, 0x63, 0xb6, 0xe9, 0x57, + 0, 0, 0, 0, 0x63, 0xb6, 0xe9, 0x57, + 0, 0, 0, 0, 0x63, 0xb6, 0xe9, 0x57 + }; + + auto Result = FProtocolVariant::Decode(Input); + TestTrue("Reject Malformed Packet", Result.IsError()); + return true; +} + +IMPLEMENT_SIMPLE_AUTOMATION_TEST(FTestRejectUnknownPacket, "Quilkin.Protocol.RejectUnknownPacket", EAutomationTestFlags::EditorContext | EAutomationTestFlags::EngineFilter) + +bool FTestRejectUnknownPacket::RunTest(const FString& Parameters) +{ + const TArray Input = { + // Magic + 'Q', 'L', 'K', 'N', + // Version + 0, + // Code + 0xff + }; + + auto Result = FProtocolVariant::Decode(Input); + TestTrue("Reject Unknown Packet", Result.IsError()); + return true; +} + +IMPLEMENT_SIMPLE_AUTOMATION_TEST(FTestRejectUnknownVersion, "Quilkin.Protocol.RejectUnknownVersion", EAutomationTestFlags::EditorContext | EAutomationTestFlags::EngineFilter) + +bool FTestRejectUnknownVersion::RunTest(const FString& Parameters) +{ + const TArray Input = { + // Magic + 'Q', 'L', 'K', 'N', + // Version + 0xff + }; + + auto Result = FProtocolVariant::Decode(Input); + TestTrue("Reject Unknown Version", Result.IsError()); + return true; +} + +IMPLEMENT_SIMPLE_AUTOMATION_TEST(FTestRejectNoMagicHeader, "Quilkin.Protocol.RejectNoMagicHeader", EAutomationTestFlags::EditorContext | EAutomationTestFlags::EngineFilter) + +bool FTestRejectNoMagicHeader::RunTest(const FString& Parameters) +{ + const TArray Input = { + 0xff, 0xff, 0, 0, 0, 0, 0x63, 0xb6, 0xe9, 0x57 + }; + + auto Result = FProtocolVariant::Decode(Input); + TestTrue("Reject No Magic Header", Result.IsError()); + return true; +} + +#endif diff --git a/sdks/ue5/Source/Quilkin/Public/QuilkinDelegates.h b/sdks/ue5/Source/Quilkin/Public/QuilkinDelegates.h new file mode 100644 index 0000000000..29c09f84e6 --- /dev/null +++ b/sdks/ue5/Source/Quilkin/Public/QuilkinDelegates.h @@ -0,0 +1,57 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "CoreMinimal.h" +#include "UObject/ObjectMacros.h" +#include "UObject/UObjectGlobals.h" +#include "UObject/Object.h" +#include "Templates/Tuple.h" +#include "QuilkinEndpoint.h" + +using EndpointMap = TMap ; +using DatacenterMap = TMap ; +class QUILKIN_API FQuilkinDelegates +{ +public: + /** + * Delegate used to get a copy of the proxy endpoints with their latest median latency. + */ + DECLARE_DELEGATE_RetVal(TArray, FGetQuilkinEndpointMeasurements); + static FGetQuilkinEndpointMeasurements GetQuilkinEndpointMeasurements; + + /** + * Delegate used to get the endpoint with the lowest median latency. Returns `None` if + * there are no endpoints, or `MeasureEndpoints` is `false`. + */ + DECLARE_DELEGATE_RetVal(TOptional, FGetLowestLatencyEndpoint); + static FGetLowestLatencyEndpoint GetLowestLatencyEndpoint; + + /** + * Delegate used to get the endpoint that matches the `Region` paramaeter with the + * lowest median latency. Returns `None` if there are no endpoints matching that region, + * or `MeasureEndpoints` is `false`. + */ + DECLARE_DELEGATE_RetVal_OneParam(TOptional, FGetLowestLatencyEndpointInRegion, FString); + static FGetLowestLatencyEndpointInRegion GetLowestLatencyEndpointInRegion; + + /** + * Delegate used to get the lowest latency measurement to each datacenter. + */ + DECLARE_DELEGATE_RetVal(DatacenterMap, FGetLowestLatencyToDatacenters); + static FGetLowestLatencyToDatacenters GetLowestLatencyToDatacenters; +}; diff --git a/sdks/ue5/Source/Quilkin/Public/QuilkinEndpoint.h b/sdks/ue5/Source/Quilkin/Public/QuilkinEndpoint.h new file mode 100644 index 0000000000..bb5cb3fbc4 --- /dev/null +++ b/sdks/ue5/Source/Quilkin/Public/QuilkinEndpoint.h @@ -0,0 +1,72 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "CoreMinimal.h" +#include "CoreTypes.h" +#include "Containers/StringConv.h" +#include "Containers/UnrealString.h" +#include "../Private/QuilkinLog.h" +#include "../Private/QuilkinConcurrentMap.h" +#include "../Private/QuilkinResult.h" +#include "IPAddress.h" + +#include "QuilkinEndpoint.generated.h" + +class FQuilkinSocketSubsystem; + +struct ResolveError {}; + +/* Represents a Quilkin proxy endpoint */ +USTRUCT() +struct FQuilkinEndpoint { + GENERATED_BODY() + /* Same as `ToInternetAddr` but uses the `QcmpPort`. */ + const TResult, ResolveError> ToInternetAddrBase(FQuilkinSocketSubsystem* SocketSubsystem, FString Host, uint16 Port) const; +public: + UPROPERTY(config, EditAnywhere, Category = Quilkin) + FString Host; + UPROPERTY(config, EditAnywhere, Category = Quilkin) + uint16 QcmpPort = 7600; + UPROPERTY(config, EditAnywhere, Category = Quilkin) + uint16 TrafficPort = 0; + UPROPERTY(config, EditAnywhere, Category = Quilkin) + FString Region; + + /* Resolves `Host` and `TrafficPort` into a `FInternetAddr`, providing a `ResolveError` if there was + problems resolving it. */ + const TResult, ResolveError> ToInternetAddr(FQuilkinSocketSubsystem* SocketSubsystem) const; + /* Same as `ToInternetAddr` but uses the `QcmpPort`. */ + const TResult, ResolveError> ToQcmpInternetAddr(FQuilkinSocketSubsystem* SocketSubsystem) const; + + const FString ToString() const + { + return FString::Printf(TEXT("%s:%d"), *Host, TrafficPort); + } + + friend int32 GetTypeHash(const FQuilkinEndpoint& Endpoint) + { + return HashCombine(GetTypeHash(Endpoint.Host), GetTypeHash(Endpoint.TrafficPort)); + } + + friend bool operator==(const FQuilkinEndpoint& A, const FQuilkinEndpoint& B) + { + return A.Host == B.Host && A.TrafficPort == B.TrafficPort; + } +}; + +using EndpointPair = TTuple; diff --git a/sdks/ue5/Source/Quilkin/Public/QuilkinSettings.h b/sdks/ue5/Source/Quilkin/Public/QuilkinSettings.h new file mode 100644 index 0000000000..f4977343e4 --- /dev/null +++ b/sdks/ue5/Source/Quilkin/Public/QuilkinSettings.h @@ -0,0 +1,177 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "CoreMinimal.h" +#include "Engine/DeveloperSettings.h" +#include "QuilkinEndpoint.h" + +#include "QuilkinSettings.generated.h" + +UCLASS(config = Game) +class QUILKIN_API UQuilkinDeveloperSettings : public UDeveloperSettings +{ + GENERATED_BODY() + +public: + // UDeveloperSettings overrides + UQuilkinDeveloperSettings(const FObjectInitializer& ObjectInitializer) {}; + + virtual FName GetContainerName() const { return FName("Project"); } + virtual FName GetCategoryName() const { return FName("Plugins"); } + virtual FName GetSectionName() const { return FName("Quilkin"); } + + // virtual FText GetSectionText() const override + // { + // return NSLOCTEXT("Quilkin", "QuilkinSettingsName", "Quilkin"); + // } + + // virtual FText GetSectionDescription() const override + // { + // return NSLOCTEXT("Quilkin", "QuilkinSettingsDescription", "Configure the Quilkin plugin"); + // } + +public: + + bool IsEnabled() const { + return Enabled && IsEnabledInEditor(); + } + + bool IsEnabledInEditor() const { +#if WITH_EDITOR + return EnabledInPie; +#else + return true; +#endif + } + + /** The token used to route traffic from the proxy to the appropiate gameserver */ + UPROPERTY(config, EditAnywhere, Category = Settings) + TArray RoutingToken; + + /** Whether to use Quilkin proxy routing in-game */ + UPROPERTY(config, EditAnywhere, Category = Settings) + bool Enabled = false; + + /** Whether to use Quilkin proxy routing in-editor */ + UPROPERTY(config, EditAnywhere, Category = Settings) + bool EnabledInPie = false; + + /** Whether to regularly measure each endpoint in `Endpoints`'s latency. */ + UPROPERTY(config, EditAnywhere, Category = Settings) + bool MeasureEndpoints = false; + + /** The amount of time (in milliseconds) that Quilkin will consider a proxy too + * far to be worth measuring the full datacenter latency. + */ + UPROPERTY(config, EditAnywhere, Category = Settings) + uint64 PingThresholdMillis = 185; + + /** The amount of time (in milliseconds) that Quilkin should wait before switching + * to the next available proxy. + */ + UPROPERTY(config, EditAnywhere, Category = Settings) + uint64 JitterThreshold = 150; + + /** List of endpoints to Quilkin proxies */ + UPROPERTY(config, EditAnywhere, Category = Settings) + TArray Endpoints; +}; + +/** Defines a property and a delegate for that property, and provides a getter + * which will call the delegate if bound, otherwise will call the primitive property. + */ +#define DECLARE_PROPERTY_AND_DELEGATE(Type, PropName) \ + private: \ + UPROPERTY(EditAnywhere, Category = "Quilkin") \ + Type PropName; \ + public: \ + DECLARE_DELEGATE_RetVal(Type, F##PropName##BindingDelegate); \ + F##PropName##BindingDelegate PropName##Binding; \ + DECLARE_MULTICAST_DELEGATE_OneParam(F##PropName##ChangedDelegate, Type); \ + F##PropName##ChangedDelegate On##PropName##Changed; \ + public: \ + Type Get##PropName() \ + { \ + if (PropName##Binding.IsBound()) \ + { \ + return PropName##Binding.Execute(); \ + } \ + else \ + { \ + return PropName; \ + } \ + } \ + void Set##PropName(Type Value) \ + { \ + checkf(! PropName##Binding.IsBound(), TEXT("Cannot call Set##PropName with PropName##Binding set.")); \ + PropName = Value; \ + if (On##PropName##Changed.IsBound()) { \ + On##PropName##Changed.Broadcast(PropName); \ + } \ + } + + +UCLASS() +class QUILKIN_API UQuilkinConfigSubsystem : public UEngineSubsystem +{ + GENERATED_BODY() + + UQuilkinConfigSubsystem() { + UE_LOG(LogQuilkin, Display, TEXT("Initialising UQuilkinConfigSubsystem")); + const UQuilkinDeveloperSettings* DefaultSettings = GetDefault(); + Enabled = DefaultSettings->IsEnabled(); + RoutingToken = DefaultSettings->RoutingToken; + MeasureEndpoints = DefaultSettings->MeasureEndpoints; + PingThresholdMillis = DefaultSettings->PingThresholdMillis; + JitterThreshold = DefaultSettings->JitterThreshold; + Endpoints = DefaultSettings->Endpoints; + } + + virtual void Deinitialize() override { + UE_LOG(LogQuilkin, Display, TEXT("Tearing down UQuilkinConfigSubsystem")); + } + +public: + static bool IsAvailable() { + return GEngine != nullptr && GEngine->GetEngineSubsystem() != nullptr; + } + + static UQuilkinConfigSubsystem* Get() { + checkf(GEngine != nullptr, TEXT("UQuilkinConfigSubsystem can only be called inside an Engine context")); + UQuilkinConfigSubsystem* Subsystem = GEngine->GetEngineSubsystem(); + checkf(Subsystem != nullptr, TEXT("UQuilkinConfigSubsystem hasn't been initialised")); + return Subsystem; + } + + /** Whether sockets should add routing tokens to packets */ + UPROPERTY(EditAnywhere, Category = "Quilkin") + bool PacketHandling; + + DECLARE_PROPERTY_AND_DELEGATE(bool, Enabled); + DECLARE_PROPERTY_AND_DELEGATE(bool, MeasureEndpoints); + DECLARE_PROPERTY_AND_DELEGATE(uint64, PingThresholdMillis); + DECLARE_PROPERTY_AND_DELEGATE(TArray, RoutingToken); + DECLARE_PROPERTY_AND_DELEGATE(TArray, Endpoints); + + UPROPERTY(EditAnywhere, Category = "Quilkin") + uint64 JitterThreshold; + + DECLARE_MULTICAST_DELEGATE(FMeasurementCompletedDelegate); + FMeasurementCompletedDelegate MeasurementCompleted; +}; + diff --git a/sdks/ue4/Source/Quilkin/Quilkin.build.cs b/sdks/ue5/Source/Quilkin/Quilkin.build.cs similarity index 79% rename from sdks/ue4/Source/Quilkin/Quilkin.build.cs rename to sdks/ue5/Source/Quilkin/Quilkin.build.cs index 6ff9c5431e..835627b87f 100644 --- a/sdks/ue4/Source/Quilkin/Quilkin.build.cs +++ b/sdks/ue5/Source/Quilkin/Quilkin.build.cs @@ -1,5 +1,5 @@ /* - * Copyright 2022 Google LLC + * Copyright 2024 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,14 +24,22 @@ public Quilkin(ReadOnlyTargetRules Target) : base(Target) PCHUsage = PCHUsageMode.UseExplicitOrSharedPCHs; PrivateDependencyModuleNames.AddRange(new string[] { - "Sockets", "Json", - "Engine" + "HTTP", + "Networking", }); PublicDependencyModuleNames.AddRange(new string[] { + "Sockets", "Core", - "CoreUObject", + "CoreUObject", + "Engine", + "DeveloperSettings", + "InputCore", + }); + + PrivateIncludePaths.AddRange(new string[] { + "Quilkin/Private/Tests", }); } }