Skip to content

Commit

Permalink
Merge pull request #262 from Avanade/feat/newcartridges
Browse files Browse the repository at this point in the history
fix(KafkaConsumer): includes messagehandler loop.
  • Loading branch information
lucianareginalino authored Jul 29, 2024
2 parents 9f14f84 + 459de67 commit 42ebdd0
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 31 deletions.
2 changes: 1 addition & 1 deletion src/Liquid.Core/Interfaces/ILiquidConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public interface ILiquidConsumer<TEntity>
/// <summary>
/// Initialize handler for consume <typeparamref name="TEntity"/> messages from topic or queue.
/// </summary>
void RegisterMessageHandler(CancellationToken cancellationToken = default);
Task RegisterMessageHandler(CancellationToken cancellationToken = default);

/// <summary>
/// Defining the message processing function.
Expand Down
2 changes: 1 addition & 1 deletion src/Liquid.Core/Liquid.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<Copyright>Avanade 2019</Copyright>
<PackageProjectUrl>https://github.com/Avanade/Liquid-Application-Framework</PackageProjectUrl>
<PackageIcon>logo.png</PackageIcon>
<Version>8.0.0-beta-08</Version>
<Version>8.0.0-beta-09</Version>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<ProjectGuid>{C33A89FC-4F4D-4274-8D0F-29456BA8F76B}</ProjectGuid>
<IsPackable>true</IsPackable>
Expand Down
68 changes: 49 additions & 19 deletions src/Liquid.Messaging.Kafka/KafkaConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
using Confluent.Kafka;
using Liquid.Core.Extensions;
using Liquid.Core.Utils;
using Liquid.Core.Entities;
using Liquid.Core.Exceptions;
using Liquid.Core.Interfaces;
using Liquid.Messaging.Kafka.Extensions;
using Liquid.Messaging.Kafka.Settings;
using Microsoft.Extensions.Options;
using System;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Liquid.Core.Entities;
using Microsoft.Extensions.Options;

namespace Liquid.Messaging.Kafka
{
Expand Down Expand Up @@ -41,35 +39,64 @@ public KafkaConsumer(IKafkaFactory kafkaFactory, IOptions<KafkaSettings> kafkaSe
}

///<inheritdoc/>
public void RegisterMessageHandler()
public Task RegisterMessageHandler(CancellationToken cancellationToken = default)
{
if (ConsumeMessageAsync is null)
{
throw new NotImplementedException($"The {nameof(ProcessErrorAsync)} action must be added to class.");
}

_client = _factory.GetConsumer(_settings);
_client.Subscribe(_settings.Topic);

var result = _client.Consume();

MessageHandler(result, new CancellationToken()).GetAwaiter();
var task = Task.Run( async () =>
{
using (_client)
{
_client.Subscribe(_settings.Topic);

await MessageHandler(cancellationToken);
}
});

return task;
}

/// <summary>
/// Process incoming messages.
/// </summary>
/// <param name="deliverEvent">Message to be processed.</param>
/// <param name="cancellationToken"> Propagates notification that operations should be canceled.</param>
protected async Task MessageHandler(ConsumeResult<Ignore, string> deliverEvent, CancellationToken cancellationToken)
protected async Task MessageHandler(CancellationToken cancellationToken)
{
try
{
await ConsumeMessageAsync(GetEventArgs(deliverEvent), cancellationToken);

if (!_settings.EnableAutoCommit)
while (!cancellationToken.IsCancellationRequested)
{
_client.Commit(deliverEvent);
var deliverEvent = _client.Consume();

_ = Task.Run(async () =>
{
try
{
await ConsumeMessageAsync(GetEventArgs(deliverEvent), cancellationToken);

if (!_settings.EnableAutoCommit)
{
_client.Commit(deliverEvent);
}
}
catch (Exception ex)
{
_client.Close();

var errorArgs = new ConsumerErrorEventArgs
{
Exception = ex,
};

await ProcessErrorAsync(errorArgs);
}
});
}
}
catch (Exception ex)
Expand All @@ -82,11 +109,14 @@ private ConsumerMessageEventArgs<TEntity> GetEventArgs(ConsumeResult<Ignore, str
{
var headers = deliverEvent.Message.Headers.GetCustomHeaders();

var data = headers.Count > 0 && headers["ContentType"]?.ToString() == CommonExtensions.GZipContentType
? Encoding.UTF8.GetBytes(deliverEvent.Message.Value).GzipDecompress().ParseJson<TEntity>()
: deliverEvent.Message.Value.ParseJson<TEntity>();
var options = new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true
};

var data = JsonSerializer.Deserialize<TEntity>(deliverEvent.Message.Value, options);

return new ConsumerMessageEventArgs<TEntity> { Data = data, Headers = headers };
}
}
}
}
7 changes: 5 additions & 2 deletions src/Liquid.Messaging.Kafka/Liquid.Messaging.Kafka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<Copyright>Avanade 2019</Copyright>
<PackageProjectUrl>https://github.com/Avanade/Liquid-Application-Framework</PackageProjectUrl>
<PackageIcon>logo.png</PackageIcon>
<Version>8.0.0-beta-03</Version>
<Version>8.0.0-beta-04</Version>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Description>
The Liquid.Messaging.Kafka provides producer and consumer patterns to allow the send and consumption of Messaging inside your microservice.
Expand All @@ -28,7 +28,10 @@

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.4.0" />
<PackageReference Include="Liquid.Core" Version="8.0.0-beta-06" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="8.0.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Liquid.Core\Liquid.Core.csproj" />
</ItemGroup>
</Project>
27 changes: 19 additions & 8 deletions test/Liquid.Messaging.Kafka.Tests/kafkaConsumerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ public void RegisterMessageHandler_WhenRegisteredSucessfully_BasicConsumeReceive
}

[Fact]
public void RegisterMessageHandler_WhenRegistereFail_ThrowException()
public async Task RegisterMessageHandler_WhenRegistereFail_ThrowException()
{
var messageReceiver = Substitute.For<IConsumer<Ignore, string>>();
_factory.GetConsumer(Arg.Any<KafkaSettings>()).Returns(messageReceiver);

Assert.Throws<NotImplementedException>(() => RegisterMessageHandler());
await Assert.ThrowsAsync<NotImplementedException>(() => RegisterMessageHandler(new CancellationToken()));
}


Expand All @@ -68,15 +68,17 @@ public async Task MessageHandler_WhenProcessExecutedSucessfully()
message.Message.Headers = new Headers();

var messageReceiver = Substitute.For<IConsumer<Ignore, string>>();

messageReceiver.Consume(Arg.Any<CancellationToken>()).Returns(message);

_factory.GetConsumer(Arg.Any<KafkaSettings>()).Returns(messageReceiver);

ConsumeMessageAsync += ProcessMessageAsyncMock;


var sut = MessageHandler(message, new CancellationToken());
var sut = RegisterMessageHandler(new CancellationToken());

await sut;

Assert.True(sut.IsCompletedSuccessfully);
Assert.NotNull(sut);
}

[Fact]
Expand All @@ -93,13 +95,16 @@ public async Task MessageHandler_WhenProcessExecutionFail_ThrowException()
message.Message.Headers = new Headers();

var messageReceiver = Substitute.For<IConsumer<Ignore, string>>();
messageReceiver.Consume(Arg.Any<CancellationToken>()).Returns(message);
_factory.GetConsumer(Arg.Any<KafkaSettings>()).Returns(messageReceiver);

ConsumeMessageAsync += ProcessMessageAsyncMock;

var task = MessageHandler(message, new CancellationToken());
ProcessErrorAsync += ProcessErrorAsyncMock;

var sut = MessageHandler(new CancellationToken());

await Assert.ThrowsAsync<MessagingConsumerException>(() => task);
await Assert.ThrowsAsync<MessagingConsumerException>(() => sut);

}

Expand All @@ -111,6 +116,12 @@ private async Task ProcessMessageAsyncMock(ConsumerMessageEventArgs<MessageMock>
throw new Exception();
}

private async Task ProcessErrorAsyncMock(ConsumerErrorEventArgs args)
{
throw args.Exception;
}


private bool RegisterHandleMock()
{
try
Expand Down

0 comments on commit 42ebdd0

Please sign in to comment.