Skip to content

Commit

Permalink
Merge pull request #191 from memphisdev/master
Browse files Browse the repository at this point in the history
Release
  • Loading branch information
idanasulin2706 authored Jan 23, 2024
2 parents 7b9b7a3 + 373106f commit dcb13de
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 40 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,23 @@ Acknowledging a message indicates to the Memphis server to not re-send the same
msg.Ack();
```

### Nacking a Message

Mark the message as not acknowledged - the broker will resend the message immediately to the same consumers group, instead of waiting to the max ack time configured.

```C#
msg.Nack();
```

### Sending a message to the dead-letter

Sending the message to the dead-letter station (DLS) - the broker won't resend the message again to the same consumers group and will place the message inside the dead-letter station (DLS) with the given reason.
The message will still be available to other consumer groups

```shell
msg.DeadLetter("reason");
```

### Delay the message after a given duration

Delay the message and tell Memphis server to re-send the same message again to the same consumer group.\
Expand Down
2 changes: 2 additions & 0 deletions src/Memphis.Client/Constants/MemphisConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ internal class MemphisSubjects
public const string SCHEMA_DESTRUCTION = "";

public const string FUNCTIONS_UPDATE = "$memphis_functions_updates_";

public const string NACKED_DLS = "$memphis_nacked_dls";
}

public static class MemphisSchemaTypes
Expand Down
26 changes: 6 additions & 20 deletions src/Memphis.Client/Consumer/IConsumerContextExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,13 @@ internal record FetchOptions
public string InternalStationName { get; set; }
public int BatchMaxTimeToWaitMs { get; set; }
public MemphisClient MemphisClient { get; set; }

public FetchOptions(
int batchSize,
int maxAckTimeMs,
string consumerGroup,
string internalStationName,
int batchMaxTimeToWaitMs,
MemphisClient memphisClient
)
{
BatchSize = batchSize;
MaxAckTimeMs = maxAckTimeMs;
ConsumerGroup = consumerGroup;
InternalStationName = internalStationName;
BatchMaxTimeToWaitMs = batchMaxTimeToWaitMs;
MemphisClient = memphisClient;
}
public int PartitionNumber { get; set; }

public FetchOptions(
MemphisClient memphisClient,
string internalStationName,
MemphisConsumerOptions consumerOptions
MemphisConsumerOptions consumerOptions,
int partitionNumber
)
{
BatchSize = consumerOptions.BatchSize;
Expand All @@ -38,7 +23,7 @@ MemphisConsumerOptions consumerOptions
InternalStationName = internalStationName;
BatchMaxTimeToWaitMs = consumerOptions.BatchMaxTimeToWaitMs;
MemphisClient = memphisClient;

PartitionNumber = partitionNumber;
}
}

Expand Down Expand Up @@ -66,7 +51,8 @@ FetchOptions fetchOptions
fetchOptions.MemphisClient,
fetchOptions.ConsumerGroup,
fetchOptions.MaxAckTimeMs,
fetchOptions.InternalStationName
fetchOptions.InternalStationName,
fetchOptions.PartitionNumber
));
receivedMessages += 1;
if (receivedMessages >= fetchOptions.BatchSize)
Expand Down
16 changes: 11 additions & 5 deletions src/Memphis.Client/Consumer/MemphisConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using Memphis.Client.Station;

using Memphis.Client;

namespace Memphis.Client.Consumer;

public sealed class MemphisConsumer : IMemphisConsumer
Expand All @@ -21,6 +19,11 @@ public sealed class MemphisConsumer : IMemphisConsumer
private bool _subscriptionActive;
private readonly int _pingConsumerIntervalMs;

/// <summary>
/// Messages in DLS station will have a partition number of -1. This does not indicate the actual partition number of the message.
/// Instead, it indicates that the message is in the DLS station.
/// </summary>
private const int DlsMessagePartitionNumber = -1;

private int[] _partitions;
internal StationPartitionResolver PartitionResolver { get; set; }
Expand Down Expand Up @@ -344,7 +347,8 @@ private IEnumerable<MemphisMessage> FetchSubscriptionWithTimeOut(int batchSize,
return consumer.FetchMessages(new FetchOptions(
_memphisClient,
InternalStationName,
_consumerOptions
_consumerOptions,
consumerPartitionNumber
));
}

Expand Down Expand Up @@ -405,7 +409,8 @@ CancellationToken cancellationToken
var memphisMessages = consumerContext.FetchMessages(new FetchOptions(
_memphisClient,
InternalStationName,
_consumerOptions
_consumerOptions,
partitionNumber
));

MessageReceived?.Invoke(this, new MemphisMessageHandlerEventArgs(memphisMessages, consumerContext, null));
Expand Down Expand Up @@ -451,7 +456,8 @@ private async Task ConsumeFromDls(CancellationToken cancellationToken)
_memphisClient,
_consumerOptions.ConsumerGroup,
_consumerOptions.MaxAckTimeMs,
InternalStationName
InternalStationName,
DlsMessagePartitionNumber
);
if (DlsMessageReceived is null)
{
Expand Down
72 changes: 62 additions & 10 deletions src/Memphis.Client/Core/MemphisMessage.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
using Memphis.Client.Helper;
using Memphis.Client.Models.Request;

namespace Memphis.Client.Core;
namespace Memphis.Client.Core;

public sealed class MemphisMessage
{
Expand All @@ -10,19 +7,25 @@ public sealed class MemphisMessage
private readonly string _consumerGroup;
private readonly int _macAckTimeMs;
private readonly string _internalStationName;
private readonly int _partitionNumber;

private bool _isMessageInDls => _partitionNumber < -1;

public MemphisMessage(
Msg msgItem,
MemphisClient memphisClient,
string consumerGroup,
int macAckTimeMs,
string internalStationName)
string internalStationName,
int partitionNumber
)
{
this._msg = msgItem;
this._memphisClient = memphisClient;
this._consumerGroup = consumerGroup;
this._macAckTimeMs = macAckTimeMs;
this._internalStationName = internalStationName;
_msg = msgItem;
_memphisClient = memphisClient;
_consumerGroup = consumerGroup;
_macAckTimeMs = macAckTimeMs;
_internalStationName = internalStationName;
_partitionNumber = partitionNumber;
}

public void Ack()
Expand Down Expand Up @@ -51,6 +54,55 @@ public void Ack()
}
}

/// <summary>
/// Nack message - not ack for a message, meaning that the message will be redelivered again to the same consumers group without waiting to its ack wait time.
/// </summary>
/// <exception cref="MemphisException">Throws when unable to nack message</exception>
public void Nack()
{
try
{
_msg.Nak();
}
catch (System.Exception e)
{
throw MemphisExceptions.NackFailedException(e);
}
}

/// <summary>
/// Dead letter message - Sending the message to the dead-letter station (DLS). the broker won't resend the message again to the same consumers group and will place the message inside the dead-letter station (DLS) with the given reason.
/// The message will still be available to other consumer groups
/// </summary>
/// <param name="reason">Reason for dead lettering the message</param>
/// <exception cref="MemphisException">Throws when unable to send message dead letter</exception>
public void DeadLetter(string reason)
{
try
{
if (_isMessageInDls)
return;
_msg.Term();
var metadata = _msg.MetaData;
var nackDlsMessage = new NackDlsMessage
{
StationName = _internalStationName,
Error = reason,
Partition = _partitionNumber,
ConsumerGroupName = _consumerGroup,
StreamSequence = metadata.StreamSequence,
};
_memphisClient.BrokerConnection.Publish(
MemphisSubjects.NACKED_DLS,
Encoding.UTF8.GetBytes(JsonSerDes.PrepareJsonString<NackDlsMessage>(nackDlsMessage))
);
}
catch (System.Exception e)
{
throw MemphisExceptions.DeadLetterFailed(e);
}
}

public byte[] GetData()
{
return _msg.Data;
Expand Down
11 changes: 10 additions & 1 deletion src/Memphis.Client/Exception/MemphisExceptions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Memphis.Client;

public static class MemphisExceptions{
internal static class MemphisExceptions{

public static MemphisConnectionException DeadConnectionException => new("Connection to the broker is dead");

Expand Down Expand Up @@ -69,4 +69,13 @@ public static MemphisConnectionException AckFailedException(System.Exception e)
return new MemphisConnectionException("Unable to ack message", e);
}

public static MemphisException NackFailedException(System.Exception e)
{
return new MemphisException("Unable to nack message", e);
}

public static MemphisException DeadLetterFailed(System.Exception e)
{
return new MemphisException("Unable to dead letter message", e);
}
}
4 changes: 2 additions & 2 deletions src/Memphis.Client/Memphis.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<PackageId>Memphis.Client</PackageId>
<Version>0.7.1</Version>
<Version>0.7.2</Version>
<Authors>Memphis.dev team</Authors>
<Company>Memphis.dev</Company>
<PackageTags>Memphis, client</PackageTags>
Expand All @@ -14,7 +14,7 @@
<PackageIcon>Icon.png</PackageIcon>
<Description>Memphis.Client SDK intended to make easy integration of Memphis into .NET projects</Description>
<EnablePackageValidation>true</EnablePackageValidation>
<PackageValidationBaselineVersion>0.7.1</PackageValidationBaselineVersion>
<PackageValidationBaselineVersion>0.7.2</PackageValidationBaselineVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
21 changes: 21 additions & 0 deletions src/Memphis.Client/Models/Request/NackDlsMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
namespace Memphis.Client;

[DataContract]
internal class NackDlsMessage
{
[DataMember(Name = "station_name")]
public string StationName { get; set; } = null!;

[DataMember(Name = "error")]
public string Error { get; set; } = null!;

[DataMember(Name = "partition")]
public int Partition { get; set; }

[DataMember(Name = "cg_name")]
public string ConsumerGroupName { get; set; } = null!;

[DataMember(Name = "seq")]
public ulong StreamSequence { get; set; }

}
2 changes: 1 addition & 1 deletion version-beta.conf
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.7.4-beta.1
0.7.5-beta.1
2 changes: 1 addition & 1 deletion version.conf
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.7.2
0.7.3

0 comments on commit dcb13de

Please sign in to comment.