Skip to content

Commit

Permalink
Add BDN that includes embedded network stack (#889)
Browse files Browse the repository at this point in the history
* Initial checkin on BDN that includes network path - handles non-TLS only

* cleanup

* ensure dispose

* cleanup

* merge from main

* Update ci-bdnbenchmark.yml
  • Loading branch information
badrishc authored Dec 19, 2024
1 parent 1f85668 commit 820ca0f
Show file tree
Hide file tree
Showing 21 changed files with 502 additions and 31 deletions.
15 changes: 14 additions & 1 deletion .github/workflows/ci-bdnbenchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,17 @@ jobs:
benchmark-data-dir-path: 'website/static/charts'
auto-push: true
max-items-in-chart: 50


# Run `github-action-benchmark` action for the Continuous Benchmarking Summary
- name: Generate summary for benchmark result
if: github.ref != 'refs/heads/main'
uses: benchmark-action/github-action-benchmark@v1
with:
name: ${{matrix.test}} (${{matrix.os}} ${{matrix.framework}} ${{matrix.configuration}})
tool: 'benchmarkdotnet'
output-file-path: ./test/BDNPerfTests/BenchmarkDotNet.Artifacts/results/BDN.benchmark.${{ matrix.test }}-report-full-compressed.json
github-token: ${{ secrets.GITHUB_TOKEN }}
summary-always: true
save-data-file: false
gh-pages-branch: 'continuousbenchmark'
benchmark-data-dir-path: 'website/static/charts'
2 changes: 0 additions & 2 deletions benchmark/BDN.benchmark/BDN.benchmark.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
</ItemGroup>

<ItemGroup>
<Compile Include="..\..\playground\Embedded.perftest\EmbeddedRespServer.cs" Link="Utils\EmbeddedRespServer.cs" />
<Compile Include="..\..\playground\Embedded.perftest\DummyNetworkSender.cs" Link="Utils\DummyNetworkSender.cs" />
<Compile Include="..\..\main\GarnetServer\Extensions\MyDictObject.cs" Link="Custom\MyDictObject.cs" />
<Compile Include="..\..\main\GarnetServer\Extensions\MyDictSet.cs" Link="Custom\MyDictSet.cs" />
<Compile Include="..\..\main\GarnetServer\Extensions\MyDictGet.cs" Link="Custom\MyDictGet.cs" />
Expand Down
2 changes: 1 addition & 1 deletion benchmark/BDN.benchmark/Cluster/ClusterContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using System.Runtime.InteropServices;
using System.Text;
using BDN.benchmark.CustomProcs;
using Embedded.perftest;
using Embedded.server;
using Garnet.common;
using Garnet.server;

Expand Down
38 changes: 38 additions & 0 deletions benchmark/BDN.benchmark/Embedded/EmbeddedNetworkHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Diagnostics;
using Garnet.common;
using Garnet.networking;
using Microsoft.Extensions.Logging;

namespace Embedded.server
{
internal class EmbeddedNetworkHandler : NetworkHandler<GarnetServerEmbedded, EmbeddedNetworkSender>
{
public EmbeddedNetworkHandler(GarnetServerEmbedded serverHook, EmbeddedNetworkSender networkSender, NetworkBufferSettings networkBufferSettings, LimitedFixedBufferPool networkPool, bool useTLS, IMessageConsumer messageConsumer = null, ILogger logger = null) : base(serverHook, networkSender, networkBufferSettings, networkPool, useTLS, messageConsumer, logger)
{
}

public override string RemoteEndpointName => throw new NotImplementedException();
public override string LocalEndpointName => throw new NotImplementedException();
public override void Dispose()
{
DisposeImpl();
}

public override bool TryClose() => throw new NotImplementedException();

public unsafe void Send(byte[] buffer, byte* bufferPtr, int length)
{
networkReceiveBuffer = buffer;
networkReceiveBufferPtr = bufferPtr;
OnNetworkReceive(length);

// We should have consumed the entire buffer
Debug.Assert(networkBytesRead == 0);
Debug.Assert(networkReadHead == 0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
using System.Runtime.CompilerServices;
using Garnet.networking;

namespace Embedded.perftest
namespace Embedded.server
{
/// <summary>
/// Dummy network sender that reads from a fixed in-memory buffer
/// </summary>
unsafe class DummyNetworkSender : INetworkSender
internal unsafe class EmbeddedNetworkSender : INetworkSender
{
/// <summary>
/// Max size settings of the in-memory sender buffer
Expand All @@ -34,7 +34,7 @@ unsafe class DummyNetworkSender : INetworkSender
/// <summary>
/// Create a new dummy network sender with a simple in-memory buffer
/// </summary>
public DummyNetworkSender()
public EmbeddedNetworkSender()
{
maxSizeSettings = new MaxSizeSettings();
serverBufferSize = BufferSizeUtils.ServerBufferSize(maxSizeSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,24 @@
using Garnet.server;
using Microsoft.Extensions.Logging;

namespace Embedded.perftest
namespace Embedded.server
{
/// <summary>
/// Implements an embedded Garnet RESP server
/// </summary>
public sealed class EmbeddedRespServer : GarnetServer
internal sealed class EmbeddedRespServer : GarnetServer
{
readonly GarnetServerEmbedded garnetServerEmbedded;

/// <summary>
/// Creates an EmbeddedRespServer instance
/// </summary>
/// <param name="opts">Server options to configure the base GarnetServer instance</param>
/// <param name="loggerFactory">Logger factory to configure the base GarnetServer instance</param>
public EmbeddedRespServer(GarnetServerOptions opts, ILoggerFactory loggerFactory = null) : base(opts, loggerFactory)
/// <param name="server">Server network</param>
public EmbeddedRespServer(GarnetServerOptions opts, ILoggerFactory loggerFactory = null, GarnetServerEmbedded server = null) : base(opts, loggerFactory, server)
{
// Nothing...
this.garnetServerEmbedded = server;
}

/// <summary>
Expand All @@ -31,12 +33,17 @@ public EmbeddedRespServer(GarnetServerOptions opts, ILoggerFactory loggerFactory
public StoreWrapper StoreWrapper => storeWrapper;

/// <summary>
/// Return a RESP session to this server
/// Return a direct RESP session to this server
/// </summary>
/// <returns>A new RESP server session</returns>
internal RespServerSession GetRespSession()
{
return new RespServerSession(0, new DummyNetworkSender(), storeWrapper, null, null, true);
return new RespServerSession(0, new EmbeddedNetworkSender(), storeWrapper, null, null, true);
}

internal EmbeddedNetworkHandler GetNetworkHandler()
{
return garnetServerEmbedded.CreateNetworkHandler();
}
}
}
101 changes: 101 additions & 0 deletions benchmark/BDN.benchmark/Embedded/GarnetServerEmbedded.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Net.Security;
using System.Threading;
using Garnet.common;
using Garnet.networking;
using Garnet.server;
using Microsoft.Extensions.Logging;

namespace Embedded.server
{
internal class GarnetServerEmbedded : GarnetServerBase, IServerHook
{
public GarnetServerEmbedded() : base("0.0.0.0", 0, 1 << 10)
{
}

public EmbeddedNetworkHandler CreateNetworkHandler(SslClientAuthenticationOptions tlsOptions = null, string remoteEndpointName = null)
{
var networkSender = new EmbeddedNetworkSender();
var networkSettings = new NetworkBufferSettings();
var networkPool = networkSettings.CreateBufferPool();
EmbeddedNetworkHandler handler = null;

if (activeHandlerCount >= 0)
{
var currentActiveHandlerCount = Interlocked.Increment(ref activeHandlerCount);
if (currentActiveHandlerCount > 0)
{
try
{
handler = new EmbeddedNetworkHandler(this, networkSender, networkSettings, networkPool, tlsOptions != null);
if (!activeHandlers.TryAdd(handler, default))
throw new Exception("Unable to add handler to dictionary");

handler.Start(tlsOptions, remoteEndpointName);
incr_conn_recv();
return handler;
}
catch (Exception ex)
{
logger?.LogError(ex, "Error starting network handler");
Interlocked.Decrement(ref activeHandlerCount);
handler?.Dispose();
}
}
else
{
Interlocked.Decrement(ref activeHandlerCount);
}
}
return handler;
}

public void DisposeMessageConsumer(INetworkHandler session)
{
if (activeHandlers.TryRemove(session, out _))
{
Interlocked.Decrement(ref activeHandlerCount);
incr_conn_disp();
try
{
session.Session?.Dispose();
}
catch (Exception ex)
{
logger?.LogError(ex, "Error disposing RespServerSession");
}
}
}

public override void Start()
{
}

public bool TryCreateMessageConsumer(Span<byte> bytes, INetworkSender networkSender, out IMessageConsumer session)
{
session = null;

// We need at least 4 bytes to determine session
if (bytes.Length < 4)
return false;

WireFormat protocol = WireFormat.ASCII;

if (!GetSessionProviders().TryGetValue(protocol, out var provider))
{
var input = System.Text.Encoding.ASCII.GetString(bytes);
logger?.LogError("Cannot identify wire protocol {bytes}", input);
throw new Exception($"Unsupported incoming wire format {protocol} {input}");
}

if (!AddSession(protocol, ref provider, networkSender, out session))
throw new Exception($"Unable to add session");

return true;
}
}
}
2 changes: 1 addition & 1 deletion benchmark/BDN.benchmark/Lua/LuaRunnerOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT license.

using BenchmarkDotNet.Attributes;
using Embedded.perftest;
using Embedded.server;
using Garnet.server;

namespace BDN.benchmark.Lua
Expand Down
2 changes: 1 addition & 1 deletion benchmark/BDN.benchmark/Lua/LuaScriptCacheOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT license.

using BenchmarkDotNet.Attributes;
using Embedded.perftest;
using Embedded.server;
using Garnet.common;
using Garnet.server;
using Garnet.server.Auth;
Expand Down
30 changes: 30 additions & 0 deletions benchmark/BDN.benchmark/Network/BasicOperations.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using BenchmarkDotNet.Attributes;

namespace BDN.benchmark.Network
{
/// <summary>
/// Benchmark for BasicOperations
/// </summary>
[MemoryDiagnoser]
public unsafe class BasicOperations : NetworkBase
{
static ReadOnlySpan<byte> INLINE_PING => "PING\r\n"u8;
byte[] pingRequestBuffer;
byte* pingRequestBufferPointer;

public override void GlobalSetup()
{
base.GlobalSetup();
SetupOperation(ref pingRequestBuffer, ref pingRequestBufferPointer, INLINE_PING);
}

[Benchmark]
public void InlinePing()
{
Send(pingRequestBuffer, pingRequestBufferPointer, pingRequestBuffer.Length);
}
}
}
93 changes: 93 additions & 0 deletions benchmark/BDN.benchmark/Network/NetworkBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System.Runtime.CompilerServices;
using BenchmarkDotNet.Attributes;
using Embedded.server;
using Garnet.server;

namespace BDN.benchmark.Network
{
/// <summary>
/// Base class for network benchmarks
/// </summary>
public abstract unsafe class NetworkBase
{
/// <summary>
/// Parameters
/// </summary>
[ParamsSource(nameof(NetworkParamsProvider))]
public NetworkParams Params { get; set; }

/// <summary>
/// Operation parameters provider
/// </summary>
public IEnumerable<NetworkParams> NetworkParamsProvider()
{
yield return new(false);
}

/// <summary>
/// Batch size per method invocation
/// With a batchSize of 100, we have a convenient conversion of latency to throughput:
/// 5 us = 20 Mops/sec
/// 10 us = 10 Mops/sec
/// 20 us = 5 Mops/sec
/// 25 us = 4 Mops/sec
/// 100 us = 1 Mops/sec
/// </summary>
const int batchSize = 100;
EmbeddedRespServer server;
EmbeddedNetworkHandler networkHandler;

/// <summary>
/// Setup
/// </summary>
[GlobalSetup]
public virtual void GlobalSetup()
{
var opts = new GarnetServerOptions
{
QuietMode = true,
DisablePubSub = true,
};

server = new EmbeddedRespServer(opts, null, new GarnetServerEmbedded());
networkHandler = server.GetNetworkHandler();

// Send a PING message to warm up the session
SlowConsumeMessage("PING\r\n"u8);
}

/// <summary>
/// Cleanup
/// </summary>
[GlobalCleanup]
public virtual void GlobalCleanup()
{
networkHandler.Dispose();
server.Dispose();
}

protected void Send(byte[] requestBuffer, byte* requestBufferPointer, int length)
{
networkHandler.Send(requestBuffer, requestBufferPointer, length);
}

protected void SetupOperation(ref byte[] requestBuffer, ref byte* requestBufferPointer, ReadOnlySpan<byte> operation)
{
requestBuffer = GC.AllocateArray<byte>(operation.Length * batchSize, pinned: true);
requestBufferPointer = (byte*)Unsafe.AsPointer(ref requestBuffer[0]);
for (int i = 0; i < batchSize; i++)
operation.CopyTo(new Span<byte>(requestBuffer).Slice(i * operation.Length));
}

protected void SlowConsumeMessage(ReadOnlySpan<byte> message)
{
var buffer = GC.AllocateArray<byte>(message.Length, pinned: true);
var bufferPointer = (byte*)Unsafe.AsPointer(ref buffer[0]);
message.CopyTo(new Span<byte>(buffer));
Send(buffer, bufferPointer, buffer.Length);
}
}
}
Loading

0 comments on commit 820ca0f

Please sign in to comment.