diff options
Diffstat (limited to 'src/csharp/Grpc.Core')
-rw-r--r-- | src/csharp/Grpc.Core/GrpcEnvironment.cs | 34 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCall.cs | 15 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 2 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 12 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 7 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/INativeCall.cs | 2 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/NativeMethods.cs | 18 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerCallHandler.cs | 72 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Server.cs | 24 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/VersionInfo.cs | 4 |
10 files changed, 133 insertions, 57 deletions
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index a5c78cc9d7..bee0ef1d62 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -45,11 +45,12 @@ namespace Grpc.Core /// </summary> public class GrpcEnvironment { - const int THREAD_POOL_SIZE = 4; + const int MinDefaultThreadPoolSize = 4; static object staticLock = new object(); static GrpcEnvironment instance; static int refCount; + static int? customThreadPoolSize; static ILogger logger = new ConsoleLogger(); @@ -123,13 +124,30 @@ namespace Grpc.Core } /// <summary> + /// Sets the number of threads in the gRPC thread pool that polls for internal RPC events. + /// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. + /// Setting thread pool size is an advanced setting and you should only use it if you know what you are doing. + /// Most users should rely on the default value provided by gRPC library. + /// Note: this method is part of an experimental API that can change or be removed without any prior notice. + /// </summary> + public static void SetThreadPoolSize(int threadCount) + { + lock (staticLock) + { + GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized"); + GrpcPreconditions.CheckArgument(threadCount > 0, "threadCount needs to be a positive number"); + customThreadPoolSize = threadCount; + } + } + + /// <summary> /// Creates gRPC environment. /// </summary> private GrpcEnvironment() { GrpcNativeInit(); completionRegistry = new CompletionRegistry(this); - threadPool = new GrpcThreadPool(this, THREAD_POOL_SIZE); + threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault()); threadPool.Start(); } @@ -200,5 +218,17 @@ namespace Grpc.Core debugStats.CheckOK(); } + + private int GetThreadPoolSizeOrDefault() + { + if (customThreadPoolSize.HasValue) + { + return customThreadPoolSize.Value; + } + // In systems with many cores, use half of the cores for GrpcThreadPool + // and the other half for .NET thread pool. This heuristic definitely needs + // more work, but seems to work reasonably well for a start. + return Math.Max(MinDefaultThreadPoolSize, Environment.ProcessorCount / 2); + } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index f522174bd0..55351869b5 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -57,7 +57,7 @@ namespace Grpc.Core.Internal // Completion of a pending unary response if not null. TaskCompletionSource<TResponse> unaryResponseTcs; - // Indicates that steaming call has finished. + // Indicates that response streaming call has finished. TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>(); // Response headers set here once received. @@ -443,6 +443,19 @@ namespace Grpc.Core.Internal } } + protected override void CheckSendingAllowed(bool allowFinished) + { + base.CheckSendingAllowed(true); + + // throwing RpcException if we already received status on client + // side makes the most sense. + // Note that this throws even for StatusCode.OK. + if (!allowFinished && finishedStatus.HasValue) + { + throw new RpcException(finishedStatus.Value.Status); + } + } + /// <summary> /// Handles receive status completion for calls with streaming response. /// </summary> diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 42234dcac2..4de23706b2 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -213,7 +213,7 @@ namespace Grpc.Core.Internal { } - protected void CheckSendingAllowed(bool allowFinished) + protected virtual void CheckSendingAllowed(bool allowFinished) { GrpcPreconditions.CheckState(started); CheckNotCancelled(); diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index eafe2ccab8..b1566b44a7 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -139,8 +139,11 @@ namespace Grpc.Core.Internal /// Sends call result status, indicating we are done with writes. /// Sending a status different from StatusCode.OK will also implicitly cancel the call. /// </summary> - public Task SendStatusFromServerAsync(Status status, Metadata trailers) + public Task SendStatusFromServerAsync(Status status, Metadata trailers, Tuple<TResponse, WriteFlags> optionalWrite) { + byte[] payload = optionalWrite != null ? UnsafeSerialize(optionalWrite.Item1) : null; + var writeFlags = optionalWrite != null ? optionalWrite.Item2 : default(WriteFlags); + lock (myLock) { GrpcPreconditions.CheckState(started); @@ -149,11 +152,16 @@ namespace Grpc.Core.Internal using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) { - call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent); + call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent, + payload, writeFlags); } halfcloseRequested = true; initialMetadataSent = true; sendStatusFromServerTcs = new TaskCompletionSource<object>(); + if (optionalWrite != null) + { + streamingWritesCounter++; + } return sendStatusFromServerTcs.Task; } } diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 500653ba5d..244b97d4a4 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -135,13 +135,16 @@ namespace Grpc.Core.Internal } } - public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) + public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, + byte[] optionalPayload, WriteFlags writeFlags) { using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); + var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero; completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); - Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk(); + Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata, + optionalPayload, optionalPayloadLength, writeFlags).CheckOk(); } } diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs index cbef599139..cd3719cb50 100644 --- a/src/csharp/Grpc.Core/Internal/INativeCall.cs +++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs @@ -78,7 +78,7 @@ namespace Grpc.Core.Internal void StartSendCloseFromClient(SendCompletionHandler callback); - void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata); + void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, Grpc.Core.WriteFlags writeFlags); void StartServerSide(ReceivedCloseOnServerHandler callback); } diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs index 9ee0ba3bc0..c277c73ef0 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs @@ -421,20 +421,21 @@ namespace Grpc.Core.Internal public delegate GRPCCallError grpcsharp_call_cancel_delegate(CallSafeHandle call); public delegate GRPCCallError grpcsharp_call_cancel_with_status_delegate(CallSafeHandle call, StatusCode status, string description); public delegate GRPCCallError grpcsharp_call_start_unary_delegate(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); public delegate GRPCCallError grpcsharp_call_start_client_streaming_delegate(CallSafeHandle call, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); public delegate GRPCCallError grpcsharp_call_start_server_streaming_delegate(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); public delegate GRPCCallError grpcsharp_call_start_duplex_streaming_delegate(CallSafeHandle call, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); public delegate GRPCCallError grpcsharp_call_send_message_delegate(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata); + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata); public delegate GRPCCallError grpcsharp_call_send_close_from_client_delegate(CallSafeHandle call, BatchContextSafeHandle ctx); public delegate GRPCCallError grpcsharp_call_send_status_from_server_delegate(CallSafeHandle call, - BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata); + BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, + byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags); public delegate GRPCCallError grpcsharp_call_recv_message_delegate(CallSafeHandle call, BatchContextSafeHandle ctx); public delegate GRPCCallError grpcsharp_call_recv_initial_metadata_delegate(CallSafeHandle call, @@ -593,7 +594,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, @@ -601,7 +602,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] @@ -610,7 +611,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata); + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata); [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call, @@ -618,7 +619,8 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, - BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata); + BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, + byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 00d82d51e8..85b7a4b01e 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -75,27 +75,32 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; + Tuple<TResponse,WriteFlags> responseTuple = null; var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false)); var request = requestStream.Current; - var result = await handler(request, context).ConfigureAwait(false); + var response = await handler(request, context).ConfigureAwait(false); status = context.Status; - await responseStream.WriteAsync(result).ConfigureAwait(false); + responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } try { - await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } @@ -139,17 +144,21 @@ namespace Grpc.Core.Internal } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } try { - await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } @@ -183,33 +192,31 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; + Tuple<TResponse,WriteFlags> responseTuple = null; var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { - var result = await handler(requestStream, context).ConfigureAwait(false); + var response = await handler(requestStream, context).ConfigureAwait(false); status = context.Status; - try - { - await responseStream.WriteAsync(result).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - status = Status.DefaultCancelled; - } + responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } try { - await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } @@ -251,16 +258,20 @@ namespace Grpc.Core.Internal } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } try { - await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } @@ -278,7 +289,7 @@ namespace Grpc.Core.Internal asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); - await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false); await finishedTask.ConfigureAwait(false); } } @@ -297,6 +308,11 @@ namespace Grpc.Core.Internal return new Status(StatusCode.Unknown, "Exception was thrown by handler."); } + public static WriteFlags GetWriteFlags(WriteOptions writeOptions) + { + return writeOptions != null ? writeOptions.Flags : default(WriteFlags); + } + public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken) where TRequest : class where TResponse : class diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index 5b61b7f060..d538a4671f 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -48,6 +48,7 @@ namespace Grpc.Core /// </summary> public class Server { + const int InitialAllowRpcTokenCount = 10; static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>(); readonly AtomicCounter activeCallCounter = new AtomicCounter(); @@ -65,7 +66,7 @@ namespace Grpc.Core readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>(); bool startRequested; - bool shutdownRequested; + volatile bool shutdownRequested; /// <summary> /// Create a new server. @@ -129,7 +130,13 @@ namespace Grpc.Core startRequested = true; handle.Start(); - AllowOneRpc(); + + // Starting with more than one AllowOneRpc tokens can significantly increase + // unary RPC throughput. + for (int i = 0; i < InitialAllowRpcTokenCount; i++) + { + AllowOneRpc(); + } } } @@ -239,12 +246,9 @@ namespace Grpc.Core /// </summary> private void AllowOneRpc() { - lock (myLock) + if (!shutdownRequested) { - if (!shutdownRequested) - { - handle.RequestCall(HandleNewServerRpc, environment); - } + handle.RequestCall(HandleNewServerRpc, environment); } } @@ -283,6 +287,8 @@ namespace Grpc.Core /// </summary> private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx) { + Task.Run(() => AllowOneRpc()); + if (success) { ServerRpcNew newRpc = ctx.GetServerRpcNew(this); @@ -290,11 +296,9 @@ namespace Grpc.Core // after server shutdown, the callback returns with null call if (!newRpc.Call.IsInvalid) { - Task.Run(async () => await HandleCallAsync(newRpc)).ConfigureAwait(false); + HandleCallAsync(newRpc); // we don't need to await. } } - - AllowOneRpc(); } /// <summary> diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs index f7a9cb9c1c..e1609341d9 100644 --- a/src/csharp/Grpc.Core/VersionInfo.cs +++ b/src/csharp/Grpc.Core/VersionInfo.cs @@ -48,11 +48,11 @@ namespace Grpc.Core /// <summary> /// Current <c>AssemblyFileVersion</c> of gRPC C# assemblies /// </summary> - public const string CurrentAssemblyFileVersion = "0.14.0.0"; + public const string CurrentAssemblyFileVersion = "0.15.0.0"; /// <summary> /// Current version of gRPC C# /// </summary> - public const string CurrentVersion = "0.14.0-dev"; + public const string CurrentVersion = "0.15.0-dev"; } } |