diff options
Diffstat (limited to 'src/csharp/Grpc.Core/Server.cs')
-rw-r--r-- | src/csharp/Grpc.Core/Server.cs | 129 |
1 files changed, 87 insertions, 42 deletions
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index d538a4671f..3b554e5e87 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -34,8 +34,7 @@ using System; using System.Collections; using System.Collections.Generic; -using System.Diagnostics; -using System.Runtime.InteropServices; +using System.Linq; using System.Threading.Tasks; using Grpc.Core.Internal; using Grpc.Core.Logging; @@ -48,7 +47,7 @@ namespace Grpc.Core /// </summary> public class Server { - const int InitialAllowRpcTokenCount = 10; + const int InitialAllowRpcTokenCountPerCq = 10; static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>(); readonly AtomicCounter activeCallCounter = new AtomicCounter(); @@ -68,11 +67,19 @@ namespace Grpc.Core bool startRequested; volatile bool shutdownRequested; + /// <summary> - /// Create a new server. + /// Creates a new server. + /// </summary> + public Server() : this(null) + { + } + + /// <summary> + /// Creates a new server. /// </summary> /// <param name="options">Channel options.</param> - public Server(IEnumerable<ChannelOption> options = null) + public Server(IEnumerable<ChannelOption> options) { this.serviceDefinitions = new ServiceDefinitionCollection(this); this.ports = new ServerPortCollection(this); @@ -80,8 +87,14 @@ namespace Grpc.Core this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options)) { - this.handle = ServerSafeHandle.NewServer(environment.CompletionQueue, channelArgs); + this.handle = ServerSafeHandle.NewServer(channelArgs); + } + + foreach (var cq in environment.CompletionQueues) + { + this.handle.RegisterCompletionQueue(cq); } + GrpcEnvironment.RegisterServer(this); } /// <summary> @@ -127,15 +140,19 @@ namespace Grpc.Core lock (myLock) { GrpcPreconditions.CheckState(!startRequested); + GrpcPreconditions.CheckState(!shutdownRequested); startRequested = true; handle.Start(); // Starting with more than one AllowOneRpc tokens can significantly increase // unary RPC throughput. - for (int i = 0; i < InitialAllowRpcTokenCount; i++) + for (int i = 0; i < InitialAllowRpcTokenCountPerCq; i++) { - AllowOneRpc(); + foreach (var cq in environment.CompletionQueues) + { + AllowOneRpc(cq); + } } } } @@ -145,41 +162,24 @@ namespace Grpc.Core /// cleans up used resources. The returned task finishes when shutdown procedure /// is complete. /// </summary> - public async Task ShutdownAsync() + /// <remarks> + /// It is strongly recommended to shutdown all previously created servers before exiting from the process. + /// </remarks> + public Task ShutdownAsync() { - lock (myLock) - { - GrpcPreconditions.CheckState(startRequested); - GrpcPreconditions.CheckState(!shutdownRequested); - shutdownRequested = true; - } - - handle.ShutdownAndNotify(HandleServerShutdown, environment); - await shutdownTcs.Task.ConfigureAwait(false); - DisposeHandle(); - - await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false); + return ShutdownInternalAsync(false); } /// <summary> /// Requests server shutdown while cancelling all the in-progress calls. /// The returned task finishes when shutdown procedure is complete. /// </summary> - public async Task KillAsync() + /// <remarks> + /// It is strongly recommended to shutdown all previously created servers before exiting from the process. + /// </remarks> + public Task KillAsync() { - lock (myLock) - { - GrpcPreconditions.CheckState(startRequested); - GrpcPreconditions.CheckState(!shutdownRequested); - shutdownRequested = true; - } - - handle.ShutdownAndNotify(HandleServerShutdown, environment); - handle.CancelAllCalls(); - await shutdownTcs.Task.ConfigureAwait(false); - DisposeHandle(); - - await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false); + return ShutdownInternalAsync(true); } internal void AddCallReference(object call) @@ -198,6 +198,51 @@ namespace Grpc.Core } /// <summary> + /// Shuts down the server. + /// </summary> + private async Task ShutdownInternalAsync(bool kill) + { + lock (myLock) + { + GrpcPreconditions.CheckState(!shutdownRequested); + shutdownRequested = true; + } + GrpcEnvironment.UnregisterServer(this); + + var cq = environment.CompletionQueues.First(); // any cq will do + handle.ShutdownAndNotify(HandleServerShutdown, cq); + if (kill) + { + handle.CancelAllCalls(); + } + await ShutdownCompleteOrEnvironmentDeadAsync().ConfigureAwait(false); + + DisposeHandle(); + + await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false); + } + + /// <summary> + /// In case the environment's threadpool becomes dead, the shutdown completion will + /// never be delivered, but we need to release the environment's handle anyway. + /// </summary> + private async Task ShutdownCompleteOrEnvironmentDeadAsync() + { + while (true) + { + var task = await Task.WhenAny(shutdownTcs.Task, Task.Delay(20)).ConfigureAwait(false); + if (shutdownTcs.Task == task) + { + return; + } + if (!environment.IsAlive) + { + return; + } + } + } + + /// <summary> /// Adds a service definition. /// </summary> private void AddServiceDefinitionInternal(ServerServiceDefinition serviceDefinition) @@ -244,11 +289,11 @@ namespace Grpc.Core /// <summary> /// Allows one new RPC call to be received by server. /// </summary> - private void AllowOneRpc() + private void AllowOneRpc(CompletionQueueSafeHandle cq) { if (!shutdownRequested) { - handle.RequestCall(HandleNewServerRpc, environment); + handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), cq); } } @@ -265,7 +310,7 @@ namespace Grpc.Core /// <summary> /// Selects corresponding handler for given call and handles the call. /// </summary> - private async Task HandleCallAsync(ServerRpcNew newRpc) + private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) { try { @@ -274,7 +319,7 @@ namespace Grpc.Core { callHandler = NoSuchMethodCallHandler.Instance; } - await callHandler.HandleCall(newRpc, environment).ConfigureAwait(false); + await callHandler.HandleCall(newRpc, cq).ConfigureAwait(false); } catch (Exception e) { @@ -285,9 +330,9 @@ namespace Grpc.Core /// <summary> /// Handles the native callback. /// </summary> - private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx) + private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx, CompletionQueueSafeHandle cq) { - Task.Run(() => AllowOneRpc()); + Task.Run(() => AllowOneRpc(cq)); if (success) { @@ -296,7 +341,7 @@ namespace Grpc.Core // after server shutdown, the callback returns with null call if (!newRpc.Call.IsInvalid) { - HandleCallAsync(newRpc); // we don't need to await. + HandleCallAsync(newRpc, cq); // we don't need to await. } } } |