aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Server.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.Core/Server.cs')
-rw-r--r--src/csharp/Grpc.Core/Server.cs141
1 files changed, 95 insertions, 46 deletions
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index 5b61b7f060..3b554e5e87 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -34,8 +34,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
-using System.Diagnostics;
-using System.Runtime.InteropServices;
+using System.Linq;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
@@ -48,6 +47,7 @@ namespace Grpc.Core
/// </summary>
public class Server
{
+ const int InitialAllowRpcTokenCountPerCq = 10;
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
readonly AtomicCounter activeCallCounter = new AtomicCounter();
@@ -65,13 +65,21 @@ namespace Grpc.Core
readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>();
bool startRequested;
- bool shutdownRequested;
+ volatile bool shutdownRequested;
+
+
+ /// <summary>
+ /// Creates a new server.
+ /// </summary>
+ public Server() : this(null)
+ {
+ }
/// <summary>
- /// Create a new server.
+ /// Creates a new server.
/// </summary>
/// <param name="options">Channel options.</param>
- public Server(IEnumerable<ChannelOption> options = null)
+ public Server(IEnumerable<ChannelOption> options)
{
this.serviceDefinitions = new ServiceDefinitionCollection(this);
this.ports = new ServerPortCollection(this);
@@ -79,8 +87,14 @@ namespace Grpc.Core
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options))
{
- this.handle = ServerSafeHandle.NewServer(environment.CompletionQueue, channelArgs);
+ this.handle = ServerSafeHandle.NewServer(channelArgs);
+ }
+
+ foreach (var cq in environment.CompletionQueues)
+ {
+ this.handle.RegisterCompletionQueue(cq);
}
+ GrpcEnvironment.RegisterServer(this);
}
/// <summary>
@@ -126,10 +140,20 @@ namespace Grpc.Core
lock (myLock)
{
GrpcPreconditions.CheckState(!startRequested);
+ GrpcPreconditions.CheckState(!shutdownRequested);
startRequested = true;
handle.Start();
- AllowOneRpc();
+
+ // Starting with more than one AllowOneRpc tokens can significantly increase
+ // unary RPC throughput.
+ for (int i = 0; i < InitialAllowRpcTokenCountPerCq; i++)
+ {
+ foreach (var cq in environment.CompletionQueues)
+ {
+ AllowOneRpc(cq);
+ }
+ }
}
}
@@ -138,41 +162,24 @@ namespace Grpc.Core
/// cleans up used resources. The returned task finishes when shutdown procedure
/// is complete.
/// </summary>
- public async Task ShutdownAsync()
+ /// <remarks>
+ /// It is strongly recommended to shutdown all previously created servers before exiting from the process.
+ /// </remarks>
+ public Task ShutdownAsync()
{
- lock (myLock)
- {
- GrpcPreconditions.CheckState(startRequested);
- GrpcPreconditions.CheckState(!shutdownRequested);
- shutdownRequested = true;
- }
-
- handle.ShutdownAndNotify(HandleServerShutdown, environment);
- await shutdownTcs.Task.ConfigureAwait(false);
- DisposeHandle();
-
- await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false);
+ return ShutdownInternalAsync(false);
}
/// <summary>
/// Requests server shutdown while cancelling all the in-progress calls.
/// The returned task finishes when shutdown procedure is complete.
/// </summary>
- public async Task KillAsync()
+ /// <remarks>
+ /// It is strongly recommended to shutdown all previously created servers before exiting from the process.
+ /// </remarks>
+ public Task KillAsync()
{
- lock (myLock)
- {
- GrpcPreconditions.CheckState(startRequested);
- GrpcPreconditions.CheckState(!shutdownRequested);
- shutdownRequested = true;
- }
-
- handle.ShutdownAndNotify(HandleServerShutdown, environment);
- handle.CancelAllCalls();
- await shutdownTcs.Task.ConfigureAwait(false);
- DisposeHandle();
-
- await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false);
+ return ShutdownInternalAsync(true);
}
internal void AddCallReference(object call)
@@ -191,6 +198,51 @@ namespace Grpc.Core
}
/// <summary>
+ /// Shuts down the server.
+ /// </summary>
+ private async Task ShutdownInternalAsync(bool kill)
+ {
+ lock (myLock)
+ {
+ GrpcPreconditions.CheckState(!shutdownRequested);
+ shutdownRequested = true;
+ }
+ GrpcEnvironment.UnregisterServer(this);
+
+ var cq = environment.CompletionQueues.First(); // any cq will do
+ handle.ShutdownAndNotify(HandleServerShutdown, cq);
+ if (kill)
+ {
+ handle.CancelAllCalls();
+ }
+ await ShutdownCompleteOrEnvironmentDeadAsync().ConfigureAwait(false);
+
+ DisposeHandle();
+
+ await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false);
+ }
+
+ /// <summary>
+ /// In case the environment's threadpool becomes dead, the shutdown completion will
+ /// never be delivered, but we need to release the environment's handle anyway.
+ /// </summary>
+ private async Task ShutdownCompleteOrEnvironmentDeadAsync()
+ {
+ while (true)
+ {
+ var task = await Task.WhenAny(shutdownTcs.Task, Task.Delay(20)).ConfigureAwait(false);
+ if (shutdownTcs.Task == task)
+ {
+ return;
+ }
+ if (!environment.IsAlive)
+ {
+ return;
+ }
+ }
+ }
+
+ /// <summary>
/// Adds a service definition.
/// </summary>
private void AddServiceDefinitionInternal(ServerServiceDefinition serviceDefinition)
@@ -237,14 +289,11 @@ namespace Grpc.Core
/// <summary>
/// Allows one new RPC call to be received by server.
/// </summary>
- private void AllowOneRpc()
+ private void AllowOneRpc(CompletionQueueSafeHandle cq)
{
- lock (myLock)
+ if (!shutdownRequested)
{
- if (!shutdownRequested)
- {
- handle.RequestCall(HandleNewServerRpc, environment);
- }
+ handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), cq);
}
}
@@ -261,7 +310,7 @@ namespace Grpc.Core
/// <summary>
/// Selects corresponding handler for given call and handles the call.
/// </summary>
- private async Task HandleCallAsync(ServerRpcNew newRpc)
+ private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
try
{
@@ -270,7 +319,7 @@ namespace Grpc.Core
{
callHandler = NoSuchMethodCallHandler.Instance;
}
- await callHandler.HandleCall(newRpc, environment).ConfigureAwait(false);
+ await callHandler.HandleCall(newRpc, cq).ConfigureAwait(false);
}
catch (Exception e)
{
@@ -281,8 +330,10 @@ namespace Grpc.Core
/// <summary>
/// Handles the native callback.
/// </summary>
- private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx)
+ private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx, CompletionQueueSafeHandle cq)
{
+ Task.Run(() => AllowOneRpc(cq));
+
if (success)
{
ServerRpcNew newRpc = ctx.GetServerRpcNew(this);
@@ -290,11 +341,9 @@ namespace Grpc.Core
// after server shutdown, the callback returns with null call
if (!newRpc.Call.IsInvalid)
{
- Task.Run(async () => await HandleCallAsync(newRpc)).ConfigureAwait(false);
+ HandleCallAsync(newRpc, cq); // we don't need to await.
}
}
-
- AllowOneRpc();
}
/// <summary>