diff options
author | Michael Lumish <mlumish@google.com> | 2015-08-21 11:52:03 -0400 |
---|---|---|
committer | Michael Lumish <mlumish@google.com> | 2015-08-21 11:52:03 -0400 |
commit | a753ba130996dcb2a80a468eb7e42dea7d55622e (patch) | |
tree | bb2506df03494df7f41853c26c7c863f8dae4ca1 | |
parent | 10cab1396fd6fce4539f6368bda6ea908e1ce27b (diff) | |
parent | e4134ddf6c64bbd4bdb6084b7295eedcf9fffcef (diff) |
Merge pull request #3018 from jtattermusch/get_rid_of_shutdown
C# fixes and beta API polishing
31 files changed, 329 insertions, 180 deletions
diff --git a/src/csharp/Grpc.Core.Tests/ChannelTest.cs b/src/csharp/Grpc.Core.Tests/ChannelTest.cs index 2787572924..dfbd92879e 100644 --- a/src/csharp/Grpc.Core.Tests/ChannelTest.cs +++ b/src/csharp/Grpc.Core.Tests/ChannelTest.cs @@ -41,12 +41,6 @@ namespace Grpc.Core.Tests { public class ChannelTest { - [TestFixtureTearDown] - public void CleanupClass() - { - GrpcEnvironment.Shutdown(); - } - [Test] public void Constructor_RejectsInvalidParams() { @@ -56,36 +50,33 @@ namespace Grpc.Core.Tests [Test] public void State_IdleAfterCreation() { - using (var channel = new Channel("localhost", Credentials.Insecure)) - { - Assert.AreEqual(ChannelState.Idle, channel.State); - } + var channel = new Channel("localhost", Credentials.Insecure); + Assert.AreEqual(ChannelState.Idle, channel.State); + channel.ShutdownAsync().Wait(); } [Test] public void WaitForStateChangedAsync_InvalidArgument() { - using (var channel = new Channel("localhost", Credentials.Insecure)) - { - Assert.Throws(typeof(ArgumentException), () => channel.WaitForStateChangedAsync(ChannelState.FatalFailure)); - } + var channel = new Channel("localhost", Credentials.Insecure); + Assert.Throws(typeof(ArgumentException), () => channel.WaitForStateChangedAsync(ChannelState.FatalFailure)); + channel.ShutdownAsync().Wait(); } [Test] public void ResolvedTarget() { - using (var channel = new Channel("127.0.0.1", Credentials.Insecure)) - { - Assert.IsTrue(channel.ResolvedTarget.Contains("127.0.0.1")); - } + var channel = new Channel("127.0.0.1", Credentials.Insecure); + Assert.IsTrue(channel.ResolvedTarget.Contains("127.0.0.1")); + channel.ShutdownAsync().Wait(); } [Test] - public void Dispose_IsIdempotent() + public void Shutdown_AllowedOnlyOnce() { var channel = new Channel("localhost", Credentials.Insecure); - channel.Dispose(); - channel.Dispose(); + channel.ShutdownAsync().Wait(); + Assert.Throws(typeof(InvalidOperationException), () => channel.ShutdownAsync().GetAwaiter().GetResult()); } } } diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index e49fdb5268..68279a2007 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -63,16 +63,10 @@ namespace Grpc.Core.Tests [TearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); } - [TestFixtureTearDown] - public void CleanupClass() - { - GrpcEnvironment.Shutdown(); - } - [Test] public async Task UnaryCall() { @@ -208,13 +202,6 @@ namespace Grpc.Core.Tests } [Test] - public void UnaryCall_DisposedChannel() - { - channel.Dispose(); - Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC")); - } - - [Test] public void UnaryCallPerformance() { helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => diff --git a/src/csharp/Grpc.Core.Tests/CompressionTest.cs b/src/csharp/Grpc.Core.Tests/CompressionTest.cs index 9547683f60..378c81851c 100644 --- a/src/csharp/Grpc.Core.Tests/CompressionTest.cs +++ b/src/csharp/Grpc.Core.Tests/CompressionTest.cs @@ -62,16 +62,10 @@ namespace Grpc.Core.Tests [TearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); } - [TestFixtureTearDown] - public void CleanupClass() - { - GrpcEnvironment.Shutdown(); - } - [Test] public void WriteOptions_Unary() { diff --git a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs index db5f953b0e..2db3f286f7 100644 --- a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs +++ b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs @@ -62,16 +62,10 @@ namespace Grpc.Core.Tests [TearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); } - [TestFixtureTearDown] - public void CleanupClass() - { - GrpcEnvironment.Shutdown(); - } - [Test] public async Task PropagateCancellation() { diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index 829effc9a2..ad4e94a695 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -64,6 +64,7 @@ <Link>Version.cs</Link> </Compile> <Compile Include="ClientBaseTest.cs" /> + <Compile Include="ShutdownTest.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="ClientServerTest.cs" /> <Compile Include="ServerTest.cs" /> diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs index 4ed93c7eca..4fdfab5a99 100644 --- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs +++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs @@ -43,34 +43,40 @@ namespace Grpc.Core.Tests [Test] public void InitializeAndShutdownGrpcEnvironment() { - var env = GrpcEnvironment.GetInstance(); + var env = GrpcEnvironment.AddRef(); Assert.IsNotNull(env.CompletionQueue); - GrpcEnvironment.Shutdown(); + GrpcEnvironment.Release(); } [Test] public void SubsequentInvocations() { - var env1 = GrpcEnvironment.GetInstance(); - var env2 = GrpcEnvironment.GetInstance(); + var env1 = GrpcEnvironment.AddRef(); + var env2 = GrpcEnvironment.AddRef(); Assert.IsTrue(object.ReferenceEquals(env1, env2)); - GrpcEnvironment.Shutdown(); - GrpcEnvironment.Shutdown(); + GrpcEnvironment.Release(); + GrpcEnvironment.Release(); } [Test] public void InitializeAfterShutdown() { - var env1 = GrpcEnvironment.GetInstance(); - GrpcEnvironment.Shutdown(); + var env1 = GrpcEnvironment.AddRef(); + GrpcEnvironment.Release(); - var env2 = GrpcEnvironment.GetInstance(); - GrpcEnvironment.Shutdown(); + var env2 = GrpcEnvironment.AddRef(); + GrpcEnvironment.Release(); Assert.IsFalse(object.ReferenceEquals(env1, env2)); } [Test] + public void ReleaseWithoutAddRef() + { + Assert.Throws(typeof(InvalidOperationException), () => GrpcEnvironment.Release()); + } + + [Test] public void GetCoreVersionString() { var coreVersion = GrpcEnvironment.GetCoreVersionString(); diff --git a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs index 981b8ea3c8..706006702e 100644 --- a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs +++ b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs @@ -69,16 +69,10 @@ namespace Grpc.Core.Tests [TearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); } - [TestFixtureTearDown] - public void CleanupClass() - { - GrpcEnvironment.Shutdown(); - } - [Test] public void WriteResponseHeaders_NullNotAllowed() { diff --git a/src/csharp/Grpc.Core.Tests/ServerTest.cs b/src/csharp/Grpc.Core.Tests/ServerTest.cs index 485006ebac..e7193c843b 100644 --- a/src/csharp/Grpc.Core.Tests/ServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ServerTest.cs @@ -51,7 +51,6 @@ namespace Grpc.Core.Tests }; server.Start(); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } [Test] @@ -67,8 +66,7 @@ namespace Grpc.Core.Tests Assert.Greater(boundPort.BoundPort, 0); server.Start(); - server.ShutdownAsync(); - GrpcEnvironment.Shutdown(); + server.ShutdownAsync().Wait(); } [Test] @@ -83,7 +81,6 @@ namespace Grpc.Core.Tests Assert.Throws(typeof(InvalidOperationException), () => server.Services.Add(ServerServiceDefinition.CreateBuilder("serviceName").Build())); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } } } diff --git a/src/csharp/Grpc.Core.Tests/ShutdownTest.cs b/src/csharp/Grpc.Core.Tests/ShutdownTest.cs new file mode 100644 index 0000000000..a2be7ddd5e --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/ShutdownTest.cs @@ -0,0 +1,77 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Tests +{ + public class ShutdownTest + { + const string Host = "127.0.0.1"; + + MockServiceHelper helper; + Server server; + Channel channel; + + [SetUp] + public void Init() + { + helper = new MockServiceHelper(Host); + server = helper.GetServer(); + server.Start(); + channel = helper.GetChannel(); + } + + [Test] + public async Task AbandonedCall() + { + helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) => + { + await requestStream.ToListAsync(); + }); + + var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall(new CallOptions(deadline: DateTime.UtcNow.AddMilliseconds(1)))); + + channel.ShutdownAsync().Wait(); + server.ShutdownAsync().Wait(); + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs index d875d601b9..41f661f62d 100644 --- a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs +++ b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs @@ -65,16 +65,10 @@ namespace Grpc.Core.Tests [TearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); } - [TestFixtureTearDown] - public void CleanupClass() - { - GrpcEnvironment.Shutdown(); - } - [Test] public void InfiniteDeadline() { diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 64c6adf2bf..2f8519dfa3 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -45,14 +45,19 @@ namespace Grpc.Core /// <summary> /// gRPC Channel /// </summary> - public class Channel : IDisposable + public class Channel { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Channel>(); + readonly object myLock = new object(); + readonly AtomicCounter activeCallCounter = new AtomicCounter(); + readonly string target; readonly GrpcEnvironment environment; readonly ChannelSafeHandle handle; readonly List<ChannelOption> options; + + bool shutdownRequested; bool disposed; /// <summary> @@ -65,7 +70,7 @@ namespace Grpc.Core public Channel(string target, Credentials credentials, IEnumerable<ChannelOption> options = null) { this.target = Preconditions.CheckNotNull(target, "target"); - this.environment = GrpcEnvironment.GetInstance(); + this.environment = GrpcEnvironment.AddRef(); this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); EnsureUserAgentChannelOption(this.options); @@ -172,12 +177,26 @@ namespace Grpc.Core } /// <summary> - /// Destroys the underlying channel. + /// Waits until there are no more active calls for this channel and then cleans up + /// resources used by this channel. /// </summary> - public void Dispose() + public async Task ShutdownAsync() { - Dispose(true); - GC.SuppressFinalize(this); + lock (myLock) + { + Preconditions.CheckState(!shutdownRequested); + shutdownRequested = true; + } + + var activeCallCount = activeCallCounter.Count; + if (activeCallCount > 0) + { + Logger.Warning("Channel shutdown was called but there are still {0} active calls for that channel.", activeCallCount); + } + + handle.Dispose(); + + await Task.Run(() => GrpcEnvironment.Release()); } internal ChannelSafeHandle Handle @@ -196,13 +215,20 @@ namespace Grpc.Core } } - protected virtual void Dispose(bool disposing) + internal void AddCallReference(object call) { - if (disposing && handle != null && !disposed) - { - disposed = true; - handle.Dispose(); - } + activeCallCounter.Increment(); + + bool success = false; + handle.DangerousAddRef(ref success); + Preconditions.CheckState(success); + } + + internal void RemoveCallReference(object call) + { + handle.DangerousRelease(); + + activeCallCounter.Decrement(); } private static void EnsureUserAgentChannelOption(List<ChannelOption> options) diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 30d8c80235..0a44eead74 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -58,6 +58,7 @@ namespace Grpc.Core static object staticLock = new object(); static GrpcEnvironment instance; + static int refCount; static ILogger logger = new ConsoleLogger(); @@ -67,13 +68,14 @@ namespace Grpc.Core bool isClosed; /// <summary> - /// Returns an instance of initialized gRPC environment. - /// Subsequent invocations return the same instance unless Shutdown has been called first. + /// Returns a reference-counted instance of initialized gRPC environment. + /// Subsequent invocations return the same instance unless reference count has dropped to zero previously. /// </summary> - internal static GrpcEnvironment GetInstance() + internal static GrpcEnvironment AddRef() { lock (staticLock) { + refCount++; if (instance == null) { instance = new GrpcEnvironment(); @@ -83,14 +85,16 @@ namespace Grpc.Core } /// <summary> - /// Shuts down the gRPC environment if it was initialized before. - /// Blocks until the environment has been fully shutdown. + /// Decrements the reference count for currently active environment and shuts down the gRPC environment if reference count drops to zero. + /// (and blocks until the environment has been fully shutdown). /// </summary> - public static void Shutdown() + internal static void Release() { lock (staticLock) { - if (instance != null) + Preconditions.CheckState(refCount > 0); + refCount--; + if (refCount == 0) { instance.Close(); instance = null; @@ -125,12 +129,10 @@ namespace Grpc.Core private GrpcEnvironment() { NativeLogRedirector.Redirect(); - grpcsharp_init(); + GrpcNativeInit(); completionRegistry = new CompletionRegistry(this); threadPool = new GrpcThreadPool(this, THREAD_POOL_SIZE); threadPool.Start(); - // TODO: use proper logging here - Logger.Info("gRPC initialized."); } /// <summary> @@ -175,6 +177,17 @@ namespace Grpc.Core return Marshal.PtrToStringAnsi(ptr); } + + internal static void GrpcNativeInit() + { + grpcsharp_init(); + } + + internal static void GrpcNativeShutdown() + { + grpcsharp_shutdown(); + } + /// <summary> /// Shuts down this environment. /// </summary> @@ -185,12 +198,10 @@ namespace Grpc.Core throw new InvalidOperationException("Close has already been called"); } threadPool.Stop(); - grpcsharp_shutdown(); + GrpcNativeShutdown(); isClosed = true; debugStats.CheckOK(); - - Logger.Info("gRPC shutdown."); } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 2c3e3d75ea..bb9ba5b8dd 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -311,9 +311,9 @@ namespace Grpc.Core.Internal } } - protected override void OnReleaseResources() + protected override void OnAfterReleaseResources() { - details.Channel.Environment.DebugStats.ActiveClientCalls.Decrement(); + details.Channel.RemoveCallReference(this); } private void Initialize(CompletionQueueSafeHandle cq) @@ -323,7 +323,9 @@ namespace Grpc.Core.Internal var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry, parentCall, ContextPropagationToken.DefaultMask, cq, details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value)); - details.Channel.Environment.DebugStats.ActiveClientCalls.Increment(); + + details.Channel.AddCallReference(this); + InitializeInternal(call); RegisterCancellationCallback(); } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 6ca4bbdafc..1808294f43 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -189,15 +189,15 @@ namespace Grpc.Core.Internal private void ReleaseResources() { - OnReleaseResources(); if (call != null) { call.Dispose(); } disposed = true; + OnAfterReleaseResources(); } - protected virtual void OnReleaseResources() + protected virtual void OnAfterReleaseResources() { } @@ -212,7 +212,7 @@ namespace Grpc.Core.Internal Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); } - protected void CheckReadingAllowed() + protected virtual void CheckReadingAllowed() { Preconditions.CheckState(started); Preconditions.CheckState(!disposed); diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 3710a65d6b..6278c0191e 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -50,16 +50,19 @@ namespace Grpc.Core.Internal readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); readonly GrpcEnvironment environment; + readonly Server server; - public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment) : base(serializer, deserializer) + public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer) { this.environment = Preconditions.CheckNotNull(environment); + this.server = Preconditions.CheckNotNull(server); } public void Initialize(CallSafeHandle call) { call.SetCompletionRegistry(environment.CompletionRegistry); - environment.DebugStats.ActiveServerCalls.Increment(); + + server.AddCallReference(this); InitializeInternal(call); } @@ -168,9 +171,15 @@ namespace Grpc.Core.Internal } } - protected override void OnReleaseResources() + protected override void CheckReadingAllowed() + { + base.CheckReadingAllowed(); + Preconditions.CheckArgument(!cancelRequested); + } + + protected override void OnAfterReleaseResources() { - environment.DebugStats.ActiveServerCalls.Decrement(); + server.RemoveCallReference(this); } /// <summary> diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs index 6a2add54db..3a96414bea 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs @@ -134,7 +134,7 @@ namespace Grpc.Core.Internal } // Gets data of server_rpc_new completion. - public ServerRpcNew GetServerRpcNew() + public ServerRpcNew GetServerRpcNew(Server server) { var call = grpcsharp_batch_context_server_rpc_new_call(this); @@ -145,7 +145,7 @@ namespace Grpc.Core.Internal IntPtr metadataArrayPtr = grpcsharp_batch_context_server_rpc_new_request_metadata(this); var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr); - return new ServerRpcNew(call, method, host, deadline, metadata); + return new ServerRpcNew(server, call, method, host, deadline, metadata); } // Gets data of receive_close_on_server completion. @@ -198,14 +198,16 @@ namespace Grpc.Core.Internal /// </summary> internal struct ServerRpcNew { + readonly Server server; readonly CallSafeHandle call; readonly string method; readonly string host; readonly Timespec deadline; readonly Metadata requestMetadata; - public ServerRpcNew(CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata) + public ServerRpcNew(Server server, CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata) { + this.server = server; this.call = call; this.method = method; this.host = host; @@ -213,6 +215,14 @@ namespace Grpc.Core.Internal this.requestMetadata = requestMetadata; } + public Server Server + { + get + { + return this.server; + } + } + public CallSafeHandle Call { get diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs index 7f03bf4ea5..8cef566c14 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs @@ -68,11 +68,17 @@ namespace Grpc.Core.Internal public static ChannelSafeHandle CreateInsecure(string target, ChannelArgsSafeHandle channelArgs) { + // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle. + // Doing so would make object finalizer crash if we end up abandoning the handle. + GrpcEnvironment.GrpcNativeInit(); return grpcsharp_insecure_channel_create(target, channelArgs); } public static ChannelSafeHandle CreateSecure(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs) { + // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle. + // Doing so would make object finalizer crash if we end up abandoning the handle. + GrpcEnvironment.GrpcNativeInit(); return grpcsharp_secure_channel_create(credentials, target, channelArgs); } @@ -107,6 +113,7 @@ namespace Grpc.Core.Internal protected override bool ReleaseHandle() { grpcsharp_channel_destroy(handle); + GrpcEnvironment.GrpcNativeShutdown(); return true; } } diff --git a/src/csharp/Grpc.Core/Internal/DebugStats.cs b/src/csharp/Grpc.Core/Internal/DebugStats.cs index 8793450ff3..1bea1adf9e 100644 --- a/src/csharp/Grpc.Core/Internal/DebugStats.cs +++ b/src/csharp/Grpc.Core/Internal/DebugStats.cs @@ -38,10 +38,6 @@ namespace Grpc.Core.Internal { internal class DebugStats { - public readonly AtomicCounter ActiveClientCalls = new AtomicCounter(); - - public readonly AtomicCounter ActiveServerCalls = new AtomicCounter(); - public readonly AtomicCounter PendingBatchCompletions = new AtomicCounter(); /// <summary> @@ -49,16 +45,6 @@ namespace Grpc.Core.Internal /// </summary> public void CheckOK() { - var remainingClientCalls = ActiveClientCalls.Count; - if (remainingClientCalls != 0) - { - DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls)); - } - var remainingServerCalls = ActiveServerCalls.Count; - if (remainingServerCalls != 0) - { - DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls)); - } var pendingBatchCompletions = PendingBatchCompletions.Count; if (pendingBatchCompletions != 0) { diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index cb4c7c821e..4b7124ee74 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -83,8 +83,6 @@ namespace Grpc.Core.Internal lock (myLock) { cq.Shutdown(); - - Logger.Info("Waiting for GRPC threads to finish."); foreach (var thread in threads) { thread.Join(); @@ -136,7 +134,6 @@ namespace Grpc.Core.Internal } } while (ev.type != GRPCCompletionType.Shutdown); - Logger.Info("Completion queue has shutdown successfully, thread {0} exiting.", Thread.CurrentThread.Name); } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 688f9f6fec..59f4c5727c 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -67,7 +67,7 @@ namespace Grpc.Core.Internal var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment); + environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -123,7 +123,7 @@ namespace Grpc.Core.Internal var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment); + environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -179,7 +179,7 @@ namespace Grpc.Core.Internal var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment); + environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -239,7 +239,7 @@ namespace Grpc.Core.Internal var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment); + environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -278,7 +278,7 @@ namespace Grpc.Core.Internal { // We don't care about the payload type here. var asyncCall = new AsyncCallServer<byte[], byte[]>( - (payload) => payload, (payload) => payload, environment); + (payload) => payload, (payload) => payload, environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index f9b44b1acf..5ee7ac14e8 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -74,6 +74,9 @@ namespace Grpc.Core.Internal public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args) { + // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle. + // Doing so would make object finalizer crash if we end up abandoning the handle. + GrpcEnvironment.GrpcNativeInit(); return grpcsharp_server_create(cq, args); } @@ -109,6 +112,7 @@ namespace Grpc.Core.Internal protected override bool ReleaseHandle() { grpcsharp_server_destroy(handle); + GrpcEnvironment.GrpcNativeShutdown(); return true; } diff --git a/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs b/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs index 382481d871..35561d25d8 100644 --- a/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs +++ b/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs @@ -51,7 +51,19 @@ namespace Grpc.Core.Logging private ConsoleLogger(Type forType) { this.forType = forType; - this.forTypeString = forType != null ? forType.FullName + " " : ""; + if (forType != null) + { + var namespaceStr = forType.Namespace ?? ""; + if (namespaceStr.Length > 0) + { + namespaceStr += "."; + } + this.forTypeString = namespaceStr + forType.Name + " "; + } + else + { + this.forTypeString = ""; + } } /// <summary> diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index c76f126026..28f1686e20 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -50,6 +50,8 @@ namespace Grpc.Core { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>(); + readonly AtomicCounter activeCallCounter = new AtomicCounter(); + readonly ServiceDefinitionCollection serviceDefinitions; readonly ServerPortCollection ports; readonly GrpcEnvironment environment; @@ -73,7 +75,7 @@ namespace Grpc.Core { this.serviceDefinitions = new ServiceDefinitionCollection(this); this.ports = new ServerPortCollection(this); - this.environment = GrpcEnvironment.GetInstance(); + this.environment = GrpcEnvironment.AddRef(); this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options)) { @@ -106,6 +108,17 @@ namespace Grpc.Core } /// <summary> + /// To allow awaiting termination of the server. + /// </summary> + public Task ShutdownTask + { + get + { + return shutdownTcs.Task; + } + } + + /// <summary> /// Starts the server. /// </summary> public void Start() @@ -136,18 +149,9 @@ namespace Grpc.Core handle.ShutdownAndNotify(HandleServerShutdown, environment); await shutdownTcs.Task; - handle.Dispose(); - } + DisposeHandle(); - /// <summary> - /// To allow awaiting termination of the server. - /// </summary> - public Task ShutdownTask - { - get - { - return shutdownTcs.Task; - } + await Task.Run(() => GrpcEnvironment.Release()); } /// <summary> @@ -166,7 +170,22 @@ namespace Grpc.Core handle.ShutdownAndNotify(HandleServerShutdown, environment); handle.CancelAllCalls(); await shutdownTcs.Task; - handle.Dispose(); + DisposeHandle(); + } + + internal void AddCallReference(object call) + { + activeCallCounter.Increment(); + + bool success = false; + handle.DangerousAddRef(ref success); + Preconditions.CheckState(success); + } + + internal void RemoveCallReference(object call) + { + handle.DangerousRelease(); + activeCallCounter.Decrement(); } /// <summary> @@ -227,6 +246,16 @@ namespace Grpc.Core } } + private void DisposeHandle() + { + var activeCallCount = activeCallCounter.Count; + if (activeCallCount > 0) + { + Logger.Warning("Server shutdown has finished but there are still {0} active calls for that server.", activeCallCount); + } + handle.Dispose(); + } + /// <summary> /// Selects corresponding handler for given call and handles the call. /// </summary> @@ -254,7 +283,7 @@ namespace Grpc.Core { if (success) { - ServerRpcNew newRpc = ctx.GetServerRpcNew(); + ServerRpcNew newRpc = ctx.GetServerRpcNew(this); // after server shutdown, the callback returns with null call if (!newRpc.Call.IsInvalid) diff --git a/src/csharp/Grpc.Examples.MathClient/MathClient.cs b/src/csharp/Grpc.Examples.MathClient/MathClient.cs index f9839d99f1..abd95cb905 100644 --- a/src/csharp/Grpc.Examples.MathClient/MathClient.cs +++ b/src/csharp/Grpc.Examples.MathClient/MathClient.cs @@ -39,23 +39,21 @@ namespace math { public static void Main(string[] args) { - using (Channel channel = new Channel("127.0.0.1", 23456, Credentials.Insecure)) - { - Math.IMathClient client = new Math.MathClient(channel); - MathExamples.DivExample(client); + var channel = new Channel("127.0.0.1", 23456, Credentials.Insecure); + Math.IMathClient client = new Math.MathClient(channel); + MathExamples.DivExample(client); - MathExamples.DivAsyncExample(client).Wait(); + MathExamples.DivAsyncExample(client).Wait(); - MathExamples.FibExample(client).Wait(); + MathExamples.FibExample(client).Wait(); - MathExamples.SumExample(client).Wait(); + MathExamples.SumExample(client).Wait(); - MathExamples.DivManyExample(client).Wait(); + MathExamples.DivManyExample(client).Wait(); - MathExamples.DependendRequestsExample(client).Wait(); - } + MathExamples.DependendRequestsExample(client).Wait(); - GrpcEnvironment.Shutdown(); + channel.ShutdownAsync().Wait(); } } } diff --git a/src/csharp/Grpc.Examples.MathServer/MathServer.cs b/src/csharp/Grpc.Examples.MathServer/MathServer.cs index 5f7e717b0c..26bef646ec 100644 --- a/src/csharp/Grpc.Examples.MathServer/MathServer.cs +++ b/src/csharp/Grpc.Examples.MathServer/MathServer.cs @@ -56,7 +56,6 @@ namespace math Console.ReadKey(); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } } } diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs index fdef950f09..36c1c947bd 100644 --- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs +++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs @@ -68,9 +68,8 @@ namespace math.Tests [TestFixtureTearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } [Test] diff --git a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs index 024377e216..80c35fb197 100644 --- a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs +++ b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs @@ -71,10 +71,9 @@ namespace Grpc.HealthCheck.Tests [TestFixtureTearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } [Test] diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index 423da2801e..24c22273fb 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -120,12 +120,10 @@ namespace Grpc.IntegrationTesting }; } - using (Channel channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions)) - { - TestService.TestServiceClient client = new TestService.TestServiceClient(channel); - await RunTestCaseAsync(options.testCase, client); - } - GrpcEnvironment.Shutdown(); + var channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions); + TestService.TestServiceClient client = new TestService.TestServiceClient(channel); + await RunTestCaseAsync(options.testCase, client); + channel.ShutdownAsync().Wait(); } private async Task RunTestCaseAsync(string testCase, TestService.TestServiceClient client) @@ -171,6 +169,9 @@ namespace Grpc.IntegrationTesting case "cancel_after_first_response": await RunCancelAfterFirstResponseAsync(client); break; + case "timeout_on_sleeping_server": + await RunTimeoutOnSleepingServerAsync(client); + break; case "benchmark_empty_unary": RunBenchmarkEmptyUnary(client); break; @@ -460,6 +461,29 @@ namespace Grpc.IntegrationTesting Console.WriteLine("Passed!"); } + public static async Task RunTimeoutOnSleepingServerAsync(TestService.ITestServiceClient client) + { + Console.WriteLine("running timeout_on_sleeping_server"); + + var deadline = DateTime.UtcNow.AddMilliseconds(1); + using (var call = client.FullDuplexCall(deadline: deadline)) + { + try + { + await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder() + .SetPayload(CreateZerosPayload(27182)).Build()); + } + catch (InvalidOperationException) + { + // Deadline was reached before write has started. Eat the exception and continue. + } + + var ex = Assert.Throws<RpcException>(async () => await call.ResponseStream.MoveNext()); + Assert.AreEqual(StatusCode.DeadlineExceeded, ex.Status.StatusCode); + } + Console.WriteLine("Passed!"); + } + // This is not an official interop test, but it's useful. public static void RunBenchmarkEmptyUnary(TestService.ITestServiceClient client) { diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs index 6fa721bc1c..f3158aeb45 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs @@ -75,9 +75,8 @@ namespace Grpc.IntegrationTesting [TestFixtureTearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } [Test] @@ -127,5 +126,11 @@ namespace Grpc.IntegrationTesting { await InteropClient.RunCancelAfterFirstResponseAsync(client); } + + [Test] + public async Task TimeoutOnSleepingServerAsync() + { + await InteropClient.RunTimeoutOnSleepingServerAsync(client); + } } } diff --git a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs index 504fd11857..0cc8b2cde1 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs @@ -107,8 +107,6 @@ namespace Grpc.IntegrationTesting server.Start(); server.ShutdownTask.Wait(); - - GrpcEnvironment.Shutdown(); } private static ServerOptions ParseArguments(string[] args) diff --git a/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs b/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs index 1c398eb84e..842795374f 100644 --- a/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs @@ -85,9 +85,8 @@ namespace Grpc.IntegrationTesting [TestFixtureTearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } [Test] |