diff options
Diffstat (limited to 'src/csharp')
18 files changed, 194 insertions, 127 deletions
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs index 058371521d..0e204761f6 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs @@ -168,7 +168,7 @@ namespace Grpc.Core.Internal.Tests var responseStream = new ServerResponseStream<string, string>(asyncCallServer); var writeTask = responseStream.WriteAsync("request1"); - var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata()); + var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null); fakeCall.SendCompletionHandler(true); fakeCall.SendStatusFromServerHandler(true); diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs index 1bec258ca2..909112a47c 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs @@ -165,7 +165,8 @@ namespace Grpc.Core.Internal.Tests SendCompletionHandler = callback; } - public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) + public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, + byte[] optionalPayload, WriteFlags writeFlags) { SendStatusFromServerHandler = callback; } diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index a5c78cc9d7..bee0ef1d62 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -45,11 +45,12 @@ namespace Grpc.Core /// </summary> public class GrpcEnvironment { - const int THREAD_POOL_SIZE = 4; + const int MinDefaultThreadPoolSize = 4; static object staticLock = new object(); static GrpcEnvironment instance; static int refCount; + static int? customThreadPoolSize; static ILogger logger = new ConsoleLogger(); @@ -123,13 +124,30 @@ namespace Grpc.Core } /// <summary> + /// Sets the number of threads in the gRPC thread pool that polls for internal RPC events. + /// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. + /// Setting thread pool size is an advanced setting and you should only use it if you know what you are doing. + /// Most users should rely on the default value provided by gRPC library. + /// Note: this method is part of an experimental API that can change or be removed without any prior notice. + /// </summary> + public static void SetThreadPoolSize(int threadCount) + { + lock (staticLock) + { + GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized"); + GrpcPreconditions.CheckArgument(threadCount > 0, "threadCount needs to be a positive number"); + customThreadPoolSize = threadCount; + } + } + + /// <summary> /// Creates gRPC environment. /// </summary> private GrpcEnvironment() { GrpcNativeInit(); completionRegistry = new CompletionRegistry(this); - threadPool = new GrpcThreadPool(this, THREAD_POOL_SIZE); + threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault()); threadPool.Start(); } @@ -200,5 +218,17 @@ namespace Grpc.Core debugStats.CheckOK(); } + + private int GetThreadPoolSizeOrDefault() + { + if (customThreadPoolSize.HasValue) + { + return customThreadPoolSize.Value; + } + // In systems with many cores, use half of the cores for GrpcThreadPool + // and the other half for .NET thread pool. This heuristic definitely needs + // more work, but seems to work reasonably well for a start. + return Math.Max(MinDefaultThreadPoolSize, Environment.ProcessorCount / 2); + } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index eafe2ccab8..b1566b44a7 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -139,8 +139,11 @@ namespace Grpc.Core.Internal /// Sends call result status, indicating we are done with writes. /// Sending a status different from StatusCode.OK will also implicitly cancel the call. /// </summary> - public Task SendStatusFromServerAsync(Status status, Metadata trailers) + public Task SendStatusFromServerAsync(Status status, Metadata trailers, Tuple<TResponse, WriteFlags> optionalWrite) { + byte[] payload = optionalWrite != null ? UnsafeSerialize(optionalWrite.Item1) : null; + var writeFlags = optionalWrite != null ? optionalWrite.Item2 : default(WriteFlags); + lock (myLock) { GrpcPreconditions.CheckState(started); @@ -149,11 +152,16 @@ namespace Grpc.Core.Internal using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) { - call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent); + call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent, + payload, writeFlags); } halfcloseRequested = true; initialMetadataSent = true; sendStatusFromServerTcs = new TaskCompletionSource<object>(); + if (optionalWrite != null) + { + streamingWritesCounter++; + } return sendStatusFromServerTcs.Task; } } diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 500653ba5d..244b97d4a4 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -135,13 +135,16 @@ namespace Grpc.Core.Internal } } - public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) + public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, + byte[] optionalPayload, WriteFlags writeFlags) { using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); + var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero; completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); - Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk(); + Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata, + optionalPayload, optionalPayloadLength, writeFlags).CheckOk(); } } diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs index cbef599139..cd3719cb50 100644 --- a/src/csharp/Grpc.Core/Internal/INativeCall.cs +++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs @@ -78,7 +78,7 @@ namespace Grpc.Core.Internal void StartSendCloseFromClient(SendCompletionHandler callback); - void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata); + void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, Grpc.Core.WriteFlags writeFlags); void StartServerSide(ReceivedCloseOnServerHandler callback); } diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs index 9ee0ba3bc0..c277c73ef0 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs @@ -421,20 +421,21 @@ namespace Grpc.Core.Internal public delegate GRPCCallError grpcsharp_call_cancel_delegate(CallSafeHandle call); public delegate GRPCCallError grpcsharp_call_cancel_with_status_delegate(CallSafeHandle call, StatusCode status, string description); public delegate GRPCCallError grpcsharp_call_start_unary_delegate(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); public delegate GRPCCallError grpcsharp_call_start_client_streaming_delegate(CallSafeHandle call, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); public delegate GRPCCallError grpcsharp_call_start_server_streaming_delegate(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); public delegate GRPCCallError grpcsharp_call_start_duplex_streaming_delegate(CallSafeHandle call, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); public delegate GRPCCallError grpcsharp_call_send_message_delegate(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata); + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata); public delegate GRPCCallError grpcsharp_call_send_close_from_client_delegate(CallSafeHandle call, BatchContextSafeHandle ctx); public delegate GRPCCallError grpcsharp_call_send_status_from_server_delegate(CallSafeHandle call, - BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata); + BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, + byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags); public delegate GRPCCallError grpcsharp_call_recv_message_delegate(CallSafeHandle call, BatchContextSafeHandle ctx); public delegate GRPCCallError grpcsharp_call_recv_initial_metadata_delegate(CallSafeHandle call, @@ -593,7 +594,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, @@ -601,7 +602,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] @@ -610,7 +611,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata); + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata); [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call, @@ -618,7 +619,8 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, - BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata); + BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, + byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 00d82d51e8..85b7a4b01e 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -75,27 +75,32 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; + Tuple<TResponse,WriteFlags> responseTuple = null; var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false)); var request = requestStream.Current; - var result = await handler(request, context).ConfigureAwait(false); + var response = await handler(request, context).ConfigureAwait(false); status = context.Status; - await responseStream.WriteAsync(result).ConfigureAwait(false); + responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } try { - await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } @@ -139,17 +144,21 @@ namespace Grpc.Core.Internal } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } try { - await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } @@ -183,33 +192,31 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; + Tuple<TResponse,WriteFlags> responseTuple = null; var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { - var result = await handler(requestStream, context).ConfigureAwait(false); + var response = await handler(requestStream, context).ConfigureAwait(false); status = context.Status; - try - { - await responseStream.WriteAsync(result).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - status = Status.DefaultCancelled; - } + responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } try { - await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } @@ -251,16 +258,20 @@ namespace Grpc.Core.Internal } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } try { - await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } @@ -278,7 +289,7 @@ namespace Grpc.Core.Internal asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); - await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false); await finishedTask.ConfigureAwait(false); } } @@ -297,6 +308,11 @@ namespace Grpc.Core.Internal return new Status(StatusCode.Unknown, "Exception was thrown by handler."); } + public static WriteFlags GetWriteFlags(WriteOptions writeOptions) + { + return writeOptions != null ? writeOptions.Flags : default(WriteFlags); + } + public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken) where TRequest : class where TResponse : class diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index 5b61b7f060..d538a4671f 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -48,6 +48,7 @@ namespace Grpc.Core /// </summary> public class Server { + const int InitialAllowRpcTokenCount = 10; static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>(); readonly AtomicCounter activeCallCounter = new AtomicCounter(); @@ -65,7 +66,7 @@ namespace Grpc.Core readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>(); bool startRequested; - bool shutdownRequested; + volatile bool shutdownRequested; /// <summary> /// Create a new server. @@ -129,7 +130,13 @@ namespace Grpc.Core startRequested = true; handle.Start(); - AllowOneRpc(); + + // Starting with more than one AllowOneRpc tokens can significantly increase + // unary RPC throughput. + for (int i = 0; i < InitialAllowRpcTokenCount; i++) + { + AllowOneRpc(); + } } } @@ -239,12 +246,9 @@ namespace Grpc.Core /// </summary> private void AllowOneRpc() { - lock (myLock) + if (!shutdownRequested) { - if (!shutdownRequested) - { - handle.RequestCall(HandleNewServerRpc, environment); - } + handle.RequestCall(HandleNewServerRpc, environment); } } @@ -283,6 +287,8 @@ namespace Grpc.Core /// </summary> private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx) { + Task.Run(() => AllowOneRpc()); + if (success) { ServerRpcNew newRpc = ctx.GetServerRpcNew(this); @@ -290,11 +296,9 @@ namespace Grpc.Core // after server shutdown, the callback returns with null call if (!newRpc.Call.IsInvalid) { - Task.Run(async () => await HandleCallAsync(newRpc)).ConfigureAwait(false); + HandleCallAsync(newRpc); // we don't need to await. } } - - AllowOneRpc(); } /// <summary> diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs index f7a9cb9c1c..e1609341d9 100644 --- a/src/csharp/Grpc.Core/VersionInfo.cs +++ b/src/csharp/Grpc.Core/VersionInfo.cs @@ -48,11 +48,11 @@ namespace Grpc.Core /// <summary> /// Current <c>AssemblyFileVersion</c> of gRPC C# assemblies /// </summary> - public const string CurrentAssemblyFileVersion = "0.14.0.0"; + public const string CurrentAssemblyFileVersion = "0.15.0.0"; /// <summary> /// Current version of gRPC C# /// </summary> - public const string CurrentVersion = "0.14.0-dev"; + public const string CurrentVersion = "0.15.0-dev"; } } diff --git a/src/csharp/Grpc.Examples/MathGrpc.cs b/src/csharp/Grpc.Examples/MathGrpc.cs index 2d3034d28b..d700a18778 100644 --- a/src/csharp/Grpc.Examples/MathGrpc.cs +++ b/src/csharp/Grpc.Examples/MathGrpc.cs @@ -151,25 +151,25 @@ namespace Math { /// Div divides args.dividend by args.divisor and returns the quotient and /// remainder. /// </summary> - Task<global::Math.DivReply> Div(global::Math.DivArgs request, ServerCallContext context); + global::System.Threading.Tasks.Task<global::Math.DivReply> Div(global::Math.DivArgs request, ServerCallContext context); /// <summary> /// DivMany accepts an arbitrary number of division args from the client stream /// and sends back the results in the reply stream. The stream continues until /// the client closes its end; the server does the same after sending all the /// replies. The stream ends immediately if either end aborts. /// </summary> - Task DivMany(IAsyncStreamReader<global::Math.DivArgs> requestStream, IServerStreamWriter<global::Math.DivReply> responseStream, ServerCallContext context); + global::System.Threading.Tasks.Task DivMany(IAsyncStreamReader<global::Math.DivArgs> requestStream, IServerStreamWriter<global::Math.DivReply> responseStream, ServerCallContext context); /// <summary> /// Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib /// generates up to limit numbers; otherwise it continues until the call is /// canceled. Unlike Fib above, Fib has no final FibReply. /// </summary> - Task Fib(global::Math.FibArgs request, IServerStreamWriter<global::Math.Num> responseStream, ServerCallContext context); + global::System.Threading.Tasks.Task Fib(global::Math.FibArgs request, IServerStreamWriter<global::Math.Num> responseStream, ServerCallContext context); /// <summary> /// Sum sums a stream of numbers, returning the final result once the stream /// is closed. /// </summary> - Task<global::Math.Num> Sum(IAsyncStreamReader<global::Math.Num> requestStream, ServerCallContext context); + global::System.Threading.Tasks.Task<global::Math.Num> Sum(IAsyncStreamReader<global::Math.Num> requestStream, ServerCallContext context); } /// <summary>Base class for server-side implementations of Math</summary> @@ -179,7 +179,7 @@ namespace Math { /// Div divides args.dividend by args.divisor and returns the quotient and /// remainder. /// </summary> - public virtual Task<global::Math.DivReply> Div(global::Math.DivArgs request, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task<global::Math.DivReply> Div(global::Math.DivArgs request, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } @@ -190,7 +190,7 @@ namespace Math { /// the client closes its end; the server does the same after sending all the /// replies. The stream ends immediately if either end aborts. /// </summary> - public virtual Task DivMany(IAsyncStreamReader<global::Math.DivArgs> requestStream, IServerStreamWriter<global::Math.DivReply> responseStream, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task DivMany(IAsyncStreamReader<global::Math.DivArgs> requestStream, IServerStreamWriter<global::Math.DivReply> responseStream, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } @@ -200,7 +200,7 @@ namespace Math { /// generates up to limit numbers; otherwise it continues until the call is /// canceled. Unlike Fib above, Fib has no final FibReply. /// </summary> - public virtual Task Fib(global::Math.FibArgs request, IServerStreamWriter<global::Math.Num> responseStream, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task Fib(global::Math.FibArgs request, IServerStreamWriter<global::Math.Num> responseStream, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } @@ -209,7 +209,7 @@ namespace Math { /// Sum sums a stream of numbers, returning the final result once the stream /// is closed. /// </summary> - public virtual Task<global::Math.Num> Sum(IAsyncStreamReader<global::Math.Num> requestStream, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task<global::Math.Num> Sum(IAsyncStreamReader<global::Math.Num> requestStream, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } diff --git a/src/csharp/Grpc.HealthCheck/HealthGrpc.cs b/src/csharp/Grpc.HealthCheck/HealthGrpc.cs index 967d1170be..51c6a39b1d 100644 --- a/src/csharp/Grpc.HealthCheck/HealthGrpc.cs +++ b/src/csharp/Grpc.HealthCheck/HealthGrpc.cs @@ -72,13 +72,13 @@ namespace Grpc.Health.V1 { [System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")] public interface IHealth { - Task<global::Grpc.Health.V1.HealthCheckResponse> Check(global::Grpc.Health.V1.HealthCheckRequest request, ServerCallContext context); + global::System.Threading.Tasks.Task<global::Grpc.Health.V1.HealthCheckResponse> Check(global::Grpc.Health.V1.HealthCheckRequest request, ServerCallContext context); } /// <summary>Base class for server-side implementations of Health</summary> public abstract class HealthBase { - public virtual Task<global::Grpc.Health.V1.HealthCheckResponse> Check(global::Grpc.Health.V1.HealthCheckRequest request, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task<global::Grpc.Health.V1.HealthCheckResponse> Check(global::Grpc.Health.V1.HealthCheckRequest request, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs index b4572756f2..9eaf6bf7ce 100644 --- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs @@ -142,8 +142,7 @@ namespace Grpc.IntegrationTesting for (int i = 0; i < outstandingRpcsPerChannel; i++) { var timer = CreateTimer(loadParams, 1.0 / this.channels.Count / outstandingRpcsPerChannel); - var threadBody = GetThreadBody(channel, timer); - this.runnerTasks.Add(Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning)); + this.runnerTasks.Add(RunClientAsync(channel, timer)); } } } @@ -269,38 +268,30 @@ namespace Grpc.IntegrationTesting } } - private Action GetThreadBody(Channel channel, IInterarrivalTimer timer) + private Task RunClientAsync(Channel channel, IInterarrivalTimer timer) { if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams) { GrpcPreconditions.CheckArgument(clientType == ClientType.ASYNC_CLIENT, "Generic client only supports async API"); GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls"); - return () => - { - RunGenericStreamingAsync(channel, timer).Wait(); - }; + return RunGenericStreamingAsync(channel, timer); } GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams); if (clientType == ClientType.SYNC_CLIENT) { GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#"); - return () => RunUnary(channel, timer); + // create a dedicated thread for the synchronous client + return Task.Factory.StartNew(() => RunUnary(channel, timer), TaskCreationOptions.LongRunning); } else if (clientType == ClientType.ASYNC_CLIENT) { switch (rpcType) { case RpcType.UNARY: - return () => - { - RunUnaryAsync(channel, timer).Wait(); - }; + return RunUnaryAsync(channel, timer); case RpcType.STREAMING: - return () => - { - RunStreamingPingPongAsync(channel, timer).Wait(); - }; + return RunStreamingPingPongAsync(channel, timer); } } throw new ArgumentException("Unsupported configuration."); diff --git a/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs b/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs index aa4f1c5c3e..9d31d1c514 100644 --- a/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs +++ b/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs @@ -112,11 +112,11 @@ namespace Grpc.Testing { /// Returns the values of all the gauges that are currently being maintained by /// the service /// </summary> - Task GetAllGauges(global::Grpc.Testing.EmptyMessage request, IServerStreamWriter<global::Grpc.Testing.GaugeResponse> responseStream, ServerCallContext context); + global::System.Threading.Tasks.Task GetAllGauges(global::Grpc.Testing.EmptyMessage request, IServerStreamWriter<global::Grpc.Testing.GaugeResponse> responseStream, ServerCallContext context); /// <summary> /// Returns the value of one gauge /// </summary> - Task<global::Grpc.Testing.GaugeResponse> GetGauge(global::Grpc.Testing.GaugeRequest request, ServerCallContext context); + global::System.Threading.Tasks.Task<global::Grpc.Testing.GaugeResponse> GetGauge(global::Grpc.Testing.GaugeRequest request, ServerCallContext context); } /// <summary>Base class for server-side implementations of MetricsService</summary> @@ -126,7 +126,7 @@ namespace Grpc.Testing { /// Returns the values of all the gauges that are currently being maintained by /// the service /// </summary> - public virtual Task GetAllGauges(global::Grpc.Testing.EmptyMessage request, IServerStreamWriter<global::Grpc.Testing.GaugeResponse> responseStream, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task GetAllGauges(global::Grpc.Testing.EmptyMessage request, IServerStreamWriter<global::Grpc.Testing.GaugeResponse> responseStream, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } @@ -134,7 +134,7 @@ namespace Grpc.Testing { /// <summary> /// Returns the value of one gauge /// </summary> - public virtual Task<global::Grpc.Testing.GaugeResponse> GetGauge(global::Grpc.Testing.GaugeRequest request, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.GaugeResponse> GetGauge(global::Grpc.Testing.GaugeRequest request, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } diff --git a/src/csharp/Grpc.IntegrationTesting/ServicesGrpc.cs b/src/csharp/Grpc.IntegrationTesting/ServicesGrpc.cs index 42bf5e0b58..f7071ebf6b 100644 --- a/src/csharp/Grpc.IntegrationTesting/ServicesGrpc.cs +++ b/src/csharp/Grpc.IntegrationTesting/ServicesGrpc.cs @@ -111,12 +111,12 @@ namespace Grpc.Testing { /// One request followed by one response. /// The server returns the client payload as-is. /// </summary> - Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context); + global::System.Threading.Tasks.Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context); /// <summary> /// One request followed by one response. /// The server returns the client payload as-is. /// </summary> - Task StreamingCall(IAsyncStreamReader<global::Grpc.Testing.SimpleRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.SimpleResponse> responseStream, ServerCallContext context); + global::System.Threading.Tasks.Task StreamingCall(IAsyncStreamReader<global::Grpc.Testing.SimpleRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.SimpleResponse> responseStream, ServerCallContext context); } /// <summary>Base class for server-side implementations of BenchmarkService</summary> @@ -126,7 +126,7 @@ namespace Grpc.Testing { /// One request followed by one response. /// The server returns the client payload as-is. /// </summary> - public virtual Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } @@ -135,7 +135,7 @@ namespace Grpc.Testing { /// One request followed by one response. /// The server returns the client payload as-is. /// </summary> - public virtual Task StreamingCall(IAsyncStreamReader<global::Grpc.Testing.SimpleRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.SimpleResponse> responseStream, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task StreamingCall(IAsyncStreamReader<global::Grpc.Testing.SimpleRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.SimpleResponse> responseStream, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } @@ -375,7 +375,7 @@ namespace Grpc.Testing { /// and once the shutdown has finished, the OK status is sent to terminate /// this RPC. /// </summary> - Task RunServer(IAsyncStreamReader<global::Grpc.Testing.ServerArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ServerStatus> responseStream, ServerCallContext context); + global::System.Threading.Tasks.Task RunServer(IAsyncStreamReader<global::Grpc.Testing.ServerArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ServerStatus> responseStream, ServerCallContext context); /// <summary> /// Start client with specified workload. /// First request sent specifies the ClientConfig followed by ClientStatus @@ -384,15 +384,15 @@ namespace Grpc.Testing { /// and once the shutdown has finished, the OK status is sent to terminate /// this RPC. /// </summary> - Task RunClient(IAsyncStreamReader<global::Grpc.Testing.ClientArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ClientStatus> responseStream, ServerCallContext context); + global::System.Threading.Tasks.Task RunClient(IAsyncStreamReader<global::Grpc.Testing.ClientArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ClientStatus> responseStream, ServerCallContext context); /// <summary> /// Just return the core count - unary call /// </summary> - Task<global::Grpc.Testing.CoreResponse> CoreCount(global::Grpc.Testing.CoreRequest request, ServerCallContext context); + global::System.Threading.Tasks.Task<global::Grpc.Testing.CoreResponse> CoreCount(global::Grpc.Testing.CoreRequest request, ServerCallContext context); /// <summary> /// Quit this worker /// </summary> - Task<global::Grpc.Testing.Void> QuitWorker(global::Grpc.Testing.Void request, ServerCallContext context); + global::System.Threading.Tasks.Task<global::Grpc.Testing.Void> QuitWorker(global::Grpc.Testing.Void request, ServerCallContext context); } /// <summary>Base class for server-side implementations of WorkerService</summary> @@ -406,7 +406,7 @@ namespace Grpc.Testing { /// and once the shutdown has finished, the OK status is sent to terminate /// this RPC. /// </summary> - public virtual Task RunServer(IAsyncStreamReader<global::Grpc.Testing.ServerArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ServerStatus> responseStream, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task RunServer(IAsyncStreamReader<global::Grpc.Testing.ServerArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ServerStatus> responseStream, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } @@ -419,7 +419,7 @@ namespace Grpc.Testing { /// and once the shutdown has finished, the OK status is sent to terminate /// this RPC. /// </summary> - public virtual Task RunClient(IAsyncStreamReader<global::Grpc.Testing.ClientArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ClientStatus> responseStream, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task RunClient(IAsyncStreamReader<global::Grpc.Testing.ClientArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ClientStatus> responseStream, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } @@ -427,7 +427,7 @@ namespace Grpc.Testing { /// <summary> /// Just return the core count - unary call /// </summary> - public virtual Task<global::Grpc.Testing.CoreResponse> CoreCount(global::Grpc.Testing.CoreRequest request, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.CoreResponse> CoreCount(global::Grpc.Testing.CoreRequest request, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } @@ -435,7 +435,7 @@ namespace Grpc.Testing { /// <summary> /// Quit this worker /// </summary> - public virtual Task<global::Grpc.Testing.Void> QuitWorker(global::Grpc.Testing.Void request, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.Void> QuitWorker(global::Grpc.Testing.Void request, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } diff --git a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs index f1878cbb55..cf43a77118 100644 --- a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs +++ b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs @@ -196,34 +196,34 @@ namespace Grpc.Testing { /// <summary> /// One empty request followed by one empty response. /// </summary> - Task<global::Grpc.Testing.Empty> EmptyCall(global::Grpc.Testing.Empty request, ServerCallContext context); + global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> EmptyCall(global::Grpc.Testing.Empty request, ServerCallContext context); /// <summary> /// One request followed by one response. /// </summary> - Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context); + global::System.Threading.Tasks.Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context); /// <summary> /// One request followed by a sequence of responses (streamed download). /// The server returns the payload with client desired type and sizes. /// </summary> - Task StreamingOutputCall(global::Grpc.Testing.StreamingOutputCallRequest request, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context); + global::System.Threading.Tasks.Task StreamingOutputCall(global::Grpc.Testing.StreamingOutputCallRequest request, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context); /// <summary> /// A sequence of requests followed by one response (streamed upload). /// The server returns the aggregated size of client payload as the result. /// </summary> - Task<global::Grpc.Testing.StreamingInputCallResponse> StreamingInputCall(IAsyncStreamReader<global::Grpc.Testing.StreamingInputCallRequest> requestStream, ServerCallContext context); + global::System.Threading.Tasks.Task<global::Grpc.Testing.StreamingInputCallResponse> StreamingInputCall(IAsyncStreamReader<global::Grpc.Testing.StreamingInputCallRequest> requestStream, ServerCallContext context); /// <summary> /// A sequence of requests with each request served by the server immediately. /// As one request could lead to multiple responses, this interface /// demonstrates the idea of full duplexing. /// </summary> - Task FullDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context); + global::System.Threading.Tasks.Task FullDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context); /// <summary> /// A sequence of requests followed by a sequence of responses. /// The server buffers all the client requests and then serves them in order. A /// stream of responses are returned to the client when the server starts with /// first request. /// </summary> - Task HalfDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context); + global::System.Threading.Tasks.Task HalfDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context); } /// <summary>Base class for server-side implementations of TestService</summary> @@ -232,7 +232,7 @@ namespace Grpc.Testing { /// <summary> /// One empty request followed by one empty response. /// </summary> - public virtual Task<global::Grpc.Testing.Empty> EmptyCall(global::Grpc.Testing.Empty request, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> EmptyCall(global::Grpc.Testing.Empty request, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } @@ -240,7 +240,7 @@ namespace Grpc.Testing { /// <summary> /// One request followed by one response. /// </summary> - public virtual Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } @@ -249,7 +249,7 @@ namespace Grpc.Testing { /// One request followed by a sequence of responses (streamed download). /// The server returns the payload with client desired type and sizes. /// </summary> - public virtual Task StreamingOutputCall(global::Grpc.Testing.StreamingOutputCallRequest request, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task StreamingOutputCall(global::Grpc.Testing.StreamingOutputCallRequest request, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } @@ -258,7 +258,7 @@ namespace Grpc.Testing { /// A sequence of requests followed by one response (streamed upload). /// The server returns the aggregated size of client payload as the result. /// </summary> - public virtual Task<global::Grpc.Testing.StreamingInputCallResponse> StreamingInputCall(IAsyncStreamReader<global::Grpc.Testing.StreamingInputCallRequest> requestStream, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.StreamingInputCallResponse> StreamingInputCall(IAsyncStreamReader<global::Grpc.Testing.StreamingInputCallRequest> requestStream, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } @@ -268,7 +268,7 @@ namespace Grpc.Testing { /// As one request could lead to multiple responses, this interface /// demonstrates the idea of full duplexing. /// </summary> - public virtual Task FullDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task FullDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } @@ -279,7 +279,7 @@ namespace Grpc.Testing { /// stream of responses are returned to the client when the server starts with /// first request. /// </summary> - public virtual Task HalfDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task HalfDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } @@ -525,7 +525,7 @@ namespace Grpc.Testing { /// <summary> /// A call that no server should implement /// </summary> - Task<global::Grpc.Testing.Empty> UnimplementedCall(global::Grpc.Testing.Empty request, ServerCallContext context); + global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> UnimplementedCall(global::Grpc.Testing.Empty request, ServerCallContext context); } /// <summary>Base class for server-side implementations of UnimplementedService</summary> @@ -534,7 +534,7 @@ namespace Grpc.Testing { /// <summary> /// A call that no server should implement /// </summary> - public virtual Task<global::Grpc.Testing.Empty> UnimplementedCall(global::Grpc.Testing.Empty request, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> UnimplementedCall(global::Grpc.Testing.Empty request, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } @@ -669,19 +669,19 @@ namespace Grpc.Testing { [System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")] public interface IReconnectService { - Task<global::Grpc.Testing.Empty> Start(global::Grpc.Testing.ReconnectParams request, ServerCallContext context); - Task<global::Grpc.Testing.ReconnectInfo> Stop(global::Grpc.Testing.Empty request, ServerCallContext context); + global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> Start(global::Grpc.Testing.ReconnectParams request, ServerCallContext context); + global::System.Threading.Tasks.Task<global::Grpc.Testing.ReconnectInfo> Stop(global::Grpc.Testing.Empty request, ServerCallContext context); } /// <summary>Base class for server-side implementations of ReconnectService</summary> public abstract class ReconnectServiceBase { - public virtual Task<global::Grpc.Testing.Empty> Start(global::Grpc.Testing.ReconnectParams request, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> Start(global::Grpc.Testing.ReconnectParams request, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } - public virtual Task<global::Grpc.Testing.ReconnectInfo> Stop(global::Grpc.Testing.Empty request, ServerCallContext context) + public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.ReconnectInfo> Stop(global::Grpc.Testing.Empty request, ServerCallContext context) { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } diff --git a/src/csharp/build_packages.bat b/src/csharp/build_packages.bat index 9a60be26b6..7520b0f81a 100644 --- a/src/csharp/build_packages.bat +++ b/src/csharp/build_packages.bat @@ -1,7 +1,7 @@ @rem Builds gRPC NuGet packages @rem Current package versions -set VERSION=0.14.0-dev +set VERSION=0.15.0-dev set PROTOBUF_VERSION=3.0.0-beta2 @rem Packages that depend on prerelease packages (like Google.Protobuf) need to have prerelease suffix as well. diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index aeef8a79e9..5b8ff9b819 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -715,10 +715,11 @@ grpcsharp_call_send_close_from_client(grpc_call *call, GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server( grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code, const char *status_details, grpc_metadata_array *trailing_metadata, - int32_t send_empty_initial_metadata) { + int32_t send_empty_initial_metadata, const char* optional_send_buffer, + size_t optional_send_buffer_len, uint32_t write_flags) { /* TODO: don't use magic number */ - grpc_op ops[2]; - size_t nops = send_empty_initial_metadata ? 2 : 1; + grpc_op ops[3]; + size_t nops = 1; ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER; ops[0].data.send_status_from_server.status = status_code; ops[0].data.send_status_from_server.status_details = @@ -731,12 +732,23 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server( ctx->send_status_from_server.trailing_metadata.metadata; ops[0].flags = 0; ops[0].reserved = NULL; - ops[1].op = GRPC_OP_SEND_INITIAL_METADATA; - ops[1].data.send_initial_metadata.count = 0; - ops[1].data.send_initial_metadata.metadata = NULL; - ops[1].flags = 0; - ops[1].reserved = NULL; - + if (optional_send_buffer) { + ops[nops].op = GRPC_OP_SEND_MESSAGE; + ctx->send_message = string_to_byte_buffer(optional_send_buffer, + optional_send_buffer_len); + ops[nops].data.send_message = ctx->send_message; + ops[nops].flags = write_flags; + ops[nops].reserved = NULL; + nops ++; + } + if (send_empty_initial_metadata) { + ops[nops].op = GRPC_OP_SEND_INITIAL_METADATA; + ops[nops].data.send_initial_metadata.count = 0; + ops[nops].data.send_initial_metadata.metadata = NULL; + ops[nops].flags = 0; + ops[nops].reserved = NULL; + nops++; + } return grpc_call_start_batch(call, ops, nops, ctx, NULL); } |