Skip to content

Commit

Permalink
set consumer timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
stormaref committed Apr 17, 2024
1 parent 8bc9c64 commit 61e9c93
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 2 deletions.
2 changes: 2 additions & 0 deletions KafkaStorm.Test/TestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ private IServiceCollection ConfigureServices()

factory.AddConsumers(crf =>
{
crf.SetConsumingPeriod(5);

crf.AddConsumer<HelloConsumer, HelloEvent>(
new ConsumerConfig {BootstrapServers = "localhost:29092", GroupId = "TestGroup"}, "my-topic");
});
Expand Down
2 changes: 1 addition & 1 deletion KafkaStorm/KafkaStorm.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<Nullable>enable</Nullable>
<LangVersion>latest</LangVersion>
<Authors>ArefAzizian</Authors>
<Version>8.0.4</Version>
<Version>8.0.5</Version>
<PackageId>KafKaStorm</PackageId>
<LangVersion>latest</LangVersion>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
Expand Down
10 changes: 10 additions & 0 deletions KafkaStorm/Registration/ConsumerRegistrationFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class ConsumerRegistrationFactory
{
internal static Dictionary<string, ConsumerConfig> ConsumerConfigs = null!;
internal static Dictionary<string, string> ConsumerTopics = null!;
internal static int ConsumingPeriod = 10;
private readonly IServiceCollection _serviceCollection;

public ConsumerRegistrationFactory(IServiceCollection serviceCollection)
Expand Down Expand Up @@ -42,4 +43,13 @@ public void AddConsumer<TConsumer, TMessage>(ConsumerConfig config, string? topi
_serviceCollection.AddTransient<IConsumer<TMessage>, TConsumer>();
_serviceCollection.AddHostedService<ConsumerHostedService<TMessage>>();
}

/// <summary>
/// Set consuming timeout (period for checking for new messages)
/// </summary>
/// <param name="period">period in milliseconds</param>
public void SetConsumingPeriod(int period = 10)
{
ConsumingPeriod = period;
}
}
2 changes: 1 addition & 1 deletion KafkaStorm/Services/ConsumerHostedService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public Task StartAsync(CancellationToken cancellationToken)
private void Handle(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;
var result = _consumer.Consume(1);
var result = _consumer.Consume(ConsumerRegistrationFactory.ConsumingPeriod);
var message = JsonSerializer.Deserialize<TMessage>(result.Message.Value) ??
throw new MessageNullException<TMessage>();
_myConsumer.Handle(message, cancellationToken);
Expand Down

0 comments on commit 61e9c93

Please sign in to comment.