Skip to content

Commit

Permalink
Merge pull request #838 from Cysharp/feature/ThrowExceptionAfterDisco…
Browse files Browse the repository at this point in the history
…nnected

Throws RpcException instead of ObjectDisposedException when the client is disconnected.
  • Loading branch information
mayuki authored Sep 20, 2024
2 parents 68763f1 + f7cd9f2 commit 31cd685
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,10 @@ public abstract class StreamingHubClientBase<TStreamingHub, TReceiver> : IStream
readonly CancellationTokenSource subscriptionCts = new();
readonly Dictionary<int, SendOrPostCallback> postCallbackCache = new();


int messageIdSequence = 0;
bool disposed;
bool disconnected;
int cleanupSentinel = 0; // 0 = false, 1 = true

readonly Channel<StreamingHubPayload> writerQueue = Channel.CreateUnbounded<StreamingHubPayload>(new UnboundedChannelOptions() { SingleReader = true, SingleWriter = false, AllowSynchronousContinuations = false });
Task? writerTask;
Expand Down Expand Up @@ -310,7 +311,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task<bool> firstM
// log post on main thread.
if (syncContext != null)
{
syncContext.Post(state => logger.Error((Exception)state!, msg), ex);
syncContext.Post(s => logger.Error((Exception)s!, msg), ex);
}
else
{
Expand All @@ -335,7 +336,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task<bool> firstM
// log post on main thread.
if (syncContext != null)
{
syncContext.Post(state => logger.Error((Exception)state!, msg), ex);
syncContext.Post(s => logger.Error((Exception)s!, msg), ex);
}
else
{
Expand All @@ -346,6 +347,8 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task<bool> firstM
}
finally
{
disconnected = true;

try
{
#if !UNITY_WEBGL
Expand All @@ -357,7 +360,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task<bool> firstM
}
#endif
await heartbeatManager.DisposeAsync().ConfigureAwait(false);
await DisposeAsyncCore(false).ConfigureAwait(false);
await CleanupAsync(false).ConfigureAwait(false);
}
finally
{
Expand Down Expand Up @@ -414,9 +417,9 @@ void ProcessBroadcast(SynchronizationContext? syncContext, StreamingHubPayload p

SendOrPostCallback CreateBroadcastCallback(int methodId, int consumed)
{
return (state) =>
return (s) =>
{
var p = (StreamingHubPayload)state!;
var p = (StreamingHubPayload)s!;
this.OnBroadcastEvent(methodId, p.Memory.Slice(consumed));
StreamingHubPayloadPool.Shared.Return(p);
};
Expand Down Expand Up @@ -516,6 +519,7 @@ protected Task<TResponse> WriteMessageFireAndForgetTaskAsync<TRequest, TResponse
protected ValueTask<TResponse> WriteMessageFireAndForgetValueTaskOfTAsync<TRequest, TResponse>(int methodId, TRequest message)
{
ThrowIfDisposed();
ThrowIfDisconnected();

var v = BuildRequestMessage(methodId, message);
_ = writerQueue.Writer.TryWrite(v);
Expand All @@ -535,6 +539,7 @@ protected Task<TResponse> WriteMessageWithResponseTaskAsync<TRequest, TResponse>
protected ValueTask<TResponse> WriteMessageWithResponseValueTaskOfTAsync<TRequest, TResponse>(int methodId, TRequest message)
{
ThrowIfDisposed();
ThrowIfDisconnected();

var mid = Interlocked.Increment(ref messageIdSequence);

Expand All @@ -545,14 +550,19 @@ protected ValueTask<TResponse> WriteMessageWithResponseValueTaskOfTAsync<TReques
}

var v = BuildRequestMessage(methodId, mid, message);
_ = writerQueue.Writer.TryWrite(v);
if (!writerQueue.Writer.TryWrite(v))
{
// If the channel writer is closed, it is likely that the connection has already been disconnected.
ThrowIfDisconnected();
}

return new ValueTask<TResponse>(taskSource, taskSource.Version); // wait until server return response(or error). if connection was closed, throws cancellation from DisposeAsyncCore.
}

protected ValueTask WriteMessageWithResponseValueTaskAsync<TRequest, TResponse>(int methodId, TRequest message)
{
ThrowIfDisposed();
ThrowIfDisconnected();

var mid = Interlocked.Increment(ref messageIdSequence);

Expand All @@ -563,7 +573,11 @@ protected ValueTask WriteMessageWithResponseValueTaskAsync<TRequest, TResponse>(
}

var v = BuildRequestMessage(methodId, mid, message);
_ = writerQueue.Writer.TryWrite(v);
if (!writerQueue.Writer.TryWrite(v))
{
// If the channel writer is closed, it is likely that the connection has already been disconnected.
ThrowIfDisconnected();
}

return new ValueTask(taskSource, taskSource.Version); // wait until server return response(or error). if connection was closed, throws cancellation from DisposeAsyncCore.
}
Expand Down Expand Up @@ -631,11 +645,19 @@ protected Task WriteClientResultResponseMessageForErrorAsync(int methodId, Guid
return Task.CompletedTask;
}

void ThrowIfDisconnected()
{
if (disconnected)
{
throw new RpcException(new Status(StatusCode.Unavailable, $"The StreamingHubClient has already been disconnected from the server."));
}
}

void ThrowIfDisposed()
{
if (disposed)
{
throw new ObjectDisposedException("StreamingHubClient", $"The StreamingHub has already been disconnected from the server.");
throw new ObjectDisposedException("StreamingHubClient", $"The StreamingHubClient has already been disconnected from the server.");
}
}

Expand All @@ -645,18 +667,21 @@ public Task WaitForDisconnect()
Task<DisconnectionReason> IStreamingHubClient.WaitForDisconnectAsync()
=> waitForDisconnect.Task;

public Task DisposeAsync()
public async Task DisposeAsync()
{
return DisposeAsyncCore(true);
if (disposed) return;
disposed = true;
await CleanupAsync(true).ConfigureAwait(false);
}

async Task DisposeAsyncCore(bool waitSubscription)
async ValueTask CleanupAsync(bool waitSubscription)
{
if (disposed) return;
if (writer == null) return;

disposed = true;
if (Interlocked.CompareExchange(ref cleanupSentinel, 1, 0) != 0)
{
return;
}

if (writer == null) return;
try
{
writerQueue.Writer.Complete();
Expand All @@ -666,7 +691,6 @@ async Task DisposeAsyncCore(bool waitSubscription)
finally
{
subscriptionCts.Cancel();
subscriptionCts.Dispose();
try
{
if (waitSubscription)
Expand Down Expand Up @@ -707,6 +731,7 @@ async Task DisposeAsyncCore(bool waitSubscription)
}
}
}
subscriptionCts.Dispose();
}

StreamingHubPayload BuildRequestMessage<T>(int methodId, T message)
Expand Down
59 changes: 42 additions & 17 deletions src/MagicOnion.Client/StreamingHubClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,10 @@ public abstract class StreamingHubClientBase<TStreamingHub, TReceiver> : IStream
readonly CancellationTokenSource subscriptionCts = new();
readonly Dictionary<int, SendOrPostCallback> postCallbackCache = new();


int messageIdSequence = 0;
bool disposed;
bool disconnected;
int cleanupSentinel = 0; // 0 = false, 1 = true

readonly Channel<StreamingHubPayload> writerQueue = Channel.CreateUnbounded<StreamingHubPayload>(new UnboundedChannelOptions() { SingleReader = true, SingleWriter = false, AllowSynchronousContinuations = false });
Task? writerTask;
Expand Down Expand Up @@ -310,7 +311,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task<bool> firstM
// log post on main thread.
if (syncContext != null)
{
syncContext.Post(state => logger.Error((Exception)state!, msg), ex);
syncContext.Post(s => logger.Error((Exception)s!, msg), ex);
}
else
{
Expand All @@ -335,7 +336,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task<bool> firstM
// log post on main thread.
if (syncContext != null)
{
syncContext.Post(state => logger.Error((Exception)state!, msg), ex);
syncContext.Post(s => logger.Error((Exception)s!, msg), ex);
}
else
{
Expand All @@ -346,6 +347,8 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task<bool> firstM
}
finally
{
disconnected = true;

try
{
#if !UNITY_WEBGL
Expand All @@ -357,7 +360,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task<bool> firstM
}
#endif
await heartbeatManager.DisposeAsync().ConfigureAwait(false);
await DisposeAsyncCore(false).ConfigureAwait(false);
await CleanupAsync(false).ConfigureAwait(false);
}
finally
{
Expand Down Expand Up @@ -414,9 +417,9 @@ void ProcessBroadcast(SynchronizationContext? syncContext, StreamingHubPayload p

SendOrPostCallback CreateBroadcastCallback(int methodId, int consumed)
{
return (state) =>
return (s) =>
{
var p = (StreamingHubPayload)state!;
var p = (StreamingHubPayload)s!;
this.OnBroadcastEvent(methodId, p.Memory.Slice(consumed));
StreamingHubPayloadPool.Shared.Return(p);
};
Expand Down Expand Up @@ -516,6 +519,7 @@ protected Task<TResponse> WriteMessageFireAndForgetTaskAsync<TRequest, TResponse
protected ValueTask<TResponse> WriteMessageFireAndForgetValueTaskOfTAsync<TRequest, TResponse>(int methodId, TRequest message)
{
ThrowIfDisposed();
ThrowIfDisconnected();

var v = BuildRequestMessage(methodId, message);
_ = writerQueue.Writer.TryWrite(v);
Expand All @@ -535,6 +539,7 @@ protected Task<TResponse> WriteMessageWithResponseTaskAsync<TRequest, TResponse>
protected ValueTask<TResponse> WriteMessageWithResponseValueTaskOfTAsync<TRequest, TResponse>(int methodId, TRequest message)
{
ThrowIfDisposed();
ThrowIfDisconnected();

var mid = Interlocked.Increment(ref messageIdSequence);

Expand All @@ -545,14 +550,19 @@ protected ValueTask<TResponse> WriteMessageWithResponseValueTaskOfTAsync<TReques
}

var v = BuildRequestMessage(methodId, mid, message);
_ = writerQueue.Writer.TryWrite(v);
if (!writerQueue.Writer.TryWrite(v))
{
// If the channel writer is closed, it is likely that the connection has already been disconnected.
ThrowIfDisconnected();
}

return new ValueTask<TResponse>(taskSource, taskSource.Version); // wait until server return response(or error). if connection was closed, throws cancellation from DisposeAsyncCore.
}

protected ValueTask WriteMessageWithResponseValueTaskAsync<TRequest, TResponse>(int methodId, TRequest message)
{
ThrowIfDisposed();
ThrowIfDisconnected();

var mid = Interlocked.Increment(ref messageIdSequence);

Expand All @@ -563,7 +573,11 @@ protected ValueTask WriteMessageWithResponseValueTaskAsync<TRequest, TResponse>(
}

var v = BuildRequestMessage(methodId, mid, message);
_ = writerQueue.Writer.TryWrite(v);
if (!writerQueue.Writer.TryWrite(v))
{
// If the channel writer is closed, it is likely that the connection has already been disconnected.
ThrowIfDisconnected();
}

return new ValueTask(taskSource, taskSource.Version); // wait until server return response(or error). if connection was closed, throws cancellation from DisposeAsyncCore.
}
Expand Down Expand Up @@ -631,11 +645,19 @@ protected Task WriteClientResultResponseMessageForErrorAsync(int methodId, Guid
return Task.CompletedTask;
}

void ThrowIfDisconnected()
{
if (disconnected)
{
throw new RpcException(new Status(StatusCode.Unavailable, $"The StreamingHubClient has already been disconnected from the server."));
}
}

void ThrowIfDisposed()
{
if (disposed)
{
throw new ObjectDisposedException("StreamingHubClient", $"The StreamingHub has already been disconnected from the server.");
throw new ObjectDisposedException("StreamingHubClient", $"The StreamingHubClient has already been disconnected from the server.");
}
}

Expand All @@ -645,18 +667,21 @@ public Task WaitForDisconnect()
Task<DisconnectionReason> IStreamingHubClient.WaitForDisconnectAsync()
=> waitForDisconnect.Task;

public Task DisposeAsync()
public async Task DisposeAsync()
{
return DisposeAsyncCore(true);
if (disposed) return;
disposed = true;
await CleanupAsync(true).ConfigureAwait(false);
}

async Task DisposeAsyncCore(bool waitSubscription)
async ValueTask CleanupAsync(bool waitSubscription)
{
if (disposed) return;
if (writer == null) return;

disposed = true;
if (Interlocked.CompareExchange(ref cleanupSentinel, 1, 0) != 0)
{
return;
}

if (writer == null) return;
try
{
writerQueue.Writer.Complete();
Expand All @@ -666,7 +691,6 @@ async Task DisposeAsyncCore(bool waitSubscription)
finally
{
subscriptionCts.Cancel();
subscriptionCts.Dispose();
try
{
if (waitSubscription)
Expand Down Expand Up @@ -707,6 +731,7 @@ async Task DisposeAsyncCore(bool waitSubscription)
}
}
}
subscriptionCts.Dispose();
}

StreamingHubPayload BuildRequestMessage<T>(int methodId, T message)
Expand Down
40 changes: 40 additions & 0 deletions tests/MagicOnion.Client.Tests/StreamingHubTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,46 @@ public async Task WaitForDisconnectAsync_Faulted()
Assert.Equal(DisconnectionType.Faulted, disconnectionReason.Type);
Assert.IsType<IOException>(disconnectionReason.Exception);
}

[Fact]
public async Task ThrowRpcException_After_Disconnected()
{
// Arrange
var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var helper = new StreamingHubClientTestHelper<IGreeterHub, IGreeterHubReceiver>(factoryProvider: DynamicStreamingHubClientFactoryProvider.Instance);
var client = await helper.ConnectAsync(timeout.Token);
// Set the client's connection status to “Disconnected”.
helper.ThrowRpcException();
var disconnectionReason = await client.WaitForDisconnectAsync().WaitAsync(timeout.Token);

// Act
var ex = await Record.ExceptionAsync(async () => await client.Parameter_Zero());

// Assert
Assert.IsType<RpcException>(ex);
Assert.Contains("StreamingHubClient has already been disconnected from the server.", ex.Message);
}

[Fact]
public async Task ThrowObjectDisposedException_After_Disposed()
{
// Arrange
var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var helper = new StreamingHubClientTestHelper<IGreeterHub, IGreeterHubReceiver>(factoryProvider: DynamicStreamingHubClientFactoryProvider.Instance);
var client = await helper.ConnectAsync(timeout.Token);
// Set the client's connection status to “Disconnected”.
helper.ThrowRpcException();
var disconnectionReason = await client.WaitForDisconnectAsync().WaitAsync(timeout.Token);

// Act
await client.DisposeAsync();
var ex = await Record.ExceptionAsync(async () => await client.Parameter_Zero());

// Assert
var ode = Assert.IsType<ObjectDisposedException>(ex);
Assert.Equal(nameof(StreamingHubClient), ode.ObjectName);
Assert.Contains("StreamingHubClient has already been disconnected from the server.", ex.Message);
}
}

public interface IGreeterHubReceiver
Expand Down

0 comments on commit 31cd685

Please sign in to comment.