diff options
author | Jan Tattermusch <jtattermusch@users.noreply.github.com> | 2016-05-12 10:13:45 -0700 |
---|---|---|
committer | Jan Tattermusch <jtattermusch@users.noreply.github.com> | 2016-05-12 10:13:45 -0700 |
commit | 26dd2b8d6b298b7225f317b66a05646aaefb6a48 (patch) | |
tree | 30acc3c43865ba916655fb873237970e82613ae4 | |
parent | 1acbe3e8a8cfa8d35843b0007b9cd79d91c62c16 (diff) | |
parent | 528fb6651cf6c19032e30dacdf437a3b7caf05e3 (diff) |
Merge pull request #6560 from jtattermusch/csharp_channel_shutdown_improvements
Improve channel behavior in shutdown situations
-rw-r--r-- | src/csharp/Grpc.Core.Tests/ChannelTest.cs | 39 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Channel.cs | 34 |
2 files changed, 70 insertions, 3 deletions
diff --git a/src/csharp/Grpc.Core.Tests/ChannelTest.cs b/src/csharp/Grpc.Core.Tests/ChannelTest.cs index 6330f50fae..850d70ce92 100644 --- a/src/csharp/Grpc.Core.Tests/ChannelTest.cs +++ b/src/csharp/Grpc.Core.Tests/ChannelTest.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Threading.Tasks; using Grpc.Core; using Grpc.Core.Internal; using Grpc.Core.Utils; @@ -89,5 +90,43 @@ namespace Grpc.Core.Tests channel.ShutdownAsync().Wait(); Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await channel.ShutdownAsync()); } + + [Test] + public async Task ShutdownTokenCancelledAfterShutdown() + { + var channel = new Channel("localhost", ChannelCredentials.Insecure); + Assert.IsFalse(channel.ShutdownToken.IsCancellationRequested); + var shutdownTask = channel.ShutdownAsync(); + Assert.IsTrue(channel.ShutdownToken.IsCancellationRequested); + await shutdownTask; + } + + [Test] + public async Task StateIsFatalFailureAfterShutdown() + { + var channel = new Channel("localhost", ChannelCredentials.Insecure); + await channel.ShutdownAsync(); + Assert.AreEqual(ChannelState.FatalFailure, channel.State); + } + + [Test] + public async Task ShutdownFinishesWaitForStateChangedAsync() + { + var channel = new Channel("localhost", ChannelCredentials.Insecure); + var stateChangedTask = channel.WaitForStateChangedAsync(ChannelState.Idle); + var shutdownTask = channel.ShutdownAsync(); + await stateChangedTask; + await shutdownTask; + } + + [Test] + public async Task OperationsThrowAfterShutdown() + { + var channel = new Channel("localhost", ChannelCredentials.Insecure); + await channel.ShutdownAsync(); + Assert.ThrowsAsync(typeof(ObjectDisposedException), async () => await channel.WaitForStateChangedAsync(ChannelState.Idle)); + Assert.Throws(typeof(ObjectDisposedException), () => { var x = channel.ResolvedTarget; }); + Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await channel.ConnectAsync()); + } } } diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 89981b1849..93a6e6a3d9 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -32,6 +32,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Grpc.Core.Internal; @@ -51,6 +52,7 @@ namespace Grpc.Core readonly object myLock = new object(); readonly AtomicCounter activeCallCounter = new AtomicCounter(); + readonly CancellationTokenSource shutdownTokenSource = new CancellationTokenSource(); readonly string target; readonly GrpcEnvironment environment; @@ -101,12 +103,13 @@ namespace Grpc.Core /// <summary> /// Gets current connectivity state of this channel. + /// After channel is has been shutdown, <c>ChannelState.FatalFailure</c> will be returned. /// </summary> public ChannelState State { get { - return handle.CheckConnectivityState(false); + return GetConnectivityState(false); } } @@ -155,6 +158,17 @@ namespace Grpc.Core } /// <summary> + /// Returns a token that gets cancelled once <c>ShutdownAsync</c> is invoked. + /// </summary> + public CancellationToken ShutdownToken + { + get + { + return this.shutdownTokenSource.Token; + } + } + + /// <summary> /// Allows explicitly requesting channel to connect without starting an RPC. /// Returned task completes once state Ready was seen. If the deadline is reached, /// or channel enters the FatalFailure state, the task is cancelled. @@ -164,7 +178,7 @@ namespace Grpc.Core /// <param name="deadline">The deadline. <c>null</c> indicates no deadline.</param> public async Task ConnectAsync(DateTime? deadline = null) { - var currentState = handle.CheckConnectivityState(true); + var currentState = GetConnectivityState(true); while (currentState != ChannelState.Ready) { if (currentState == ChannelState.FatalFailure) @@ -172,7 +186,7 @@ namespace Grpc.Core throw new OperationCanceledException("Channel has reached FatalFailure state."); } await WaitForStateChangedAsync(currentState, deadline).ConfigureAwait(false); - currentState = handle.CheckConnectivityState(false); + currentState = GetConnectivityState(false); } } @@ -188,6 +202,8 @@ namespace Grpc.Core shutdownRequested = true; } + shutdownTokenSource.Cancel(); + var activeCallCount = activeCallCounter.Count; if (activeCallCount > 0) { @@ -231,6 +247,18 @@ namespace Grpc.Core activeCallCounter.Decrement(); } + private ChannelState GetConnectivityState(bool tryToConnect) + { + try + { + return handle.CheckConnectivityState(tryToConnect); + } + catch (ObjectDisposedException) + { + return ChannelState.FatalFailure; + } + } + private static void EnsureUserAgentChannelOption(Dictionary<string, ChannelOption> options) { var key = ChannelOptions.PrimaryUserAgentString; |