aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Server.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.Core/Server.cs')
-rw-r--r--src/csharp/Grpc.Core/Server.cs39
1 files changed, 24 insertions, 15 deletions
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index d538a4671f..069185e13a 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -34,8 +34,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
-using System.Diagnostics;
-using System.Runtime.InteropServices;
+using System.Linq;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
@@ -48,7 +47,7 @@ namespace Grpc.Core
/// </summary>
public class Server
{
- const int InitialAllowRpcTokenCount = 10;
+ const int InitialAllowRpcTokenCountPerCq = 10;
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
readonly AtomicCounter activeCallCounter = new AtomicCounter();
@@ -80,7 +79,12 @@ namespace Grpc.Core
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options))
{
- this.handle = ServerSafeHandle.NewServer(environment.CompletionQueue, channelArgs);
+ this.handle = ServerSafeHandle.NewServer(channelArgs);
+ }
+
+ foreach (var cq in environment.CompletionQueues)
+ {
+ this.handle.RegisterCompletionQueue(cq);
}
}
@@ -133,9 +137,12 @@ namespace Grpc.Core
// Starting with more than one AllowOneRpc tokens can significantly increase
// unary RPC throughput.
- for (int i = 0; i < InitialAllowRpcTokenCount; i++)
+ for (int i = 0; i < InitialAllowRpcTokenCountPerCq; i++)
{
- AllowOneRpc();
+ foreach (var cq in environment.CompletionQueues)
+ {
+ AllowOneRpc(cq);
+ }
}
}
}
@@ -154,7 +161,8 @@ namespace Grpc.Core
shutdownRequested = true;
}
- handle.ShutdownAndNotify(HandleServerShutdown, environment);
+ var cq = environment.CompletionQueues.First(); // any cq will do
+ handle.ShutdownAndNotify(HandleServerShutdown, cq);
await shutdownTcs.Task.ConfigureAwait(false);
DisposeHandle();
@@ -174,7 +182,8 @@ namespace Grpc.Core
shutdownRequested = true;
}
- handle.ShutdownAndNotify(HandleServerShutdown, environment);
+ var cq = environment.CompletionQueues.First(); // any cq will do
+ handle.ShutdownAndNotify(HandleServerShutdown, cq);
handle.CancelAllCalls();
await shutdownTcs.Task.ConfigureAwait(false);
DisposeHandle();
@@ -244,11 +253,11 @@ namespace Grpc.Core
/// <summary>
/// Allows one new RPC call to be received by server.
/// </summary>
- private void AllowOneRpc()
+ private void AllowOneRpc(CompletionQueueSafeHandle cq)
{
if (!shutdownRequested)
{
- handle.RequestCall(HandleNewServerRpc, environment);
+ handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), cq);
}
}
@@ -265,7 +274,7 @@ namespace Grpc.Core
/// <summary>
/// Selects corresponding handler for given call and handles the call.
/// </summary>
- private async Task HandleCallAsync(ServerRpcNew newRpc)
+ private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
try
{
@@ -274,7 +283,7 @@ namespace Grpc.Core
{
callHandler = NoSuchMethodCallHandler.Instance;
}
- await callHandler.HandleCall(newRpc, environment).ConfigureAwait(false);
+ await callHandler.HandleCall(newRpc, cq).ConfigureAwait(false);
}
catch (Exception e)
{
@@ -285,9 +294,9 @@ namespace Grpc.Core
/// <summary>
/// Handles the native callback.
/// </summary>
- private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx)
+ private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx, CompletionQueueSafeHandle cq)
{
- Task.Run(() => AllowOneRpc());
+ Task.Run(() => AllowOneRpc(cq));
if (success)
{
@@ -296,7 +305,7 @@ namespace Grpc.Core
// after server shutdown, the callback returns with null call
if (!newRpc.Call.IsInvalid)
{
- HandleCallAsync(newRpc); // we don't need to await.
+ HandleCallAsync(newRpc, cq); // we don't need to await.
}
}
}