diff options
Diffstat (limited to 'src/csharp/Grpc.Core/Server.cs')
-rw-r--r-- | src/csharp/Grpc.Core/Server.cs | 39 |
1 files changed, 24 insertions, 15 deletions
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index d538a4671f..069185e13a 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -34,8 +34,7 @@ using System; using System.Collections; using System.Collections.Generic; -using System.Diagnostics; -using System.Runtime.InteropServices; +using System.Linq; using System.Threading.Tasks; using Grpc.Core.Internal; using Grpc.Core.Logging; @@ -48,7 +47,7 @@ namespace Grpc.Core /// </summary> public class Server { - const int InitialAllowRpcTokenCount = 10; + const int InitialAllowRpcTokenCountPerCq = 10; static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>(); readonly AtomicCounter activeCallCounter = new AtomicCounter(); @@ -80,7 +79,12 @@ namespace Grpc.Core this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options)) { - this.handle = ServerSafeHandle.NewServer(environment.CompletionQueue, channelArgs); + this.handle = ServerSafeHandle.NewServer(channelArgs); + } + + foreach (var cq in environment.CompletionQueues) + { + this.handle.RegisterCompletionQueue(cq); } } @@ -133,9 +137,12 @@ namespace Grpc.Core // Starting with more than one AllowOneRpc tokens can significantly increase // unary RPC throughput. - for (int i = 0; i < InitialAllowRpcTokenCount; i++) + for (int i = 0; i < InitialAllowRpcTokenCountPerCq; i++) { - AllowOneRpc(); + foreach (var cq in environment.CompletionQueues) + { + AllowOneRpc(cq); + } } } } @@ -154,7 +161,8 @@ namespace Grpc.Core shutdownRequested = true; } - handle.ShutdownAndNotify(HandleServerShutdown, environment); + var cq = environment.CompletionQueues.First(); // any cq will do + handle.ShutdownAndNotify(HandleServerShutdown, cq); await shutdownTcs.Task.ConfigureAwait(false); DisposeHandle(); @@ -174,7 +182,8 @@ namespace Grpc.Core shutdownRequested = true; } - handle.ShutdownAndNotify(HandleServerShutdown, environment); + var cq = environment.CompletionQueues.First(); // any cq will do + handle.ShutdownAndNotify(HandleServerShutdown, cq); handle.CancelAllCalls(); await shutdownTcs.Task.ConfigureAwait(false); DisposeHandle(); @@ -244,11 +253,11 @@ namespace Grpc.Core /// <summary> /// Allows one new RPC call to be received by server. /// </summary> - private void AllowOneRpc() + private void AllowOneRpc(CompletionQueueSafeHandle cq) { if (!shutdownRequested) { - handle.RequestCall(HandleNewServerRpc, environment); + handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), cq); } } @@ -265,7 +274,7 @@ namespace Grpc.Core /// <summary> /// Selects corresponding handler for given call and handles the call. /// </summary> - private async Task HandleCallAsync(ServerRpcNew newRpc) + private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) { try { @@ -274,7 +283,7 @@ namespace Grpc.Core { callHandler = NoSuchMethodCallHandler.Instance; } - await callHandler.HandleCall(newRpc, environment).ConfigureAwait(false); + await callHandler.HandleCall(newRpc, cq).ConfigureAwait(false); } catch (Exception e) { @@ -285,9 +294,9 @@ namespace Grpc.Core /// <summary> /// Handles the native callback. /// </summary> - private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx) + private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx, CompletionQueueSafeHandle cq) { - Task.Run(() => AllowOneRpc()); + Task.Run(() => AllowOneRpc(cq)); if (success) { @@ -296,7 +305,7 @@ namespace Grpc.Core // after server shutdown, the callback returns with null call if (!newRpc.Call.IsInvalid) { - HandleCallAsync(newRpc); // we don't need to await. + HandleCallAsync(newRpc, cq); // we don't need to await. } } } |