Skip to content

Commit

Permalink
chore: add example logging if heartbeat not received (#578)
Browse files Browse the repository at this point in the history
Adds an example demonstrating logging a message if a heartbeat isn't
received in an interval of time.
  • Loading branch information
malandis authored Sep 24, 2024
1 parent 9679eef commit 8b96cf7
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 0 deletions.
6 changes: 6 additions & 0 deletions examples/MomentoExamples.sln
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TopicExample", "TopicExampl
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MomentoWeb", "MomentoWeb\MomentoWeb.csproj", "{DE3FDC58-9342-4B93-A4DB-C4039A235BA3}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TopicHeartbeatExample", "TopicHeartbeatExample\TopicHeartbeatExample.csproj", "{31E3111A-655E-4844-86DD-06F2B2AD9DE6}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -73,6 +75,10 @@ Global
{DE3FDC58-9342-4B93-A4DB-C4039A235BA3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DE3FDC58-9342-4B93-A4DB-C4039A235BA3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DE3FDC58-9342-4B93-A4DB-C4039A235BA3}.Release|Any CPU.Build.0 = Release|Any CPU
{31E3111A-655E-4844-86DD-06F2B2AD9DE6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{31E3111A-655E-4844-86DD-06F2B2AD9DE6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{31E3111A-655E-4844-86DD-06F2B2AD9DE6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{31E3111A-655E-4844-86DD-06F2B2AD9DE6}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
214 changes: 214 additions & 0 deletions examples/TopicHeartbeatExample/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
using Microsoft.Extensions.Logging;
using Momento.Sdk;
using Momento.Sdk.Auth;
using Momento.Sdk.Config;
using Momento.Sdk.Exceptions;
using Momento.Sdk.Responses;
using System.Timers;

namespace TopicExample;

public class Driver
{
private const string AuthTokenEnvVar = "MOMENTO_API_KEY";
private const string CacheNameEnvVar = "MOMENTO_CACHE_NAME";
private const string TopicName = "example-topic";
private static readonly ILogger Logger;
private static readonly ILoggerFactory LoggerFactory;
private static System.Timers.Timer _heartbeatTimer;

private static TimeSpan HeartbeatTimeout;

static Driver()
{
LoggerFactory = InitializeLogging();
Logger = LoggerFactory.CreateLogger<Driver>();

// Experiment with values less than and greater than 10
HeartbeatTimeout = TimeSpan.FromSeconds(5);

// Initialize and configure the heartbeat timer
_heartbeatTimer = new(HeartbeatTimeout.TotalMilliseconds);
_heartbeatTimer.Elapsed += HeartbeatTimeoutHandler;
// Only trigger once if no heartbeat is received.
_heartbeatTimer.AutoReset = false;

}

public static async Task Main()
{
var authToken = ReadAuthToken();
var cacheName = ReadCacheName();

// Set up the client
using ICacheClient client =
new CacheClient(Configurations.Laptop.V1(LoggerFactory), authToken, TimeSpan.FromSeconds(60));
await EnsureCacheExistsAsync(client, cacheName);
using ITopicClient topicClient = new TopicClient(TopicConfigurations.Laptop.latest(LoggerFactory), authToken);
try
{
var cts = new CancellationTokenSource();
cts.CancelAfter(30_000);

// Subscribe and begin receiving messages
var subscriptionTask = Task.Run(async () =>
{
var subscribeResponse = await topicClient.SubscribeAsync(cacheName, TopicName);
switch (subscribeResponse)
{
case TopicSubscribeResponse.Subscription subscription:
try
{
var cancellableSubscription = subscription.WithCancellationForAllEvents(cts.Token);
await foreach (var topicEvent in cancellableSubscription)
{
switch (topicEvent)
{
case TopicMessage.Binary:
Logger.LogInformation("Received unexpected binary message from topic.");
break;
case TopicMessage.Text text:
Logger.LogInformation("Received string message from topic: {message}",
text.Value);
break;
case TopicSystemEvent.Heartbeat:
Logger.LogInformation("Received heartbeat from topic.");
ResetHeartbeatTimer(); // Reset the timer on heartbeat
break;
case TopicSystemEvent.Discontinuity discontinuity:
Logger.LogInformation($"Received discontinuity from topic: {discontinuity}");
break;
case TopicMessage.Error error:
Logger.LogInformation("Received error message from topic: {error}",
error.Message);
cts.Cancel();
break;
}
}
}
finally
{
subscription.Dispose();
}
break;
case TopicSubscribeResponse.Error error:
Logger.LogInformation("Error subscribing to a topic: {error}", error.Message);
cts.Cancel();
break;
}
});

// Publish messages
var publishTask = Task.Run(async () =>
{
var messageCounter = 0;
while (!cts.IsCancellationRequested)
{
var publishResponse =
await topicClient.PublishAsync(cacheName, TopicName, $"message {messageCounter}");
switch (publishResponse)
{
case TopicPublishResponse.Success:
break;
case TopicPublishResponse.Error error:
Logger.LogInformation("Error publishing a message to the topic: {error}", error.Message);
cts.Cancel();
break;
}
await Task.Delay(1_000);
messageCounter++;
}
});

// Start the heartbeat timer initially
_heartbeatTimer.Start();

await Task.WhenAll(subscriptionTask, publishTask);
}
finally
{
client.Dispose();
topicClient.Dispose();
}
}

private static void HeartbeatTimeoutHandler(object? sender, ElapsedEventArgs e)
{
Logger.LogWarning($"No heartbeat received in the last {HeartbeatTimeout} seconds.");
}

private static void ResetHeartbeatTimer()
{
_heartbeatTimer.Stop();
_heartbeatTimer.Start();
}

private static ILoggerFactory InitializeLogging()
{
return Microsoft.Extensions.Logging.LoggerFactory.Create(builder =>
{
builder.AddSimpleConsole(options =>
{
options.IncludeScopes = true;
options.SingleLine = true;
options.TimestampFormat = "hh:mm:ss ";
});
builder.SetMinimumLevel(LogLevel.Information);
});
}

private static ICredentialProvider ReadAuthToken()
{
try
{
return new EnvMomentoTokenProvider(AuthTokenEnvVar);
}
catch (InvalidArgumentException)
{
}

Console.Write($"Auth token not detected in environment variable {AuthTokenEnvVar}. Enter auth token here: ");
var authToken = Console.ReadLine()!.Trim();

StringMomentoTokenProvider? authProvider = null;
try
{
authProvider = new StringMomentoTokenProvider(authToken);
}
catch (InvalidArgumentException e)
{
Logger.LogInformation("{}", e);
LoggerFactory.Dispose();
Environment.Exit(1);
}

return authProvider;
}

private static string ReadCacheName()
{
var cacheName = Environment.GetEnvironmentVariable(CacheNameEnvVar);
return cacheName ?? "default-cache";
}

private static async Task EnsureCacheExistsAsync(ICacheClient client, string cacheName)
{
Logger.LogInformation("Creating cache {cacheName} if it doesn't already exist.", cacheName);
var createCacheResponse = await client.CreateCacheAsync(cacheName);
switch (createCacheResponse)
{
case CreateCacheResponse.Success:
Logger.LogInformation("Created cache {cacheName}.", cacheName);
break;
case CreateCacheResponse.CacheAlreadyExists:
Logger.LogInformation("Cache {cacheName} already exists.", cacheName);
break;
case CreateCacheResponse.Error:
Logger.LogInformation("Error creating cache: {error.Message}", cacheName);
Environment.Exit(1);
break;
}
}
}
14 changes: 14 additions & 0 deletions examples/TopicHeartbeatExample/TopicHeartbeatExample.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="6.0.0" />
<PackageReference Include="Momento.Sdk" Version="1.37.0" />
</ItemGroup>
</Project>

0 comments on commit 8b96cf7

Please sign in to comment.