Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not create string objects from consumerTag, exchange and routingKey, or get them from a string cache. #1232

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public AsyncBasicConsumerFake(ManualResetEventSlim autoResetEvent)
_autoResetEvent = autoResetEvent;
}

public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
if (Interlocked.Increment(ref _current) == Count)
{
Expand All @@ -28,7 +28,7 @@ public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redel
return Task.CompletedTask;
}

void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey,
in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
if (Interlocked.Increment(ref _current) == Count)
Expand Down
12 changes: 7 additions & 5 deletions projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Threading;
using System.Text;
using System.Threading;
using BenchmarkDotNet.Attributes;
using RabbitMQ.Client;
using RabbitMQ.Client.ConsumerDispatching;
Expand All @@ -14,9 +15,10 @@ public class ConsumerDispatcherBase
private protected IConsumerDispatcher _dispatcher;
private protected readonly AsyncBasicConsumerFake _consumer = new AsyncBasicConsumerFake(_autoResetEvent);
protected readonly string _consumerTag = "ConsumerTag";
protected static readonly byte[] _consumerTagBytes = Encoding.UTF8.GetBytes("ConsumerTag");
protected readonly ulong _deliveryTag = 500UL;
protected readonly string _exchange = "Exchange";
protected readonly string _routingKey = "RoutingKey";
protected static readonly byte[] _exchange = Encoding.UTF8.GetBytes("Exchange");
protected static readonly byte[] _routingKey = Encoding.UTF8.GetBytes("RoutingKey");
protected readonly ReadOnlyBasicProperties _properties = new ReadOnlyBasicProperties();
protected readonly byte[] _body = new byte[512];
}
Expand All @@ -41,7 +43,7 @@ public void AsyncConsumerDispatcher()
{
for (int i = 0; i < Count; i++)
{
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body);
_dispatcher.HandleBasicDeliver(_consumerTagBytes, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body);
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
Expand All @@ -59,7 +61,7 @@ public void ConsumerDispatcher()
{
for (int i = 0; i < Count; i++)
{
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body);
_dispatcher.HandleBasicDeliver(_consumerTagBytes, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body);
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
Expand Down
2 changes: 1 addition & 1 deletion projects/Benchmarks/WireFormatting/MethodSerialization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public override void SetUp()
}

[Benchmark]
public object BasicDeliverRead() => new BasicDeliver(_buffer.Span)._consumerTag; // return one property to not box when returning an object instead
public object BasicDeliverRead() => new BasicDeliver(_buffer)._consumerTag; // return one property to not box when returning an object instead

[Benchmark]
public int BasicPublishWrite() => _basicPublish.WriteTo(_buffer.Span);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ public virtual Task HandleBasicConsumeOk(string consumerTag)
public virtual Task HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
ReadOnlyMemory<byte> exchange,
ReadOnlyMemory<byte> routingKey,
in ReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body)
{
Expand Down Expand Up @@ -165,7 +165,7 @@ void IBasicConsumer.HandleBasicConsumeOk(string consumerTag)
throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'.");
}

void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
throw new InvalidOperationException("Should never be called. Enable 'DispatchConsumersAsync'.");
}
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ public virtual void HandleBasicConsumeOk(string consumerTag)
public virtual void HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
ReadOnlyMemory<byte> exchange,
ReadOnlyMemory<byte> routingKey,
in ReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body)
{
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public interface IAsyncBasicConsumer
Task HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
ReadOnlyMemory<byte> exchange,
ReadOnlyMemory<byte> routingKey,
in ReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body);

Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/api/IBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ public interface IBasicConsumer
void HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
ReadOnlyMemory<byte> exchange,
ReadOnlyMemory<byte> routingKey,
in ReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public override async Task HandleBasicConsumeOk(string consumerTag)
}

///<summary>Fires the Received event.</summary>
public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
// No need to call base, it's empty.
return _receivedWrapper.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public BasicDeliverEventArgs()
public BasicDeliverEventArgs(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
ReadOnlyMemory<byte> exchange,
ReadOnlyMemory<byte> routingKey,
in ReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body)
{
Expand Down Expand Up @@ -77,13 +77,13 @@ public BasicDeliverEventArgs(string consumerTag,

///<summary>The exchange the message was originally published
///to.</summary>
public string Exchange { get; set; }
public ReadOnlyMemory<byte> Exchange { get; set; }

///<summary>The AMQP "redelivered" flag.</summary>
public bool Redelivered { get; set; }

///<summary>The routing key used when the message was
///originally published.</summary>
public string RoutingKey { get; set; }
public ReadOnlyMemory<byte> RoutingKey { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public override void HandleBasicConsumeOk(string consumerTag)
/// Accessing the body at a later point is unsafe as its memory can
/// be already released.
/// </remarks>
public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
Received?.Invoke(
Expand Down
19 changes: 9 additions & 10 deletions projects/RabbitMQ.Client/client/framing/BasicDeliver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,26 @@
//---------------------------------------------------------------------------

using System;

using RabbitMQ.Client.client.framing;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client.Framing.Impl
{
internal readonly struct BasicDeliver : IAmqpMethod
{
public readonly string _consumerTag;
public readonly ReadOnlyMemory<byte> _consumerTag;
public readonly ulong _deliveryTag;
public readonly bool _redelivered;
public readonly string _exchange;
public readonly string _routingKey;
public readonly ReadOnlyMemory<byte> _exchange;
public readonly ReadOnlyMemory<byte> _routingKey;

public BasicDeliver(ReadOnlySpan<byte> span)
public BasicDeliver(ReadOnlyMemory<byte> data)
{
int offset = WireFormatting.ReadShortstr(span, out _consumerTag);
offset += WireFormatting.ReadLonglong(span.Slice(offset), out _deliveryTag);
offset += WireFormatting.ReadBits(span.Slice(offset), out _redelivered);
offset += WireFormatting.ReadShortstr(span.Slice(offset), out _exchange);
WireFormatting.ReadShortstr(span.Slice(offset), out _routingKey);
int offset = WireFormatting.ReadShortMemory(data, out _consumerTag);
offset += WireFormatting.ReadLonglong(data.Span.Slice(offset), out _deliveryTag);
offset += WireFormatting.ReadBits(data.Span.Slice(offset), out _redelivered);
offset += WireFormatting.ReadShortMemory(data.Slice(offset), out _exchange);
WireFormatting.ReadShortMemory(data.Slice(offset), out _routingKey);
}

public ProtocolCommandId ProtocolCommandId => ProtocolCommandId.BasicDeliver;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,53 +1,88 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Util;

namespace RabbitMQ.Client.ConsumerDispatching
{
#nullable enable
internal abstract class ConsumerDispatcherBase
{
private static readonly FallbackConsumer fallbackConsumer = new FallbackConsumer();
private readonly Dictionary<string, IBasicConsumer> _consumers;
private readonly Dictionary<ReadOnlyMemory<byte>, (IBasicConsumer consumer, string consumerTag)> _consumers;

public IBasicConsumer? DefaultConsumer { get; set; }

protected ConsumerDispatcherBase()
{
_consumers = new Dictionary<string, IBasicConsumer>();
_consumers = new Dictionary<ReadOnlyMemory<byte>, (IBasicConsumer, string)>(MemoryOfByteEqualityComparer.Instance);
}

protected void AddConsumer(IBasicConsumer consumer, string tag)
{
lock (_consumers)
{
_consumers[tag] = consumer;
var tagBytes = Encoding.UTF8.GetBytes(tag);
_consumers[tagBytes] = (consumer, tag);
}
}

protected IBasicConsumer GetConsumerOrDefault(string tag)
protected (IBasicConsumer consumer, string consumerTag) GetConsumerOrDefault(ReadOnlyMemory<byte> tag)
{
lock (_consumers)
{
return _consumers.TryGetValue(tag, out var consumer) ? consumer : GetDefaultOrFallbackConsumer();
if (_consumers.TryGetValue(tag, out var consumerPair))
{
return consumerPair;
}

#if NETCOREAPP
var consumerTag = Encoding.UTF8.GetString(tag.Span);
#else
string consumerTag;
unsafe
{
fixed (byte* bytes = tag.Span)
{
consumerTag = Encoding.UTF8.GetString(bytes, tag.Length);
}
}
#endif

return (GetDefaultOrFallbackConsumer(), consumerTag);
}
}

public IBasicConsumer GetAndRemoveConsumer(string tag)
{
lock (_consumers)
{
return _consumers.Remove(tag, out var consumer) ? consumer : GetDefaultOrFallbackConsumer();
var utf8 = Encoding.UTF8;
#if NETCOREAPP
var pool = ArrayPool<byte>.Shared;
var buf = pool.Rent(utf8.GetMaxByteCount(tag.Length));
int count = utf8.GetBytes(tag, buf);
var memory = buf.AsMemory(0, count);
#else
var memory = utf8.GetBytes(tag).AsMemory();
#endif
var result = _consumers.Remove(memory, out var consumerPair) ? consumerPair.consumer : GetDefaultOrFallbackConsumer();
#if NETCOREAPP
pool.Return(buf);
#endif
return result;
}
}

public Task ShutdownAsync(ShutdownEventArgs reason)
{
lock (_consumers)
{
foreach (KeyValuePair<string, IBasicConsumer> pair in _consumers)
foreach (KeyValuePair<ReadOnlyMemory<byte>, (IBasicConsumer consumer, string consumerTag)> pair in _consumers)
{
ShutdownConsumer(pair.Value, reason);
ShutdownConsumer(pair.Value.consumer, reason);
}
_consumers.Clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@ public void HandleBasicConsumeOk(IBasicConsumer consumer, string consumerTag)
}
}

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedArray)
public void HandleBasicDeliver(ReadOnlyMemory<byte> consumerTag, ulong deliveryTag, bool redelivered,
ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedArray)
{
if (!IsShutdown)
{
_writer.TryWrite(new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedArray));
var consumerPair = GetConsumerOrDefault(consumerTag);
_writer.TryWrite(new WorkStruct(consumerPair.consumer, consumerPair.consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedArray));
}
}

Expand Down Expand Up @@ -108,8 +109,8 @@ protected readonly struct WorkStruct
public readonly string? ConsumerTag;
public readonly ulong DeliveryTag;
public readonly bool Redelivered;
public readonly string? Exchange;
public readonly string? RoutingKey;
public readonly ReadOnlyMemory<byte> Exchange;
public readonly ReadOnlyMemory<byte> RoutingKey;
public readonly ReadOnlyBasicProperties BasicProperties;
public readonly ReadOnlyMemory<byte> Body;
public readonly byte[]? RentedArray;
Expand All @@ -133,7 +134,7 @@ public WorkStruct(IBasicConsumer consumer, ShutdownEventArgs reason)
}

public WorkStruct(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedArray)
ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedArray)
{
WorkType = WorkType.Deliver;
Consumer = consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void IBasicConsumer.HandleBasicConsumeOk(string consumerTag)
ESLog.Info($"Unhandled {nameof(IBasicConsumer.HandleBasicConsumeOk)} for tag {consumerTag}");
}

void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties,
void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body)
{
ESLog.Info($"Unhandled {nameof(IBasicConsumer.HandleBasicDeliver)} for tag {consumerTag}");
Expand Down Expand Up @@ -66,7 +66,7 @@ Task IAsyncBasicConsumer.HandleBasicConsumeOk(string consumerTag)
return Task.CompletedTask;
}

Task IAsyncBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, in ReadOnlyBasicProperties properties,
Task IAsyncBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body)
{
((IBasicConsumer)this).HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ internal interface IConsumerDispatcher

void HandleBasicConsumeOk(IBasicConsumer consumer, string consumerTag);

void HandleBasicDeliver(string consumerTag,
void HandleBasicDeliver(ReadOnlyMemory<byte> consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
ReadOnlyMemory<byte> exchange,
ReadOnlyMemory<byte> routingKey,
in ReadOnlyBasicProperties basicProperties,
ReadOnlyMemory<byte> body,
byte[] rentedArray);
Expand Down
5 changes: 3 additions & 2 deletions projects/RabbitMQ.Client/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -573,8 +573,7 @@ protected void HandleBasicConsumeOk(in IncomingCommand cmd)

protected void HandleBasicDeliver(in IncomingCommand cmd)
{
var method = new Client.Framing.Impl.BasicDeliver(cmd.MethodBytes.Span);
cmd.ReturnMethodBuffer();
var method = new Client.Framing.Impl.BasicDeliver(cmd.MethodBytes);
var header = new ReadOnlyBasicProperties(cmd.HeaderBytes.Span);
cmd.ReturnHeaderBuffer();

Expand All @@ -587,6 +586,8 @@ protected void HandleBasicDeliver(in IncomingCommand cmd)
header,
cmd.Body,
cmd.TakeoverBody());

cmd.ReturnMethodBuffer();
}

protected void HandleBasicGetOk(in IncomingCommand cmd)
Expand Down
Loading