Skip to content

Commit

Permalink
Fix chat race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
MattiaFioretti committed Aug 28, 2023
1 parent b2d3fa2 commit 4bbc770
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,7 @@
import com.velocitypowered.proxy.protocol.netty.MinecraftVarintLengthEncoder;
import com.velocitypowered.proxy.util.except.QuietDecoderException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
import io.netty.channel.*;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
Expand All @@ -66,6 +61,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.jetbrains.annotations.NotNull;

/**
* A utility class to make working with the pipeline a little less painful and transparently handles
Expand Down Expand Up @@ -99,7 +95,7 @@ public MinecraftConnection(Channel channel, VelocityServer server) {
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
public void channelActive(@NotNull ChannelHandlerContext ctx) {
if (sessionHandler != null) {
sessionHandler.connected();
}
Expand All @@ -110,7 +106,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
public void channelInactive(@NotNull ChannelHandlerContext ctx) {
if (sessionHandler != null) {
sessionHandler.disconnected();
}
Expand All @@ -123,7 +119,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
public void channelRead(@NotNull ChannelHandlerContext ctx, @NotNull Object msg) {
try {
if (sessionHandler == null) {
// No session handler available, do nothing
Expand Down Expand Up @@ -215,11 +211,12 @@ public EventLoop eventLoop() {
*
* @param msg the message to write
*/
public void write(Object msg) {
public ChannelFuture write(Object msg) {
if (channel.isActive()) {
channel.writeAndFlush(msg, channel.voidPromise());
return channel.writeAndFlush(msg, channel.newPromise());
} else {
ReferenceCountUtil.release(msg);
return null;
}
}

Expand Down Expand Up @@ -314,10 +311,6 @@ public StateRegistry getState() {
return state;
}

public boolean isAutoReading() {
return channel.config().isAutoRead();
}

public boolean isKnownDisconnect() {
return knownDisconnect;
}
Expand Down Expand Up @@ -471,10 +464,6 @@ public void enableEncryption(byte[] secret) throws GeneralSecurityException {
channel.pipeline().fireUserEventTriggered(VelocityConnectionEvent.ENCRYPTION_ENABLED);
}

public @Nullable MinecraftConnectionAssociation getAssociation() {
return association;
}

public void setAssociation(MinecraftConnectionAssociation association) {
ensureInEventLoop();
this.association = association;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.velocitypowered.proxy.connection.MinecraftConnection;
import com.velocitypowered.proxy.connection.client.ConnectedPlayer;
import com.velocitypowered.proxy.protocol.MinecraftPacket;
import io.netty.channel.ChannelFuture;

import java.time.Instant;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -60,7 +61,6 @@ public void queuePacket(CompletableFuture<MinecraftPacket> nextPacket, Instant t

CompletableFuture<WrappedPacket> nextInLine = WrappedPacket.wrap(timestamp, nextPacket);
this.packetFuture = awaitChat(smc, this.packetFuture, nextInLine);
;
}
}

Expand All @@ -87,30 +87,31 @@ public <K, V extends MinecraftPacket> void hijack(K packet,
private static Function<WrappedPacket, WrappedPacket> writePacket(MinecraftConnection connection) {
return wrappedPacket -> {
if (!connection.isClosed()) {
wrappedPacket.write(connection);
ChannelFuture future = wrappedPacket.write(connection);
if (future != null) {
future.awaitUninterruptibly();
}
}

return wrappedPacket;
};
}

private static <T extends MinecraftPacket> CompletableFuture<WrappedPacket> awaitChat(
MinecraftConnection connection,
CompletableFuture<WrappedPacket> binder,
CompletableFuture<WrappedPacket> future) {
MinecraftConnection connection,
CompletableFuture<WrappedPacket> binder,
CompletableFuture<WrappedPacket> future) {
return binder.whenCompleteAsync((ignored1, ignored2) -> future.thenApply(writePacket(connection)).join());
}

private static <K, V extends MinecraftPacket> CompletableFuture<WrappedPacket> hijackCurrentPacket(
MinecraftConnection connection,
CompletableFuture<WrappedPacket> binder,
CompletableFuture<K> future,
InstantPacketMapper<K, V> packetMapper
) {
InstantPacketMapper<K, V> packetMapper) {
CompletableFuture<WrappedPacket> awaitedFuture = new CompletableFuture<>();
// the binder will complete -> then the future will get the `write packet` caller

binder.whenComplete((previous, ignored) -> {
// map the new packet into a better "designed" packet with the hijacked packet's timestamp
WrappedPacket.wrap(previous.timestamp,
future.thenApply(item -> packetMapper.map(previous.timestamp, item)))
.thenApplyAsync(writePacket(connection), connection.eventLoop())
Expand Down Expand Up @@ -147,10 +148,12 @@ private WrappedPacket(Instant timestamp, MinecraftPacket packet) {
this.packet = packet;
}

public void write(MinecraftConnection connection) {
public ChannelFuture write(MinecraftConnection connection) {
if (packet != null) {
connection.write(packet);
return connection.write(packet);
}
return null;
}

private static CompletableFuture<WrappedPacket> wrap(Instant timestamp,
Expand Down

0 comments on commit 4bbc770

Please sign in to comment.