diff options
Diffstat (limited to 'src/csharp')
53 files changed, 1244 insertions, 1037 deletions
diff --git a/src/csharp/Grpc.Core.Tests/AppDomainUnloadTest.cs b/src/csharp/Grpc.Core.Tests/AppDomainUnloadTest.cs new file mode 100644 index 0000000000..e605a310f9 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/AppDomainUnloadTest.cs @@ -0,0 +1,90 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Linq; +using System.Reflection; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Tests +{ + public class AppDomainUnloadTest + { + [Test] + public void AppDomainUnloadHookCanCleanupAbandonedCall() + { + var setup = new AppDomainSetup + { + ApplicationBase = AppDomain.CurrentDomain.BaseDirectory + }; + var childDomain = AppDomain.CreateDomain("test", null, setup); + var remoteObj = childDomain.CreateInstance(typeof(AppDomainTestClass).Assembly.GetName().Name, typeof(AppDomainTestClass).FullName); + + // Try to unload the appdomain once we've created a server and a channel inside the appdomain. + AppDomain.Unload(childDomain); + } + + public class AppDomainTestClass + { + const string Host = "127.0.0.1"; + + /// <summary> + /// Creates a server and a channel and initiates a call. The code is invoked from inside of an AppDomain + /// to test if AppDomain.Unload() work if Grpc is being used. + /// </summary> + public AppDomainTestClass() + { + var helper = new MockServiceHelper(Host); + var server = helper.GetServer(); + server.Start(); + var channel = helper.GetChannel(); + + var readyToShutdown = new TaskCompletionSource<object>(); + helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) => + { + readyToShutdown.SetResult(null); + await requestStream.ToListAsync(); + }); + + var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall()); + readyToShutdown.Task.Wait(); // make sure handler is running + } + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/ChannelTest.cs b/src/csharp/Grpc.Core.Tests/ChannelTest.cs index 850d70ce92..db0ef3a4cd 100644 --- a/src/csharp/Grpc.Core.Tests/ChannelTest.cs +++ b/src/csharp/Grpc.Core.Tests/ChannelTest.cs @@ -71,7 +71,7 @@ namespace Grpc.Core.Tests public void WaitForStateChangedAsync_InvalidArgument() { var channel = new Channel("localhost", ChannelCredentials.Insecure); - Assert.ThrowsAsync(typeof(ArgumentException), async () => await channel.WaitForStateChangedAsync(ChannelState.FatalFailure)); + Assert.ThrowsAsync(typeof(ArgumentException), async () => await channel.WaitForStateChangedAsync(ChannelState.Shutdown)); channel.ShutdownAsync().Wait(); } @@ -102,11 +102,11 @@ namespace Grpc.Core.Tests } [Test] - public async Task StateIsFatalFailureAfterShutdown() + public async Task StateIsShutdownAfterShutdown() { var channel = new Channel("localhost", ChannelCredentials.Insecure); await channel.ShutdownAsync(); - Assert.AreEqual(ChannelState.FatalFailure, channel.State); + Assert.AreEqual(ChannelState.Shutdown, channel.State); } [Test] diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index d92addbf54..dcdddc769e 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -235,8 +235,16 @@ namespace Grpc.Core.Tests await barrier.Task; // make sure the handler has started. cts.Cancel(); - var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseAsync); - Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + try + { + // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. + await call.ResponseAsync; + Assert.Fail(); + } + catch (RpcException ex) + { + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + } } [Test] @@ -265,9 +273,15 @@ namespace Grpc.Core.Tests await handlerStartedBarrier.Task; cts.Cancel(); - var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseAsync); - Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); - + try + { + await call.ResponseAsync; + Assert.Fail(); + } + catch (RpcException ex) + { + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + } Assert.AreEqual("SUCCESS", await successTcs.Task); } diff --git a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs index cec8c7ce6b..6a156293ad 100644 --- a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs +++ b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs @@ -105,7 +105,15 @@ namespace Grpc.Core.Tests var parentCall = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); await readyToCancelTcs.Task; cts.Cancel(); - Assert.ThrowsAsync(typeof(RpcException), async () => await parentCall); + try + { + // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. + await parentCall; + Assert.Fail(); + } + catch (RpcException) + { + } Assert.AreEqual("CHILD_CALL_CANCELLED", await successTcs.Task); } diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index 47131fc454..074c9603dc 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -86,6 +86,10 @@ <Compile Include="NUnitMain.cs" /> <Compile Include="Internal\FakeNativeCall.cs" /> <Compile Include="Internal\AsyncCallServerTest.cs" /> + <Compile Include="ShutdownHookServerTest.cs" /> + <Compile Include="ShutdownHookPendingCallTest.cs" /> + <Compile Include="ShutdownHookClientTest.cs" /> + <Compile Include="AppDomainUnloadTest.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs index ab12c120cb..3ec2cf48cd 100644 --- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs +++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs @@ -32,7 +32,7 @@ #endregion using System; -using System.Threading; +using System.Linq; using Grpc.Core; using NUnit.Framework; @@ -44,8 +44,12 @@ namespace Grpc.Core.Tests public void InitializeAndShutdownGrpcEnvironment() { var env = GrpcEnvironment.AddRef(); - Assert.IsNotNull(env.CompletionQueue); - GrpcEnvironment.Release(); + Assert.IsTrue(env.CompletionQueues.Count > 0); + for (int i = 0; i < env.CompletionQueues.Count; i++) + { + Assert.IsNotNull(env.CompletionQueues.ElementAt(i)); + } + GrpcEnvironment.ReleaseAsync().Wait(); } [Test] @@ -54,8 +58,8 @@ namespace Grpc.Core.Tests var env1 = GrpcEnvironment.AddRef(); var env2 = GrpcEnvironment.AddRef(); Assert.AreSame(env1, env2); - GrpcEnvironment.Release(); - GrpcEnvironment.Release(); + GrpcEnvironment.ReleaseAsync().Wait(); + GrpcEnvironment.ReleaseAsync().Wait(); } [Test] @@ -64,10 +68,10 @@ namespace Grpc.Core.Tests Assert.AreEqual(0, GrpcEnvironment.GetRefCount()); var env1 = GrpcEnvironment.AddRef(); - GrpcEnvironment.Release(); + GrpcEnvironment.ReleaseAsync().Wait(); var env2 = GrpcEnvironment.AddRef(); - GrpcEnvironment.Release(); + GrpcEnvironment.ReleaseAsync().Wait(); Assert.AreNotSame(env1, env2); } @@ -76,7 +80,7 @@ namespace Grpc.Core.Tests public void ReleaseWithoutAddRef() { Assert.AreEqual(0, GrpcEnvironment.GetRefCount()); - Assert.Throws(typeof(InvalidOperationException), () => GrpcEnvironment.Release()); + Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await GrpcEnvironment.ReleaseAsync()); } [Test] diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs index 0e204761f6..c35aaf680f 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs @@ -53,8 +53,6 @@ namespace Grpc.Core.Internal.Tests [SetUp] public void Init() { - var environment = GrpcEnvironment.AddRef(); - // Create a fake server just so we have an instance to refer to. // The server won't actually be used at all. server = new Server() @@ -66,7 +64,6 @@ namespace Grpc.Core.Internal.Tests fakeCall = new FakeNativeCall(); asyncCallServer = new AsyncCallServer<string, string>( Marshallers.StringMarshaller.Serializer, Marshallers.StringMarshaller.Deserializer, - environment, server); asyncCallServer.InitializeForTesting(fakeCall); } @@ -75,7 +72,6 @@ namespace Grpc.Core.Internal.Tests public void Cleanup() { server.ShutdownAsync().Wait(); - GrpcEnvironment.Release(); } [Test] @@ -136,7 +132,6 @@ namespace Grpc.Core.Internal.Tests public void WriteAfterCancelNotificationFails() { var finishedTask = asyncCallServer.ServerSideCallAsync(); - var requestStream = new ServerRequestStream<string, string>(asyncCallServer); var responseStream = new ServerResponseStream<string, string>(asyncCallServer); fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); @@ -181,6 +176,21 @@ namespace Grpc.Core.Internal.Tests AssertFinished(asyncCallServer, fakeCall, finishedTask); } + [Test] + public void WriteAfterWriteStatusThrowsInvalidOperationException() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + var responseStream = new ServerResponseStream<string, string>(asyncCallServer); + + asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null); + Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await responseStream.WriteAsync("request1")); + + fakeCall.SendStatusFromServerHandler(true); + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + + AssertFinished(asyncCallServer, fakeCall, finishedTask); + } + static void AssertFinished(AsyncCallServer<string, string> asyncCallServer, FakeNativeCall fakeCall, Task finishedTask) { Assert.IsTrue(fakeCall.IsDisposed); diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs index 777a1c8c50..98e27a17a1 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -33,7 +33,6 @@ using System; using System.Collections.Generic; -using System.Runtime.InteropServices; using System.Threading.Tasks; using Grpc.Core.Internal; @@ -82,7 +81,7 @@ namespace Grpc.Core.Internal.Tests Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await asyncCall.ReadMessageAsync()); Assert.Throws(typeof(InvalidOperationException), - () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {})); + () => asyncCall.SendMessageAsync("abc", new WriteFlags())); } [Test] @@ -103,7 +102,7 @@ namespace Grpc.Core.Internal.Tests var resultTask = asyncCall.UnaryCallAsync("request1"); fakeCall.UnaryResponseClientHandler(true, CreateClientSideStatus(StatusCode.InvalidArgument), - CreateResponsePayload(), + null, new Metadata()); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument); @@ -148,7 +147,7 @@ namespace Grpc.Core.Internal.Tests var resultTask = asyncCall.ClientStreamingCallAsync(); fakeCall.UnaryResponseClientHandler(true, CreateClientSideStatus(StatusCode.InvalidArgument), - CreateResponsePayload(), + null, new Metadata()); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument); @@ -193,7 +192,7 @@ namespace Grpc.Core.Internal.Tests fakeCall.UnaryResponseClientHandler(true, CreateClientSideStatus(StatusCode.Internal), - CreateResponsePayload(), + null, new Metadata()); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal); @@ -211,7 +210,9 @@ namespace Grpc.Core.Internal.Tests new Metadata()); AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); - var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1")); + + var writeTask = requestStream.WriteAsync("request1"); + var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); Assert.AreEqual(Status.DefaultSuccess, ex.Status); } @@ -227,7 +228,9 @@ namespace Grpc.Core.Internal.Tests new Metadata()); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange); - var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1")); + + var writeTask = requestStream.WriteAsync("request1"); + var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); Assert.AreEqual(StatusCode.OutOfRange, ex.Status.StatusCode); } @@ -267,7 +270,7 @@ namespace Grpc.Core.Internal.Tests } [Test] - public void ClientStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException() + public void ClientStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException() { var resultTask = asyncCall.ClientStreamingCallAsync(); var requestStream = new ClientRequestStream<string, string>(asyncCall); @@ -275,11 +278,12 @@ namespace Grpc.Core.Internal.Tests asyncCall.Cancel(); Assert.IsTrue(fakeCall.IsCancelled); - Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1")); + var writeTask = requestStream.WriteAsync("request1"); + Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask); fakeCall.UnaryResponseClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled), - CreateResponsePayload(), + null, new Metadata()); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled); @@ -290,7 +294,7 @@ namespace Grpc.Core.Internal.Tests { asyncCall.StartServerStreamingCall("request1"); Assert.Throws(typeof(InvalidOperationException), - () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {})); + () => asyncCall.SendMessageAsync("abc", new WriteFlags())); } [Test] @@ -390,12 +394,13 @@ namespace Grpc.Core.Internal.Tests AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); - var ex = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("request1")); + var writeTask = requestStream.WriteAsync("request1"); + var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); Assert.AreEqual(Status.DefaultSuccess, ex.Status); } [Test] - public void DuplexStreaming_CompleteAfterReceivingStatusFails() + public void DuplexStreaming_CompleteAfterReceivingStatusSuceeds() { asyncCall.StartDuplexStreamingCall(); var requestStream = new ClientRequestStream<string, string>(asyncCall); @@ -411,7 +416,7 @@ namespace Grpc.Core.Internal.Tests } [Test] - public void DuplexStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException() + public void DuplexStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException() { asyncCall.StartDuplexStreamingCall(); var requestStream = new ClientRequestStream<string, string>(asyncCall); @@ -419,7 +424,9 @@ namespace Grpc.Core.Internal.Tests asyncCall.Cancel(); Assert.IsTrue(fakeCall.IsCancelled); - Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1")); + + var writeTask = requestStream.WriteAsync("request1"); + Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask); var readTask = responseStream.MoveNext(); fakeCall.ReceivedMessageHandler(true, null); diff --git a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs index 195119f920..e9ec59eb3d 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs @@ -48,7 +48,7 @@ namespace Grpc.Core.Internal.Tests GrpcEnvironment.AddRef(); var cq = CompletionQueueSafeHandle.Create(); cq.Dispose(); - GrpcEnvironment.Release(); + GrpcEnvironment.ReleaseAsync().Wait(); } [Test] @@ -59,7 +59,7 @@ namespace Grpc.Core.Internal.Tests cq.Shutdown(); var ev = cq.Next(); cq.Dispose(); - GrpcEnvironment.Release(); + GrpcEnvironment.ReleaseAsync().Wait(); Assert.AreEqual(CompletionQueueEvent.CompletionType.Shutdown, ev.type); Assert.AreNotEqual(IntPtr.Zero, ev.success); Assert.AreEqual(IntPtr.Zero, ev.tag); diff --git a/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs b/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs index 0663e77d1e..d770f82390 100644 --- a/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs +++ b/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs @@ -134,7 +134,15 @@ namespace Grpc.Core.Tests { helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => { - Assert.ThrowsAsync<IOException>(async () => await requestStream.MoveNext()); + try + { + // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. + await requestStream.MoveNext(); + Assert.Fail(); + } + catch (IOException) + { + } return "RESPONSE"; }); diff --git a/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs b/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs index 3047314345..4d90470056 100644 --- a/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs +++ b/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs @@ -102,7 +102,7 @@ namespace Grpc.Core.Tests marshaller, marshaller); - serviceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName) + serviceDefinition = ServerServiceDefinition.CreateBuilder() .AddMethod(unaryMethod, (request, context) => unaryHandler(request, context)) .AddMethod(clientStreamingMethod, (requestStream, context) => clientStreamingHandler(requestStream, context)) .AddMethod(serverStreamingMethod, (request, responseStream, context) => serverStreamingHandler(request, responseStream, context)) diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs index d2b2fc6a66..d3735c7880 100644 --- a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs +++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs @@ -65,7 +65,7 @@ namespace Grpc.Core.Tests cq.Dispose(); }); - GrpcEnvironment.Release(); + GrpcEnvironment.ReleaseAsync().Wait(); } /// <summary> diff --git a/src/csharp/Grpc.Core.Tests/ServerTest.cs b/src/csharp/Grpc.Core.Tests/ServerTest.cs index b40508accc..3b51aa6330 100644 --- a/src/csharp/Grpc.Core.Tests/ServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ServerTest.cs @@ -89,9 +89,24 @@ namespace Grpc.Core.Tests }; server.Start(); Assert.Throws(typeof(InvalidOperationException), () => server.Ports.Add("localhost", 9999, ServerCredentials.Insecure)); - Assert.Throws(typeof(InvalidOperationException), () => server.Services.Add(ServerServiceDefinition.CreateBuilder("serviceName").Build())); + Assert.Throws(typeof(InvalidOperationException), () => server.Services.Add(ServerServiceDefinition.CreateBuilder().Build())); server.ShutdownAsync().Wait(); } + + [Test] + public void UnstartedServerCanBeShutdown() + { + var server = new Server(); + server.ShutdownAsync().Wait(); + Assert.Throws(typeof(InvalidOperationException), () => server.Start()); + } + + [Test] + public void UnstartedServerDoesNotPreventShutdown() + { + // just create a server, don't start it, and make sure it doesn't prevent shutdown. + var server = new Server(); + } } } diff --git a/src/csharp/Grpc.Core.Tests/ShutdownHookClientTest.cs b/src/csharp/Grpc.Core.Tests/ShutdownHookClientTest.cs new file mode 100644 index 0000000000..12b8452f64 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/ShutdownHookClientTest.cs @@ -0,0 +1,57 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Tests +{ + public class ShutdownHookClientTest + { + const string Host = "127.0.0.1"; + + [Test] + public void ProcessExitHookCanCleanupAbandonedChannels() + { + var channel = new Channel(Host, 1000, ChannelCredentials.Insecure); + var channel2 = new Channel(Host, 1001, ChannelCredentials.Insecure); + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs b/src/csharp/Grpc.Core.Tests/ShutdownHookPendingCallTest.cs index 7e86fddb4d..175233840d 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs +++ b/src/csharp/Grpc.Core.Tests/ShutdownHookPendingCallTest.cs @@ -2,11 +2,11 @@ // Copyright 2015, Google Inc. // All rights reserved. -// +// // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: -// +// // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above @@ -16,7 +16,7 @@ // * Neither the name of Google Inc. nor the names of its // contributors may be used to endorse or promote products derived from // this software without specific prior written permission. -// +// // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR @@ -33,62 +33,37 @@ using System; using System.Diagnostics; -using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; +using System.Linq; using System.Threading; using System.Threading.Tasks; +using Grpc.Core; using Grpc.Core.Internal; using Grpc.Core.Utils; +using NUnit.Framework; -namespace Grpc.Core.Internal +namespace Grpc.Core.Tests { - /// <summary> - /// If error != null, there's been an error or operation has been cancelled. - /// </summary> - internal delegate void AsyncCompletionDelegate<T>(T result, Exception error); - - /// <summary> - /// Helper for transforming AsyncCompletionDelegate into full-fledged Task. - /// </summary> - internal class AsyncCompletionTaskSource<T> + public class ShutdownHookPendingCallTest { - readonly TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); - readonly AsyncCompletionDelegate<T> completionDelegate; + const string Host = "127.0.0.1"; - public AsyncCompletionTaskSource() + [Test] + public void ProcessExitHookCanCleanupAbandonedCall() { - completionDelegate = new AsyncCompletionDelegate<T>(HandleCompletion); - } + var helper = new MockServiceHelper(Host); + var server = helper.GetServer(); + server.Start(); + var channel = helper.GetChannel(); - public Task<T> Task - { - get + var readyToShutdown = new TaskCompletionSource<object>(); + helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) => { - return tcs.Task; - } - } + readyToShutdown.SetResult(null); + await requestStream.ToListAsync(); + }); - public AsyncCompletionDelegate<T> CompletionDelegate - { - get - { - return completionDelegate; - } - } - - private void HandleCompletion(T value, Exception error) - { - if (error == null) - { - tcs.SetResult(value); - return; - } - if (error is OperationCanceledException) - { - tcs.SetCanceled(); - return; - } - tcs.SetException(error); + var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall()); + readyToShutdown.Task.Wait(); // make sure handler is running } } } diff --git a/src/csharp/Grpc.Core.Tests/ShutdownHookServerTest.cs b/src/csharp/Grpc.Core.Tests/ShutdownHookServerTest.cs new file mode 100644 index 0000000000..e7ea7a0bf5 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/ShutdownHookServerTest.cs @@ -0,0 +1,58 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Tests +{ + public class ShutdownHookServerTest + { + const string Host = "127.0.0.1"; + + [Test] + public void ProcessExitHookCanCleanupAbandonedServers() + { + var helper = new MockServiceHelper(Host); + var server = helper.GetServer(); + server.Start(); + } + } +} diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs index 5646fed3d9..02b08d2a10 100644 --- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs @@ -127,6 +127,10 @@ namespace Grpc.Core /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. /// As a result, all resources being used by the call should be released eventually. /// </summary> + /// <remarks> + /// Normally, there is no need for you to dispose the call unless you want to utilize the + /// "Cancel" semantics of invoking <c>Dispose</c>. + /// </remarks> public void Dispose() { disposeAction.Invoke(); diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs index e75108c7e5..68fd6d0b05 100644 --- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs @@ -117,6 +117,10 @@ namespace Grpc.Core /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. /// As a result, all resources being used by the call should be released eventually. /// </summary> + /// <remarks> + /// Normally, there is no need for you to dispose the call unless you want to utilize the + /// "Cancel" semantics of invoking <c>Dispose</c>. + /// </remarks> public void Dispose() { disposeAction.Invoke(); diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs index f953091984..5777c72615 100644 --- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs @@ -103,6 +103,10 @@ namespace Grpc.Core /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. /// As a result, all resources being used by the call should be released eventually. /// </summary> + /// <remarks> + /// Normally, there is no need for you to dispose the call unless you want to utilize the + /// "Cancel" semantics of invoking <c>Dispose</c>. + /// </remarks> public void Dispose() { disposeAction.Invoke(); diff --git a/src/csharp/Grpc.Core/AsyncUnaryCall.cs b/src/csharp/Grpc.Core/AsyncUnaryCall.cs index 97df8f5e91..d180c27922 100644 --- a/src/csharp/Grpc.Core/AsyncUnaryCall.cs +++ b/src/csharp/Grpc.Core/AsyncUnaryCall.cs @@ -112,6 +112,10 @@ namespace Grpc.Core /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. /// As a result, all resources being used by the call should be released eventually. /// </summary> + /// <remarks> + /// Normally, there is no need for you to dispose the call unless you want to utilize the + /// "Cancel" semantics of invoking <c>Dispose</c>. + /// </remarks> public void Dispose() { disposeAction.Invoke(); diff --git a/src/csharp/Grpc.Core/CallOptions.cs b/src/csharp/Grpc.Core/CallOptions.cs index 9ca88849ee..35548cfc96 100644 --- a/src/csharp/Grpc.Core/CallOptions.cs +++ b/src/csharp/Grpc.Core/CallOptions.cs @@ -88,7 +88,13 @@ namespace Grpc.Core } /// <summary> - /// Token that can be used for cancelling the call. + /// Token that can be used for cancelling the call on the client side. + /// Cancelling the token will request cancellation + /// of the remote call. Best effort will be made to deliver the cancellation + /// notification to the server and interaction of the call with the server side + /// will be terminated. Unless the call finishes before the cancellation could + /// happen (there is an inherent race), + /// the call will finish with <c>StatusCode.Cancelled</c> status. /// </summary> public CancellationToken CancellationToken { diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 93a6e6a3d9..4f29c35b32 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -31,7 +31,6 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -56,6 +55,7 @@ namespace Grpc.Core readonly string target; readonly GrpcEnvironment environment; + readonly CompletionQueueSafeHandle completionQueue; readonly ChannelSafeHandle handle; readonly Dictionary<string, ChannelOption> options; @@ -67,14 +67,26 @@ namespace Grpc.Core /// </summary> /// <param name="target">Target of the channel.</param> /// <param name="credentials">Credentials to secure the channel.</param> + public Channel(string target, ChannelCredentials credentials) : + this(target, credentials, null) + { + } + + /// <summary> + /// Creates a channel that connects to a specific host. + /// Port will default to 80 for an unsecure channel and to 443 for a secure channel. + /// </summary> + /// <param name="target">Target of the channel.</param> + /// <param name="credentials">Credentials to secure the channel.</param> /// <param name="options">Channel options.</param> - public Channel(string target, ChannelCredentials credentials, IEnumerable<ChannelOption> options = null) + public Channel(string target, ChannelCredentials credentials, IEnumerable<ChannelOption> options) { this.target = GrpcPreconditions.CheckNotNull(target, "target"); this.options = CreateOptionsDictionary(options); EnsureUserAgentChannelOption(this.options); this.environment = GrpcEnvironment.AddRef(); + this.completionQueue = this.environment.PickCompletionQueue(); using (var nativeCredentials = credentials.ToNativeCredentials()) using (var nativeChannelArgs = ChannelOptions.CreateChannelArgs(this.options.Values)) { @@ -87,6 +99,18 @@ namespace Grpc.Core this.handle = ChannelSafeHandle.CreateInsecure(target, nativeChannelArgs); } } + GrpcEnvironment.RegisterChannel(this); + } + + /// <summary> + /// Creates a channel that connects to a specific host and port. + /// </summary> + /// <param name="host">The name or IP address of the host.</param> + /// <param name="port">The port.</param> + /// <param name="credentials">Credentials to secure the channel.</param> + public Channel(string host, int port, ChannelCredentials credentials) : + this(host, port, credentials, null) + { } /// <summary> @@ -96,14 +120,14 @@ namespace Grpc.Core /// <param name="port">The port.</param> /// <param name="credentials">Credentials to secure the channel.</param> /// <param name="options">Channel options.</param> - public Channel(string host, int port, ChannelCredentials credentials, IEnumerable<ChannelOption> options = null) : + public Channel(string host, int port, ChannelCredentials credentials, IEnumerable<ChannelOption> options) : this(string.Format("{0}:{1}", host, port), credentials, options) { } /// <summary> /// Gets current connectivity state of this channel. - /// After channel is has been shutdown, <c>ChannelState.FatalFailure</c> will be returned. + /// After channel is has been shutdown, <c>ChannelState.Shutdown</c> will be returned. /// </summary> public ChannelState State { @@ -120,8 +144,8 @@ namespace Grpc.Core /// </summary> public Task WaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null) { - GrpcPreconditions.CheckArgument(lastObservedState != ChannelState.FatalFailure, - "FatalFailure is a terminal state. No further state changes can occur."); + GrpcPreconditions.CheckArgument(lastObservedState != ChannelState.Shutdown, + "Shutdown is a terminal state. No further state changes can occur."); var tcs = new TaskCompletionSource<object>(); var deadlineTimespec = deadline.HasValue ? Timespec.FromDateTime(deadline.Value) : Timespec.InfFuture; var handler = new BatchCompletionDelegate((success, ctx) => @@ -135,7 +159,7 @@ namespace Grpc.Core tcs.SetCanceled(); } }); - handle.WatchConnectivityState(lastObservedState, deadlineTimespec, environment.CompletionQueue, environment.CompletionRegistry, handler); + handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, handler); return tcs.Task; } @@ -171,7 +195,7 @@ namespace Grpc.Core /// <summary> /// Allows explicitly requesting channel to connect without starting an RPC. /// Returned task completes once state Ready was seen. If the deadline is reached, - /// or channel enters the FatalFailure state, the task is cancelled. + /// or channel enters the Shutdown state, the task is cancelled. /// There is no need to call this explicitly unless your use case requires that. /// Starting an RPC on a new channel will request connection implicitly. /// </summary> @@ -181,9 +205,9 @@ namespace Grpc.Core var currentState = GetConnectivityState(true); while (currentState != ChannelState.Ready) { - if (currentState == ChannelState.FatalFailure) + if (currentState == ChannelState.Shutdown) { - throw new OperationCanceledException("Channel has reached FatalFailure state."); + throw new OperationCanceledException("Channel has reached Shutdown state."); } await WaitForStateChangedAsync(currentState, deadline).ConfigureAwait(false); currentState = GetConnectivityState(false); @@ -191,9 +215,16 @@ namespace Grpc.Core } /// <summary> - /// Waits until there are no more active calls for this channel and then cleans up - /// resources used by this channel. + /// Shuts down the channel cleanly. It is strongly recommended to shutdown + /// all previously created channels before exiting from the process. /// </summary> + /// <remarks> + /// This method doesn't wait for all calls on this channel to finish (nor does + /// it explicitly cancel all outstanding calls). It is user's responsibility to make sure + /// all the calls on this channel have finished (successfully or with an error) + /// before shutting down the channel to ensure channel shutdown won't impact + /// the outcome of those remote calls. + /// </remarks> public async Task ShutdownAsync() { lock (myLock) @@ -201,6 +232,7 @@ namespace Grpc.Core GrpcPreconditions.CheckState(!shutdownRequested); shutdownRequested = true; } + GrpcEnvironment.UnregisterChannel(this); shutdownTokenSource.Cancel(); @@ -212,7 +244,7 @@ namespace Grpc.Core handle.Dispose(); - await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false); + await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false); } internal ChannelSafeHandle Handle @@ -231,6 +263,14 @@ namespace Grpc.Core } } + internal CompletionQueueSafeHandle CompletionQueue + { + get + { + return this.completionQueue; + } + } + internal void AddCallReference(object call) { activeCallCounter.Increment(); @@ -255,7 +295,7 @@ namespace Grpc.Core } catch (ObjectDisposedException) { - return ChannelState.FatalFailure; + return ChannelState.Shutdown; } } diff --git a/src/csharp/Grpc.Core/ChannelState.cs b/src/csharp/Grpc.Core/ChannelState.cs index d293b98f75..a6c3b2a488 100644 --- a/src/csharp/Grpc.Core/ChannelState.cs +++ b/src/csharp/Grpc.Core/ChannelState.cs @@ -64,6 +64,6 @@ namespace Grpc.Core /// <summary> /// Channel has seen a failure that it cannot recover from /// </summary> - FatalFailure + Shutdown } } diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 4bf30e83c1..a796911b99 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -86,7 +86,6 @@ <Compile Include="Utils\BenchmarkUtil.cs" /> <Compile Include="ChannelCredentials.cs" /> <Compile Include="Internal\ChannelArgsSafeHandle.cs" /> - <Compile Include="Internal\AsyncCompletion.cs" /> <Compile Include="Internal\AsyncCallBase.cs" /> <Compile Include="Internal\AsyncCallServer.cs" /> <Compile Include="Internal\AsyncCall.cs" /> @@ -137,6 +136,8 @@ <Compile Include="Internal\ClientSideStatus.cs" /> <Compile Include="Internal\ClockType.cs" /> <Compile Include="Internal\CallError.cs" /> + <Compile Include="Logging\LogLevel.cs" /> + <Compile Include="Logging\LogLevelFilterLogger.cs" /> </ItemGroup> <ItemGroup> <None Include="Grpc.Core.nuspec" /> diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index bee0ef1d62..e9e4cb4cbb 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -32,6 +32,8 @@ #endregion using System; +using System.Collections.Generic; +using System.Linq; using System.Runtime.InteropServices; using System.Threading.Tasks; using Grpc.Core.Internal; @@ -45,18 +47,24 @@ namespace Grpc.Core /// </summary> public class GrpcEnvironment { + const LogLevel DefaultLogLevel = LogLevel.Info; const int MinDefaultThreadPoolSize = 4; static object staticLock = new object(); static GrpcEnvironment instance; static int refCount; static int? customThreadPoolSize; + static int? customCompletionQueueCount; + static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>(); + static readonly HashSet<Server> registeredServers = new HashSet<Server>(); - static ILogger logger = new ConsoleLogger(); + static ILogger logger = new LogLevelFilterLogger(new ConsoleLogger(), DefaultLogLevel); + readonly object myLock = new object(); readonly GrpcThreadPool threadPool; - readonly CompletionRegistry completionRegistry; readonly DebugStats debugStats = new DebugStats(); + readonly AtomicCounter cqPickerCounter = new AtomicCounter(); + bool isClosed; /// <summary> @@ -65,6 +73,8 @@ namespace Grpc.Core /// </summary> internal static GrpcEnvironment AddRef() { + ShutdownHooks.Register(); + lock (staticLock) { refCount++; @@ -77,21 +87,26 @@ namespace Grpc.Core } /// <summary> - /// Decrements the reference count for currently active environment and shuts down the gRPC environment if reference count drops to zero. - /// (and blocks until the environment has been fully shutdown). + /// Decrements the reference count for currently active environment and asynchronously shuts down the gRPC environment if reference count drops to zero. /// </summary> - internal static void Release() + internal static async Task ReleaseAsync() { + GrpcEnvironment instanceToShutdown = null; lock (staticLock) { GrpcPreconditions.CheckState(refCount > 0); refCount--; if (refCount == 0) { - instance.Close(); + instanceToShutdown = instance; instance = null; } } + + if (instanceToShutdown != null) + { + await instanceToShutdown.ShutdownAsync(); + } } internal static int GetRefCount() @@ -102,6 +117,68 @@ namespace Grpc.Core } } + internal static void RegisterChannel(Channel channel) + { + lock (staticLock) + { + GrpcPreconditions.CheckNotNull(channel); + registeredChannels.Add(channel); + } + } + + internal static void UnregisterChannel(Channel channel) + { + lock (staticLock) + { + GrpcPreconditions.CheckNotNull(channel); + GrpcPreconditions.CheckArgument(registeredChannels.Remove(channel), "Channel not found in the registered channels set."); + } + } + + internal static void RegisterServer(Server server) + { + lock (staticLock) + { + GrpcPreconditions.CheckNotNull(server); + registeredServers.Add(server); + } + } + + internal static void UnregisterServer(Server server) + { + lock (staticLock) + { + GrpcPreconditions.CheckNotNull(server); + GrpcPreconditions.CheckArgument(registeredServers.Remove(server), "Server not found in the registered servers set."); + } + } + + /// <summary> + /// Requests shutdown of all channels created by the current process. + /// </summary> + public static Task ShutdownChannelsAsync() + { + HashSet<Channel> snapshot = null; + lock (staticLock) + { + snapshot = new HashSet<Channel>(registeredChannels); + } + return Task.WhenAll(snapshot.Select((channel) => channel.ShutdownAsync())); + } + + /// <summary> + /// Requests immediate shutdown of all servers created by the current process. + /// </summary> + public static Task KillServersAsync() + { + HashSet<Server> snapshot = null; + lock (staticLock) + { + snapshot = new HashSet<Server>(registeredServers); + } + return Task.WhenAll(snapshot.Select((server) => server.KillAsync())); + } + /// <summary> /// Gets application-wide logger used by gRPC. /// </summary> @@ -141,39 +218,62 @@ namespace Grpc.Core } /// <summary> + /// Sets the number of completion queues in the gRPC thread pool that polls for internal RPC events. + /// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. + /// Setting the number of completions queues is an advanced setting and you should only use it if you know what you are doing. + /// Most users should rely on the default value provided by gRPC library. + /// Note: this method is part of an experimental API that can change or be removed without any prior notice. + /// </summary> + public static void SetCompletionQueueCount(int completionQueueCount) + { + lock (staticLock) + { + GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized"); + GrpcPreconditions.CheckArgument(completionQueueCount > 0, "threadCount needs to be a positive number"); + customCompletionQueueCount = completionQueueCount; + } + } + + /// <summary> /// Creates gRPC environment. /// </summary> private GrpcEnvironment() { GrpcNativeInit(); - completionRegistry = new CompletionRegistry(this); - threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault()); + threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault()); threadPool.Start(); } /// <summary> - /// Gets the completion registry used by this gRPC environment. + /// Gets the completion queues used by this gRPC environment. /// </summary> - internal CompletionRegistry CompletionRegistry + internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues { get { - return this.completionRegistry; + return this.threadPool.CompletionQueues; } } - /// <summary> - /// Gets the completion queue used by this gRPC environment. - /// </summary> - internal CompletionQueueSafeHandle CompletionQueue + internal bool IsAlive { get { - return this.threadPool.CompletionQueue; + return this.threadPool.IsAlive; } } /// <summary> + /// Picks a completion queue in a round-robin fashion. + /// Shouldn't be invoked on a per-call basis (used at per-channel basis). + /// </summary> + internal CompletionQueueSafeHandle PickCompletionQueue() + { + var cqIndex = (int) ((cqPickerCounter.Increment() - 1) % this.threadPool.CompletionQueues.Count); + return this.threadPool.CompletionQueues.ElementAt(cqIndex); + } + + /// <summary> /// Gets the completion queue used by this gRPC environment. /// </summary> internal DebugStats DebugStats @@ -206,13 +306,13 @@ namespace Grpc.Core /// <summary> /// Shuts down this environment. /// </summary> - private void Close() + private async Task ShutdownAsync() { if (isClosed) { throw new InvalidOperationException("Close has already been called"); } - threadPool.Stop(); + await threadPool.StopAsync().ConfigureAwait(false); GrpcNativeShutdown(); isClosed = true; @@ -230,5 +330,42 @@ namespace Grpc.Core // more work, but seems to work reasonably well for a start. return Math.Max(MinDefaultThreadPoolSize, Environment.ProcessorCount / 2); } + + private int GetCompletionQueueCountOrDefault() + { + if (customCompletionQueueCount.HasValue) + { + return customCompletionQueueCount.Value; + } + // by default, create a completion queue for each thread + return GetThreadPoolSizeOrDefault(); + } + + private static class ShutdownHooks + { + static object staticLock = new object(); + static bool hooksRegistered; + + public static void Register() + { + lock (staticLock) + { + if (!hooksRegistered) + { + AppDomain.CurrentDomain.ProcessExit += ShutdownHookHandler; + AppDomain.CurrentDomain.DomainUnload += ShutdownHookHandler; + } + hooksRegistered = true; + } + } + + /// <summary> + /// Handler for AppDomain.DomainUnload and AppDomain.ProcessExit hooks. + /// </summary> + private static void ShutdownHookHandler(object sender, EventArgs e) + { + Task.WaitAll(GrpcEnvironment.ShutdownChannelsAsync(), GrpcEnvironment.KillServersAsync()); + } + } } } diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs index 49e1ea7832..aa3b802a50 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs @@ -41,10 +41,24 @@ namespace Grpc.Core { /// <summary> /// A stream of messages to be read. + /// Messages can be awaited <c>await reader.MoveNext()</c>, that returns <c>true</c> + /// if there is a message available and <c>false</c> if there are no more messages + /// (i.e. the stream has been closed). + /// <para> + /// On the client side, the last invocation of <c>MoveNext()</c> either returns <c>false</c> + /// if the call has finished successfully or throws <c>RpcException</c> if call finished + /// with an error. Once the call finishes, subsequent invocations of <c>MoveNext()</c> will + /// continue yielding the same result (returning <c>false</c> or throwing an exception). + /// </para> + /// <para> + /// On the server side, <c>MoveNext()</c> does not throw exceptions. + /// In case of a failure, the request stream will appear to be finished + /// (<c>MoveNext</c> will return <c>false</c>) and the <c>CancellationToken</c> + /// associated with the call will be cancelled to signal the failure. + /// </para> /// </summary> /// <typeparam name="T">The message type.</typeparam> public interface IAsyncStreamReader<T> : IAsyncEnumerator<T> { - // TODO(jtattermusch): consider just using IAsyncEnumerator instead of this interface. } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 55351869b5..f549c52876 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -32,12 +32,7 @@ #endregion using System; -using System.Diagnostics; -using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; -using System.Threading; using System.Threading.Tasks; -using Grpc.Core.Internal; using Grpc.Core.Logging; using Grpc.Core.Profiling; using Grpc.Core.Utils; @@ -57,9 +52,11 @@ namespace Grpc.Core.Internal // Completion of a pending unary response if not null. TaskCompletionSource<TResponse> unaryResponseTcs; + // TODO(jtattermusch): this field doesn't need to be initialized for unary response calls. // Indicates that response streaming call has finished. TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>(); + // TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers). // Response headers set here once received. TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>(); @@ -67,7 +64,7 @@ namespace Grpc.Core.Internal ClientSideStatus? finishedStatus; public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails) - : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment) + : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer) { this.details = callDetails.WithOptions(callDetails.Options.Normalize()); this.initialMetadataSent = true; // we always send metadata at the very beginning of the call. @@ -144,7 +141,7 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(!started); started = true; - Initialize(environment.CompletionQueue); + Initialize(details.Channel.CompletionQueue); halfcloseRequested = true; readingDone = true; @@ -171,7 +168,7 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(!started); started = true; - Initialize(environment.CompletionQueue); + Initialize(details.Channel.CompletionQueue); readingDone = true; @@ -195,7 +192,7 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(!started); started = true; - Initialize(environment.CompletionQueue); + Initialize(details.Channel.CompletionQueue); halfcloseRequested = true; @@ -220,7 +217,7 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(!started); started = true; - Initialize(environment.CompletionQueue); + Initialize(details.Channel.CompletionQueue); using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { @@ -232,11 +229,10 @@ namespace Grpc.Core.Internal /// <summary> /// Sends a streaming request. Only one pending send action is allowed at any given time. - /// completionDelegate is called when the operation finishes. /// </summary> - public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate) + public Task SendMessageAsync(TRequest msg, WriteFlags writeFlags) { - StartSendMessageInternal(msg, writeFlags, completionDelegate); + return SendMessageInternalAsync(msg, writeFlags); } /// <summary> @@ -250,29 +246,32 @@ namespace Grpc.Core.Internal /// <summary> /// Sends halfclose, indicating client is done with streaming requests. /// Only one pending send action is allowed at any given time. - /// completionDelegate is called when the operation finishes. /// </summary> - public void StartSendCloseFromClient(AsyncCompletionDelegate<object> completionDelegate) + public Task SendCloseFromClientAsync() { lock (myLock) { - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); - CheckSendingAllowed(allowFinished: true); + GrpcPreconditions.CheckState(started); - if (!disposed && !finished) + var earlyResult = CheckSendPreconditionsClientSide(); + if (earlyResult != null) { - call.StartSendCloseFromClient(HandleSendCloseFromClientFinished); + return earlyResult; } - else + + if (disposed || finished) { // In case the call has already been finished by the serverside, - // the halfclose has already been done implicitly, so we only - // emit the notification for the completion delegate. - Task.Run(() => HandleSendCloseFromClientFinished(true)); + // the halfclose has already been done implicitly, so just return + // completed task here. + halfcloseRequested = true; + return Task.FromResult<object>(null); } + call.StartSendCloseFromClient(HandleSendFinished); halfcloseRequested = true; - sendCompletionDelegate = completionDelegate; + streamingWriteTcs = new TaskCompletionSource<object>(); + return streamingWriteTcs.Task; } } @@ -342,6 +341,45 @@ namespace Grpc.Core.Internal get { return true; } } + protected override Task CheckSendAllowedOrEarlyResult() + { + var earlyResult = CheckSendPreconditionsClientSide(); + if (earlyResult != null) + { + return earlyResult; + } + + if (finishedStatus.HasValue) + { + // throwing RpcException if we already received status on client + // side makes the most sense. + // Note that this throws even for StatusCode.OK. + // Writing after the call has finished is not a programming error because server can close + // the call anytime, so don't throw directly, but let the write task finish with an error. + var tcs = new TaskCompletionSource<object>(); + tcs.SetException(new RpcException(finishedStatus.Value.Status)); + return tcs.Task; + } + + return null; + } + + private Task CheckSendPreconditionsClientSide() + { + GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed."); + GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time."); + + if (cancelRequested) + { + // Return a cancelled task. + var tcs = new TaskCompletionSource<object>(); + tcs.SetCanceled(); + return tcs.Task; + } + + return null; + } + private void Initialize(CompletionQueueSafeHandle cq) { using (Profilers.ForCurrentThread().NewScope("AsyncCall.Initialize")) @@ -368,7 +406,7 @@ namespace Grpc.Core.Internal var credentials = details.Options.Credentials; using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null) { - var result = details.Channel.Handle.CreateCall(environment.CompletionRegistry, + var result = details.Channel.Handle.CreateCall( parentCall, ContextPropagationToken.DefaultMask, cq, details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials); return result; @@ -400,6 +438,7 @@ namespace Grpc.Core.Internal /// </summary> private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders) { + // TODO(jtattermusch): handle success==false responseHeadersTcs.SetResult(responseHeaders); } @@ -443,19 +482,6 @@ namespace Grpc.Core.Internal } } - protected override void CheckSendingAllowed(bool allowFinished) - { - base.CheckSendingAllowed(true); - - // throwing RpcException if we already received status on client - // side makes the most sense. - // Note that this throws even for StatusCode.OK. - if (!allowFinished && finishedStatus.HasValue) - { - throw new RpcException(finishedStatus.Value.Status); - } - } - /// <summary> /// Handles receive status completion for calls with streaming response. /// </summary> diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 4de23706b2..eb9c3ea62d 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -58,7 +58,6 @@ namespace Grpc.Core.Internal readonly Func<TWrite, byte[]> serializer; readonly Func<byte[], TRead> deserializer; - protected readonly GrpcEnvironment environment; protected readonly object myLock = new object(); protected INativeCall call; @@ -67,8 +66,8 @@ namespace Grpc.Core.Internal protected bool started; protected bool cancelRequested; - protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null. protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null. + protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null. protected TaskCompletionSource<object> sendStatusFromServerTcs; protected bool readingDone; // True if last read (i.e. read with null payload) was already received. @@ -78,11 +77,10 @@ namespace Grpc.Core.Internal protected bool initialMetadataSent; protected long streamingWritesCounter; // Number of streaming send operations started so far. - public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment) + public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer) { this.serializer = GrpcPreconditions.CheckNotNull(serializer); this.deserializer = GrpcPreconditions.CheckNotNull(deserializer); - this.environment = GrpcPreconditions.CheckNotNull(environment); } /// <summary> @@ -128,28 +126,31 @@ namespace Grpc.Core.Internal /// <summary> /// Initiates sending a message. Only one send operation can be active at a time. - /// completionDelegate is invoked upon completion. /// </summary> - protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate) + protected Task SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags) { byte[] payload = UnsafeSerialize(msg); lock (myLock) { - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); - CheckSendingAllowed(allowFinished: false); + GrpcPreconditions.CheckState(started); + var earlyResult = CheckSendAllowedOrEarlyResult(); + if (earlyResult != null) + { + return earlyResult; + } call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent); - sendCompletionDelegate = completionDelegate; initialMetadataSent = true; streamingWritesCounter++; + streamingWriteTcs = new TaskCompletionSource<object>(); + return streamingWriteTcs.Task; } } /// <summary> /// Initiates reading a message. Only one read operation can be active at a time. - /// completionDelegate is invoked upon completion. /// </summary> protected Task<TRead> ReadMessageInternalAsync() { @@ -159,7 +160,7 @@ namespace Grpc.Core.Internal if (readingDone) { // the last read that returns null or throws an exception is idempotent - // and maintain its state. + // and maintains its state. GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads."); return streamingReadTcs.Task; } @@ -183,7 +184,7 @@ namespace Grpc.Core.Internal { if (!disposed && call != null) { - bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished); + bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished); if (noMoreSendCompletions && readingDone && finished) { ReleaseResources(); @@ -213,24 +214,11 @@ namespace Grpc.Core.Internal { } - protected virtual void CheckSendingAllowed(bool allowFinished) - { - GrpcPreconditions.CheckState(started); - CheckNotCancelled(); - GrpcPreconditions.CheckState(!disposed || allowFinished); - - GrpcPreconditions.CheckState(!halfcloseRequested, "Already halfclosed."); - GrpcPreconditions.CheckState(!finished || allowFinished, "Already finished."); - GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); - } - - protected void CheckNotCancelled() - { - if (cancelRequested) - { - throw new OperationCanceledException("Remote call has been cancelled."); - } - } + /// <summary> + /// Checks if sending is allowed and possibly returns a Task that allows short-circuiting the send + /// logic by directly returning the write operation result task. Normally, null is returned. + /// </summary> + protected abstract Task CheckSendAllowedOrEarlyResult(); protected byte[] UnsafeSerialize(TWrite msg) { @@ -259,63 +247,27 @@ namespace Grpc.Core.Internal } } - protected void FireCompletion<T>(AsyncCompletionDelegate<T> completionDelegate, T value, Exception error) - { - try - { - completionDelegate(value, error); - } - catch (Exception e) - { - Logger.Error(e, "Exception occured while invoking completion delegate."); - } - } - /// <summary> - /// Handles send completion. + /// Handles send completion (including SendCloseFromClient). /// </summary> protected void HandleSendFinished(bool success) { - AsyncCompletionDelegate<object> origCompletionDelegate = null; - lock (myLock) - { - origCompletionDelegate = sendCompletionDelegate; - sendCompletionDelegate = null; - - ReleaseResourcesIfPossible(); - } - - if (!success) - { - FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Send failed")); - } - else - { - FireCompletion(origCompletionDelegate, null, null); - } - } - - /// <summary> - /// Handles halfclose (send close from client) completion. - /// </summary> - protected void HandleSendCloseFromClientFinished(bool success) - { - AsyncCompletionDelegate<object> origCompletionDelegate = null; + TaskCompletionSource<object> origTcs = null; lock (myLock) { - origCompletionDelegate = sendCompletionDelegate; - sendCompletionDelegate = null; + origTcs = streamingWriteTcs; + streamingWriteTcs = null; ReleaseResourcesIfPossible(); } if (!success) { - FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Sending close from client has failed.")); + origTcs.SetException(new InvalidOperationException("Send failed")); } else { - FireCompletion(origCompletionDelegate, null, null); + origTcs.SetResult(null); } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index b1566b44a7..56c23ba3ef 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -51,14 +51,14 @@ namespace Grpc.Core.Internal readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); readonly Server server; - public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer, environment) + public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, Server server) : base(serializer, deserializer) { this.server = GrpcPreconditions.CheckNotNull(server); } - public void Initialize(CallSafeHandle call) + public void Initialize(CallSafeHandle call, CompletionQueueSafeHandle completionQueue) { - call.Initialize(environment.CompletionRegistry, environment.CompletionQueue); + call.Initialize(completionQueue); server.AddCallReference(this); InitializeInternal(call); @@ -91,11 +91,10 @@ namespace Grpc.Core.Internal /// <summary> /// Sends a streaming response. Only one pending send action is allowed at any given time. - /// completionDelegate is called when the operation finishes. /// </summary> - public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate) + public Task SendMessageAsync(TResponse msg, WriteFlags writeFlags) { - StartSendMessageInternal(msg, writeFlags, completionDelegate); + return SendMessageInternalAsync(msg, writeFlags); } /// <summary> @@ -110,20 +109,22 @@ namespace Grpc.Core.Internal /// Initiates sending a initial metadata. /// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation /// to make things simpler. - /// completionDelegate is invoked upon completion. /// </summary> - public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate<object> completionDelegate) + public Task SendInitialMetadataAsync(Metadata headers) { lock (myLock) { GrpcPreconditions.CheckNotNull(headers, "metadata"); - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + GrpcPreconditions.CheckState(started); GrpcPreconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call."); GrpcPreconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts."); - CheckSendingAllowed(allowFinished: false); - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + var earlyResult = CheckSendAllowedOrEarlyResult(); + if (earlyResult != null) + { + return earlyResult; + } using (var metadataArray = MetadataArraySafeHandle.Create(headers)) { @@ -131,7 +132,8 @@ namespace Grpc.Core.Internal } this.initialMetadataSent = true; - sendCompletionDelegate = completionDelegate; + streamingWriteTcs = new TaskCompletionSource<object>(); + return streamingWriteTcs.Task; } } @@ -196,6 +198,16 @@ namespace Grpc.Core.Internal server.RemoveCallReference(this); } + protected override Task CheckSendAllowedOrEarlyResult() + { + GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed."); + GrpcPreconditions.CheckState(!finished, "Already finished."); + GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time"); + GrpcPreconditions.CheckState(!disposed); + + return null; + } + /// <summary> /// Handles the server side close completion. /// </summary> diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 244b97d4a4..82361f5797 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -47,16 +47,14 @@ namespace Grpc.Core.Internal static readonly NativeMethods Native = NativeMethods.Get(); const uint GRPC_WRITE_BUFFER_HINT = 1; - CompletionRegistry completionRegistry; CompletionQueueSafeHandle completionQueue; private CallSafeHandle() { } - public void Initialize(CompletionRegistry completionRegistry, CompletionQueueSafeHandle completionQueue) + public void Initialize(CompletionQueueSafeHandle completionQueue) { - this.completionRegistry = completionRegistry; this.completionQueue = completionQueue; } @@ -70,7 +68,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags) .CheckOk(); } @@ -90,7 +88,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); Native.grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk(); } } @@ -100,7 +98,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); Native.grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk(); } } @@ -110,7 +108,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); Native.grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk(); } } @@ -120,7 +118,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); Native.grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk(); } } @@ -130,7 +128,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); Native.grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); } } @@ -142,7 +140,7 @@ namespace Grpc.Core.Internal { var ctx = BatchContextSafeHandle.Create(); var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero; - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata, optionalPayload, optionalPayloadLength, writeFlags).CheckOk(); } @@ -153,7 +151,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage())); Native.grpcsharp_call_recv_message(this, ctx).CheckOk(); } } @@ -163,7 +161,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata())); Native.grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk(); } } @@ -173,7 +171,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled())); Native.grpcsharp_call_start_serverside(this, ctx).CheckOk(); } } @@ -183,7 +181,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); Native.grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk(); } } diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs index 1dbd1f4e34..62864dff0c 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs @@ -63,7 +63,7 @@ namespace Grpc.Core.Internal return Native.grpcsharp_secure_channel_create(credentials, target, channelArgs); } - public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials) + public CallSafeHandle CreateCall(CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials) { using (Profilers.ForCurrentThread().NewScope("ChannelSafeHandle.CreateCall")) { @@ -72,7 +72,7 @@ namespace Grpc.Core.Internal { result.SetCredentials(credentials); } - result.Initialize(registry, cq); + result.Initialize(cq); return result; } } @@ -82,11 +82,10 @@ namespace Grpc.Core.Internal return Native.grpcsharp_channel_check_connectivity_state(this, tryToConnect ? 1 : 0); } - public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, - CompletionRegistry completionRegistry, BatchCompletionDelegate callback) + public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback); Native.grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx); } diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs index 013f00ff6f..924de028f5 100644 --- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs @@ -50,16 +50,12 @@ namespace Grpc.Core.Internal public Task WriteAsync(TRequest message) { - var taskSource = new AsyncCompletionTaskSource<object>(); - call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate); - return taskSource.Task; + return call.SendMessageAsync(message, GetWriteFlags()); } public Task CompleteAsync() { - var taskSource = new AsyncCompletionTaskSource<object>(); - call.StartSendCloseFromClient(taskSource.CompletionDelegate); - return taskSource.Task; + return call.SendCloseFromClientAsync(); } public WriteOptions WriteOptions diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs index 91364cdc70..46f5624223 100644 --- a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs @@ -45,6 +45,7 @@ namespace Grpc.Core.Internal static readonly NativeMethods Native = NativeMethods.Get(); AtomicCounter shutdownRefcount = new AtomicCounter(1); + CompletionRegistry completionRegistry; private CompletionQueueSafeHandle() { @@ -53,7 +54,13 @@ namespace Grpc.Core.Internal public static CompletionQueueSafeHandle Create() { return Native.grpcsharp_completion_queue_create(); + } + public static CompletionQueueSafeHandle Create(CompletionRegistry completionRegistry) + { + var cq = Native.grpcsharp_completion_queue_create(); + cq.completionRegistry = completionRegistry; + return cq; } public CompletionQueueEvent Next() @@ -83,6 +90,15 @@ namespace Grpc.Core.Internal DecrementShutdownRefcount(); } + /// <summary> + /// Completion registry associated with this completion queue. + /// Doesn't need to be set if only using Pluck() operations. + /// </summary> + public CompletionRegistry CompletionRegistry + { + get { return completionRegistry; } + } + protected override bool ReleaseHandle() { Native.grpcsharp_completion_queue_destroy(handle); diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index b538726fa1..a446c1f99f 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -33,15 +33,16 @@ using System; using System.Collections.Generic; -using System.Runtime.InteropServices; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Grpc.Core.Logging; +using Grpc.Core.Utils; namespace Grpc.Core.Internal { /// <summary> - /// Pool of threads polling on the same completion queue. + /// Pool of threads polling on a set of completions queues. /// </summary> internal class GrpcThreadPool { @@ -51,25 +52,33 @@ namespace Grpc.Core.Internal readonly object myLock = new object(); readonly List<Thread> threads = new List<Thread>(); readonly int poolSize; + readonly int completionQueueCount; - CompletionQueueSafeHandle cq; + bool stopRequested; - public GrpcThreadPool(GrpcEnvironment environment, int poolSize) + IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues; + + /// <summary> + /// Creates a thread pool threads polling on a set of completions queues. + /// </summary> + /// <param name="environment">Environment.</param> + /// <param name="poolSize">Pool size.</param> + /// <param name="completionQueueCount">Completion queue count.</param> + public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount) { this.environment = environment; this.poolSize = poolSize; + this.completionQueueCount = completionQueueCount; + GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount, + "Thread pool size cannot be smaller than the number of completion queues used."); } public void Start() { lock (myLock) { - if (cq != null) - { - throw new InvalidOperationException("Already started."); - } - - cq = CompletionQueueSafeHandle.Create(); + GrpcPreconditions.CheckState(completionQueues == null, "Already started."); + completionQueues = CreateCompletionQueueList(environment, completionQueueCount); for (int i = 0; i < poolSize; i++) { @@ -78,41 +87,73 @@ namespace Grpc.Core.Internal } } - public void Stop() + public Task StopAsync() { lock (myLock) { - cq.Shutdown(); + GrpcPreconditions.CheckState(!stopRequested, "Stop already requested."); + stopRequested = true; + + foreach (var cq in completionQueues) + { + cq.Shutdown(); + } + } + + return Task.Run(() => + { foreach (var thread in threads) { thread.Join(); } - cq.Dispose(); + foreach (var cq in completionQueues) + { + cq.Dispose(); + } + }); + } + + /// <summary> + /// Returns true if there is at least one thread pool thread that hasn't + /// already stopped. + /// Threads can either stop because all completion queues shut down or + /// because all foreground threads have already shutdown and process is + /// going to exit. + /// </summary> + internal bool IsAlive + { + get + { + return threads.Any(t => t.ThreadState != ThreadState.Stopped); } } - internal CompletionQueueSafeHandle CompletionQueue + internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues { get { - return cq; + return completionQueues; } } - private Thread CreateAndStartThread(int i) + private Thread CreateAndStartThread(int threadIndex) { - var thread = new Thread(new ThreadStart(RunHandlerLoop)); - thread.IsBackground = false; + var cqIndex = threadIndex % completionQueues.Count; + var cq = completionQueues.ElementAt(cqIndex); + + var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq))); + thread.IsBackground = true; + thread.Name = string.Format("grpc {0} (cq {1})", threadIndex, cqIndex); thread.Start(); - thread.Name = "grpc " + i; + return thread; } /// <summary> /// Body of the polling thread. /// </summary> - private void RunHandlerLoop() + private void RunHandlerLoop(CompletionQueueSafeHandle cq) { CompletionQueueEvent ev; do @@ -124,7 +165,7 @@ namespace Grpc.Core.Internal IntPtr tag = ev.tag; try { - var callback = environment.CompletionRegistry.Extract(tag); + var callback = cq.CompletionRegistry.Extract(tag); callback(success); } catch (Exception e) @@ -135,5 +176,16 @@ namespace Grpc.Core.Internal } while (ev.type != CompletionQueueEvent.CompletionType.Shutdown); } + + private static IReadOnlyCollection<CompletionQueueSafeHandle> CreateCompletionQueueList(GrpcEnvironment environment, int completionQueueCount) + { + var list = new List<CompletionQueueSafeHandle>(); + for (int i = 0; i < completionQueueCount; i++) + { + var completionRegistry = new CompletionRegistry(environment); + list.Add(CompletionQueueSafeHandle.Create(completionRegistry)); + } + return list.AsReadOnly(); + } } } diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs index 42fd4d4dc6..65607ed120 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs @@ -137,6 +137,7 @@ namespace Grpc.Core.Internal public readonly Delegates.grpcsharp_server_credentials_release_delegate grpcsharp_server_credentials_release; public readonly Delegates.grpcsharp_server_create_delegate grpcsharp_server_create; + public readonly Delegates.grpcsharp_server_register_completion_queue_delegate grpcsharp_server_register_completion_queue; public readonly Delegates.grpcsharp_server_add_insecure_http2_port_delegate grpcsharp_server_add_insecure_http2_port; public readonly Delegates.grpcsharp_server_add_secure_http2_port_delegate grpcsharp_server_add_secure_http2_port; public readonly Delegates.grpcsharp_server_start_delegate grpcsharp_server_start; @@ -244,6 +245,7 @@ namespace Grpc.Core.Internal this.grpcsharp_server_credentials_release = GetMethodDelegate<Delegates.grpcsharp_server_credentials_release_delegate>(library); this.grpcsharp_server_create = GetMethodDelegate<Delegates.grpcsharp_server_create_delegate>(library); + this.grpcsharp_server_register_completion_queue = GetMethodDelegate<Delegates.grpcsharp_server_register_completion_queue_delegate>(library); this.grpcsharp_server_add_insecure_http2_port = GetMethodDelegate<Delegates.grpcsharp_server_add_insecure_http2_port_delegate>(library); this.grpcsharp_server_add_secure_http2_port = GetMethodDelegate<Delegates.grpcsharp_server_add_secure_http2_port_delegate>(library); this.grpcsharp_server_start = GetMethodDelegate<Delegates.grpcsharp_server_start_delegate>(library); @@ -348,6 +350,7 @@ namespace Grpc.Core.Internal this.grpcsharp_server_credentials_release = PInvokeMethods.grpcsharp_server_credentials_release; this.grpcsharp_server_create = PInvokeMethods.grpcsharp_server_create; + this.grpcsharp_server_register_completion_queue = PInvokeMethods.grpcsharp_server_register_completion_queue; this.grpcsharp_server_add_insecure_http2_port = PInvokeMethods.grpcsharp_server_add_insecure_http2_port; this.grpcsharp_server_add_secure_http2_port = PInvokeMethods.grpcsharp_server_add_secure_http2_port; this.grpcsharp_server_start = PInvokeMethods.grpcsharp_server_start; @@ -493,7 +496,8 @@ namespace Grpc.Core.Internal public delegate ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create_delegate(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs, bool forceClientAuth); public delegate void grpcsharp_server_credentials_release_delegate(IntPtr credentials); - public delegate ServerSafeHandle grpcsharp_server_create_delegate(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args); + public delegate ServerSafeHandle grpcsharp_server_create_delegate(ChannelArgsSafeHandle args); + public delegate void grpcsharp_server_register_completion_queue_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq); public delegate int grpcsharp_server_add_insecure_http2_port_delegate(ServerSafeHandle server, string addr); public delegate int grpcsharp_server_add_secure_http2_port_delegate(ServerSafeHandle server, string addr, ServerCredentialsSafeHandle creds); public delegate void grpcsharp_server_start_delegate(ServerSafeHandle server); @@ -773,7 +777,10 @@ namespace Grpc.Core.Internal // ServerSafeHandle [DllImport("grpc_csharp_ext.dll")] - public static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args); + public static extern ServerSafeHandle grpcsharp_server_create(ChannelArgsSafeHandle args); + + [DllImport("grpc_csharp_ext.dll")] + public static extern void grpcsharp_server_register_completion_queue(ServerSafeHandle server, CompletionQueueSafeHandle cq); [DllImport("grpc_csharp_ext.dll")] public static extern int grpcsharp_server_add_insecure_http2_port(ServerSafeHandle server, string addr); diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index febebba209..6a2f520163 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -44,7 +44,7 @@ namespace Grpc.Core.Internal { internal interface IServerCallHandler { - Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment); + Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq); } internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler @@ -62,14 +62,14 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) + public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment, newRpc.Server); + newRpc.Server); - asyncCall.Initialize(newRpc.Call); + asyncCall.Initialize(newRpc.Call, cq); var finishedTask = asyncCall.ServerSideCallAsync(); var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); @@ -121,14 +121,14 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) + public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment, newRpc.Server); + newRpc.Server); - asyncCall.Initialize(newRpc.Call); + asyncCall.Initialize(newRpc.Call, cq); var finishedTask = asyncCall.ServerSideCallAsync(); var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); @@ -179,14 +179,14 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) + public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment, newRpc.Server); + newRpc.Server); - asyncCall.Initialize(newRpc.Call); + asyncCall.Initialize(newRpc.Call, cq); var finishedTask = asyncCall.ServerSideCallAsync(); var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); @@ -237,14 +237,14 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) + public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment, newRpc.Server); + newRpc.Server); - asyncCall.Initialize(newRpc.Call); + asyncCall.Initialize(newRpc.Call, cq); var finishedTask = asyncCall.ServerSideCallAsync(); var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); @@ -281,13 +281,13 @@ namespace Grpc.Core.Internal { public static readonly NoSuchMethodCallHandler Instance = new NoSuchMethodCallHandler(); - public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) + public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) { // We don't care about the payload type here. var asyncCall = new AsyncCallServer<byte[], byte[]>( - (payload) => payload, (payload) => payload, environment, newRpc.Server); + (payload) => payload, (payload) => payload, newRpc.Server); - asyncCall.Initialize(newRpc.Call); + asyncCall.Initialize(newRpc.Call, cq); var finishedTask = asyncCall.ServerSideCallAsync(); await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false); await finishedTask.ConfigureAwait(false); diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index ecfee0bfdd..25b79b4398 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -52,16 +52,12 @@ namespace Grpc.Core.Internal public Task WriteAsync(TResponse message) { - var taskSource = new AsyncCompletionTaskSource<object>(); - call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate); - return taskSource.Task; + return call.SendMessageAsync(message, GetWriteFlags()); } public Task WriteResponseHeadersAsync(Metadata responseHeaders) { - var taskSource = new AsyncCompletionTaskSource<object>(); - call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate); - return taskSource.Task; + return call.SendInitialMetadataAsync(responseHeaders); } public WriteOptions WriteOptions diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index 6b5f70e220..8581302706 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -31,12 +31,6 @@ #endregion -using System; -using System.Collections.Concurrent; -using System.Diagnostics; -using System.Runtime.InteropServices; -using Grpc.Core.Utils; - namespace Grpc.Core.Internal { /// <summary> @@ -50,12 +44,17 @@ namespace Grpc.Core.Internal { } - public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args) + public static ServerSafeHandle NewServer(ChannelArgsSafeHandle args) { // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle. // Doing so would make object finalizer crash if we end up abandoning the handle. GrpcEnvironment.GrpcNativeInit(); - return Native.grpcsharp_server_create(cq, args); + return Native.grpcsharp_server_create(args); + } + + public void RegisterCompletionQueue(CompletionQueueSafeHandle cq) + { + Native.grpcsharp_server_register_completion_queue(this, cq); } public int AddInsecurePort(string addr) @@ -73,18 +72,18 @@ namespace Grpc.Core.Internal Native.grpcsharp_server_start(this); } - public void ShutdownAndNotify(BatchCompletionDelegate callback, GrpcEnvironment environment) + public void ShutdownAndNotify(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue) { var ctx = BatchContextSafeHandle.Create(); - environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); - Native.grpcsharp_server_shutdown_and_notify_callback(this, environment.CompletionQueue, ctx); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + Native.grpcsharp_server_shutdown_and_notify_callback(this, completionQueue, ctx); } - public void RequestCall(BatchCompletionDelegate callback, GrpcEnvironment environment) + public void RequestCall(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue) { var ctx = BatchContextSafeHandle.Create(); - environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); - Native.grpcsharp_server_request_call(this, environment.CompletionQueue, ctx).CheckOk(); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + Native.grpcsharp_server_request_call(this, completionQueue, ctx).CheckOk(); } protected override bool ReleaseHandle() diff --git a/src/csharp/Grpc.Core/Logging/LogLevel.cs b/src/csharp/Grpc.Core/Logging/LogLevel.cs new file mode 100644 index 0000000000..d64e1f5fd0 --- /dev/null +++ b/src/csharp/Grpc.Core/Logging/LogLevel.cs @@ -0,0 +1,59 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; + +namespace Grpc.Core.Logging +{ + /// <summary>Standard logging levels.</summary> + public enum LogLevel + { + /// <summary> + /// Debug severity. + /// </summary> + Debug = 0, + /// <summary> + /// Info severity. + /// </summary> + Info, + /// <summary> + /// Warning severity. + /// </summary> + Warning, + /// <summary> + /// Error severity. + /// </summary> + Error + } +} diff --git a/src/csharp/Grpc.Core/Logging/LogLevelFilterLogger.cs b/src/csharp/Grpc.Core/Logging/LogLevelFilterLogger.cs new file mode 100644 index 0000000000..4eeb79c783 --- /dev/null +++ b/src/csharp/Grpc.Core/Logging/LogLevelFilterLogger.cs @@ -0,0 +1,160 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.IO; +using Grpc.Core.Utils; + +namespace Grpc.Core.Logging +{ + /// <summary>Logger that filters out messages below certain log level.</summary> + public class LogLevelFilterLogger : ILogger + { + readonly ILogger innerLogger; + readonly LogLevel logLevel; + + /// <summary> + /// Creates and instance of <c>LogLevelFilter.</c> + /// </summary> + public LogLevelFilterLogger(ILogger logger, LogLevel logLevel) + { + this.innerLogger = GrpcPreconditions.CheckNotNull(logger); + this.logLevel = logLevel; + } + + /// <summary> + /// Returns a logger associated with the specified type. + /// </summary> + public virtual ILogger ForType<T>() + { + var newInnerLogger = innerLogger.ForType<T>(); + if (object.ReferenceEquals(this.innerLogger, newInnerLogger)) + { + return this; + } + return new LogLevelFilterLogger(newInnerLogger, logLevel); + } + + /// <summary>Logs a message with severity Debug.</summary> + public void Debug(string message) + { + if (logLevel <= LogLevel.Debug) + { + innerLogger.Debug(message); + } + } + + /// <summary>Logs a formatted message with severity Debug.</summary> + public void Debug(string format, params object[] formatArgs) + { + if (logLevel <= LogLevel.Debug) + { + innerLogger.Debug(format, formatArgs); + } + } + + /// <summary>Logs a message with severity Info.</summary> + public void Info(string message) + { + if (logLevel <= LogLevel.Info) + { + innerLogger.Info(message); + } + } + + /// <summary>Logs a formatted message with severity Info.</summary> + public void Info(string format, params object[] formatArgs) + { + if (logLevel <= LogLevel.Info) + { + innerLogger.Info(format, formatArgs); + } + } + + /// <summary>Logs a message with severity Warning.</summary> + public void Warning(string message) + { + if (logLevel <= LogLevel.Warning) + { + innerLogger.Warning(message); + } + } + + /// <summary>Logs a formatted message with severity Warning.</summary> + public void Warning(string format, params object[] formatArgs) + { + if (logLevel <= LogLevel.Warning) + { + innerLogger.Warning(format, formatArgs); + } + } + + /// <summary>Logs a message and an associated exception with severity Warning.</summary> + public void Warning(Exception exception, string message) + { + if (logLevel <= LogLevel.Warning) + { + innerLogger.Warning(exception, message); + } + } + + /// <summary>Logs a message with severity Error.</summary> + public void Error(string message) + { + if (logLevel <= LogLevel.Error) + { + innerLogger.Error(message); + } + } + + /// <summary>Logs a formatted message with severity Error.</summary> + public void Error(string format, params object[] formatArgs) + { + if (logLevel <= LogLevel.Error) + { + innerLogger.Error(format, formatArgs); + } + } + + /// <summary>Logs a message and an associated exception with severity Error.</summary> + public void Error(Exception exception, string message) + { + if (logLevel <= LogLevel.Error) + { + innerLogger.Error(exception, message); + } + } + } +} diff --git a/src/csharp/Grpc.Core/Metadata.cs b/src/csharp/Grpc.Core/Metadata.cs index e982fa0c48..f73f720094 100644 --- a/src/csharp/Grpc.Core/Metadata.cs +++ b/src/csharp/Grpc.Core/Metadata.cs @@ -95,6 +95,7 @@ namespace Grpc.Core public void Insert(int index, Metadata.Entry item) { + GrpcPreconditions.CheckNotNull(item); CheckWriteable(); entries.Insert(index, item); } @@ -114,6 +115,7 @@ namespace Grpc.Core set { + GrpcPreconditions.CheckNotNull(value); CheckWriteable(); entries[index] = value; } @@ -121,6 +123,7 @@ namespace Grpc.Core public void Add(Metadata.Entry item) { + GrpcPreconditions.CheckNotNull(item); CheckWriteable(); entries.Add(item); } @@ -187,7 +190,7 @@ namespace Grpc.Core /// <summary> /// Metadata entry /// </summary> - public struct Entry + public class Entry { private static readonly Encoding Encoding = Encoding.ASCII; private static readonly Regex ValidKeyRegex = new Regex("^[a-z0-9_-]+$"); diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index d538a4671f..3b554e5e87 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -34,8 +34,7 @@ using System; using System.Collections; using System.Collections.Generic; -using System.Diagnostics; -using System.Runtime.InteropServices; +using System.Linq; using System.Threading.Tasks; using Grpc.Core.Internal; using Grpc.Core.Logging; @@ -48,7 +47,7 @@ namespace Grpc.Core /// </summary> public class Server { - const int InitialAllowRpcTokenCount = 10; + const int InitialAllowRpcTokenCountPerCq = 10; static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>(); readonly AtomicCounter activeCallCounter = new AtomicCounter(); @@ -68,11 +67,19 @@ namespace Grpc.Core bool startRequested; volatile bool shutdownRequested; + /// <summary> - /// Create a new server. + /// Creates a new server. + /// </summary> + public Server() : this(null) + { + } + + /// <summary> + /// Creates a new server. /// </summary> /// <param name="options">Channel options.</param> - public Server(IEnumerable<ChannelOption> options = null) + public Server(IEnumerable<ChannelOption> options) { this.serviceDefinitions = new ServiceDefinitionCollection(this); this.ports = new ServerPortCollection(this); @@ -80,8 +87,14 @@ namespace Grpc.Core this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options)) { - this.handle = ServerSafeHandle.NewServer(environment.CompletionQueue, channelArgs); + this.handle = ServerSafeHandle.NewServer(channelArgs); + } + + foreach (var cq in environment.CompletionQueues) + { + this.handle.RegisterCompletionQueue(cq); } + GrpcEnvironment.RegisterServer(this); } /// <summary> @@ -127,15 +140,19 @@ namespace Grpc.Core lock (myLock) { GrpcPreconditions.CheckState(!startRequested); + GrpcPreconditions.CheckState(!shutdownRequested); startRequested = true; handle.Start(); // Starting with more than one AllowOneRpc tokens can significantly increase // unary RPC throughput. - for (int i = 0; i < InitialAllowRpcTokenCount; i++) + for (int i = 0; i < InitialAllowRpcTokenCountPerCq; i++) { - AllowOneRpc(); + foreach (var cq in environment.CompletionQueues) + { + AllowOneRpc(cq); + } } } } @@ -145,41 +162,24 @@ namespace Grpc.Core /// cleans up used resources. The returned task finishes when shutdown procedure /// is complete. /// </summary> - public async Task ShutdownAsync() + /// <remarks> + /// It is strongly recommended to shutdown all previously created servers before exiting from the process. + /// </remarks> + public Task ShutdownAsync() { - lock (myLock) - { - GrpcPreconditions.CheckState(startRequested); - GrpcPreconditions.CheckState(!shutdownRequested); - shutdownRequested = true; - } - - handle.ShutdownAndNotify(HandleServerShutdown, environment); - await shutdownTcs.Task.ConfigureAwait(false); - DisposeHandle(); - - await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false); + return ShutdownInternalAsync(false); } /// <summary> /// Requests server shutdown while cancelling all the in-progress calls. /// The returned task finishes when shutdown procedure is complete. /// </summary> - public async Task KillAsync() + /// <remarks> + /// It is strongly recommended to shutdown all previously created servers before exiting from the process. + /// </remarks> + public Task KillAsync() { - lock (myLock) - { - GrpcPreconditions.CheckState(startRequested); - GrpcPreconditions.CheckState(!shutdownRequested); - shutdownRequested = true; - } - - handle.ShutdownAndNotify(HandleServerShutdown, environment); - handle.CancelAllCalls(); - await shutdownTcs.Task.ConfigureAwait(false); - DisposeHandle(); - - await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false); + return ShutdownInternalAsync(true); } internal void AddCallReference(object call) @@ -198,6 +198,51 @@ namespace Grpc.Core } /// <summary> + /// Shuts down the server. + /// </summary> + private async Task ShutdownInternalAsync(bool kill) + { + lock (myLock) + { + GrpcPreconditions.CheckState(!shutdownRequested); + shutdownRequested = true; + } + GrpcEnvironment.UnregisterServer(this); + + var cq = environment.CompletionQueues.First(); // any cq will do + handle.ShutdownAndNotify(HandleServerShutdown, cq); + if (kill) + { + handle.CancelAllCalls(); + } + await ShutdownCompleteOrEnvironmentDeadAsync().ConfigureAwait(false); + + DisposeHandle(); + + await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false); + } + + /// <summary> + /// In case the environment's threadpool becomes dead, the shutdown completion will + /// never be delivered, but we need to release the environment's handle anyway. + /// </summary> + private async Task ShutdownCompleteOrEnvironmentDeadAsync() + { + while (true) + { + var task = await Task.WhenAny(shutdownTcs.Task, Task.Delay(20)).ConfigureAwait(false); + if (shutdownTcs.Task == task) + { + return; + } + if (!environment.IsAlive) + { + return; + } + } + } + + /// <summary> /// Adds a service definition. /// </summary> private void AddServiceDefinitionInternal(ServerServiceDefinition serviceDefinition) @@ -244,11 +289,11 @@ namespace Grpc.Core /// <summary> /// Allows one new RPC call to be received by server. /// </summary> - private void AllowOneRpc() + private void AllowOneRpc(CompletionQueueSafeHandle cq) { if (!shutdownRequested) { - handle.RequestCall(HandleNewServerRpc, environment); + handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), cq); } } @@ -265,7 +310,7 @@ namespace Grpc.Core /// <summary> /// Selects corresponding handler for given call and handles the call. /// </summary> - private async Task HandleCallAsync(ServerRpcNew newRpc) + private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) { try { @@ -274,7 +319,7 @@ namespace Grpc.Core { callHandler = NoSuchMethodCallHandler.Instance; } - await callHandler.HandleCall(newRpc, environment).ConfigureAwait(false); + await callHandler.HandleCall(newRpc, cq).ConfigureAwait(false); } catch (Exception e) { @@ -285,9 +330,9 @@ namespace Grpc.Core /// <summary> /// Handles the native callback. /// </summary> - private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx) + private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx, CompletionQueueSafeHandle cq) { - Task.Run(() => AllowOneRpc()); + Task.Run(() => AllowOneRpc(cq)); if (success) { @@ -296,7 +341,7 @@ namespace Grpc.Core // after server shutdown, the callback returns with null call if (!newRpc.Call.IsInvalid) { - HandleCallAsync(newRpc); // we don't need to await. + HandleCallAsync(newRpc, cq); // we don't need to await. } } } diff --git a/src/csharp/Grpc.Core/ServerServiceDefinition.cs b/src/csharp/Grpc.Core/ServerServiceDefinition.cs index deb1431ca3..ac08c04bf6 100644 --- a/src/csharp/Grpc.Core/ServerServiceDefinition.cs +++ b/src/csharp/Grpc.Core/ServerServiceDefinition.cs @@ -63,11 +63,10 @@ namespace Grpc.Core /// <summary> /// Creates a new builder object for <c>ServerServiceDefinition</c>. /// </summary> - /// <param name="serviceName">The service name.</param> /// <returns>The builder object.</returns> - public static Builder CreateBuilder(string serviceName) + public static Builder CreateBuilder() { - return new Builder(serviceName); + return new Builder(); } /// <summary> @@ -75,16 +74,13 @@ namespace Grpc.Core /// </summary> public class Builder { - readonly string serviceName; readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>(); /// <summary> /// Creates a new instance of builder. /// </summary> - /// <param name="serviceName">The service name.</param> - public Builder(string serviceName) + public Builder() { - this.serviceName = serviceName; } /// <summary> diff --git a/src/csharp/Grpc.Core/WriteOptions.cs b/src/csharp/Grpc.Core/WriteOptions.cs index 7523ada84a..4c9706d966 100644 --- a/src/csharp/Grpc.Core/WriteOptions.cs +++ b/src/csharp/Grpc.Core/WriteOptions.cs @@ -1,6 +1,6 @@ #region Copyright notice and license -// Copyright 2015, Google Inc. +// Copyright 2015-2016, Google Inc. // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -64,7 +64,7 @@ namespace Grpc.Core /// </summary> public static readonly WriteOptions Default = new WriteOptions(); - private WriteFlags flags; + private readonly WriteFlags flags; /// <summary> /// Initializes a new instance of <c>WriteOptions</c> class. diff --git a/src/csharp/Grpc.Examples/MathGrpc.cs b/src/csharp/Grpc.Examples/MathGrpc.cs index d700a18778..4bbefcbe01 100644 --- a/src/csharp/Grpc.Examples/MathGrpc.cs +++ b/src/csharp/Grpc.Examples/MathGrpc.cs @@ -81,103 +81,12 @@ namespace Math { get { return global::Math.MathReflection.Descriptor.Services[0]; } } - /// <summary>Client for Math</summary> - [System.Obsolete("Client side interfaced will be removed in the next release. Use client class directly.")] - public interface IMathClient - { - /// <summary> - /// Div divides args.dividend by args.divisor and returns the quotient and - /// remainder. - /// </summary> - global::Math.DivReply Div(global::Math.DivArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// Div divides args.dividend by args.divisor and returns the quotient and - /// remainder. - /// </summary> - global::Math.DivReply Div(global::Math.DivArgs request, CallOptions options); - /// <summary> - /// Div divides args.dividend by args.divisor and returns the quotient and - /// remainder. - /// </summary> - AsyncUnaryCall<global::Math.DivReply> DivAsync(global::Math.DivArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// Div divides args.dividend by args.divisor and returns the quotient and - /// remainder. - /// </summary> - AsyncUnaryCall<global::Math.DivReply> DivAsync(global::Math.DivArgs request, CallOptions options); - /// <summary> - /// DivMany accepts an arbitrary number of division args from the client stream - /// and sends back the results in the reply stream. The stream continues until - /// the client closes its end; the server does the same after sending all the - /// replies. The stream ends immediately if either end aborts. - /// </summary> - AsyncDuplexStreamingCall<global::Math.DivArgs, global::Math.DivReply> DivMany(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// DivMany accepts an arbitrary number of division args from the client stream - /// and sends back the results in the reply stream. The stream continues until - /// the client closes its end; the server does the same after sending all the - /// replies. The stream ends immediately if either end aborts. - /// </summary> - AsyncDuplexStreamingCall<global::Math.DivArgs, global::Math.DivReply> DivMany(CallOptions options); - /// <summary> - /// Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib - /// generates up to limit numbers; otherwise it continues until the call is - /// canceled. Unlike Fib above, Fib has no final FibReply. - /// </summary> - AsyncServerStreamingCall<global::Math.Num> Fib(global::Math.FibArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib - /// generates up to limit numbers; otherwise it continues until the call is - /// canceled. Unlike Fib above, Fib has no final FibReply. - /// </summary> - AsyncServerStreamingCall<global::Math.Num> Fib(global::Math.FibArgs request, CallOptions options); - /// <summary> - /// Sum sums a stream of numbers, returning the final result once the stream - /// is closed. - /// </summary> - AsyncClientStreamingCall<global::Math.Num, global::Math.Num> Sum(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// Sum sums a stream of numbers, returning the final result once the stream - /// is closed. - /// </summary> - AsyncClientStreamingCall<global::Math.Num, global::Math.Num> Sum(CallOptions options); - } - - /// <summary>Interface of server-side implementations of Math</summary> - [System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")] - public interface IMath - { - /// <summary> - /// Div divides args.dividend by args.divisor and returns the quotient and - /// remainder. - /// </summary> - global::System.Threading.Tasks.Task<global::Math.DivReply> Div(global::Math.DivArgs request, ServerCallContext context); - /// <summary> - /// DivMany accepts an arbitrary number of division args from the client stream - /// and sends back the results in the reply stream. The stream continues until - /// the client closes its end; the server does the same after sending all the - /// replies. The stream ends immediately if either end aborts. - /// </summary> - global::System.Threading.Tasks.Task DivMany(IAsyncStreamReader<global::Math.DivArgs> requestStream, IServerStreamWriter<global::Math.DivReply> responseStream, ServerCallContext context); - /// <summary> - /// Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib - /// generates up to limit numbers; otherwise it continues until the call is - /// canceled. Unlike Fib above, Fib has no final FibReply. - /// </summary> - global::System.Threading.Tasks.Task Fib(global::Math.FibArgs request, IServerStreamWriter<global::Math.Num> responseStream, ServerCallContext context); - /// <summary> - /// Sum sums a stream of numbers, returning the final result once the stream - /// is closed. - /// </summary> - global::System.Threading.Tasks.Task<global::Math.Num> Sum(IAsyncStreamReader<global::Math.Num> requestStream, ServerCallContext context); - } - /// <summary>Base class for server-side implementations of Math</summary> public abstract class MathBase { /// <summary> - /// Div divides args.dividend by args.divisor and returns the quotient and - /// remainder. + /// Div divides DivArgs.dividend by DivArgs.divisor and returns the quotient + /// and remainder. /// </summary> public virtual global::System.Threading.Tasks.Task<global::Math.DivReply> Div(global::Math.DivArgs request, ServerCallContext context) { @@ -196,7 +105,7 @@ namespace Math { } /// <summary> - /// Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib + /// Fib generates numbers in the Fibonacci sequence. If FibArgs.limit > 0, Fib /// generates up to limit numbers; otherwise it continues until the call is /// canceled. Unlike Fib above, Fib has no final FibReply. /// </summary> @@ -217,9 +126,7 @@ namespace Math { } /// <summary>Client for Math</summary> - #pragma warning disable 0618 - public class MathClient : ClientBase<MathClient>, IMathClient - #pragma warning restore 0618 + public class MathClient : ClientBase<MathClient> { public MathClient(Channel channel) : base(channel) { @@ -237,32 +144,32 @@ namespace Math { } /// <summary> - /// Div divides args.dividend by args.divisor and returns the quotient and - /// remainder. + /// Div divides DivArgs.dividend by DivArgs.divisor and returns the quotient + /// and remainder. /// </summary> public virtual global::Math.DivReply Div(global::Math.DivArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) { return Div(request, new CallOptions(headers, deadline, cancellationToken)); } /// <summary> - /// Div divides args.dividend by args.divisor and returns the quotient and - /// remainder. + /// Div divides DivArgs.dividend by DivArgs.divisor and returns the quotient + /// and remainder. /// </summary> public virtual global::Math.DivReply Div(global::Math.DivArgs request, CallOptions options) { return CallInvoker.BlockingUnaryCall(__Method_Div, null, options, request); } /// <summary> - /// Div divides args.dividend by args.divisor and returns the quotient and - /// remainder. + /// Div divides DivArgs.dividend by DivArgs.divisor and returns the quotient + /// and remainder. /// </summary> public virtual AsyncUnaryCall<global::Math.DivReply> DivAsync(global::Math.DivArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) { return DivAsync(request, new CallOptions(headers, deadline, cancellationToken)); } /// <summary> - /// Div divides args.dividend by args.divisor and returns the quotient and - /// remainder. + /// Div divides DivArgs.dividend by DivArgs.divisor and returns the quotient + /// and remainder. /// </summary> public virtual AsyncUnaryCall<global::Math.DivReply> DivAsync(global::Math.DivArgs request, CallOptions options) { @@ -289,7 +196,7 @@ namespace Math { return CallInvoker.AsyncDuplexStreamingCall(__Method_DivMany, null, options); } /// <summary> - /// Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib + /// Fib generates numbers in the Fibonacci sequence. If FibArgs.limit > 0, Fib /// generates up to limit numbers; otherwise it continues until the call is /// canceled. Unlike Fib above, Fib has no final FibReply. /// </summary> @@ -298,7 +205,7 @@ namespace Math { return Fib(request, new CallOptions(headers, deadline, cancellationToken)); } /// <summary> - /// Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib + /// Fib generates numbers in the Fibonacci sequence. If FibArgs.limit > 0, Fib /// generates up to limit numbers; otherwise it continues until the call is /// canceled. Unlike Fib above, Fib has no final FibReply. /// </summary> @@ -335,23 +242,9 @@ namespace Math { } /// <summary>Creates service definition that can be registered with a server</summary> - #pragma warning disable 0618 - public static ServerServiceDefinition BindService(IMath serviceImpl) - #pragma warning restore 0618 - { - return ServerServiceDefinition.CreateBuilder(__ServiceName) - .AddMethod(__Method_Div, serviceImpl.Div) - .AddMethod(__Method_DivMany, serviceImpl.DivMany) - .AddMethod(__Method_Fib, serviceImpl.Fib) - .AddMethod(__Method_Sum, serviceImpl.Sum).Build(); - } - - /// <summary>Creates service definition that can be registered with a server</summary> - #pragma warning disable 0618 public static ServerServiceDefinition BindService(MathBase serviceImpl) - #pragma warning restore 0618 { - return ServerServiceDefinition.CreateBuilder(__ServiceName) + return ServerServiceDefinition.CreateBuilder() .AddMethod(__Method_Div, serviceImpl.Div) .AddMethod(__Method_DivMany, serviceImpl.DivMany) .AddMethod(__Method_Fib, serviceImpl.Fib) diff --git a/src/csharp/Grpc.HealthCheck/HealthGrpc.cs b/src/csharp/Grpc.HealthCheck/HealthGrpc.cs index 51c6a39b1d..d0ade7d02b 100644 --- a/src/csharp/Grpc.HealthCheck/HealthGrpc.cs +++ b/src/csharp/Grpc.HealthCheck/HealthGrpc.cs @@ -58,23 +58,6 @@ namespace Grpc.Health.V1 { get { return global::Grpc.Health.V1.HealthReflection.Descriptor.Services[0]; } } - /// <summary>Client for Health</summary> - [System.Obsolete("Client side interfaced will be removed in the next release. Use client class directly.")] - public interface IHealthClient - { - global::Grpc.Health.V1.HealthCheckResponse Check(global::Grpc.Health.V1.HealthCheckRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - global::Grpc.Health.V1.HealthCheckResponse Check(global::Grpc.Health.V1.HealthCheckRequest request, CallOptions options); - AsyncUnaryCall<global::Grpc.Health.V1.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1.HealthCheckRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - AsyncUnaryCall<global::Grpc.Health.V1.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1.HealthCheckRequest request, CallOptions options); - } - - /// <summary>Interface of server-side implementations of Health</summary> - [System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")] - public interface IHealth - { - global::System.Threading.Tasks.Task<global::Grpc.Health.V1.HealthCheckResponse> Check(global::Grpc.Health.V1.HealthCheckRequest request, ServerCallContext context); - } - /// <summary>Base class for server-side implementations of Health</summary> public abstract class HealthBase { @@ -86,9 +69,7 @@ namespace Grpc.Health.V1 { } /// <summary>Client for Health</summary> - #pragma warning disable 0618 - public class HealthClient : ClientBase<HealthClient>, IHealthClient - #pragma warning restore 0618 + public class HealthClient : ClientBase<HealthClient> { public HealthClient(Channel channel) : base(channel) { @@ -134,20 +115,9 @@ namespace Grpc.Health.V1 { } /// <summary>Creates service definition that can be registered with a server</summary> - #pragma warning disable 0618 - public static ServerServiceDefinition BindService(IHealth serviceImpl) - #pragma warning restore 0618 - { - return ServerServiceDefinition.CreateBuilder(__ServiceName) - .AddMethod(__Method_Check, serviceImpl.Check).Build(); - } - - /// <summary>Creates service definition that can be registered with a server</summary> - #pragma warning disable 0618 public static ServerServiceDefinition BindService(HealthBase serviceImpl) - #pragma warning restore 0618 { - return ServerServiceDefinition.CreateBuilder(__ServiceName) + return ServerServiceDefinition.CreateBuilder() .AddMethod(__Method_Check, serviceImpl.Check).Build(); } diff --git a/src/csharp/Grpc.IntegrationTesting/GenericService.cs b/src/csharp/Grpc.IntegrationTesting/GenericService.cs index c6128264ac..53fa1ee5f6 100644 --- a/src/csharp/Grpc.IntegrationTesting/GenericService.cs +++ b/src/csharp/Grpc.IntegrationTesting/GenericService.cs @@ -64,7 +64,7 @@ namespace Grpc.IntegrationTesting public static ServerServiceDefinition BindHandler(DuplexStreamingServerMethod<byte[], byte[]> handler) { - return ServerServiceDefinition.CreateBuilder(StreamingCallMethod.ServiceName) + return ServerServiceDefinition.CreateBuilder() .AddMethod(StreamingCallMethod, handler).Build(); } } diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index 1541cfd7bb..aea40afee2 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -471,8 +471,16 @@ namespace Grpc.IntegrationTesting cts.Cancel(); - var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext()); - Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + try + { + // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. + await call.ResponseStream.MoveNext(); + Assert.Fail(); + } + catch (RpcException ex) + { + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + } } Console.WriteLine("Passed!"); } @@ -497,9 +505,16 @@ namespace Grpc.IntegrationTesting // Deadline was reached before write has started. Eat the exception and continue. } - var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext()); - // We can't guarantee the status code always DeadlineExceeded. See issue #2685. - Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal }); + try + { + await call.ResponseStream.MoveNext(); + Assert.Fail(); + } + catch (RpcException ex) + { + // We can't guarantee the status code always DeadlineExceeded. See issue #2685. + Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal }); + } } Console.WriteLine("Passed!"); } @@ -577,9 +592,17 @@ namespace Grpc.IntegrationTesting await call.RequestStream.WriteAsync(request); await call.RequestStream.CompleteAsync(); - var e = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.ToListAsync()); - Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode); - Assert.AreEqual(echoStatus.Message, e.Status.Detail); + try + { + // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. + await call.ResponseStream.ToListAsync(); + Assert.Fail(); + } + catch (RpcException e) + { + Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode); + Assert.AreEqual(echoStatus.Message, e.Status.Detail); + } } Console.WriteLine("Passed!"); diff --git a/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs b/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs index 9d31d1c514..22bd27ec0a 100644 --- a/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs +++ b/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs @@ -72,53 +72,6 @@ namespace Grpc.Testing { get { return global::Grpc.Testing.MetricsReflection.Descriptor.Services[0]; } } - /// <summary>Client for MetricsService</summary> - [System.Obsolete("Client side interfaced will be removed in the next release. Use client class directly.")] - public interface IMetricsServiceClient - { - /// <summary> - /// Returns the values of all the gauges that are currently being maintained by - /// the service - /// </summary> - AsyncServerStreamingCall<global::Grpc.Testing.GaugeResponse> GetAllGauges(global::Grpc.Testing.EmptyMessage request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// Returns the values of all the gauges that are currently being maintained by - /// the service - /// </summary> - AsyncServerStreamingCall<global::Grpc.Testing.GaugeResponse> GetAllGauges(global::Grpc.Testing.EmptyMessage request, CallOptions options); - /// <summary> - /// Returns the value of one gauge - /// </summary> - global::Grpc.Testing.GaugeResponse GetGauge(global::Grpc.Testing.GaugeRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// Returns the value of one gauge - /// </summary> - global::Grpc.Testing.GaugeResponse GetGauge(global::Grpc.Testing.GaugeRequest request, CallOptions options); - /// <summary> - /// Returns the value of one gauge - /// </summary> - AsyncUnaryCall<global::Grpc.Testing.GaugeResponse> GetGaugeAsync(global::Grpc.Testing.GaugeRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// Returns the value of one gauge - /// </summary> - AsyncUnaryCall<global::Grpc.Testing.GaugeResponse> GetGaugeAsync(global::Grpc.Testing.GaugeRequest request, CallOptions options); - } - - /// <summary>Interface of server-side implementations of MetricsService</summary> - [System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")] - public interface IMetricsService - { - /// <summary> - /// Returns the values of all the gauges that are currently being maintained by - /// the service - /// </summary> - global::System.Threading.Tasks.Task GetAllGauges(global::Grpc.Testing.EmptyMessage request, IServerStreamWriter<global::Grpc.Testing.GaugeResponse> responseStream, ServerCallContext context); - /// <summary> - /// Returns the value of one gauge - /// </summary> - global::System.Threading.Tasks.Task<global::Grpc.Testing.GaugeResponse> GetGauge(global::Grpc.Testing.GaugeRequest request, ServerCallContext context); - } - /// <summary>Base class for server-side implementations of MetricsService</summary> public abstract class MetricsServiceBase { @@ -142,9 +95,7 @@ namespace Grpc.Testing { } /// <summary>Client for MetricsService</summary> - #pragma warning disable 0618 - public class MetricsServiceClient : ClientBase<MetricsServiceClient>, IMetricsServiceClient - #pragma warning restore 0618 + public class MetricsServiceClient : ClientBase<MetricsServiceClient> { public MetricsServiceClient(Channel channel) : base(channel) { @@ -218,21 +169,9 @@ namespace Grpc.Testing { } /// <summary>Creates service definition that can be registered with a server</summary> - #pragma warning disable 0618 - public static ServerServiceDefinition BindService(IMetricsService serviceImpl) - #pragma warning restore 0618 - { - return ServerServiceDefinition.CreateBuilder(__ServiceName) - .AddMethod(__Method_GetAllGauges, serviceImpl.GetAllGauges) - .AddMethod(__Method_GetGauge, serviceImpl.GetGauge).Build(); - } - - /// <summary>Creates service definition that can be registered with a server</summary> - #pragma warning disable 0618 public static ServerServiceDefinition BindService(MetricsServiceBase serviceImpl) - #pragma warning restore 0618 { - return ServerServiceDefinition.CreateBuilder(__ServiceName) + return ServerServiceDefinition.CreateBuilder() .AddMethod(__Method_GetAllGauges, serviceImpl.GetAllGauges) .AddMethod(__Method_GetGauge, serviceImpl.GetGauge).Build(); } diff --git a/src/csharp/Grpc.IntegrationTesting/ServicesGrpc.cs b/src/csharp/Grpc.IntegrationTesting/ServicesGrpc.cs index f7071ebf6b..9c99296115 100644 --- a/src/csharp/Grpc.IntegrationTesting/ServicesGrpc.cs +++ b/src/csharp/Grpc.IntegrationTesting/ServicesGrpc.cs @@ -67,58 +67,6 @@ namespace Grpc.Testing { get { return global::Grpc.Testing.ServicesReflection.Descriptor.Services[0]; } } - /// <summary>Client for BenchmarkService</summary> - [System.Obsolete("Client side interfaced will be removed in the next release. Use client class directly.")] - public interface IBenchmarkServiceClient - { - /// <summary> - /// One request followed by one response. - /// The server returns the client payload as-is. - /// </summary> - global::Grpc.Testing.SimpleResponse UnaryCall(global::Grpc.Testing.SimpleRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// One request followed by one response. - /// The server returns the client payload as-is. - /// </summary> - global::Grpc.Testing.SimpleResponse UnaryCall(global::Grpc.Testing.SimpleRequest request, CallOptions options); - /// <summary> - /// One request followed by one response. - /// The server returns the client payload as-is. - /// </summary> - AsyncUnaryCall<global::Grpc.Testing.SimpleResponse> UnaryCallAsync(global::Grpc.Testing.SimpleRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// One request followed by one response. - /// The server returns the client payload as-is. - /// </summary> - AsyncUnaryCall<global::Grpc.Testing.SimpleResponse> UnaryCallAsync(global::Grpc.Testing.SimpleRequest request, CallOptions options); - /// <summary> - /// One request followed by one response. - /// The server returns the client payload as-is. - /// </summary> - AsyncDuplexStreamingCall<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse> StreamingCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// One request followed by one response. - /// The server returns the client payload as-is. - /// </summary> - AsyncDuplexStreamingCall<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse> StreamingCall(CallOptions options); - } - - /// <summary>Interface of server-side implementations of BenchmarkService</summary> - [System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")] - public interface IBenchmarkService - { - /// <summary> - /// One request followed by one response. - /// The server returns the client payload as-is. - /// </summary> - global::System.Threading.Tasks.Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context); - /// <summary> - /// One request followed by one response. - /// The server returns the client payload as-is. - /// </summary> - global::System.Threading.Tasks.Task StreamingCall(IAsyncStreamReader<global::Grpc.Testing.SimpleRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.SimpleResponse> responseStream, ServerCallContext context); - } - /// <summary>Base class for server-side implementations of BenchmarkService</summary> public abstract class BenchmarkServiceBase { @@ -143,9 +91,7 @@ namespace Grpc.Testing { } /// <summary>Client for BenchmarkService</summary> - #pragma warning disable 0618 - public class BenchmarkServiceClient : ClientBase<BenchmarkServiceClient>, IBenchmarkServiceClient - #pragma warning restore 0618 + public class BenchmarkServiceClient : ClientBase<BenchmarkServiceClient> { public BenchmarkServiceClient(Channel channel) : base(channel) { @@ -223,21 +169,9 @@ namespace Grpc.Testing { } /// <summary>Creates service definition that can be registered with a server</summary> - #pragma warning disable 0618 - public static ServerServiceDefinition BindService(IBenchmarkService serviceImpl) - #pragma warning restore 0618 - { - return ServerServiceDefinition.CreateBuilder(__ServiceName) - .AddMethod(__Method_UnaryCall, serviceImpl.UnaryCall) - .AddMethod(__Method_StreamingCall, serviceImpl.StreamingCall).Build(); - } - - /// <summary>Creates service definition that can be registered with a server</summary> - #pragma warning disable 0618 public static ServerServiceDefinition BindService(BenchmarkServiceBase serviceImpl) - #pragma warning restore 0618 { - return ServerServiceDefinition.CreateBuilder(__ServiceName) + return ServerServiceDefinition.CreateBuilder() .AddMethod(__Method_UnaryCall, serviceImpl.UnaryCall) .AddMethod(__Method_StreamingCall, serviceImpl.StreamingCall).Build(); } @@ -289,112 +223,6 @@ namespace Grpc.Testing { get { return global::Grpc.Testing.ServicesReflection.Descriptor.Services[1]; } } - /// <summary>Client for WorkerService</summary> - [System.Obsolete("Client side interfaced will be removed in the next release. Use client class directly.")] - public interface IWorkerServiceClient - { - /// <summary> - /// Start server with specified workload. - /// First request sent specifies the ServerConfig followed by ServerStatus - /// response. After that, a "Mark" can be sent anytime to request the latest - /// stats. Closing the stream will initiate shutdown of the test server - /// and once the shutdown has finished, the OK status is sent to terminate - /// this RPC. - /// </summary> - AsyncDuplexStreamingCall<global::Grpc.Testing.ServerArgs, global::Grpc.Testing.ServerStatus> RunServer(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// Start server with specified workload. - /// First request sent specifies the ServerConfig followed by ServerStatus - /// response. After that, a "Mark" can be sent anytime to request the latest - /// stats. Closing the stream will initiate shutdown of the test server - /// and once the shutdown has finished, the OK status is sent to terminate - /// this RPC. - /// </summary> - AsyncDuplexStreamingCall<global::Grpc.Testing.ServerArgs, global::Grpc.Testing.ServerStatus> RunServer(CallOptions options); - /// <summary> - /// Start client with specified workload. - /// First request sent specifies the ClientConfig followed by ClientStatus - /// response. After that, a "Mark" can be sent anytime to request the latest - /// stats. Closing the stream will initiate shutdown of the test client - /// and once the shutdown has finished, the OK status is sent to terminate - /// this RPC. - /// </summary> - AsyncDuplexStreamingCall<global::Grpc.Testing.ClientArgs, global::Grpc.Testing.ClientStatus> RunClient(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// Start client with specified workload. - /// First request sent specifies the ClientConfig followed by ClientStatus - /// response. After that, a "Mark" can be sent anytime to request the latest - /// stats. Closing the stream will initiate shutdown of the test client - /// and once the shutdown has finished, the OK status is sent to terminate - /// this RPC. - /// </summary> - AsyncDuplexStreamingCall<global::Grpc.Testing.ClientArgs, global::Grpc.Testing.ClientStatus> RunClient(CallOptions options); - /// <summary> - /// Just return the core count - unary call - /// </summary> - global::Grpc.Testing.CoreResponse CoreCount(global::Grpc.Testing.CoreRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// Just return the core count - unary call - /// </summary> - global::Grpc.Testing.CoreResponse CoreCount(global::Grpc.Testing.CoreRequest request, CallOptions options); - /// <summary> - /// Just return the core count - unary call - /// </summary> - AsyncUnaryCall<global::Grpc.Testing.CoreResponse> CoreCountAsync(global::Grpc.Testing.CoreRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// Just return the core count - unary call - /// </summary> - AsyncUnaryCall<global::Grpc.Testing.CoreResponse> CoreCountAsync(global::Grpc.Testing.CoreRequest request, CallOptions options); - /// <summary> - /// Quit this worker - /// </summary> - global::Grpc.Testing.Void QuitWorker(global::Grpc.Testing.Void request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// Quit this worker - /// </summary> - global::Grpc.Testing.Void QuitWorker(global::Grpc.Testing.Void request, CallOptions options); - /// <summary> - /// Quit this worker - /// </summary> - AsyncUnaryCall<global::Grpc.Testing.Void> QuitWorkerAsync(global::Grpc.Testing.Void request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// Quit this worker - /// </summary> - AsyncUnaryCall<global::Grpc.Testing.Void> QuitWorkerAsync(global::Grpc.Testing.Void request, CallOptions options); - } - - /// <summary>Interface of server-side implementations of WorkerService</summary> - [System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")] - public interface IWorkerService - { - /// <summary> - /// Start server with specified workload. - /// First request sent specifies the ServerConfig followed by ServerStatus - /// response. After that, a "Mark" can be sent anytime to request the latest - /// stats. Closing the stream will initiate shutdown of the test server - /// and once the shutdown has finished, the OK status is sent to terminate - /// this RPC. - /// </summary> - global::System.Threading.Tasks.Task RunServer(IAsyncStreamReader<global::Grpc.Testing.ServerArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ServerStatus> responseStream, ServerCallContext context); - /// <summary> - /// Start client with specified workload. - /// First request sent specifies the ClientConfig followed by ClientStatus - /// response. After that, a "Mark" can be sent anytime to request the latest - /// stats. Closing the stream will initiate shutdown of the test client - /// and once the shutdown has finished, the OK status is sent to terminate - /// this RPC. - /// </summary> - global::System.Threading.Tasks.Task RunClient(IAsyncStreamReader<global::Grpc.Testing.ClientArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ClientStatus> responseStream, ServerCallContext context); - /// <summary> - /// Just return the core count - unary call - /// </summary> - global::System.Threading.Tasks.Task<global::Grpc.Testing.CoreResponse> CoreCount(global::Grpc.Testing.CoreRequest request, ServerCallContext context); - /// <summary> - /// Quit this worker - /// </summary> - global::System.Threading.Tasks.Task<global::Grpc.Testing.Void> QuitWorker(global::Grpc.Testing.Void request, ServerCallContext context); - } - /// <summary>Base class for server-side implementations of WorkerService</summary> public abstract class WorkerServiceBase { @@ -443,9 +271,7 @@ namespace Grpc.Testing { } /// <summary>Client for WorkerService</summary> - #pragma warning disable 0618 - public class WorkerServiceClient : ClientBase<WorkerServiceClient>, IWorkerServiceClient - #pragma warning restore 0618 + public class WorkerServiceClient : ClientBase<WorkerServiceClient> { public WorkerServiceClient(Channel channel) : base(channel) { @@ -579,23 +405,9 @@ namespace Grpc.Testing { } /// <summary>Creates service definition that can be registered with a server</summary> - #pragma warning disable 0618 - public static ServerServiceDefinition BindService(IWorkerService serviceImpl) - #pragma warning restore 0618 - { - return ServerServiceDefinition.CreateBuilder(__ServiceName) - .AddMethod(__Method_RunServer, serviceImpl.RunServer) - .AddMethod(__Method_RunClient, serviceImpl.RunClient) - .AddMethod(__Method_CoreCount, serviceImpl.CoreCount) - .AddMethod(__Method_QuitWorker, serviceImpl.QuitWorker).Build(); - } - - /// <summary>Creates service definition that can be registered with a server</summary> - #pragma warning disable 0618 public static ServerServiceDefinition BindService(WorkerServiceBase serviceImpl) - #pragma warning restore 0618 { - return ServerServiceDefinition.CreateBuilder(__ServiceName) + return ServerServiceDefinition.CreateBuilder() .AddMethod(__Method_RunServer, serviceImpl.RunServer) .AddMethod(__Method_RunClient, serviceImpl.RunClient) .AddMethod(__Method_CoreCount, serviceImpl.CoreCount) diff --git a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs index cf43a77118..6c252013f8 100644 --- a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs +++ b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs @@ -105,127 +105,6 @@ namespace Grpc.Testing { get { return global::Grpc.Testing.TestReflection.Descriptor.Services[0]; } } - /// <summary>Client for TestService</summary> - [System.Obsolete("Client side interfaced will be removed in the next release. Use client class directly.")] - public interface ITestServiceClient - { - /// <summary> - /// One empty request followed by one empty response. - /// </summary> - global::Grpc.Testing.Empty EmptyCall(global::Grpc.Testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// One empty request followed by one empty response. - /// </summary> - global::Grpc.Testing.Empty EmptyCall(global::Grpc.Testing.Empty request, CallOptions options); - /// <summary> - /// One empty request followed by one empty response. - /// </summary> - AsyncUnaryCall<global::Grpc.Testing.Empty> EmptyCallAsync(global::Grpc.Testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// One empty request followed by one empty response. - /// </summary> - AsyncUnaryCall<global::Grpc.Testing.Empty> EmptyCallAsync(global::Grpc.Testing.Empty request, CallOptions options); - /// <summary> - /// One request followed by one response. - /// </summary> - global::Grpc.Testing.SimpleResponse UnaryCall(global::Grpc.Testing.SimpleRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// One request followed by one response. - /// </summary> - global::Grpc.Testing.SimpleResponse UnaryCall(global::Grpc.Testing.SimpleRequest request, CallOptions options); - /// <summary> - /// One request followed by one response. - /// </summary> - AsyncUnaryCall<global::Grpc.Testing.SimpleResponse> UnaryCallAsync(global::Grpc.Testing.SimpleRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// One request followed by one response. - /// </summary> - AsyncUnaryCall<global::Grpc.Testing.SimpleResponse> UnaryCallAsync(global::Grpc.Testing.SimpleRequest request, CallOptions options); - /// <summary> - /// One request followed by a sequence of responses (streamed download). - /// The server returns the payload with client desired type and sizes. - /// </summary> - AsyncServerStreamingCall<global::Grpc.Testing.StreamingOutputCallResponse> StreamingOutputCall(global::Grpc.Testing.StreamingOutputCallRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// One request followed by a sequence of responses (streamed download). - /// The server returns the payload with client desired type and sizes. - /// </summary> - AsyncServerStreamingCall<global::Grpc.Testing.StreamingOutputCallResponse> StreamingOutputCall(global::Grpc.Testing.StreamingOutputCallRequest request, CallOptions options); - /// <summary> - /// A sequence of requests followed by one response (streamed upload). - /// The server returns the aggregated size of client payload as the result. - /// </summary> - AsyncClientStreamingCall<global::Grpc.Testing.StreamingInputCallRequest, global::Grpc.Testing.StreamingInputCallResponse> StreamingInputCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// A sequence of requests followed by one response (streamed upload). - /// The server returns the aggregated size of client payload as the result. - /// </summary> - AsyncClientStreamingCall<global::Grpc.Testing.StreamingInputCallRequest, global::Grpc.Testing.StreamingInputCallResponse> StreamingInputCall(CallOptions options); - /// <summary> - /// A sequence of requests with each request served by the server immediately. - /// As one request could lead to multiple responses, this interface - /// demonstrates the idea of full duplexing. - /// </summary> - AsyncDuplexStreamingCall<global::Grpc.Testing.StreamingOutputCallRequest, global::Grpc.Testing.StreamingOutputCallResponse> FullDuplexCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// A sequence of requests with each request served by the server immediately. - /// As one request could lead to multiple responses, this interface - /// demonstrates the idea of full duplexing. - /// </summary> - AsyncDuplexStreamingCall<global::Grpc.Testing.StreamingOutputCallRequest, global::Grpc.Testing.StreamingOutputCallResponse> FullDuplexCall(CallOptions options); - /// <summary> - /// A sequence of requests followed by a sequence of responses. - /// The server buffers all the client requests and then serves them in order. A - /// stream of responses are returned to the client when the server starts with - /// first request. - /// </summary> - AsyncDuplexStreamingCall<global::Grpc.Testing.StreamingOutputCallRequest, global::Grpc.Testing.StreamingOutputCallResponse> HalfDuplexCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// A sequence of requests followed by a sequence of responses. - /// The server buffers all the client requests and then serves them in order. A - /// stream of responses are returned to the client when the server starts with - /// first request. - /// </summary> - AsyncDuplexStreamingCall<global::Grpc.Testing.StreamingOutputCallRequest, global::Grpc.Testing.StreamingOutputCallResponse> HalfDuplexCall(CallOptions options); - } - - /// <summary>Interface of server-side implementations of TestService</summary> - [System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")] - public interface ITestService - { - /// <summary> - /// One empty request followed by one empty response. - /// </summary> - global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> EmptyCall(global::Grpc.Testing.Empty request, ServerCallContext context); - /// <summary> - /// One request followed by one response. - /// </summary> - global::System.Threading.Tasks.Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context); - /// <summary> - /// One request followed by a sequence of responses (streamed download). - /// The server returns the payload with client desired type and sizes. - /// </summary> - global::System.Threading.Tasks.Task StreamingOutputCall(global::Grpc.Testing.StreamingOutputCallRequest request, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context); - /// <summary> - /// A sequence of requests followed by one response (streamed upload). - /// The server returns the aggregated size of client payload as the result. - /// </summary> - global::System.Threading.Tasks.Task<global::Grpc.Testing.StreamingInputCallResponse> StreamingInputCall(IAsyncStreamReader<global::Grpc.Testing.StreamingInputCallRequest> requestStream, ServerCallContext context); - /// <summary> - /// A sequence of requests with each request served by the server immediately. - /// As one request could lead to multiple responses, this interface - /// demonstrates the idea of full duplexing. - /// </summary> - global::System.Threading.Tasks.Task FullDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context); - /// <summary> - /// A sequence of requests followed by a sequence of responses. - /// The server buffers all the client requests and then serves them in order. A - /// stream of responses are returned to the client when the server starts with - /// first request. - /// </summary> - global::System.Threading.Tasks.Task HalfDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context); - } - /// <summary>Base class for server-side implementations of TestService</summary> public abstract class TestServiceBase { @@ -287,9 +166,7 @@ namespace Grpc.Testing { } /// <summary>Client for TestService</summary> - #pragma warning disable 0618 - public class TestServiceClient : ClientBase<TestServiceClient>, ITestServiceClient - #pragma warning restore 0618 + public class TestServiceClient : ClientBase<TestServiceClient> { public TestServiceClient(Channel channel) : base(channel) { @@ -445,25 +322,9 @@ namespace Grpc.Testing { } /// <summary>Creates service definition that can be registered with a server</summary> - #pragma warning disable 0618 - public static ServerServiceDefinition BindService(ITestService serviceImpl) - #pragma warning restore 0618 - { - return ServerServiceDefinition.CreateBuilder(__ServiceName) - .AddMethod(__Method_EmptyCall, serviceImpl.EmptyCall) - .AddMethod(__Method_UnaryCall, serviceImpl.UnaryCall) - .AddMethod(__Method_StreamingOutputCall, serviceImpl.StreamingOutputCall) - .AddMethod(__Method_StreamingInputCall, serviceImpl.StreamingInputCall) - .AddMethod(__Method_FullDuplexCall, serviceImpl.FullDuplexCall) - .AddMethod(__Method_HalfDuplexCall, serviceImpl.HalfDuplexCall).Build(); - } - - /// <summary>Creates service definition that can be registered with a server</summary> - #pragma warning disable 0618 public static ServerServiceDefinition BindService(TestServiceBase serviceImpl) - #pragma warning restore 0618 { - return ServerServiceDefinition.CreateBuilder(__ServiceName) + return ServerServiceDefinition.CreateBuilder() .AddMethod(__Method_EmptyCall, serviceImpl.EmptyCall) .AddMethod(__Method_UnaryCall, serviceImpl.UnaryCall) .AddMethod(__Method_StreamingOutputCall, serviceImpl.StreamingOutputCall) @@ -496,38 +357,6 @@ namespace Grpc.Testing { get { return global::Grpc.Testing.TestReflection.Descriptor.Services[1]; } } - /// <summary>Client for UnimplementedService</summary> - [System.Obsolete("Client side interfaced will be removed in the next release. Use client class directly.")] - public interface IUnimplementedServiceClient - { - /// <summary> - /// A call that no server should implement - /// </summary> - global::Grpc.Testing.Empty UnimplementedCall(global::Grpc.Testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// A call that no server should implement - /// </summary> - global::Grpc.Testing.Empty UnimplementedCall(global::Grpc.Testing.Empty request, CallOptions options); - /// <summary> - /// A call that no server should implement - /// </summary> - AsyncUnaryCall<global::Grpc.Testing.Empty> UnimplementedCallAsync(global::Grpc.Testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - /// <summary> - /// A call that no server should implement - /// </summary> - AsyncUnaryCall<global::Grpc.Testing.Empty> UnimplementedCallAsync(global::Grpc.Testing.Empty request, CallOptions options); - } - - /// <summary>Interface of server-side implementations of UnimplementedService</summary> - [System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")] - public interface IUnimplementedService - { - /// <summary> - /// A call that no server should implement - /// </summary> - global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> UnimplementedCall(global::Grpc.Testing.Empty request, ServerCallContext context); - } - /// <summary>Base class for server-side implementations of UnimplementedService</summary> public abstract class UnimplementedServiceBase { @@ -542,9 +371,7 @@ namespace Grpc.Testing { } /// <summary>Client for UnimplementedService</summary> - #pragma warning disable 0618 - public class UnimplementedServiceClient : ClientBase<UnimplementedServiceClient>, IUnimplementedServiceClient - #pragma warning restore 0618 + public class UnimplementedServiceClient : ClientBase<UnimplementedServiceClient> { public UnimplementedServiceClient(Channel channel) : base(channel) { @@ -602,20 +429,9 @@ namespace Grpc.Testing { } /// <summary>Creates service definition that can be registered with a server</summary> - #pragma warning disable 0618 - public static ServerServiceDefinition BindService(IUnimplementedService serviceImpl) - #pragma warning restore 0618 - { - return ServerServiceDefinition.CreateBuilder(__ServiceName) - .AddMethod(__Method_UnimplementedCall, serviceImpl.UnimplementedCall).Build(); - } - - /// <summary>Creates service definition that can be registered with a server</summary> - #pragma warning disable 0618 public static ServerServiceDefinition BindService(UnimplementedServiceBase serviceImpl) - #pragma warning restore 0618 { - return ServerServiceDefinition.CreateBuilder(__ServiceName) + return ServerServiceDefinition.CreateBuilder() .AddMethod(__Method_UnimplementedCall, serviceImpl.UnimplementedCall).Build(); } @@ -651,28 +467,6 @@ namespace Grpc.Testing { get { return global::Grpc.Testing.TestReflection.Descriptor.Services[2]; } } - /// <summary>Client for ReconnectService</summary> - [System.Obsolete("Client side interfaced will be removed in the next release. Use client class directly.")] - public interface IReconnectServiceClient - { - global::Grpc.Testing.Empty Start(global::Grpc.Testing.ReconnectParams request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - global::Grpc.Testing.Empty Start(global::Grpc.Testing.ReconnectParams request, CallOptions options); - AsyncUnaryCall<global::Grpc.Testing.Empty> StartAsync(global::Grpc.Testing.ReconnectParams request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - AsyncUnaryCall<global::Grpc.Testing.Empty> StartAsync(global::Grpc.Testing.ReconnectParams request, CallOptions options); - global::Grpc.Testing.ReconnectInfo Stop(global::Grpc.Testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - global::Grpc.Testing.ReconnectInfo Stop(global::Grpc.Testing.Empty request, CallOptions options); - AsyncUnaryCall<global::Grpc.Testing.ReconnectInfo> StopAsync(global::Grpc.Testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); - AsyncUnaryCall<global::Grpc.Testing.ReconnectInfo> StopAsync(global::Grpc.Testing.Empty request, CallOptions options); - } - - /// <summary>Interface of server-side implementations of ReconnectService</summary> - [System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")] - public interface IReconnectService - { - global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> Start(global::Grpc.Testing.ReconnectParams request, ServerCallContext context); - global::System.Threading.Tasks.Task<global::Grpc.Testing.ReconnectInfo> Stop(global::Grpc.Testing.Empty request, ServerCallContext context); - } - /// <summary>Base class for server-side implementations of ReconnectService</summary> public abstract class ReconnectServiceBase { @@ -689,9 +483,7 @@ namespace Grpc.Testing { } /// <summary>Client for ReconnectService</summary> - #pragma warning disable 0618 - public class ReconnectServiceClient : ClientBase<ReconnectServiceClient>, IReconnectServiceClient - #pragma warning restore 0618 + public class ReconnectServiceClient : ClientBase<ReconnectServiceClient> { public ReconnectServiceClient(Channel channel) : base(channel) { @@ -753,21 +545,9 @@ namespace Grpc.Testing { } /// <summary>Creates service definition that can be registered with a server</summary> - #pragma warning disable 0618 - public static ServerServiceDefinition BindService(IReconnectService serviceImpl) - #pragma warning restore 0618 - { - return ServerServiceDefinition.CreateBuilder(__ServiceName) - .AddMethod(__Method_Start, serviceImpl.Start) - .AddMethod(__Method_Stop, serviceImpl.Stop).Build(); - } - - /// <summary>Creates service definition that can be registered with a server</summary> - #pragma warning disable 0618 public static ServerServiceDefinition BindService(ReconnectServiceBase serviceImpl) - #pragma warning restore 0618 { - return ServerServiceDefinition.CreateBuilder(__ServiceName) + return ServerServiceDefinition.CreateBuilder() .AddMethod(__Method_Start, serviceImpl.Start) .AddMethod(__Method_Stop, serviceImpl.Stop).Build(); } diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 5b8ff9b819..4782654250 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -45,7 +45,7 @@ #include <string.h> -#ifdef GPR_WIN32 +#ifdef GPR_WINDOWS #define GPR_EXPORT __declspec(dllexport) #define GPR_CALLTYPE __stdcall #endif @@ -503,6 +503,7 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx, grpc_metadata_array *initial_metadata, uint32_t write_flags) { /* TODO: don't use magic number */ grpc_op ops[6]; + memset(ops, 0, sizeof(ops)); ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); @@ -555,6 +556,7 @@ grpcsharp_call_start_client_streaming(grpc_call *call, grpc_metadata_array *initial_metadata) { /* TODO: don't use magic number */ grpc_op ops[4]; + memset(ops, 0, sizeof(ops)); ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); @@ -596,6 +598,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( size_t send_buffer_len, grpc_metadata_array *initial_metadata, uint32_t write_flags) { /* TODO: don't use magic number */ grpc_op ops[4]; + memset(ops, 0, sizeof(ops)); ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); @@ -638,6 +641,7 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call, grpc_metadata_array *initial_metadata) { /* TODO: don't use magic number */ grpc_op ops[2]; + memset(ops, 0, sizeof(ops)); ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); @@ -684,6 +688,7 @@ grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx, int32_t send_empty_initial_metadata) { /* TODO: don't use magic number */ grpc_op ops[2]; + memset(ops, 0, sizeof(ops)); size_t nops = send_empty_initial_metadata ? 2 : 1; ops[0].op = GRPC_OP_SEND_MESSAGE; ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); @@ -691,8 +696,6 @@ grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx, ops[0].flags = write_flags; ops[0].reserved = NULL; ops[1].op = GRPC_OP_SEND_INITIAL_METADATA; - ops[1].data.send_initial_metadata.count = 0; - ops[1].data.send_initial_metadata.metadata = NULL; ops[1].flags = 0; ops[1].reserved = NULL; @@ -719,6 +722,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server( size_t optional_send_buffer_len, uint32_t write_flags) { /* TODO: don't use magic number */ grpc_op ops[3]; + memset(ops, 0, sizeof(ops)); size_t nops = 1; ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER; ops[0].data.send_status_from_server.status = status_code; @@ -743,8 +747,6 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server( } if (send_empty_initial_metadata) { ops[nops].op = GRPC_OP_SEND_INITIAL_METADATA; - ops[nops].data.send_initial_metadata.count = 0; - ops[nops].data.send_initial_metadata.metadata = NULL; ops[nops].flags = 0; ops[nops].reserved = NULL; nops++; @@ -784,6 +786,7 @@ grpcsharp_call_send_initial_metadata(grpc_call *call, grpc_metadata_array *initial_metadata) { /* TODO: don't use magic number */ grpc_op ops[1]; + memset(ops, 0, sizeof(ops)); ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); @@ -806,11 +809,14 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_set_credentials( /* Server */ GPR_EXPORT grpc_server *GPR_CALLTYPE -grpcsharp_server_create(grpc_completion_queue *cq, - const grpc_channel_args *args) { - grpc_server *server = grpc_server_create(args, NULL); +grpcsharp_server_create(const grpc_channel_args *args) { + return grpc_server_create(args, NULL); +} + +GPR_EXPORT void GPR_CALLTYPE +grpcsharp_server_register_completion_queue(grpc_server *server, + grpc_completion_queue *cq) { grpc_server_register_completion_queue(server, cq, NULL); - return server; } GPR_EXPORT int32_t GPR_CALLTYPE diff --git a/src/csharp/tests.json b/src/csharp/tests.json index f6af3408d5..7e7aee1093 100644 --- a/src/csharp/tests.json +++ b/src/csharp/tests.json @@ -7,6 +7,7 @@ "Grpc.Core.Internal.Tests.CompletionQueueSafeHandleTest", "Grpc.Core.Internal.Tests.MetadataArraySafeHandleTest", "Grpc.Core.Internal.Tests.TimespecTest", + "Grpc.Core.Tests.AppDomainUnloadTest", "Grpc.Core.Tests.CallCredentialsTest", "Grpc.Core.Tests.CallOptionsTest", "Grpc.Core.Tests.ChannelCredentialsTest", @@ -25,6 +26,9 @@ "Grpc.Core.Tests.ResponseHeadersTest", "Grpc.Core.Tests.SanityTest", "Grpc.Core.Tests.ServerTest", + "Grpc.Core.Tests.ShutdownHookClientTest", + "Grpc.Core.Tests.ShutdownHookPendingCallTest", + "Grpc.Core.Tests.ShutdownHookServerTest", "Grpc.Core.Tests.ShutdownTest", "Grpc.Core.Tests.TimeoutsTest", "Grpc.Core.Tests.UserAgentStringTest" |