aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Server.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.Core/Server.cs')
-rw-r--r--src/csharp/Grpc.Core/Server.cs58
1 files changed, 46 insertions, 12 deletions
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index 3b554e5e87..63c1d9cd00 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -47,7 +47,7 @@ namespace Grpc.Core
/// </summary>
public class Server
{
- const int InitialAllowRpcTokenCountPerCq = 10;
+ const int DefaultRequestCallTokensPerCq = 2000;
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
readonly AtomicCounter activeCallCounter = new AtomicCounter();
@@ -66,7 +66,7 @@ namespace Grpc.Core
bool startRequested;
volatile bool shutdownRequested;
-
+ int requestCallTokensPerCq = DefaultRequestCallTokensPerCq;
/// <summary>
/// Creates a new server.
@@ -133,6 +133,27 @@ namespace Grpc.Core
}
/// <summary>
+ /// Experimental API. Might anytime change without prior notice.
+ /// Number or calls requested via grpc_server_request_call at any given time for each completion queue.
+ /// </summary>
+ public int RequestCallTokensPerCompletionQueue
+ {
+ get
+ {
+ return requestCallTokensPerCq;
+ }
+ set
+ {
+ lock (myLock)
+ {
+ GrpcPreconditions.CheckState(!startRequested);
+ GrpcPreconditions.CheckArgument(value > 0);
+ requestCallTokensPerCq = value;
+ }
+ }
+ }
+
+ /// <summary>
/// Starts the server.
/// </summary>
public void Start()
@@ -145,9 +166,7 @@ namespace Grpc.Core
handle.Start();
- // Starting with more than one AllowOneRpc tokens can significantly increase
- // unary RPC throughput.
- for (int i = 0; i < InitialAllowRpcTokenCountPerCq; i++)
+ for (int i = 0; i < requestCallTokensPerCq; i++)
{
foreach (var cq in environment.CompletionQueues)
{
@@ -310,14 +329,14 @@ namespace Grpc.Core
/// <summary>
/// Selects corresponding handler for given call and handles the call.
/// </summary>
- private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
+ private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq, Action continuation)
{
try
{
IServerCallHandler callHandler;
if (!callHandlers.TryGetValue(newRpc.Method, out callHandler))
{
- callHandler = NoSuchMethodCallHandler.Instance;
+ callHandler = UnimplementedMethodCallHandler.Instance;
}
await callHandler.HandleCall(newRpc, cq).ConfigureAwait(false);
}
@@ -325,25 +344,40 @@ namespace Grpc.Core
{
Logger.Warning(e, "Exception while handling RPC.");
}
+ finally
+ {
+ continuation();
+ }
}
/// <summary>
/// Handles the native callback.
/// </summary>
- private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx, CompletionQueueSafeHandle cq)
+ private void HandleNewServerRpc(bool success, RequestCallContextSafeHandle ctx, CompletionQueueSafeHandle cq)
{
- Task.Run(() => AllowOneRpc(cq));
-
+ bool nextRpcRequested = false;
if (success)
{
- ServerRpcNew newRpc = ctx.GetServerRpcNew(this);
+ var newRpc = ctx.GetServerRpcNew(this);
// after server shutdown, the callback returns with null call
if (!newRpc.Call.IsInvalid)
{
- HandleCallAsync(newRpc, cq); // we don't need to await.
+ nextRpcRequested = true;
+
+ // Start asynchronous handler for the call.
+ // Don't await, the continuations will run on gRPC thread pool once triggered
+ // by cq.Next().
+ #pragma warning disable 4014
+ HandleCallAsync(newRpc, cq, () => AllowOneRpc(cq));
+ #pragma warning restore 4014
}
}
+
+ if (!nextRpcRequested)
+ {
+ AllowOneRpc(cq);
+ }
}
/// <summary>