Skip to content

Commit

Permalink
Merge pull request #833 from Cysharp/feature/HeartbeatCallback
Browse files Browse the repository at this point in the history
Add a callback when an Ack message is received from the client
  • Loading branch information
mayuki authored Sep 6, 2024
2 parents eca7ee4 + 0fbd6ba commit 67af829
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 77 deletions.
2 changes: 1 addition & 1 deletion src/MagicOnion.Server/Diagnostics/MagicOnionMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void StreamingHubMethodCompleted(in MetricsContext context, StreamingHubH
var tags = InitializeTagListForStreamingHub(handler.HubName);
tags.Add("rpc.method", handler.MethodInfo.Name);
tags.Add("magiconion.streaminghub.is_error", isErrorOrInterrupted ? BoxedTrue : BoxedFalse);
streamingHubMethodDuration.Record((long)StopwatchHelper.GetElapsedTime(startingTimestamp, endingTimestamp).TotalMilliseconds, tags);
streamingHubMethodDuration.Record((long)TimeProvider.System.GetElapsedTime(startingTimestamp, endingTimestamp).TotalMilliseconds, tags);
streamingHubMethodCompletedCounter.Add(1, tags);
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/MagicOnion.Server/Features/IMagicOnionHeartbeatFeature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ public interface IMagicOnionHeartbeatFeature
/// </summary>
CancellationToken TimeoutToken { get; }

/// <summary>
/// Sets the callback action to be performed when an Ack message is received from the client.
/// </summary>
/// <param name="callbackAction"></param>
void SetAckCallback(Action<TimeSpan>? callbackAction);

/// <summary>
/// Unregister the current StreamingHub connection from the HeartbeatManager.
/// </summary>
Expand All @@ -37,4 +43,6 @@ internal sealed class MagicOnionHeartbeatFeature(StreamingHubHeartbeatHandle han
public CancellationToken TimeoutToken => handle.TimeoutToken;

public void Unregister() => handle.Unregister();

public void SetAckCallback(Action<TimeSpan>? callbackAction) => handle.SetAckCallback(callbackAction);
}
19 changes: 11 additions & 8 deletions src/MagicOnion.Server/Hubs/StreamingHub.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using System.Buffers;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Cysharp.Runtime.Multicast;
using Cysharp.Runtime.Multicast.Remoting;
using Grpc.Core;
using MagicOnion.Internal;
Expand All @@ -23,6 +21,7 @@ public abstract class StreamingHubBase<THubInterface, TReceiver> : ServiceBase<T
{
IRemoteClientResultPendingTaskRegistry remoteClientResultPendingTasks = default!;
StreamingHubHeartbeatHandle heartbeatHandle = default!;
TimeProvider timeProvider = default!;

protected static readonly Task<Nil> NilTask = Task.FromResult(Nil.Default);
protected static readonly ValueTask CompletedTask = new ValueTask();
Expand Down Expand Up @@ -88,10 +87,11 @@ internal async Task<DuplexStreamingResult<StreamingHubPayload, StreamingHubPaylo

var features = this.Context.CallContext.GetHttpContext().Features;
var magicOnionOptions = serviceProvider.GetRequiredService<IOptions<MagicOnionOptions>>().Value;
timeProvider = magicOnionOptions.TimeProvider ?? TimeProvider.System;

var remoteProxyFactory = serviceProvider.GetRequiredService<IRemoteProxyFactory>();
var remoteSerializer = serviceProvider.GetRequiredService<IRemoteSerializer>();
this.remoteClientResultPendingTasks = new RemoteClientResultPendingTaskRegistry(magicOnionOptions.ClientResultsDefaultTimeout, magicOnionOptions.TimeProvider ?? TimeProvider.System);
this.remoteClientResultPendingTasks = new RemoteClientResultPendingTaskRegistry(magicOnionOptions.ClientResultsDefaultTimeout, timeProvider);
this.Client = remoteProxyFactory.CreateDirect<TReceiver>(new MagicOnionRemoteReceiverWriter(StreamingServiceContext), remoteSerializer, remoteClientResultPendingTasks);

var handlerRepository = serviceProvider.GetRequiredService<StreamingHubHandlerRepository>();
Expand Down Expand Up @@ -134,10 +134,13 @@ internal async Task<DuplexStreamingResult<StreamingHubPayload, StreamingHubPaylo
{
Metrics.StreamingHubConnectionDecrement(Context.Metrics, Context.MethodHandler.ServiceName);

heartbeatHandle.Dispose();
StreamingServiceContext.CompleteStreamingHub();
heartbeatHandle.Unregister(); // NOTE: To be able to use CancellationToken within OnDisconnected event, separate the calls to Dispose and Unregister.

await OnDisconnected();

await this.Group.DisposeAsync();
heartbeatHandle.Dispose();
remoteClientResultPendingTasks.Dispose();
}

Expand Down Expand Up @@ -267,10 +270,10 @@ async ValueTask ProcessRequestAsync(UniqueHashDictionary<StreamingHubHandler> ha
hubInstance: this,
request: body,
messageId: messageId,
timestamp: DateTime.UtcNow
timestamp: timeProvider.GetUtcNow().UtcDateTime
);

var methodStartingTimestamp = Stopwatch.GetTimestamp();
var methodStartingTimestamp = timeProvider.GetTimestamp();
var isErrorOrInterrupted = false;
MagicOnionServerLog.BeginInvokeHubMethod(Context.MethodHandler.Logger, context, context.Request, handler.RequestType);
try
Expand All @@ -297,8 +300,8 @@ async ValueTask ProcessRequestAsync(UniqueHashDictionary<StreamingHubHandler> ha
}
finally
{
var methodEndingTimestamp = Stopwatch.GetTimestamp();
MagicOnionServerLog.EndInvokeHubMethod(Context.MethodHandler.Logger, context, context.ResponseSize, context.ResponseType, StopwatchHelper.GetElapsedTime(methodStartingTimestamp, methodEndingTimestamp).TotalMilliseconds, isErrorOrInterrupted);
var methodEndingTimestamp = timeProvider.GetTimestamp();
MagicOnionServerLog.EndInvokeHubMethod(Context.MethodHandler.Logger, context, context.ResponseSize, context.ResponseType, timeProvider.GetElapsedTime(methodStartingTimestamp, methodEndingTimestamp).TotalMilliseconds, isErrorOrInterrupted);
Metrics.StreamingHubMethodCompleted(Context.Metrics, handler, methodStartingTimestamp, methodEndingTimestamp, isErrorOrInterrupted);

StreamingHubContextPool.Shared.Return(context);
Expand Down
65 changes: 48 additions & 17 deletions src/MagicOnion.Server/Hubs/StreamingHubHeartbeatManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Diagnostics;
using MagicOnion.Internal;
using MagicOnion.Server.Diagnostics;
using MagicOnion.Server.Internal;
using Microsoft.Extensions.Logging;

namespace MagicOnion.Server.Hubs;
Expand All @@ -17,26 +18,27 @@ internal interface IStreamingHubHeartbeatManager : IDisposable

internal class StreamingHubHeartbeatHandle : IDisposable
{
readonly object gate = new();
readonly IStreamingHubHeartbeatManager manager;
readonly CancellationTokenSource timeoutToken;
readonly TimeSpan timeoutDuration;
bool disposed;
short waitingSequence;
bool unregistered;
short waitingSequence = -1;
bool timeoutTimerIsRunning;
DateTimeOffset lastSentAt;
DateTimeOffset lastReceivedAt;
long lastSentAtTimestamp;
Action<TimeSpan>? onAckCallback;

/// <summary>
/// Gets the last received time.
/// </summary>
public DateTimeOffset LastReceivedAt => lastReceivedAt;
public DateTimeOffset LastReceivedAt { get; private set; }

/// <summary>
/// Gets the latency between client and server. Returns <see cref="TimeSpan.Zero"/> if not sent or received.
/// </summary>
public TimeSpan Latency => (lastSentAt == default || lastReceivedAt == default)
? TimeSpan.Zero
: lastReceivedAt - lastSentAt;
public TimeSpan Latency { get; private set; }

public IStreamingServiceContext<StreamingHubPayload, StreamingHubPayload> ServiceContext { get; }
public CancellationToken TimeoutToken => timeoutToken.Token;
Expand All @@ -53,16 +55,21 @@ public StreamingHubHeartbeatHandle(IStreamingHubHeartbeatManager manager, IStrea
);
}

public void RestartTimeoutTimer(short sequence, DateTimeOffset sentAt)
public void RestartTimeoutTimer(short sequence, DateTimeOffset sentAt, long sentAtTimestamp)
{
if (disposed || timeoutDuration == Timeout.InfiniteTimeSpan) return;
waitingSequence = sequence;
lastSentAt = sentAt;

if (!timeoutTimerIsRunning)
lock (gate)
{
timeoutToken.CancelAfter(timeoutDuration);
timeoutTimerIsRunning = true;
waitingSequence = sequence;
lastSentAt = sentAt;
lastSentAtTimestamp = sentAtTimestamp;

if (!timeoutTimerIsRunning)
{
timeoutToken.CancelAfter(timeoutDuration);
timeoutTimerIsRunning = true;
}
}
}

Expand All @@ -71,23 +78,46 @@ public void Ack(short sequence)
if (disposed || timeoutDuration == Timeout.InfiniteTimeSpan) return;

if (waitingSequence != sequence) return;
lastReceivedAt = manager.TimeProvider.GetUtcNow();
timeoutToken.CancelAfter(Timeout.InfiniteTimeSpan);
timeoutTimerIsRunning = false;

lock (gate)
{
var receivedAtTimestamp = manager.TimeProvider.GetTimestamp();
var elapsed = manager.TimeProvider.GetElapsedTime(lastSentAtTimestamp, receivedAtTimestamp);

LastReceivedAt = lastSentAt.Add(elapsed);
Latency = elapsed;
timeoutToken.CancelAfter(Timeout.InfiniteTimeSpan);
timeoutTimerIsRunning = false;

onAckCallback?.Invoke(Latency);
}
}

public void SetAckCallback(Action<TimeSpan>? callbackAction)
{
this.onAckCallback = callbackAction;
}

public void Unregister()
{
if (unregistered) return;

manager.Unregister(ServiceContext);
timeoutToken.CancelAfter(Timeout.InfiniteTimeSpan);
timeoutTimerIsRunning = false;
unregistered = true;
}

public void Dispose()
{
if (disposed) return;

disposed = true;
manager.Unregister(ServiceContext);
onAckCallback = null;
if (!unregistered)
{
manager.Unregister(ServiceContext);
}
timeoutToken.Dispose();
}
}
Expand Down Expand Up @@ -172,6 +202,7 @@ async Task StartHeartbeatAsync(PeriodicTimer runningTimer, string method)
while (await runningTimer.WaitForNextTickAsync())
{
var now = TimeProvider.GetUtcNow();
var timestamp = TimeProvider.GetTimestamp();
StreamingHubMessageWriter.WriteServerHeartbeatMessageHeader(writer, sequence, now);
if (!(heartbeatMetadataProvider?.TryWriteMetadata(writer) ?? false))
{
Expand All @@ -183,7 +214,7 @@ async Task StartHeartbeatAsync(PeriodicTimer runningTimer, string method)
{
foreach (var (contextId, handle) in contexts)
{
handle.RestartTimeoutTimer(sequence, now);
handle.RestartTimeoutTimer(sequence, now, timestamp);
handle.ServiceContext.QueueResponseStreamWrite(StreamingHubPayloadPool.Shared.RentOrCreate(writer.WrittenSpan));
}
}
Expand Down
20 changes: 0 additions & 20 deletions src/MagicOnion.Server/Internal/StopwatchHelper.cs

This file was deleted.

Loading

0 comments on commit 67af829

Please sign in to comment.