From 8b96cf7fcef9d80dcab85b854544b18cfc7d1ea8 Mon Sep 17 00:00:00 2001 From: Michael Landis Date: Tue, 24 Sep 2024 09:24:29 -0700 Subject: [PATCH] chore: add example logging if heartbeat not received (#578) Adds an example demonstrating logging a message if a heartbeat isn't received in an interval of time. --- examples/MomentoExamples.sln | 6 + examples/TopicHeartbeatExample/Program.cs | 214 ++++++++++++++++++ .../TopicHeartbeatExample.csproj | 14 ++ 3 files changed, 234 insertions(+) create mode 100644 examples/TopicHeartbeatExample/Program.cs create mode 100644 examples/TopicHeartbeatExample/TopicHeartbeatExample.csproj diff --git a/examples/MomentoExamples.sln b/examples/MomentoExamples.sln index 811b6eb5..3e263e1b 100644 --- a/examples/MomentoExamples.sln +++ b/examples/MomentoExamples.sln @@ -23,6 +23,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TopicExample", "TopicExampl EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MomentoWeb", "MomentoWeb\MomentoWeb.csproj", "{DE3FDC58-9342-4B93-A4DB-C4039A235BA3}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TopicHeartbeatExample", "TopicHeartbeatExample\TopicHeartbeatExample.csproj", "{31E3111A-655E-4844-86DD-06F2B2AD9DE6}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -73,6 +75,10 @@ Global {DE3FDC58-9342-4B93-A4DB-C4039A235BA3}.Debug|Any CPU.Build.0 = Debug|Any CPU {DE3FDC58-9342-4B93-A4DB-C4039A235BA3}.Release|Any CPU.ActiveCfg = Release|Any CPU {DE3FDC58-9342-4B93-A4DB-C4039A235BA3}.Release|Any CPU.Build.0 = Release|Any CPU + {31E3111A-655E-4844-86DD-06F2B2AD9DE6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {31E3111A-655E-4844-86DD-06F2B2AD9DE6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {31E3111A-655E-4844-86DD-06F2B2AD9DE6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {31E3111A-655E-4844-86DD-06F2B2AD9DE6}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/examples/TopicHeartbeatExample/Program.cs b/examples/TopicHeartbeatExample/Program.cs new file mode 100644 index 00000000..59cd86e6 --- /dev/null +++ b/examples/TopicHeartbeatExample/Program.cs @@ -0,0 +1,214 @@ +using Microsoft.Extensions.Logging; +using Momento.Sdk; +using Momento.Sdk.Auth; +using Momento.Sdk.Config; +using Momento.Sdk.Exceptions; +using Momento.Sdk.Responses; +using System.Timers; + +namespace TopicExample; + +public class Driver +{ + private const string AuthTokenEnvVar = "MOMENTO_API_KEY"; + private const string CacheNameEnvVar = "MOMENTO_CACHE_NAME"; + private const string TopicName = "example-topic"; + private static readonly ILogger Logger; + private static readonly ILoggerFactory LoggerFactory; + private static System.Timers.Timer _heartbeatTimer; + + private static TimeSpan HeartbeatTimeout; + + static Driver() + { + LoggerFactory = InitializeLogging(); + Logger = LoggerFactory.CreateLogger(); + + // Experiment with values less than and greater than 10 + HeartbeatTimeout = TimeSpan.FromSeconds(5); + + // Initialize and configure the heartbeat timer + _heartbeatTimer = new(HeartbeatTimeout.TotalMilliseconds); + _heartbeatTimer.Elapsed += HeartbeatTimeoutHandler; + // Only trigger once if no heartbeat is received. + _heartbeatTimer.AutoReset = false; + + } + + public static async Task Main() + { + var authToken = ReadAuthToken(); + var cacheName = ReadCacheName(); + + // Set up the client + using ICacheClient client = + new CacheClient(Configurations.Laptop.V1(LoggerFactory), authToken, TimeSpan.FromSeconds(60)); + await EnsureCacheExistsAsync(client, cacheName); + using ITopicClient topicClient = new TopicClient(TopicConfigurations.Laptop.latest(LoggerFactory), authToken); + try + { + var cts = new CancellationTokenSource(); + cts.CancelAfter(30_000); + + // Subscribe and begin receiving messages + var subscriptionTask = Task.Run(async () => + { + var subscribeResponse = await topicClient.SubscribeAsync(cacheName, TopicName); + switch (subscribeResponse) + { + case TopicSubscribeResponse.Subscription subscription: + try + { + var cancellableSubscription = subscription.WithCancellationForAllEvents(cts.Token); + await foreach (var topicEvent in cancellableSubscription) + { + switch (topicEvent) + { + case TopicMessage.Binary: + Logger.LogInformation("Received unexpected binary message from topic."); + break; + case TopicMessage.Text text: + Logger.LogInformation("Received string message from topic: {message}", + text.Value); + break; + case TopicSystemEvent.Heartbeat: + Logger.LogInformation("Received heartbeat from topic."); + ResetHeartbeatTimer(); // Reset the timer on heartbeat + break; + case TopicSystemEvent.Discontinuity discontinuity: + Logger.LogInformation($"Received discontinuity from topic: {discontinuity}"); + break; + case TopicMessage.Error error: + Logger.LogInformation("Received error message from topic: {error}", + error.Message); + cts.Cancel(); + break; + } + } + } + finally + { + subscription.Dispose(); + } + + break; + case TopicSubscribeResponse.Error error: + Logger.LogInformation("Error subscribing to a topic: {error}", error.Message); + cts.Cancel(); + break; + } + }); + + // Publish messages + var publishTask = Task.Run(async () => + { + var messageCounter = 0; + while (!cts.IsCancellationRequested) + { + var publishResponse = + await topicClient.PublishAsync(cacheName, TopicName, $"message {messageCounter}"); + switch (publishResponse) + { + case TopicPublishResponse.Success: + break; + case TopicPublishResponse.Error error: + Logger.LogInformation("Error publishing a message to the topic: {error}", error.Message); + cts.Cancel(); + break; + } + + await Task.Delay(1_000); + messageCounter++; + } + }); + + // Start the heartbeat timer initially + _heartbeatTimer.Start(); + + await Task.WhenAll(subscriptionTask, publishTask); + } + finally + { + client.Dispose(); + topicClient.Dispose(); + } + } + + private static void HeartbeatTimeoutHandler(object? sender, ElapsedEventArgs e) + { + Logger.LogWarning($"No heartbeat received in the last {HeartbeatTimeout} seconds."); + } + + private static void ResetHeartbeatTimer() + { + _heartbeatTimer.Stop(); + _heartbeatTimer.Start(); + } + + private static ILoggerFactory InitializeLogging() + { + return Microsoft.Extensions.Logging.LoggerFactory.Create(builder => + { + builder.AddSimpleConsole(options => + { + options.IncludeScopes = true; + options.SingleLine = true; + options.TimestampFormat = "hh:mm:ss "; + }); + builder.SetMinimumLevel(LogLevel.Information); + }); + } + + private static ICredentialProvider ReadAuthToken() + { + try + { + return new EnvMomentoTokenProvider(AuthTokenEnvVar); + } + catch (InvalidArgumentException) + { + } + + Console.Write($"Auth token not detected in environment variable {AuthTokenEnvVar}. Enter auth token here: "); + var authToken = Console.ReadLine()!.Trim(); + + StringMomentoTokenProvider? authProvider = null; + try + { + authProvider = new StringMomentoTokenProvider(authToken); + } + catch (InvalidArgumentException e) + { + Logger.LogInformation("{}", e); + LoggerFactory.Dispose(); + Environment.Exit(1); + } + + return authProvider; + } + + private static string ReadCacheName() + { + var cacheName = Environment.GetEnvironmentVariable(CacheNameEnvVar); + return cacheName ?? "default-cache"; + } + + private static async Task EnsureCacheExistsAsync(ICacheClient client, string cacheName) + { + Logger.LogInformation("Creating cache {cacheName} if it doesn't already exist.", cacheName); + var createCacheResponse = await client.CreateCacheAsync(cacheName); + switch (createCacheResponse) + { + case CreateCacheResponse.Success: + Logger.LogInformation("Created cache {cacheName}.", cacheName); + break; + case CreateCacheResponse.CacheAlreadyExists: + Logger.LogInformation("Cache {cacheName} already exists.", cacheName); + break; + case CreateCacheResponse.Error: + Logger.LogInformation("Error creating cache: {error.Message}", cacheName); + Environment.Exit(1); + break; + } + } +} diff --git a/examples/TopicHeartbeatExample/TopicHeartbeatExample.csproj b/examples/TopicHeartbeatExample/TopicHeartbeatExample.csproj new file mode 100644 index 00000000..b9b75cbc --- /dev/null +++ b/examples/TopicHeartbeatExample/TopicHeartbeatExample.csproj @@ -0,0 +1,14 @@ + + + + Exe + net8.0 + enable + enable + + + + + + +