Skip to content

Commit

Permalink
Add FileStream and NetworkStream Receiver Abstractions (#103)
Browse files Browse the repository at this point in the history
* Support NetworkStreamNmeaReceiver and FileStreamNmeaReceiver for online / offline demos.

* Remove unused reference

* Switch back to networkstream
  • Loading branch information
HowardvanRooijen authored Mar 23, 2021
1 parent 9f2e873 commit eacca73
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 14 deletions.
10 changes: 9 additions & 1 deletion Solutions/Ais.Net.Receiver.Host.Console/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,15 @@ public static async Task Main(string[] args)

IStorageClient storageClient = new AzureAppendBlobStorageClient(storageConfig);

var receiverHost = new ReceiverHost(aisConfig);
INmeaReceiver receiver = new NetworkStreamNmeaReceiver(
aisConfig.Host,
aisConfig.Port,
aisConfig.RetryPeriodicity,
aisConfig.RetryAttempts);

// INmeaReceiver receiver = new FileStreamNmeaReceiver(@"PATH-TO-RECORDING.nm4");

var receiverHost = new ReceiverHost(receiver);

// Decode teh sentences into messages, and group by the vessel by Id
IObservable<IGroupedObservable<uint, IAisMessage>> byVessel = receiverHost.Messages.GroupBy(m => m.Mmsi);
Expand Down
2 changes: 1 addition & 1 deletion Solutions/Ais.Net.Receiver/Ais.Net.Receiver.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Corvus.Retry" Version="1.0.1" />
<PackageReference Include="Corvus.Retry" Version="1.0.2" />
<PackageReference Include="Endjin.RecommendedPractices" Version="1.2.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// <copyright file="NmeaReceiver.cs" company="Endjin Limited">
// Copyright (c) Endjin Limited. All rights reserved.
// </copyright>

namespace Ais.Net.Receiver.Receiver
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;

public class FileStreamNmeaReceiver : INmeaReceiver
{
private readonly string path;

public FileStreamNmeaReceiver(string path)
{
this.path = path;
}

public int RetryAttemptLimit { get; }

public TimeSpan RetryPeriodicity { get; }

public async IAsyncEnumerable<string> GetAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
using var sr = new StreamReader(this.path);

while (sr.Peek() >= 0)
{
if (cancellationToken.IsCancellationRequested)
{
break;
}

string? line = await sr.ReadLineAsync().ConfigureAwait(false);

if (line is not null) { yield return line; }
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// <copyright file="NmeaReceiver.cs" company="Endjin Limited">
// Copyright (c) Endjin Limited. All rights reserved.
// </copyright>

namespace Ais.Net.Receiver.Receiver
{
using System.Collections.Generic;
using System.Threading;

public interface INmeaReceiver
{
IAsyncEnumerable<string> GetAsync(CancellationToken cancellationToken = default);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ namespace Ais.Net.Receiver.Receiver
using System.Threading;
using System.Threading.Tasks;

public class NmeaReceiver
public class NetworkStreamNmeaReceiver : INmeaReceiver
{
private readonly TcpClient tcpClient = new();

public NmeaReceiver(string host, int port, TimeSpan? retryPeriodicity, int retryAttemptLimit = 100)
public NetworkStreamNmeaReceiver(string host, int port, TimeSpan? retryPeriodicity, int retryAttemptLimit = 100)
{
this.Host = host;
this.Port = port;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ namespace Ais.Net.Receiver.Receiver
using System.Threading.Tasks;

using Ais.Net.Models.Abstractions;
using Ais.Net.Receiver.Configuration;
using Ais.Net.Receiver.Parser;

using Corvus.Retry;
Expand All @@ -22,19 +21,13 @@ namespace Ais.Net.Receiver.Receiver

public class ReceiverHost
{
private readonly AisConfig configuration;
private readonly NmeaReceiver receiver;
private readonly INmeaReceiver receiver;
private readonly Subject<string> sentences = new();
private readonly Subject<IAisMessage> messages = new();

public ReceiverHost(AisConfig configuration)
public ReceiverHost(INmeaReceiver receiver)
{
this.configuration = configuration;
this.receiver = new NmeaReceiver(
this.configuration.Host,
this.configuration.Port,
this.configuration.RetryPeriodicity,
this.configuration.RetryAttempts);
this.receiver = receiver;
}

public IObservable<string> Sentences => this.sentences;
Expand Down

0 comments on commit eacca73

Please sign in to comment.