Skip to content

Commit

Permalink
Merge pull request #839 from Cysharp/feature/FixDisconnectionReason
Browse files Browse the repository at this point in the history
Fix the disconnection reason determination process
  • Loading branch information
mayuki authored Sep 20, 2024
2 parents 93cda56 + 4218be6 commit 08530bf
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,14 +324,18 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task<bool> firstM
}
catch (Exception ex)
{
if (ex is OperationCanceledException oce)
// When terminating by Heartbeat or DisposeAsync, a RpcException with a Status of Canceled is thrown.
// If `ex.InnerException` is OperationCanceledException` and `subscriptionToken.IsCancellationRequested` is true, it is treated as a normal cancellation.
if ((ex is OperationCanceledException oce) ||
(ex is RpcException { InnerException: OperationCanceledException } && subscriptionToken.IsCancellationRequested))
{
if (heartbeatManager.TimeoutToken.IsCancellationRequested)
{
disconnectionReason = new DisconnectionReason(DisconnectionType.TimedOut, ex);
}
return;
}

const string msg = "An error occurred while subscribing to messages.";
// log post on main thread.
if (syncContext != null)
Expand Down
6 changes: 5 additions & 1 deletion src/MagicOnion.Client/StreamingHubClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -324,14 +324,18 @@ async Task StartSubscribe(SynchronizationContext? syncContext, Task<bool> firstM
}
catch (Exception ex)
{
if (ex is OperationCanceledException oce)
// When terminating by Heartbeat or DisposeAsync, a RpcException with a Status of Canceled is thrown.
// If `ex.InnerException` is OperationCanceledException` and `subscriptionToken.IsCancellationRequested` is true, it is treated as a normal cancellation.
if ((ex is OperationCanceledException oce) ||
(ex is RpcException { InnerException: OperationCanceledException } && subscriptionToken.IsCancellationRequested))
{
if (heartbeatManager.TimeoutToken.IsCancellationRequested)
{
disconnectionReason = new DisconnectionReason(DisconnectionType.TimedOut, ex);
}
return;
}

const string msg = "An error occurred while subscribing to messages.";
// log post on main thread.
if (syncContext != null)
Expand Down
19 changes: 15 additions & 4 deletions tests/MagicOnion.Client.Tests/ChannelAsyncStreamReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,25 @@ public ChannelAsyncStreamReader(Channel<T> channel)

public async Task<bool> MoveNext(CancellationToken cancellationToken)
{
if (await reader.WaitToReadAsync(cancellationToken))
try
{
if (reader.TryRead(out var item))
if (await reader.WaitToReadAsync(cancellationToken))
{
Current = item;
return true;
if (reader.TryRead(out var item))
{
Current = item;
return true;
}
}
}
catch (OperationCanceledException e)
{
throw new RpcException(new Status(StatusCode.Cancelled, e.Message, e));
}
catch (Exception e)
{
throw new RpcException(new Status(StatusCode.Unknown, e.Message, e));
}

return false;
}
Expand Down
4 changes: 2 additions & 2 deletions tests/MagicOnion.Client.Tests/StreamingHubTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ public async Task WaitForDisconnectAsync_TimedOut()

// Assert
Assert.Equal(DisconnectionType.TimedOut, disconnectionReason.Type);
Assert.IsType<OperationCanceledException>(disconnectionReason.Exception);
Assert.IsType<RpcException>(disconnectionReason.Exception);
}

[Fact]
Expand All @@ -635,7 +635,7 @@ public async Task WaitForDisconnectAsync_Faulted()

// Assert
Assert.Equal(DisconnectionType.Faulted, disconnectionReason.Type);
Assert.IsType<IOException>(disconnectionReason.Exception);
Assert.IsType<RpcException>(disconnectionReason.Exception);
}

[Fact]
Expand Down
52 changes: 52 additions & 0 deletions tests/MagicOnion.Integration.Tests/StreamingHubTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,50 @@ public async Task CustomMethodId_Receiver(TestStreamingHubClientFactory clientFa
// Assert
receiver.Received().Receiver_CustomMethodId();
}

[Theory]
[MemberData(nameof(EnumerateStreamingHubClientFactory))]
public async Task WaitForDisconnectAsync_CompletedNormally(TestStreamingHubClientFactory clientFactory)
{
// Arrange
var httpClient = factory.CreateDefaultClient();
var channel = GrpcChannel.ForAddress("http://localhost", new GrpcChannelOptions() { HttpClient = httpClient });

var receiver = Substitute.For<IStreamingHubTestHubReceiver>();
var client = await clientFactory.CreateAndConnectAsync<IStreamingHubTestHub, IStreamingHubTestHubReceiver>(channel, receiver);

// Act
await client.DisposeAsync();
var reason = await client.WaitForDisconnectAsync();

// Assert
reason.Type.Should().Be(DisconnectionType.CompletedNormally);
reason.Exception.Should().BeNull();
}

[Theory]
[MemberData(nameof(EnumerateStreamingHubClientFactory))]
public async Task WaitForDisconnectAsync_Faulted(TestStreamingHubClientFactory clientFactory)
{
// Arrange
var httpClient = factory.CreateDefaultClient();
var channel = GrpcChannel.ForAddress("http://localhost", new GrpcChannelOptions() { HttpClient = httpClient });

var receiver = Substitute.For<IStreamingHubTestHubReceiver>();
var client = await clientFactory.CreateAndConnectAsync<IStreamingHubTestHub, IStreamingHubTestHubReceiver>(channel, receiver);

// Act
try
{
await client.DisconnectFromServerAsync();
}
catch { }
var reason = await client.WaitForDisconnectAsync();

// Assert
reason.Type.Should().Be(DisconnectionType.Faulted);
reason.Exception.Should().NotBeNull();
}
}

public class StreamingHubTestHub : StreamingHubBase<IStreamingHubTestHub, IStreamingHubTestHubReceiver>, IStreamingHubTestHub
Expand Down Expand Up @@ -870,6 +914,12 @@ public Task CustomMethodId()
{
return Task.CompletedTask;
}

public Task DisconnectFromServerAsync()
{
this.Context.CallContext.GetHttpContext().Abort();
return Task.CompletedTask;
}
}

public interface IStreamingHubTestHubReceiver
Expand Down Expand Up @@ -940,4 +990,6 @@ public interface IStreamingHubTestHub : IStreamingHub<IStreamingHubTestHub, IStr

[MethodId(12345)]
Task CustomMethodId();

Task DisconnectFromServerAsync();
}

0 comments on commit 08530bf

Please sign in to comment.