Skip to content

Commit

Permalink
Save messages to storage. (#97)
Browse files Browse the repository at this point in the history
Better Cancellation support
  • Loading branch information
HowardvanRooijen authored Mar 6, 2021
1 parent 1577263 commit fda5e98
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="5.0.0" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="5.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
23 changes: 20 additions & 3 deletions Solutions/Ais.Net.Receiver.Host.Console/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
namespace Ais.Net.Receiver.Host.Console
{
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

using Ais.Net.Models;
using Ais.Net.Models.Abstractions;
using Ais.Net.Receiver.Configuration;
using Ais.Net.Receiver.Receiver;
using Ais.Net.Receiver.Storage.Azure.Blob;
using Ais.Net.Receiver.Storage.Azure.Blob.Configuration;

using Microsoft.Extensions.Configuration;

Expand All @@ -26,10 +30,11 @@ public static async Task Main(string[] args)
.Build();

AisConfig aisConfig = config.GetSection("Ais").Get<AisConfig>();
var receiverHost = new ReceiverHost(aisConfig);
StorageConfig storageConfig = config.GetSection("Storage").Get<StorageConfig>();

// Write out the messages as they are received over the wire.
receiverHost.Sentences.Subscribe((sentences) => Console.WriteLine(sentences));
IStorageClient storageClient = new AzureAppendBlobStorageClient(storageConfig);

var receiverHost = new ReceiverHost(aisConfig);

// Decode teh sentences into messages, and group by the vessel by Id
IObservable<IGroupedObservable<uint, IAisMessage>> byVessel = receiverHost.Messages.GroupBy(m => m.Mmsi);
Expand All @@ -47,11 +52,23 @@ from vesselLocationAndName in vesselLocationsWithNames
{
(uint mmsi, IVesselNavigation navigation, IVesselName name) = navigationWithName;
string positionText = navigation.Position is null ? "unknown position" : $"{navigation.Position.Latitude},{navigation.Position.Longitude}";

Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"[{mmsi}: '{name.VesselName.CleanVesselName()}'] - [{positionText}] - [{navigation.CourseOverGroundDegrees ?? 0}]");
Console.ResetColor();
});

var batchBlock = new BatchBlock<string>(storageConfig.WriteBatchSize);
var actionBlock = new ActionBlock<IEnumerable<string>>(storageClient.PersistAsync);

batchBlock.LinkTo(actionBlock);

// Write out the messages as they are received over the wire.
receiverHost.Sentences.Subscribe(sentence => Console.WriteLine(sentence));

// Persist the messages as they are received over the wire.
receiverHost.Sentences.Subscribe(batchBlock.AsObserver());

var cts = new CancellationTokenSource();

Task task = receiverHost.StartAsync(cts.Token);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ namespace Ais.Net.Receiver.Storage.Azure.Blob
using global::Azure.Storage.Blobs;
using global::Azure.Storage.Blobs.Specialized;

public class StorageClient : IStorageClient
public class AzureAppendBlobStorageClient : IStorageClient
{
private readonly StorageConfig configuration;
private AppendBlobClient? appendBlobClient;
private BlobContainerClient? blobContainerClient;

public StorageClient(StorageConfig configuration)
public AzureAppendBlobStorageClient(StorageConfig configuration)
{
this.configuration = configuration;
}
Expand Down
4 changes: 2 additions & 2 deletions Solutions/Ais.Net.Receiver/Receiver/NmeaReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public NmeaReceiver(string host, int port, TimeSpan? retryPeriodicity, int retry

public async IAsyncEnumerable<string> GetAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await this.tcpClient.ConnectAsync(this.Host, this.Port);
await this.tcpClient.ConnectAsync(this.Host, this.Port, cancellationToken);
await using NetworkStream stream = this.tcpClient.GetStream();
using StreamReader reader = new(stream);

Expand All @@ -56,7 +56,7 @@ public async IAsyncEnumerable<string> GetAsync([EnumeratorCancellation] Cancella
break;
}

await Task.Delay(this.RetryPeriodicity).ConfigureAwait(false);
await Task.Delay(this.RetryPeriodicity, cancellationToken).ConfigureAwait(false);

retryAttempt++;
}
Expand Down
2 changes: 1 addition & 1 deletion Solutions/Ais.Net.Receiver/Receiver/ReceiverHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ static void ProcessLineNonAsync(string line, INmeaLineStreamProcessor lineStream

private async IAsyncEnumerable<string> GetAsync([EnumeratorCancellation]CancellationToken cancellationToken = default)
{
await foreach (string? message in this.receiver.GetAsync().WithCancellation(cancellationToken))
await foreach (string? message in this.receiver.GetAsync(cancellationToken).WithCancellation(cancellationToken))
{
if (message.IsMissingNmeaBlockTags())
{
Expand Down

0 comments on commit fda5e98

Please sign in to comment.