Skip to content

Commit

Permalink
Maybe adding a CancellationToken will help.
Browse files Browse the repository at this point in the history
  • Loading branch information
bgrainger committed Oct 29, 2023
1 parent d5d7b39 commit ea423f6
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 22 deletions.
4 changes: 4 additions & 0 deletions tests/MySqlConnector.Tests/FakeMySqlServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ private async Task AcceptConnectionsAsync()
{
while (true)
{
#if NET6_0_OR_GREATER
var tcpClient = await m_tcpListener.AcceptTcpClientAsync(m_cts.Token);
#else
var tcpClient = await m_tcpListener.AcceptTcpClientAsync();
#endif
Interlocked.Increment(ref m_activeConnections);
lock (m_lock)
{
Expand Down
44 changes: 22 additions & 22 deletions tests/MySqlConnector.Tests/FakeMySqlServerConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ public async Task RunAsync(TcpClient client, CancellationToken token)
using (var stream = client.GetStream())
{
if (m_server.ConnectDelay is { } connectDelay)
await Task.Delay(connectDelay);
await Task.Delay(connectDelay, token);

await SendAsync(stream, 0, WriteInitialHandshake);
await SendAsync(stream, 0, WriteInitialHandshake, token);
await ReadPayloadAsync(stream, token); // handshake response

if (m_server.SendIncompletePostHandshakeResponse)
Expand All @@ -37,7 +37,7 @@ public async Task RunAsync(TcpClient client, CancellationToken token)
return;
}

await SendAsync(stream, 2, WriteOk);
await SendAsync(stream, 2, WriteOk, token);

var keepRunning = true;
while (keepRunning)
Expand All @@ -55,26 +55,26 @@ public async Task RunAsync(TcpClient client, CancellationToken token)
switch ((CommandKind) bytes[0])
{
case CommandKind.Quit:
await SendAsync(stream, 1, WriteOk);
await SendAsync(stream, 1, WriteOk, token);
keepRunning = false;
break;

case CommandKind.Ping:
await SendAsync(stream, 1, WriteOk);
await SendAsync(stream, 1, WriteOk , token);
break;

case CommandKind.ResetConnection:
if (m_server.ResetDelay is { } resetDelay)
await Task.Delay(resetDelay);
await SendAsync(stream, 1, WriteOk);
await SendAsync(stream, 1, WriteOk, token);
break;

case CommandKind.Query:
var query = Encoding.UTF8.GetString(bytes, 1, bytes.Length - 1);
Match match;
if (query == "SET NAMES utf8mb4;")
{
await SendAsync(stream, 1, WriteOk);
await SendAsync(stream, 1, WriteOk, token);
}
else if ((match = Regex.Match(query, @"^SELECT ([0-9])(;|$)")).Success)
{
Expand All @@ -83,11 +83,11 @@ public async Task RunAsync(TcpClient client, CancellationToken token)
data[0] = (byte) number.Length;
Encoding.UTF8.GetBytes(number, 0, number.Length, data, 1);

await SendAsync(stream, 1, x => x.Write((byte) 1)); // one column
await SendAsync(stream, 2, x => x.Write(new byte[] { 3, 0x64, 0x65, 0x66, 0, 0, 0, 1, 0x5F, 0, 0x0c, 0x3f, 0, 1, 0, 0, 0, 3, 0x81, 0, 0, 0, 0 })); // column definition
await SendAsync(stream, 3, x => x.Write(new byte[] { 0xFE, 0, 0, 2, 0 })); // EOF
await SendAsync(stream, 4, x => x.Write(data));
await SendAsync(stream, 5, x => x.Write(new byte[] { 0xFE, 0, 0, 2, 0 })); // EOF
await SendAsync(stream, 1, x => x.Write((byte) 1), token); // one column
await SendAsync(stream, 2, x => x.Write(new byte[] { 3, 0x64, 0x65, 0x66, 0, 0, 0, 1, 0x5F, 0, 0x0c, 0x3f, 0, 1, 0, 0, 0, 3, 0x81, 0, 0, 0, 0 }), token); // column definition
await SendAsync(stream, 3, x => x.Write(new byte[] { 0xFE, 0, 0, 2, 0 }), token); // EOF
await SendAsync(stream, 4, x => x.Write(data), token);
await SendAsync(stream, 5, x => x.Write(new byte[] { 0xFE, 0, 0, 2, 0 }), token); // EOF
}
else if ((match = Regex.Match(query, @"^SELECT ([0-9]+), ([0-9]+), ([0-9-]+), ([0-9]+)(;|$)")).Success)
{
Expand Down Expand Up @@ -132,8 +132,8 @@ public async Task RunAsync(TcpClient client, CancellationToken token)
queryInterrupted = CancelQueryEvent.Wait(delay, token);

for (var step = 1; step < pauseStep; step++)
await SendAsync(stream, step, x => x.Write(packets[step]));
await SendAsync(stream, pauseStep, x => x.Write(packets[queryInterrupted ? 0 : pauseStep]));
await SendAsync(stream, step, x => x.Write(packets[step]), token);
await SendAsync(stream, pauseStep, x => x.Write(packets[queryInterrupted ? 0 : pauseStep]), token);
}
else
{
Expand All @@ -148,20 +148,20 @@ public async Task RunAsync(TcpClient client, CancellationToken token)
queryInterrupted = CancelQueryEvent.Wait(delay, token);
}

await SendAsync(stream, step, x => x.Write(packets[queryInterrupted ? 0 : step]));
await SendAsync(stream, step, x => x.Write(packets[queryInterrupted ? 0 : step]), token);
}
}
}
else if ((match = Regex.Match(query, @"^KILL QUERY ([0-9]+)(;|$)", RegexOptions.IgnoreCase)).Success)
{
var connectionId = int.Parse(match.Groups[1].Value);
m_server.CancelQuery(connectionId);
await SendAsync(stream, 1, WriteOk);
await SendAsync(stream, 1, WriteOk, token);
}
else if (query == "SELECT SLEEP(0) INTO @\uE001MySqlConnector\uE001Sleep;")
{
var wasSet = CancelQueryEvent.Wait(0, token);
await SendAsync(stream, 1, WriteOk);
await SendAsync(stream, 1, WriteOk, token);
}
else if (query == "select infinity")
{
Expand All @@ -177,17 +177,17 @@ public async Task RunAsync(TcpClient client, CancellationToken token)
new byte[] { 0xFE, 0, 0, 2, 0 }, // EOF
};
for (var packetIndex = 0; packetIndex < packets.Length; packetIndex++)
await SendAsync(stream, packetIndex + 1, x => x.Write(packets[packetIndex]));
await SendAsync(stream, packetIndex + 1, x => x.Write(packets[packetIndex]), token);
}
else
{
await SendAsync(stream, 1, x => WriteError(x, "Unhandled query: " + query));
await SendAsync(stream, 1, x => WriteError(x, "Unhandled query: " + query), token);
}
break;

default:
Console.WriteLine("** UNHANDLED ** {0}", (CommandKind) bytes[0]);
await SendAsync(stream, 1, x => WriteError(x));
await SendAsync(stream, 1, x => WriteError(x), token);
break;
}
}
Expand All @@ -199,10 +199,10 @@ public async Task RunAsync(TcpClient client, CancellationToken token)
}
}

private static async Task SendAsync(Stream stream, int sequenceNumber, Action<BinaryWriter> writePayload)
private static async Task SendAsync(Stream stream, int sequenceNumber, Action<BinaryWriter> writePayload, CancellationToken token)
{
var packet = MakePayload(sequenceNumber, writePayload);
await stream.WriteAsync(packet, 0, packet.Length);
await stream.WriteAsync(packet, 0, packet.Length, token);
}

private static byte[] MakePayload(int sequenceNumber, Action<BinaryWriter> writePayload)
Expand Down
1 change: 1 addition & 0 deletions tests/MySqlConnector.Tests/Metrics/MetricsTestsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ protected virtual void Dispose(bool disposing)
if (disposing)
{
m_meterListener.Dispose();
Server.Stop();
}
}

Expand Down

0 comments on commit ea423f6

Please sign in to comment.