Skip to content

Commit

Permalink
AMQNET-625: Content Property of IBytesMessage should be idempotent
Browse files Browse the repository at this point in the history
Content Property should not generate side effects
  • Loading branch information
Havret committed Nov 17, 2019
1 parent 6169372 commit c988ccc
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 46 deletions.
1 change: 1 addition & 0 deletions src/NMS.AMQP/Message/Facade/INmsBytesMessageFacade.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ public interface INmsBytesMessageFacade : INmsMessageFacade
BinaryWriter GetDataWriter();
void Reset();
long BodyLength { get; }
byte[] Content { get; set; }
}
}
17 changes: 12 additions & 5 deletions src/NMS.AMQP/Message/NmsBytesMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ public byte[] Content
{
get
{
byte[] buffer = new byte [BodyLength];
ReadBytes(buffer);
return buffer;
CheckWriteOnlyBody();
return this.facade.Content;
}
set
{
CheckReadOnlyBody();
this.facade.Content = value;
}
set => WriteBytes(value);
}

public byte ReadByte()
Expand Down Expand Up @@ -292,15 +295,19 @@ public int ReadBytes(byte[] value)
public int ReadBytes(byte[] value, int length)
{
InitializeReading();
return ReadBytes(dataIn, value, length);
}

private int ReadBytes(BinaryReader binaryReader, byte[] value, int length)
{
if (length < 0 || value.Length < length)
{
throw new IndexOutOfRangeException("length must not be negative or larger than the size of the provided array");
}

try
{
return dataIn.Read(value, 0, length);
return binaryReader.Read(value, 0, length);
}
catch (EndOfStreamException e)
{
Expand Down
71 changes: 37 additions & 34 deletions src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsBytesMessageFacade.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ public class AmqpNmsBytesMessageFacade : AmqpNmsMessageFacade, INmsBytesMessageF
private EndianBinaryReader byteIn = null;
private EndianBinaryWriter byteOut = null;

private static readonly Data EMPTY_DATA = new Data { Binary = new byte[0] };
private static readonly byte[] EMPTY_BINARY = new byte[0];
private static readonly Data EMPTY_DATA = new Data { Binary = EMPTY_BINARY };

public override sbyte? JmsMsgType => MessageSupport.JMS_TYPE_BYTE;
public long BodyLength => GetBinaryFromBody().Binary.LongLength;
public long BodyLength => Content.Length;

public BinaryReader GetDataReader()
{
Expand All @@ -45,58 +46,60 @@ public BinaryReader GetDataReader()

if (byteIn == null)
{
Data body = GetBinaryFromBody();
Stream dataStream = new MemoryStream(body.Binary, false);
byte[] body = Content;
Stream dataStream = new MemoryStream(body, false);
byteIn = new EndianBinaryReader(dataStream);
}

return byteIn;
}

private Data GetBinaryFromBody()
public byte[] Content
{
RestrictedDescribed body = Message.BodySection;
Data result = EMPTY_DATA;
if (body == null)
get
{
return result;
}
else if (body is Data)
{
byte[] binary = (body as Data).Binary;
if (binary != null && binary.Length != 0)
RestrictedDescribed body = Message.BodySection;
byte[] result = EMPTY_BINARY;
if (body == null)
{
return body as Data;
return result;
}
}
else if (body is AmqpValue)
{
object value = (body as AmqpValue).Value;
if (value == null)
else if (body is Data)
{
return result;
byte[] binary = (body as Data).Binary;
if (binary != null && binary.Length != 0)
{
return binary;
}
}

if (value is byte[])
else if (body is AmqpValue)
{
byte[] dataValue = value as byte[];
if (dataValue.Length > 0)
object value = (body as AmqpValue).Value;
if (value == null)
{
result = new Data();
result.Binary = dataValue;
return result;
}
if (value is byte[])
{
byte[] dataValue = value as byte[];
if (dataValue.Length > 0)
{
return dataValue;
}
}
else
{
throw new IllegalStateException("Unexpected Amqp value content-type: " + value.GetType().FullName);
}
}
else
{
throw new IllegalStateException("Unexpected Amqp value content-type: " + value.GetType().FullName);
throw new IllegalStateException("Unexpected body content-type: " + body.GetType().FullName);
}
}
else
{
throw new IllegalStateException("Unexpected body content-type: " + body.GetType().FullName);
}

return result;
return result;
}
set => Message.BodySection = new Data { Binary = value };
}

public BinaryWriter GetDataWriter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@ public class NmsTestBytesMessageFacade : NmsTestMessageFacade, INmsBytesMessageF
{
private BinaryWriter bytesOut = null;
private BinaryReader bytesIn = null;
private byte[] content = null;

public NmsTestBytesMessageFacade()
{
content = new byte[0];
Content = new byte[0];
}

public NmsTestBytesMessageFacade(byte[] content)
{
this.content = content;
this.Content = content;
}

public BinaryReader GetDataReader()
Expand All @@ -45,7 +44,7 @@ public BinaryReader GetDataReader()
throw new IllegalStateException("Body is being written to, cannot perform a read.");
}

return bytesIn ?? (bytesIn = new BinaryReader(new MemoryStream(content)));
return bytesIn ?? (bytesIn = new BinaryReader(new MemoryStream(Content)));
}

public BinaryWriter GetDataWriter()
Expand All @@ -65,7 +64,7 @@ public void Reset()
bytesOut.BaseStream.Position = 0;
bytesOut.BaseStream.CopyTo(byteStream);

content = byteStream.ToArray();
Content = byteStream.ToArray();

byteStream.Close();
bytesOut.Close();
Expand All @@ -81,9 +80,10 @@ public void Reset()
public override void ClearBody()
{
this.Reset();
content = new byte[0];
Content = new byte[0];
}

public long BodyLength => content?.LongLength ?? 0;
public long BodyLength => Content?.LongLength ?? 0;
public byte[] Content { get; set; }
}
}
38 changes: 38 additions & 0 deletions test/Apache-NMS-AMQP-Test/Message/NmsBytesMessageTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -534,5 +534,43 @@ public void TestMessageCopy()
NmsBytesMessage copy = message.Copy() as NmsBytesMessage;
Assert.IsNotNull(copy);
}

[Test]
public void TestMessageContentCanBeObtainedMultipleTimesWithoutReset()
{
byte[] bytes = Encoding.UTF8.GetBytes("myBytes");
NmsBytesMessage message = factory.CreateBytesMessage();
message.Content = bytes;
message.Reset();

CollectionAssert.AreEqual(bytes, message.Content);
CollectionAssert.AreEqual(bytes, message.Content);
}

[Test]
public void TestConsecutiveReadBytes()
{
byte[] bytes = CreateBytesArrayOfSize(24);
NmsBytesMessage message = factory.CreateBytesMessage(bytes);
message.Reset();

CollectionAssert.AreEqual(bytes.Take(8), ReadBytes(message, 8));
CollectionAssert.AreEqual(bytes.Skip(8).Take(8), ReadBytes(message, 8));
CollectionAssert.AreEqual(bytes.Skip(16).Take(8), ReadBytes(message, 8));
CollectionAssert.AreEqual(new byte[8], ReadBytes(message, 8));
}

private static byte[] CreateBytesArrayOfSize(int size)
{
var random = new Random();
return Enumerable.Range(0, size).Select(x => (byte) random.Next(byte.MaxValue)).ToArray();
}

private static byte[] ReadBytes(IBytesMessage message, int count)
{
byte[] bytes = new byte[count];
message.ReadBytes(bytes);
return bytes;
}
}
}

0 comments on commit c988ccc

Please sign in to comment.