Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
mayuki committed Sep 20, 2024
1 parent 7303973 commit f7cd9f2
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,9 @@ public abstract class StreamingHubClientBase<TStreamingHub, TReceiver> : IStream
readonly Dictionary<int, SendOrPostCallback> postCallbackCache = new();

int messageIdSequence = 0;
int state = (int)State.Uninitialized;

enum State
{
Uninitialized,
Connected,
Disconnecting,
Disconnected,
Disposed,
}
bool disposed;
bool disconnected;
int cleanupSentinel = 0; // 0 = false, 1 = true

readonly Channel<StreamingHubPayload> writerQueue = Channel.CreateUnbounded<StreamingHubPayload>(new UnboundedChannelOptions() { SingleReader = true, SingleWriter = false, AllowSynchronousContinuations = false });
Task? writerTask;
Expand Down Expand Up @@ -299,8 +292,6 @@ static Method<StreamingHubPayload, StreamingHubPayload> CreateConnectMethod(stri

async Task StartSubscribe(SynchronizationContext? syncContext, Task<bool> firstMoveNext, CancellationToken subscriptionToken)
{
state = (int)State.Connected;

var disconnectionReason = new DisconnectionReason(DisconnectionType.CompletedNormally, null);
writerTask = RunWriterLoopAsync(subscriptionToken);

Expand Down Expand Up @@ -356,7 +347,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task<bool> firstM
}
finally
{
state = (int)State.Disconnected;
disconnected = true;

try
{
Expand Down Expand Up @@ -656,15 +647,15 @@ protected Task WriteClientResultResponseMessageForErrorAsync(int methodId, Guid

void ThrowIfDisconnected()
{
if (state != (int)State.Connected)
if (disconnected)
{
throw new RpcException(new Status(StatusCode.Unavailable, $"The StreamingHubClient has already been disconnected from the server."));
}
}

void ThrowIfDisposed()
{
if (state == (int)State.Disposed)
if (disposed)
{
throw new ObjectDisposedException("StreamingHubClient", $"The StreamingHubClient has already been disconnected from the server.");
}
Expand All @@ -678,15 +669,14 @@ Task<DisconnectionReason> IStreamingHubClient.WaitForDisconnectAsync()

public async Task DisposeAsync()
{
if (state == (int)State.Disposed) return;
state = (int)State.Disposed;

if (disposed) return;
disposed = true;
await CleanupAsync(true).ConfigureAwait(false);
}

async ValueTask CleanupAsync(bool waitSubscription)
{
if (Interlocked.CompareExchange(ref state, (int)State.Disconnected, (int)State.Disconnecting) != (int)State.Disconnecting)
if (Interlocked.CompareExchange(ref cleanupSentinel, 1, 0) != 0)
{
return;
}
Expand Down
28 changes: 9 additions & 19 deletions src/MagicOnion.Client/StreamingHubClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,9 @@ public abstract class StreamingHubClientBase<TStreamingHub, TReceiver> : IStream
readonly Dictionary<int, SendOrPostCallback> postCallbackCache = new();

int messageIdSequence = 0;
int state = (int)State.Uninitialized;

enum State
{
Uninitialized,
Connected,
Disconnecting,
Disconnected,
Disposed,
}
bool disposed;
bool disconnected;
int cleanupSentinel = 0; // 0 = false, 1 = true

readonly Channel<StreamingHubPayload> writerQueue = Channel.CreateUnbounded<StreamingHubPayload>(new UnboundedChannelOptions() { SingleReader = true, SingleWriter = false, AllowSynchronousContinuations = false });
Task? writerTask;
Expand Down Expand Up @@ -299,8 +292,6 @@ static Method<StreamingHubPayload, StreamingHubPayload> CreateConnectMethod(stri

async Task StartSubscribe(SynchronizationContext? syncContext, Task<bool> firstMoveNext, CancellationToken subscriptionToken)
{
state = (int)State.Connected;

var disconnectionReason = new DisconnectionReason(DisconnectionType.CompletedNormally, null);
writerTask = RunWriterLoopAsync(subscriptionToken);

Expand Down Expand Up @@ -356,7 +347,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task<bool> firstM
}
finally
{
state = (int)State.Disconnected;
disconnected = true;

try
{
Expand Down Expand Up @@ -656,15 +647,15 @@ protected Task WriteClientResultResponseMessageForErrorAsync(int methodId, Guid

void ThrowIfDisconnected()
{
if (state != (int)State.Connected)
if (disconnected)
{
throw new RpcException(new Status(StatusCode.Unavailable, $"The StreamingHubClient has already been disconnected from the server."));
}
}

void ThrowIfDisposed()
{
if (state == (int)State.Disposed)
if (disposed)
{
throw new ObjectDisposedException("StreamingHubClient", $"The StreamingHubClient has already been disconnected from the server.");
}
Expand All @@ -678,15 +669,14 @@ Task<DisconnectionReason> IStreamingHubClient.WaitForDisconnectAsync()

public async Task DisposeAsync()
{
if (state == (int)State.Disposed) return;
state = (int)State.Disposed;

if (disposed) return;
disposed = true;
await CleanupAsync(true).ConfigureAwait(false);
}

async ValueTask CleanupAsync(bool waitSubscription)
{
if (Interlocked.CompareExchange(ref state, (int)State.Disconnected, (int)State.Disconnecting) != (int)State.Disconnecting)
if (Interlocked.CompareExchange(ref cleanupSentinel, 1, 0) != 0)
{
return;
}
Expand Down

0 comments on commit f7cd9f2

Please sign in to comment.