Skip to content

Commit

Permalink
Refactor out the TCP Client based stream reader so alternative implem…
Browse files Browse the repository at this point in the history
…entations can be used. (#237)
  • Loading branch information
HowardvanRooijen authored Feb 23, 2025
1 parent 9af47d9 commit 626dc51
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public static async Task Main()
if (aisConfig.LoggerVerbosity == LoggerVerbosity.Minimal)
{
receiverHost.GetStreamStatistics(aisConfig.StatisticsPeriodicity)
.Subscribe(statistics => System.Console.WriteLine($"{DateTime.UtcNow.ToUniversalTime()}: Sentences: {statistics.Sentence} | Messages: {statistics.Message} | Errors: {statistics.Error}"));
.Subscribe(statistics =>
System.Console.WriteLine($"{DateTime.UtcNow.ToUniversalTime()}: Sentences: {statistics.Sentence} | Messages: {statistics.Message} | Errors: {statistics.Error}"));
}

if (aisConfig.LoggerVerbosity == LoggerVerbosity.Normal)
Expand Down
43 changes: 21 additions & 22 deletions Solutions/Ais.Net.Receiver.Host.Console/packages.lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,34 +95,34 @@
},
"Azure.Core": {
"type": "Transitive",
"resolved": "1.43.0",
"contentHash": "XE6GHvFAv0djbzNSC3jJPDytcDRmg2CUYlh4V4HjIjX9xLP90OYDPz1KyF02qJJJTF9wLuG4C0XIQm22MGkCww==",
"resolved": "1.44.1",
"contentHash": "YyznXLQZCregzHvioip07/BkzjuWNXogJEVz9T5W6TwjNr17ax41YGzYMptlo2G10oLCuVPoyva62y0SIRDixg==",
"dependencies": {
"Microsoft.Bcl.AsyncInterfaces": "6.0.0",
"System.ClientModel": "1.0.0",
"System.ClientModel": "1.1.0",
"System.Diagnostics.DiagnosticSource": "6.0.1",
"System.Memory.Data": "1.0.2",
"System.Memory.Data": "6.0.0",
"System.Numerics.Vectors": "4.5.0",
"System.Text.Encodings.Web": "6.0.0",
"System.Text.Json": "6.0.9",
"System.Text.Json": "6.0.10",
"System.Threading.Tasks.Extensions": "4.5.4"
}
},
"Azure.Storage.Blobs": {
"type": "Transitive",
"resolved": "12.22.1",
"contentHash": "x0TZRhkLQwVf+BYjFJybRu0G8OdIXm0mYZJUgUX2I27DHxHmW6+qR4q3VsbOKYT1r/CFwDKBt7wDVohr14k4bQ==",
"resolved": "12.23.0",
"contentHash": "wokJ5KX/iViQQ32xyCu69+Ter0aR4B9QQ+oR9NCpc/WPIanxnDErrmFfdmE7K8ZdccjHkvE/wEnqJxaF1+5wFg==",
"dependencies": {
"Azure.Storage.Common": "12.21.0",
"System.Text.Json": "6.0.9"
"Azure.Storage.Common": "12.22.0",
"System.Text.Json": "6.0.10"
}
},
"Azure.Storage.Common": {
"type": "Transitive",
"resolved": "12.21.0",
"contentHash": "4DosxjTeu1BpRJr8iPuUQ0QMGgLHreErKy1DdqsqABkwA5FzzRr/fUD+IfxTpCyi7uhLhcgz5FmDdts+uyrvpQ==",
"resolved": "12.22.0",
"contentHash": "0Vm30bRpQ0fcswB0xQMhKAOSXnRygnF2f/029uPaIDLaj1/yfX4jmU0fFjJe9ojGEj/vlAmsApCEOyL9if6zHg==",
"dependencies": {
"Azure.Core": "1.43.0",
"Azure.Core": "1.44.1",
"System.IO.Hashing": "6.0.0"
}
},
Expand Down Expand Up @@ -206,11 +206,11 @@
},
"System.ClientModel": {
"type": "Transitive",
"resolved": "1.0.0",
"contentHash": "I3CVkvxeqFYjIVEP59DnjbeoGNfo/+SZrCLpRz2v/g0gpCHaEMPtWSY0s9k/7jR1rAsLNg2z2u1JRB76tPjnIw==",
"resolved": "1.1.0",
"contentHash": "UocOlCkxLZrG2CKMAAImPcldJTxeesHnHGHwhJ0pNlZEvEXcWKuQvVOER2/NiOkJGRJk978SNdw3j6/7O9H1lg==",
"dependencies": {
"System.Memory.Data": "1.0.2",
"System.Text.Json": "4.7.2"
"System.Text.Json": "6.0.9"
}
},
"System.Diagnostics.DiagnosticSource": {
Expand Down Expand Up @@ -241,11 +241,10 @@
},
"System.Memory.Data": {
"type": "Transitive",
"resolved": "1.0.2",
"contentHash": "JGkzeqgBsiZwKJZ1IxPNsDFZDhUvuEdX8L8BDC8N3KOj+6zMcNU28CNN59TpZE/VJYy9cP+5M+sbxtWJx3/xtw==",
"resolved": "6.0.0",
"contentHash": "ntFHArH3I4Lpjf5m4DCXQHJuGwWPNVJPaAvM95Jy/u+2Yzt2ryiyIN04LAogkjP9DeRcEOiviAjQotfmPq/FrQ==",
"dependencies": {
"System.Text.Encodings.Web": "4.7.2",
"System.Text.Json": "4.6.0"
"System.Text.Json": "6.0.0"
}
},
"System.Numerics.Vectors": {
Expand Down Expand Up @@ -273,8 +272,8 @@
},
"System.Text.Json": {
"type": "Transitive",
"resolved": "6.0.9",
"contentHash": "2j16oUgtIzl7Xtk7demG0i/v5aU/ZvULcAnJvPb63U3ZhXJ494UYcxuEj5Fs49i3XDrk5kU/8I+6l9zRCw3cJw==",
"resolved": "6.0.10",
"contentHash": "NSB0kDipxn2ychp88NXWfFRFlmi1bst/xynOutbnpEfRCT9JZkZ7KOmF/I/hNKo2dILiMGnqblm+j1sggdLB9g==",
"dependencies": {
"System.Runtime.CompilerServices.Unsafe": "6.0.0",
"System.Text.Encodings.Web": "6.0.0"
Expand All @@ -298,7 +297,7 @@
"type": "Project",
"dependencies": {
"Ais.Net.Receiver": "[1.0.0, )",
"Azure.Storage.Blobs": "[12.22.1, )"
"Azure.Storage.Blobs": "[12.23.0, )"
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion Solutions/Ais.Net.Receiver.Host.Console/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"retryPeriodicity": "00:00:00:01"
},
"Storage": {
"enableCapture": true,
"enableCapture": false,
"connectionString": "",
"containerName": "nmea-ais",
"writeBatchSize": 500
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// <copyright file="INmeaStreamReader.cs" company="Endjin Limited">
// Copyright (c) Endjin Limited. All rights reserved.
// </copyright>

namespace Ais.Net.Receiver.Receiver;

using System;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Abstracts network stream reading operations for NMEA messages
/// </summary>
public interface INmeaStreamReader : IAsyncDisposable
{
/// <summary>
/// Establishes a connection to the specified host and port
/// </summary>
Task ConnectAsync(string host, int port, CancellationToken cancellationToken);

/// <summary>
/// Reads a line of text asynchronously
/// </summary>
Task<string?> ReadLineAsync(CancellationToken cancellationToken);

/// <summary>
/// Gets whether data is available to be read
/// </summary>
bool DataAvailable { get; }

/// <summary>
/// Gets whether the connection is established
/// </summary>
bool Connected { get; }
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
// <copyright file="NmeaReceiver.cs" company="Endjin Limited">
// <copyright file="NetworkStreamNmeaReceiver.cs" company="Endjin Limited">
// Copyright (c) Endjin Limited. All rights reserved.
// </copyright>

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Sockets;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -15,29 +13,33 @@ namespace Ais.Net.Receiver.Receiver;

public class NetworkStreamNmeaReceiver : INmeaReceiver
{
private readonly INmeaStreamReader nmeaStreamReader;

public NetworkStreamNmeaReceiver(string host, int port, TimeSpan? retryPeriodicity, int retryAttemptLimit = 100)
: this(new TcpClientNmeaStreamReader(), host, port, retryPeriodicity, retryAttemptLimit)
{
}

public NetworkStreamNmeaReceiver(INmeaStreamReader reader, string host, int port, TimeSpan? retryPeriodicity, int retryAttemptLimit = 100)
{
this.Host = host;
this.Port = port;
this.RetryPeriodicity = (retryPeriodicity ?? TimeSpan.FromSeconds(1));
this.RetryPeriodicity = retryPeriodicity ?? TimeSpan.FromSeconds(1);
this.RetryAttemptLimit = retryAttemptLimit;
this.nmeaStreamReader = reader ?? throw new ArgumentNullException(nameof(reader));
}

public string Host { get; }

public int Port { get; }

public int RetryAttemptLimit { get; }

public TimeSpan RetryPeriodicity { get; }

//public IAsyncEnumerable<string> GetAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
// We still provide the IAsyncEnumerable API for backwards compatibility.
public IAsyncEnumerable<string> GetAsync(CancellationToken cancellationToken = default)
{
// We're letting Rx handle the retries for us. Since the rest of the code is currently written
// to assume we return an IAsyncEnumerable (which we used to) we convert to that, but it's now
// really all Rx. And since I think it's Rx above us too, we can probably remove IAsyncEnumerable
// from the picture completely. This is all reactive stuff, so I don't think it really belongs.
return this.GetObservable(cancellationToken).ToAsyncEnumerable();
}

Expand All @@ -50,22 +52,19 @@ public IObservable<string> GetObservable(CancellationToken cancellationToken = d

while (!mergedToken.IsCancellationRequested)
{
// Seems like we need a new one each time we try to connect, because if we reuse
// the previous TcpClient after a failure, it tells us it's disposed even if we
// didn't dispose it directly. (Perhaps disposing the NetworkStream has that effect?)
using TcpClient tcpClient = new();
await tcpClient.ConnectAsync(this.Host, this.Port, mergedToken);
await using NetworkStream stream = tcpClient.GetStream();
using StreamReader reader = new(stream);
await this.nmeaStreamReader.ConnectAsync(this.Host, this.Port, mergedToken);

int retryAttempt = 0;

while (tcpClient.Connected)
while (this.nmeaStreamReader.Connected)
{
while (stream.DataAvailable && !mergedToken.IsCancellationRequested)
while (this.nmeaStreamReader.DataAvailable && !mergedToken.IsCancellationRequested)
{
string? line = await reader.ReadLineAsync(mergedToken).ConfigureAwait(false);
if (line is not null) { obs.OnNext(line); }
string? line = await this.nmeaStreamReader.ReadLineAsync(mergedToken).ConfigureAwait(false);
if (line is not null)
{
obs.OnNext(line);
}
retryAttempt = 0;
}

Expand All @@ -75,21 +74,11 @@ public IObservable<string> GetObservable(CancellationToken cancellationToken = d
}

await Task.Delay(this.RetryPeriodicity, mergedToken).ConfigureAwait(false);

retryAttempt++;
}

// Sometimes if the network connection drops, the TcpClient will just calmly set its
// Connected property to false, and it won't throw an exception. So we need a non-exception
// retry loop. If we hit this point we just go round the outer try loop one more time.
// (It's quite likely if we hit this point that the very next thing to happen will
// be that the attempt to reconnect fails with an exception, but at that point the
// Rx-based retry will save us.
}
});

// Let Rx handle the retries for us in the event of a failure that produces
// an exception.
return withoutRetry.Retry();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// <copyright file="TcpClientNmeaStreamReader.cs" company="Endjin Limited">
// Copyright (c) Endjin Limited. All rights reserved.
// </copyright>

namespace Ais.Net.Receiver.Receiver;

using System.IO;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

public class TcpClientNmeaStreamReader : INmeaStreamReader
{
private TcpClient? tcpClient;
private NetworkStream? stream;
private StreamReader? reader;

public bool DataAvailable => this.stream?.DataAvailable ?? false;

public bool Connected => this.tcpClient?.Connected ?? false;

public async Task ConnectAsync(string host, int port, CancellationToken cancellationToken)
{
this.tcpClient = new TcpClient();
await this.tcpClient.ConnectAsync(host, port, cancellationToken);
this.stream = this.tcpClient.GetStream();
this.reader = new StreamReader(this.stream);
}

public async Task<string?> ReadLineAsync(CancellationToken cancellationToken)
{
return this.reader is not null
? await this.reader.ReadLineAsync(cancellationToken).ConfigureAwait(false)
: null;
}

public async ValueTask DisposeAsync()
{
this.reader?.Dispose();

if (this.stream is not null)
{
await this.stream.DisposeAsync();
}

this.tcpClient?.Dispose();
}
}

0 comments on commit 626dc51

Please sign in to comment.