diff options
Diffstat (limited to 'src/csharp/Grpc.Core/Server.cs')
-rw-r--r-- | src/csharp/Grpc.Core/Server.cs | 58 |
1 files changed, 46 insertions, 12 deletions
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index 3b554e5e87..63c1d9cd00 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -47,7 +47,7 @@ namespace Grpc.Core /// </summary> public class Server { - const int InitialAllowRpcTokenCountPerCq = 10; + const int DefaultRequestCallTokensPerCq = 2000; static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>(); readonly AtomicCounter activeCallCounter = new AtomicCounter(); @@ -66,7 +66,7 @@ namespace Grpc.Core bool startRequested; volatile bool shutdownRequested; - + int requestCallTokensPerCq = DefaultRequestCallTokensPerCq; /// <summary> /// Creates a new server. @@ -133,6 +133,27 @@ namespace Grpc.Core } /// <summary> + /// Experimental API. Might anytime change without prior notice. + /// Number or calls requested via grpc_server_request_call at any given time for each completion queue. + /// </summary> + public int RequestCallTokensPerCompletionQueue + { + get + { + return requestCallTokensPerCq; + } + set + { + lock (myLock) + { + GrpcPreconditions.CheckState(!startRequested); + GrpcPreconditions.CheckArgument(value > 0); + requestCallTokensPerCq = value; + } + } + } + + /// <summary> /// Starts the server. /// </summary> public void Start() @@ -145,9 +166,7 @@ namespace Grpc.Core handle.Start(); - // Starting with more than one AllowOneRpc tokens can significantly increase - // unary RPC throughput. - for (int i = 0; i < InitialAllowRpcTokenCountPerCq; i++) + for (int i = 0; i < requestCallTokensPerCq; i++) { foreach (var cq in environment.CompletionQueues) { @@ -310,14 +329,14 @@ namespace Grpc.Core /// <summary> /// Selects corresponding handler for given call and handles the call. /// </summary> - private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) + private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq, Action continuation) { try { IServerCallHandler callHandler; if (!callHandlers.TryGetValue(newRpc.Method, out callHandler)) { - callHandler = NoSuchMethodCallHandler.Instance; + callHandler = UnimplementedMethodCallHandler.Instance; } await callHandler.HandleCall(newRpc, cq).ConfigureAwait(false); } @@ -325,25 +344,40 @@ namespace Grpc.Core { Logger.Warning(e, "Exception while handling RPC."); } + finally + { + continuation(); + } } /// <summary> /// Handles the native callback. /// </summary> - private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx, CompletionQueueSafeHandle cq) + private void HandleNewServerRpc(bool success, RequestCallContextSafeHandle ctx, CompletionQueueSafeHandle cq) { - Task.Run(() => AllowOneRpc(cq)); - + bool nextRpcRequested = false; if (success) { - ServerRpcNew newRpc = ctx.GetServerRpcNew(this); + var newRpc = ctx.GetServerRpcNew(this); // after server shutdown, the callback returns with null call if (!newRpc.Call.IsInvalid) { - HandleCallAsync(newRpc, cq); // we don't need to await. + nextRpcRequested = true; + + // Start asynchronous handler for the call. + // Don't await, the continuations will run on gRPC thread pool once triggered + // by cq.Next(). + #pragma warning disable 4014 + HandleCallAsync(newRpc, cq, () => AllowOneRpc(cq)); + #pragma warning restore 4014 } } + + if (!nextRpcRequested) + { + AllowOneRpc(cq); + } } /// <summary> |