From 74c331c659930fc98da39ed944283383dc5ef4e9 Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Fri, 20 Sep 2024 12:52:37 +0900 Subject: [PATCH 1/4] Throws RpcException instead of ObjectDisposedException when the client is disconnected. --- .../StreamingHubClientBase.cs | 29 +++++++++----- .../StreamingHubClientBase.cs | 29 +++++++++----- .../StreamingHubTest.cs | 38 +++++++++++++++++++ 3 files changed, 78 insertions(+), 18 deletions(-) diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs index a06491fea..62d17310d 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs @@ -357,7 +357,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task firstM } #endif await heartbeatManager.DisposeAsync().ConfigureAwait(false); - await DisposeAsyncCore(false).ConfigureAwait(false); + await CleanupAsync(false).ConfigureAwait(false); } finally { @@ -516,6 +516,7 @@ protected Task WriteMessageFireAndForgetTaskAsync WriteMessageFireAndForgetValueTaskOfTAsync(int methodId, TRequest message) { ThrowIfDisposed(); + ThrowIfDisconnected(); var v = BuildRequestMessage(methodId, message); _ = writerQueue.Writer.TryWrite(v); @@ -535,6 +536,7 @@ protected Task WriteMessageWithResponseTaskAsync protected ValueTask WriteMessageWithResponseValueTaskOfTAsync(int methodId, TRequest message) { ThrowIfDisposed(); + ThrowIfDisconnected(); var mid = Interlocked.Increment(ref messageIdSequence); @@ -553,6 +555,7 @@ protected ValueTask WriteMessageWithResponseValueTaskOfTAsync(int methodId, TRequest message) { ThrowIfDisposed(); + ThrowIfDisconnected(); var mid = Interlocked.Increment(ref messageIdSequence); @@ -631,6 +634,14 @@ protected Task WriteClientResultResponseMessageForErrorAsync(int methodId, Guid return Task.CompletedTask; } + void ThrowIfDisconnected() + { + if (waitForDisconnect.Task.IsCompleted) + { + throw new RpcException(new Status(StatusCode.Unavailable, $"The StreamingHubClient has already been disconnected from the server.")); + } + } + void ThrowIfDisposed() { if (disposed) @@ -645,18 +656,19 @@ public Task WaitForDisconnect() Task IStreamingHubClient.WaitForDisconnectAsync() => waitForDisconnect.Task; - public Task DisposeAsync() + public async Task DisposeAsync() { - return DisposeAsyncCore(true); + if (disposed) return; + disposed = true; + + await CleanupAsync(true).ConfigureAwait(false); + + subscriptionCts.Dispose(); } - async Task DisposeAsyncCore(bool waitSubscription) + async Task CleanupAsync(bool waitSubscription) { - if (disposed) return; if (writer == null) return; - - disposed = true; - try { writerQueue.Writer.Complete(); @@ -666,7 +678,6 @@ async Task DisposeAsyncCore(bool waitSubscription) finally { subscriptionCts.Cancel(); - subscriptionCts.Dispose(); try { if (waitSubscription) diff --git a/src/MagicOnion.Client/StreamingHubClientBase.cs b/src/MagicOnion.Client/StreamingHubClientBase.cs index a06491fea..779b40335 100644 --- a/src/MagicOnion.Client/StreamingHubClientBase.cs +++ b/src/MagicOnion.Client/StreamingHubClientBase.cs @@ -357,7 +357,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task firstM } #endif await heartbeatManager.DisposeAsync().ConfigureAwait(false); - await DisposeAsyncCore(false).ConfigureAwait(false); + await CleanupAsync(false).ConfigureAwait(false); } finally { @@ -516,6 +516,7 @@ protected Task WriteMessageFireAndForgetTaskAsync WriteMessageFireAndForgetValueTaskOfTAsync(int methodId, TRequest message) { ThrowIfDisposed(); + ThrowIfDisconnected(); var v = BuildRequestMessage(methodId, message); _ = writerQueue.Writer.TryWrite(v); @@ -535,6 +536,7 @@ protected Task WriteMessageWithResponseTaskAsync protected ValueTask WriteMessageWithResponseValueTaskOfTAsync(int methodId, TRequest message) { ThrowIfDisposed(); + ThrowIfDisconnected(); var mid = Interlocked.Increment(ref messageIdSequence); @@ -553,6 +555,7 @@ protected ValueTask WriteMessageWithResponseValueTaskOfTAsync(int methodId, TRequest message) { ThrowIfDisposed(); + ThrowIfDisconnected(); var mid = Interlocked.Increment(ref messageIdSequence); @@ -631,6 +634,14 @@ protected Task WriteClientResultResponseMessageForErrorAsync(int methodId, Guid return Task.CompletedTask; } + void ThrowIfDisconnected() + { + if (waitForDisconnect.Task.IsCompleted) + { + throw new RpcException(new Status(StatusCode.Unavailable, $"The StreamingHubClient has already been disconnected from the server.")); + } + } + void ThrowIfDisposed() { if (disposed) @@ -645,18 +656,19 @@ public Task WaitForDisconnect() Task IStreamingHubClient.WaitForDisconnectAsync() => waitForDisconnect.Task; - public Task DisposeAsync() + public async Task DisposeAsync() { - return DisposeAsyncCore(true); + if (disposed) return; + disposed = true; + + await CleanupAsync(true).ConfigureAwait(false); + + subscriptionCts.Dispose(); } - async Task DisposeAsyncCore(bool waitSubscription) + async ValueTask CleanupAsync(bool waitSubscription) { - if (disposed) return; if (writer == null) return; - - disposed = true; - try { writerQueue.Writer.Complete(); @@ -666,7 +678,6 @@ async Task DisposeAsyncCore(bool waitSubscription) finally { subscriptionCts.Cancel(); - subscriptionCts.Dispose(); try { if (waitSubscription) diff --git a/tests/MagicOnion.Client.Tests/StreamingHubTest.cs b/tests/MagicOnion.Client.Tests/StreamingHubTest.cs index 1be675096..874cf6139 100644 --- a/tests/MagicOnion.Client.Tests/StreamingHubTest.cs +++ b/tests/MagicOnion.Client.Tests/StreamingHubTest.cs @@ -637,6 +637,44 @@ public async Task WaitForDisconnectAsync_Faulted() Assert.Equal(DisconnectionType.Faulted, disconnectionReason.Type); Assert.IsType(disconnectionReason.Exception); } + + [Fact] + public async Task ThrowRpcException_After_Disconnected() + { + // Arrange + var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var helper = new StreamingHubClientTestHelper(factoryProvider: DynamicStreamingHubClientFactoryProvider.Instance); + var client = await helper.ConnectAsync(timeout.Token); + // Set the client's connection status to “Disconnected”. + helper.ThrowRpcException(); + var disconnectionReason = await client.WaitForDisconnectAsync().WaitAsync(timeout.Token); + + // Act + var ex = await Record.ExceptionAsync(async () => await client.Parameter_Zero()); + + // Assert + Assert.IsType(ex); + Assert.Contains("StreamingHubClient has already been disconnected from the server.", ex.Message); + } + + [Fact] + public async Task ThrowObjectDisposedException_After_Disposed() + { + // Arrange + var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var helper = new StreamingHubClientTestHelper(factoryProvider: DynamicStreamingHubClientFactoryProvider.Instance); + var client = await helper.ConnectAsync(timeout.Token); + // Set the client's connection status to “Disconnected”. + helper.ThrowRpcException(); + var disconnectionReason = await client.WaitForDisconnectAsync().WaitAsync(timeout.Token); + + // Act + await client.DisposeAsync(); + var ex = await Record.ExceptionAsync(async () => await client.Parameter_Zero()); + + // Assert + Assert.IsType(ex); + } } public interface IGreeterHubReceiver From 821f09983e7dd708f6229912cce1f4ce586e489b Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Fri, 20 Sep 2024 14:12:31 +0900 Subject: [PATCH 2/4] Throws an exception if a hub method is called during the cleanup process --- .../StreamingHubClientBase.cs | 45 +++++++++++++------ .../StreamingHubClientBase.cs | 43 +++++++++++++----- 2 files changed, 63 insertions(+), 25 deletions(-) diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs index 62d17310d..3b1e13e14 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs @@ -175,9 +175,16 @@ public abstract class StreamingHubClientBase : IStream readonly CancellationTokenSource subscriptionCts = new(); readonly Dictionary postCallbackCache = new(); - int messageIdSequence = 0; - bool disposed; + State state = State.Uninitialized; + + enum State + { + Uninitialized, + Connected, + Disconnected, + Disposed, + } readonly Channel writerQueue = Channel.CreateUnbounded(new UnboundedChannelOptions() { SingleReader = true, SingleWriter = false, AllowSynchronousContinuations = false }); Task? writerTask; @@ -291,6 +298,8 @@ static Method CreateConnectMethod(stri async Task StartSubscribe(SynchronizationContext? syncContext, Task firstMoveNext, CancellationToken subscriptionToken) { + state = State.Connected; + var disconnectionReason = new DisconnectionReason(DisconnectionType.CompletedNormally, null); writerTask = RunWriterLoopAsync(subscriptionToken); @@ -310,7 +319,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task firstM // log post on main thread. if (syncContext != null) { - syncContext.Post(state => logger.Error((Exception)state!, msg), ex); + syncContext.Post(s => logger.Error((Exception)s!, msg), ex); } else { @@ -335,7 +344,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task firstM // log post on main thread. if (syncContext != null) { - syncContext.Post(state => logger.Error((Exception)state!, msg), ex); + syncContext.Post(s => logger.Error((Exception)s!, msg), ex); } else { @@ -346,6 +355,8 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task firstM } finally { + state = State.Disconnected; + try { #if !UNITY_WEBGL @@ -414,9 +425,9 @@ void ProcessBroadcast(SynchronizationContext? syncContext, StreamingHubPayload p SendOrPostCallback CreateBroadcastCallback(int methodId, int consumed) { - return (state) => + return (s) => { - var p = (StreamingHubPayload)state!; + var p = (StreamingHubPayload)s!; this.OnBroadcastEvent(methodId, p.Memory.Slice(consumed)); StreamingHubPayloadPool.Shared.Return(p); }; @@ -547,7 +558,11 @@ protected ValueTask WriteMessageWithResponseValueTaskOfTAsync(taskSource, taskSource.Version); // wait until server return response(or error). if connection was closed, throws cancellation from DisposeAsyncCore. } @@ -566,7 +581,11 @@ protected ValueTask WriteMessageWithResponseValueTaskAsync( } var v = BuildRequestMessage(methodId, mid, message); - _ = writerQueue.Writer.TryWrite(v); + if (!writerQueue.Writer.TryWrite(v)) + { + // If the channel writer is closed, it is likely that the connection has already been disconnected. + ThrowIfDisconnected(); + } return new ValueTask(taskSource, taskSource.Version); // wait until server return response(or error). if connection was closed, throws cancellation from DisposeAsyncCore. } @@ -636,7 +655,7 @@ protected Task WriteClientResultResponseMessageForErrorAsync(int methodId, Guid void ThrowIfDisconnected() { - if (waitForDisconnect.Task.IsCompleted) + if (state == State.Disconnected) { throw new RpcException(new Status(StatusCode.Unavailable, $"The StreamingHubClient has already been disconnected from the server.")); } @@ -644,7 +663,7 @@ void ThrowIfDisconnected() void ThrowIfDisposed() { - if (disposed) + if (state == State.Disposed) { throw new ObjectDisposedException("StreamingHubClient", $"The StreamingHub has already been disconnected from the server."); } @@ -658,15 +677,15 @@ Task IStreamingHubClient.WaitForDisconnectAsync() public async Task DisposeAsync() { - if (disposed) return; - disposed = true; + if (state == State.Disposed) return; + state = State.Disposed; await CleanupAsync(true).ConfigureAwait(false); subscriptionCts.Dispose(); } - async Task CleanupAsync(bool waitSubscription) + async ValueTask CleanupAsync(bool waitSubscription) { if (writer == null) return; try diff --git a/src/MagicOnion.Client/StreamingHubClientBase.cs b/src/MagicOnion.Client/StreamingHubClientBase.cs index 779b40335..3b1e13e14 100644 --- a/src/MagicOnion.Client/StreamingHubClientBase.cs +++ b/src/MagicOnion.Client/StreamingHubClientBase.cs @@ -175,9 +175,16 @@ public abstract class StreamingHubClientBase : IStream readonly CancellationTokenSource subscriptionCts = new(); readonly Dictionary postCallbackCache = new(); - int messageIdSequence = 0; - bool disposed; + State state = State.Uninitialized; + + enum State + { + Uninitialized, + Connected, + Disconnected, + Disposed, + } readonly Channel writerQueue = Channel.CreateUnbounded(new UnboundedChannelOptions() { SingleReader = true, SingleWriter = false, AllowSynchronousContinuations = false }); Task? writerTask; @@ -291,6 +298,8 @@ static Method CreateConnectMethod(stri async Task StartSubscribe(SynchronizationContext? syncContext, Task firstMoveNext, CancellationToken subscriptionToken) { + state = State.Connected; + var disconnectionReason = new DisconnectionReason(DisconnectionType.CompletedNormally, null); writerTask = RunWriterLoopAsync(subscriptionToken); @@ -310,7 +319,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task firstM // log post on main thread. if (syncContext != null) { - syncContext.Post(state => logger.Error((Exception)state!, msg), ex); + syncContext.Post(s => logger.Error((Exception)s!, msg), ex); } else { @@ -335,7 +344,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task firstM // log post on main thread. if (syncContext != null) { - syncContext.Post(state => logger.Error((Exception)state!, msg), ex); + syncContext.Post(s => logger.Error((Exception)s!, msg), ex); } else { @@ -346,6 +355,8 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task firstM } finally { + state = State.Disconnected; + try { #if !UNITY_WEBGL @@ -414,9 +425,9 @@ void ProcessBroadcast(SynchronizationContext? syncContext, StreamingHubPayload p SendOrPostCallback CreateBroadcastCallback(int methodId, int consumed) { - return (state) => + return (s) => { - var p = (StreamingHubPayload)state!; + var p = (StreamingHubPayload)s!; this.OnBroadcastEvent(methodId, p.Memory.Slice(consumed)); StreamingHubPayloadPool.Shared.Return(p); }; @@ -547,7 +558,11 @@ protected ValueTask WriteMessageWithResponseValueTaskOfTAsync(taskSource, taskSource.Version); // wait until server return response(or error). if connection was closed, throws cancellation from DisposeAsyncCore. } @@ -566,7 +581,11 @@ protected ValueTask WriteMessageWithResponseValueTaskAsync( } var v = BuildRequestMessage(methodId, mid, message); - _ = writerQueue.Writer.TryWrite(v); + if (!writerQueue.Writer.TryWrite(v)) + { + // If the channel writer is closed, it is likely that the connection has already been disconnected. + ThrowIfDisconnected(); + } return new ValueTask(taskSource, taskSource.Version); // wait until server return response(or error). if connection was closed, throws cancellation from DisposeAsyncCore. } @@ -636,7 +655,7 @@ protected Task WriteClientResultResponseMessageForErrorAsync(int methodId, Guid void ThrowIfDisconnected() { - if (waitForDisconnect.Task.IsCompleted) + if (state == State.Disconnected) { throw new RpcException(new Status(StatusCode.Unavailable, $"The StreamingHubClient has already been disconnected from the server.")); } @@ -644,7 +663,7 @@ void ThrowIfDisconnected() void ThrowIfDisposed() { - if (disposed) + if (state == State.Disposed) { throw new ObjectDisposedException("StreamingHubClient", $"The StreamingHub has already been disconnected from the server."); } @@ -658,8 +677,8 @@ Task IStreamingHubClient.WaitForDisconnectAsync() public async Task DisposeAsync() { - if (disposed) return; - disposed = true; + if (state == State.Disposed) return; + state = State.Disposed; await CleanupAsync(true).ConfigureAwait(false); From 730397365ac4b2851e1952f7a9f2a0d728220494 Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Fri, 20 Sep 2024 14:32:36 +0900 Subject: [PATCH 3/4] Fix use after dispose --- .../StreamingHubClientBase.cs | 25 +++++++++++-------- .../StreamingHubClientBase.cs | 25 +++++++++++-------- .../StreamingHubTest.cs | 4 ++- 3 files changed, 33 insertions(+), 21 deletions(-) diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs index 3b1e13e14..12baa7956 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs @@ -176,12 +176,13 @@ public abstract class StreamingHubClientBase : IStream readonly Dictionary postCallbackCache = new(); int messageIdSequence = 0; - State state = State.Uninitialized; + int state = (int)State.Uninitialized; enum State { Uninitialized, Connected, + Disconnecting, Disconnected, Disposed, } @@ -298,7 +299,7 @@ static Method CreateConnectMethod(stri async Task StartSubscribe(SynchronizationContext? syncContext, Task firstMoveNext, CancellationToken subscriptionToken) { - state = State.Connected; + state = (int)State.Connected; var disconnectionReason = new DisconnectionReason(DisconnectionType.CompletedNormally, null); writerTask = RunWriterLoopAsync(subscriptionToken); @@ -355,7 +356,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task firstM } finally { - state = State.Disconnected; + state = (int)State.Disconnected; try { @@ -655,7 +656,7 @@ protected Task WriteClientResultResponseMessageForErrorAsync(int methodId, Guid void ThrowIfDisconnected() { - if (state == State.Disconnected) + if (state != (int)State.Connected) { throw new RpcException(new Status(StatusCode.Unavailable, $"The StreamingHubClient has already been disconnected from the server.")); } @@ -663,9 +664,9 @@ void ThrowIfDisconnected() void ThrowIfDisposed() { - if (state == State.Disposed) + if (state == (int)State.Disposed) { - throw new ObjectDisposedException("StreamingHubClient", $"The StreamingHub has already been disconnected from the server."); + throw new ObjectDisposedException("StreamingHubClient", $"The StreamingHubClient has already been disconnected from the server."); } } @@ -677,16 +678,19 @@ Task IStreamingHubClient.WaitForDisconnectAsync() public async Task DisposeAsync() { - if (state == State.Disposed) return; - state = State.Disposed; + if (state == (int)State.Disposed) return; + state = (int)State.Disposed; await CleanupAsync(true).ConfigureAwait(false); - - subscriptionCts.Dispose(); } async ValueTask CleanupAsync(bool waitSubscription) { + if (Interlocked.CompareExchange(ref state, (int)State.Disconnected, (int)State.Disconnecting) != (int)State.Disconnecting) + { + return; + } + if (writer == null) return; try { @@ -737,6 +741,7 @@ async ValueTask CleanupAsync(bool waitSubscription) } } } + subscriptionCts.Dispose(); } StreamingHubPayload BuildRequestMessage(int methodId, T message) diff --git a/src/MagicOnion.Client/StreamingHubClientBase.cs b/src/MagicOnion.Client/StreamingHubClientBase.cs index 3b1e13e14..12baa7956 100644 --- a/src/MagicOnion.Client/StreamingHubClientBase.cs +++ b/src/MagicOnion.Client/StreamingHubClientBase.cs @@ -176,12 +176,13 @@ public abstract class StreamingHubClientBase : IStream readonly Dictionary postCallbackCache = new(); int messageIdSequence = 0; - State state = State.Uninitialized; + int state = (int)State.Uninitialized; enum State { Uninitialized, Connected, + Disconnecting, Disconnected, Disposed, } @@ -298,7 +299,7 @@ static Method CreateConnectMethod(stri async Task StartSubscribe(SynchronizationContext? syncContext, Task firstMoveNext, CancellationToken subscriptionToken) { - state = State.Connected; + state = (int)State.Connected; var disconnectionReason = new DisconnectionReason(DisconnectionType.CompletedNormally, null); writerTask = RunWriterLoopAsync(subscriptionToken); @@ -355,7 +356,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task firstM } finally { - state = State.Disconnected; + state = (int)State.Disconnected; try { @@ -655,7 +656,7 @@ protected Task WriteClientResultResponseMessageForErrorAsync(int methodId, Guid void ThrowIfDisconnected() { - if (state == State.Disconnected) + if (state != (int)State.Connected) { throw new RpcException(new Status(StatusCode.Unavailable, $"The StreamingHubClient has already been disconnected from the server.")); } @@ -663,9 +664,9 @@ void ThrowIfDisconnected() void ThrowIfDisposed() { - if (state == State.Disposed) + if (state == (int)State.Disposed) { - throw new ObjectDisposedException("StreamingHubClient", $"The StreamingHub has already been disconnected from the server."); + throw new ObjectDisposedException("StreamingHubClient", $"The StreamingHubClient has already been disconnected from the server."); } } @@ -677,16 +678,19 @@ Task IStreamingHubClient.WaitForDisconnectAsync() public async Task DisposeAsync() { - if (state == State.Disposed) return; - state = State.Disposed; + if (state == (int)State.Disposed) return; + state = (int)State.Disposed; await CleanupAsync(true).ConfigureAwait(false); - - subscriptionCts.Dispose(); } async ValueTask CleanupAsync(bool waitSubscription) { + if (Interlocked.CompareExchange(ref state, (int)State.Disconnected, (int)State.Disconnecting) != (int)State.Disconnecting) + { + return; + } + if (writer == null) return; try { @@ -737,6 +741,7 @@ async ValueTask CleanupAsync(bool waitSubscription) } } } + subscriptionCts.Dispose(); } StreamingHubPayload BuildRequestMessage(int methodId, T message) diff --git a/tests/MagicOnion.Client.Tests/StreamingHubTest.cs b/tests/MagicOnion.Client.Tests/StreamingHubTest.cs index 874cf6139..d127dd924 100644 --- a/tests/MagicOnion.Client.Tests/StreamingHubTest.cs +++ b/tests/MagicOnion.Client.Tests/StreamingHubTest.cs @@ -673,7 +673,9 @@ public async Task ThrowObjectDisposedException_After_Disposed() var ex = await Record.ExceptionAsync(async () => await client.Parameter_Zero()); // Assert - Assert.IsType(ex); + var ode = Assert.IsType(ex); + Assert.Equal(nameof(StreamingHubClient), ode.ObjectName); + Assert.Contains("StreamingHubClient has already been disconnected from the server.", ex.Message); } } From f7cd9f282b6cb8f436cdc922670d389fe5b5fdfd Mon Sep 17 00:00:00 2001 From: Mayuki Sawatari Date: Fri, 20 Sep 2024 14:48:35 +0900 Subject: [PATCH 4/4] Fix --- .../StreamingHubClientBase.cs | 28 ++++++------------- .../StreamingHubClientBase.cs | 28 ++++++------------- 2 files changed, 18 insertions(+), 38 deletions(-) diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs index 12baa7956..8046ab045 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs @@ -176,16 +176,9 @@ public abstract class StreamingHubClientBase : IStream readonly Dictionary postCallbackCache = new(); int messageIdSequence = 0; - int state = (int)State.Uninitialized; - - enum State - { - Uninitialized, - Connected, - Disconnecting, - Disconnected, - Disposed, - } + bool disposed; + bool disconnected; + int cleanupSentinel = 0; // 0 = false, 1 = true readonly Channel writerQueue = Channel.CreateUnbounded(new UnboundedChannelOptions() { SingleReader = true, SingleWriter = false, AllowSynchronousContinuations = false }); Task? writerTask; @@ -299,8 +292,6 @@ static Method CreateConnectMethod(stri async Task StartSubscribe(SynchronizationContext? syncContext, Task firstMoveNext, CancellationToken subscriptionToken) { - state = (int)State.Connected; - var disconnectionReason = new DisconnectionReason(DisconnectionType.CompletedNormally, null); writerTask = RunWriterLoopAsync(subscriptionToken); @@ -356,7 +347,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task firstM } finally { - state = (int)State.Disconnected; + disconnected = true; try { @@ -656,7 +647,7 @@ protected Task WriteClientResultResponseMessageForErrorAsync(int methodId, Guid void ThrowIfDisconnected() { - if (state != (int)State.Connected) + if (disconnected) { throw new RpcException(new Status(StatusCode.Unavailable, $"The StreamingHubClient has already been disconnected from the server.")); } @@ -664,7 +655,7 @@ void ThrowIfDisconnected() void ThrowIfDisposed() { - if (state == (int)State.Disposed) + if (disposed) { throw new ObjectDisposedException("StreamingHubClient", $"The StreamingHubClient has already been disconnected from the server."); } @@ -678,15 +669,14 @@ Task IStreamingHubClient.WaitForDisconnectAsync() public async Task DisposeAsync() { - if (state == (int)State.Disposed) return; - state = (int)State.Disposed; - + if (disposed) return; + disposed = true; await CleanupAsync(true).ConfigureAwait(false); } async ValueTask CleanupAsync(bool waitSubscription) { - if (Interlocked.CompareExchange(ref state, (int)State.Disconnected, (int)State.Disconnecting) != (int)State.Disconnecting) + if (Interlocked.CompareExchange(ref cleanupSentinel, 1, 0) != 0) { return; } diff --git a/src/MagicOnion.Client/StreamingHubClientBase.cs b/src/MagicOnion.Client/StreamingHubClientBase.cs index 12baa7956..8046ab045 100644 --- a/src/MagicOnion.Client/StreamingHubClientBase.cs +++ b/src/MagicOnion.Client/StreamingHubClientBase.cs @@ -176,16 +176,9 @@ public abstract class StreamingHubClientBase : IStream readonly Dictionary postCallbackCache = new(); int messageIdSequence = 0; - int state = (int)State.Uninitialized; - - enum State - { - Uninitialized, - Connected, - Disconnecting, - Disconnected, - Disposed, - } + bool disposed; + bool disconnected; + int cleanupSentinel = 0; // 0 = false, 1 = true readonly Channel writerQueue = Channel.CreateUnbounded(new UnboundedChannelOptions() { SingleReader = true, SingleWriter = false, AllowSynchronousContinuations = false }); Task? writerTask; @@ -299,8 +292,6 @@ static Method CreateConnectMethod(stri async Task StartSubscribe(SynchronizationContext? syncContext, Task firstMoveNext, CancellationToken subscriptionToken) { - state = (int)State.Connected; - var disconnectionReason = new DisconnectionReason(DisconnectionType.CompletedNormally, null); writerTask = RunWriterLoopAsync(subscriptionToken); @@ -356,7 +347,7 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task firstM } finally { - state = (int)State.Disconnected; + disconnected = true; try { @@ -656,7 +647,7 @@ protected Task WriteClientResultResponseMessageForErrorAsync(int methodId, Guid void ThrowIfDisconnected() { - if (state != (int)State.Connected) + if (disconnected) { throw new RpcException(new Status(StatusCode.Unavailable, $"The StreamingHubClient has already been disconnected from the server.")); } @@ -664,7 +655,7 @@ void ThrowIfDisconnected() void ThrowIfDisposed() { - if (state == (int)State.Disposed) + if (disposed) { throw new ObjectDisposedException("StreamingHubClient", $"The StreamingHubClient has already been disconnected from the server."); } @@ -678,15 +669,14 @@ Task IStreamingHubClient.WaitForDisconnectAsync() public async Task DisposeAsync() { - if (state == (int)State.Disposed) return; - state = (int)State.Disposed; - + if (disposed) return; + disposed = true; await CleanupAsync(true).ConfigureAwait(false); } async ValueTask CleanupAsync(bool waitSubscription) { - if (Interlocked.CompareExchange(ref state, (int)State.Disconnected, (int)State.Disconnecting) != (int)State.Disconnecting) + if (Interlocked.CompareExchange(ref cleanupSentinel, 1, 0) != 0) { return; }