From eacca73470a80e0520116520f67f89811f457b44 Mon Sep 17 00:00:00 2001 From: Howard van Rooijen Date: Tue, 23 Mar 2021 10:46:50 +0000 Subject: [PATCH] Add FileStream and NetworkStream Receiver Abstractions (#103) * Support NetworkStreamNmeaReceiver and FileStreamNmeaReceiver for online / offline demos. * Remove unused reference * Switch back to networkstream --- .../Ais.Net.Receiver.Host.Console/Program.cs | 10 ++++- .../Ais.Net.Receiver/Ais.Net.Receiver.csproj | 2 +- .../Receiver/FileStreamNmeaReceiver.cs | 43 +++++++++++++++++++ .../Net/Receiver/Receiver/INmeaReceiver.cs | 14 ++++++ ...ceiver.cs => NetworkStreamNmeaReceiver.cs} | 4 +- .../Ais/Net/Receiver/Receiver/ReceiverHost.cs | 13 ++---- 6 files changed, 72 insertions(+), 14 deletions(-) create mode 100644 Solutions/Ais.Net.Receiver/Ais/Net/Receiver/Receiver/FileStreamNmeaReceiver.cs create mode 100644 Solutions/Ais.Net.Receiver/Ais/Net/Receiver/Receiver/INmeaReceiver.cs rename Solutions/Ais.Net.Receiver/Ais/Net/Receiver/Receiver/{NmeaReceiver.cs => NetworkStreamNmeaReceiver.cs} (91%) diff --git a/Solutions/Ais.Net.Receiver.Host.Console/Program.cs b/Solutions/Ais.Net.Receiver.Host.Console/Program.cs index de72e49..9acbb68 100644 --- a/Solutions/Ais.Net.Receiver.Host.Console/Program.cs +++ b/Solutions/Ais.Net.Receiver.Host.Console/Program.cs @@ -35,7 +35,15 @@ public static async Task Main(string[] args) IStorageClient storageClient = new AzureAppendBlobStorageClient(storageConfig); - var receiverHost = new ReceiverHost(aisConfig); + INmeaReceiver receiver = new NetworkStreamNmeaReceiver( + aisConfig.Host, + aisConfig.Port, + aisConfig.RetryPeriodicity, + aisConfig.RetryAttempts); + + // INmeaReceiver receiver = new FileStreamNmeaReceiver(@"PATH-TO-RECORDING.nm4"); + + var receiverHost = new ReceiverHost(receiver); // Decode teh sentences into messages, and group by the vessel by Id IObservable> byVessel = receiverHost.Messages.GroupBy(m => m.Mmsi); diff --git a/Solutions/Ais.Net.Receiver/Ais.Net.Receiver.csproj b/Solutions/Ais.Net.Receiver/Ais.Net.Receiver.csproj index fd144cd..72c2fa9 100644 --- a/Solutions/Ais.Net.Receiver/Ais.Net.Receiver.csproj +++ b/Solutions/Ais.Net.Receiver/Ais.Net.Receiver.csproj @@ -21,7 +21,7 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/Solutions/Ais.Net.Receiver/Ais/Net/Receiver/Receiver/FileStreamNmeaReceiver.cs b/Solutions/Ais.Net.Receiver/Ais/Net/Receiver/Receiver/FileStreamNmeaReceiver.cs new file mode 100644 index 0000000..6b02a8c --- /dev/null +++ b/Solutions/Ais.Net.Receiver/Ais/Net/Receiver/Receiver/FileStreamNmeaReceiver.cs @@ -0,0 +1,43 @@ +// +// Copyright (c) Endjin Limited. All rights reserved. +// + +namespace Ais.Net.Receiver.Receiver +{ + using System; + using System.Collections.Generic; + using System.IO; + using System.Runtime.CompilerServices; + using System.Threading; + + public class FileStreamNmeaReceiver : INmeaReceiver + { + private readonly string path; + + public FileStreamNmeaReceiver(string path) + { + this.path = path; + } + + public int RetryAttemptLimit { get; } + + public TimeSpan RetryPeriodicity { get; } + + public async IAsyncEnumerable GetAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) + { + using var sr = new StreamReader(this.path); + + while (sr.Peek() >= 0) + { + if (cancellationToken.IsCancellationRequested) + { + break; + } + + string? line = await sr.ReadLineAsync().ConfigureAwait(false); + + if (line is not null) { yield return line; } + } + } + } +} \ No newline at end of file diff --git a/Solutions/Ais.Net.Receiver/Ais/Net/Receiver/Receiver/INmeaReceiver.cs b/Solutions/Ais.Net.Receiver/Ais/Net/Receiver/Receiver/INmeaReceiver.cs new file mode 100644 index 0000000..a056b5b --- /dev/null +++ b/Solutions/Ais.Net.Receiver/Ais/Net/Receiver/Receiver/INmeaReceiver.cs @@ -0,0 +1,14 @@ +// +// Copyright (c) Endjin Limited. All rights reserved. +// + +namespace Ais.Net.Receiver.Receiver +{ + using System.Collections.Generic; + using System.Threading; + + public interface INmeaReceiver + { + IAsyncEnumerable GetAsync(CancellationToken cancellationToken = default); + } +} \ No newline at end of file diff --git a/Solutions/Ais.Net.Receiver/Ais/Net/Receiver/Receiver/NmeaReceiver.cs b/Solutions/Ais.Net.Receiver/Ais/Net/Receiver/Receiver/NetworkStreamNmeaReceiver.cs similarity index 91% rename from Solutions/Ais.Net.Receiver/Ais/Net/Receiver/Receiver/NmeaReceiver.cs rename to Solutions/Ais.Net.Receiver/Ais/Net/Receiver/Receiver/NetworkStreamNmeaReceiver.cs index dc8f4d2..2323016 100644 --- a/Solutions/Ais.Net.Receiver/Ais/Net/Receiver/Receiver/NmeaReceiver.cs +++ b/Solutions/Ais.Net.Receiver/Ais/Net/Receiver/Receiver/NetworkStreamNmeaReceiver.cs @@ -12,11 +12,11 @@ namespace Ais.Net.Receiver.Receiver using System.Threading; using System.Threading.Tasks; - public class NmeaReceiver + public class NetworkStreamNmeaReceiver : INmeaReceiver { private readonly TcpClient tcpClient = new(); - public NmeaReceiver(string host, int port, TimeSpan? retryPeriodicity, int retryAttemptLimit = 100) + public NetworkStreamNmeaReceiver(string host, int port, TimeSpan? retryPeriodicity, int retryAttemptLimit = 100) { this.Host = host; this.Port = port; diff --git a/Solutions/Ais.Net.Receiver/Ais/Net/Receiver/Receiver/ReceiverHost.cs b/Solutions/Ais.Net.Receiver/Ais/Net/Receiver/Receiver/ReceiverHost.cs index 3f0d425..64ba182 100644 --- a/Solutions/Ais.Net.Receiver/Ais/Net/Receiver/Receiver/ReceiverHost.cs +++ b/Solutions/Ais.Net.Receiver/Ais/Net/Receiver/Receiver/ReceiverHost.cs @@ -13,7 +13,6 @@ namespace Ais.Net.Receiver.Receiver using System.Threading.Tasks; using Ais.Net.Models.Abstractions; - using Ais.Net.Receiver.Configuration; using Ais.Net.Receiver.Parser; using Corvus.Retry; @@ -22,19 +21,13 @@ namespace Ais.Net.Receiver.Receiver public class ReceiverHost { - private readonly AisConfig configuration; - private readonly NmeaReceiver receiver; + private readonly INmeaReceiver receiver; private readonly Subject sentences = new(); private readonly Subject messages = new(); - public ReceiverHost(AisConfig configuration) + public ReceiverHost(INmeaReceiver receiver) { - this.configuration = configuration; - this.receiver = new NmeaReceiver( - this.configuration.Host, - this.configuration.Port, - this.configuration.RetryPeriodicity, - this.configuration.RetryAttempts); + this.receiver = receiver; } public IObservable Sentences => this.sentences;