diff options
Diffstat (limited to 'src/csharp')
-rw-r--r-- | src/csharp/Grpc.Core.Tests/ChannelTest.cs | 39 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Channel.cs | 34 | ||||
-rw-r--r-- | src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs | 2 | ||||
-rw-r--r-- | src/csharp/Grpc.Examples/MathExamples.cs | 38 | ||||
-rw-r--r-- | src/csharp/Grpc.Examples/MathServiceImpl.cs | 29 | ||||
-rw-r--r-- | src/csharp/Grpc.IntegrationTesting/StressTestClient.cs | 2 |
6 files changed, 124 insertions, 20 deletions
diff --git a/src/csharp/Grpc.Core.Tests/ChannelTest.cs b/src/csharp/Grpc.Core.Tests/ChannelTest.cs index 6330f50fae..850d70ce92 100644 --- a/src/csharp/Grpc.Core.Tests/ChannelTest.cs +++ b/src/csharp/Grpc.Core.Tests/ChannelTest.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Threading.Tasks; using Grpc.Core; using Grpc.Core.Internal; using Grpc.Core.Utils; @@ -89,5 +90,43 @@ namespace Grpc.Core.Tests channel.ShutdownAsync().Wait(); Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await channel.ShutdownAsync()); } + + [Test] + public async Task ShutdownTokenCancelledAfterShutdown() + { + var channel = new Channel("localhost", ChannelCredentials.Insecure); + Assert.IsFalse(channel.ShutdownToken.IsCancellationRequested); + var shutdownTask = channel.ShutdownAsync(); + Assert.IsTrue(channel.ShutdownToken.IsCancellationRequested); + await shutdownTask; + } + + [Test] + public async Task StateIsFatalFailureAfterShutdown() + { + var channel = new Channel("localhost", ChannelCredentials.Insecure); + await channel.ShutdownAsync(); + Assert.AreEqual(ChannelState.FatalFailure, channel.State); + } + + [Test] + public async Task ShutdownFinishesWaitForStateChangedAsync() + { + var channel = new Channel("localhost", ChannelCredentials.Insecure); + var stateChangedTask = channel.WaitForStateChangedAsync(ChannelState.Idle); + var shutdownTask = channel.ShutdownAsync(); + await stateChangedTask; + await shutdownTask; + } + + [Test] + public async Task OperationsThrowAfterShutdown() + { + var channel = new Channel("localhost", ChannelCredentials.Insecure); + await channel.ShutdownAsync(); + Assert.ThrowsAsync(typeof(ObjectDisposedException), async () => await channel.WaitForStateChangedAsync(ChannelState.Idle)); + Assert.Throws(typeof(ObjectDisposedException), () => { var x = channel.ResolvedTarget; }); + Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await channel.ConnectAsync()); + } } } diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 89981b1849..93a6e6a3d9 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -32,6 +32,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Grpc.Core.Internal; @@ -51,6 +52,7 @@ namespace Grpc.Core readonly object myLock = new object(); readonly AtomicCounter activeCallCounter = new AtomicCounter(); + readonly CancellationTokenSource shutdownTokenSource = new CancellationTokenSource(); readonly string target; readonly GrpcEnvironment environment; @@ -101,12 +103,13 @@ namespace Grpc.Core /// <summary> /// Gets current connectivity state of this channel. + /// After channel is has been shutdown, <c>ChannelState.FatalFailure</c> will be returned. /// </summary> public ChannelState State { get { - return handle.CheckConnectivityState(false); + return GetConnectivityState(false); } } @@ -155,6 +158,17 @@ namespace Grpc.Core } /// <summary> + /// Returns a token that gets cancelled once <c>ShutdownAsync</c> is invoked. + /// </summary> + public CancellationToken ShutdownToken + { + get + { + return this.shutdownTokenSource.Token; + } + } + + /// <summary> /// Allows explicitly requesting channel to connect without starting an RPC. /// Returned task completes once state Ready was seen. If the deadline is reached, /// or channel enters the FatalFailure state, the task is cancelled. @@ -164,7 +178,7 @@ namespace Grpc.Core /// <param name="deadline">The deadline. <c>null</c> indicates no deadline.</param> public async Task ConnectAsync(DateTime? deadline = null) { - var currentState = handle.CheckConnectivityState(true); + var currentState = GetConnectivityState(true); while (currentState != ChannelState.Ready) { if (currentState == ChannelState.FatalFailure) @@ -172,7 +186,7 @@ namespace Grpc.Core throw new OperationCanceledException("Channel has reached FatalFailure state."); } await WaitForStateChangedAsync(currentState, deadline).ConfigureAwait(false); - currentState = handle.CheckConnectivityState(false); + currentState = GetConnectivityState(false); } } @@ -188,6 +202,8 @@ namespace Grpc.Core shutdownRequested = true; } + shutdownTokenSource.Cancel(); + var activeCallCount = activeCallCounter.Count; if (activeCallCount > 0) { @@ -231,6 +247,18 @@ namespace Grpc.Core activeCallCounter.Decrement(); } + private ChannelState GetConnectivityState(bool tryToConnect) + { + try + { + return handle.CheckConnectivityState(tryToConnect); + } + catch (ObjectDisposedException) + { + return ChannelState.FatalFailure; + } + } + private static void EnsureUserAgentChannelOption(Dictionary<string, ChannelOption> options) { var key = ChannelOptions.PrimaryUserAgentString; diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs index 875202b950..ee11105efe 100644 --- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs +++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs @@ -92,7 +92,7 @@ namespace Math.Tests public void DivByZero() { var ex = Assert.Throws<RpcException>(() => client.Div(new DivArgs { Dividend = 0, Divisor = 0 })); - Assert.AreEqual(StatusCode.Unknown, ex.Status.StatusCode); + Assert.AreEqual(StatusCode.InvalidArgument, ex.Status.StatusCode); } [Test] diff --git a/src/csharp/Grpc.Examples/MathExamples.cs b/src/csharp/Grpc.Examples/MathExamples.cs index 6075420974..d260830b94 100644 --- a/src/csharp/Grpc.Examples/MathExamples.cs +++ b/src/csharp/Grpc.Examples/MathExamples.cs @@ -32,6 +32,7 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; +using Grpc.Core; using Grpc.Core.Utils; namespace Math @@ -109,5 +110,42 @@ namespace Math DivReply result = await client.DivAsync(new DivArgs { Dividend = sum.Num_, Divisor = numbers.Count }); Console.WriteLine("Avg Result: " + result); } + + /// <summary> + /// Shows how to handle a call ending with non-OK status. + /// </summary> + public static async Task HandleErrorExample(Math.MathClient client) + { + try + { + DivReply result = await client.DivAsync(new DivArgs { Dividend = 5, Divisor = 0 }); + } + catch (RpcException ex) + { + Console.WriteLine(string.Format("RPC ended with status {0}", ex.Status)); + } + } + + /// <summary> + /// Shows how to send request headers and how to access response headers + /// and response trailers. + /// </summary> + public static async Task MetadataExample(Math.MathClient client) + { + var requestHeaders = new Metadata + { + { "custom-header", "custom-value" } + }; + + var call = client.DivAsync(new DivArgs { Dividend = 5, Divisor = 0 }, requestHeaders); + + // Get response headers + Metadata responseHeaders = await call.ResponseHeadersAsync; + + var result = await call; + + // Get response trailers after the call has finished. + Metadata responseTrailers = call.GetTrailers(); + } } } diff --git a/src/csharp/Grpc.Examples/MathServiceImpl.cs b/src/csharp/Grpc.Examples/MathServiceImpl.cs index 79c56e57a8..a28020f62f 100644 --- a/src/csharp/Grpc.Examples/MathServiceImpl.cs +++ b/src/csharp/Grpc.Examples/MathServiceImpl.cs @@ -52,23 +52,15 @@ namespace Math public override async Task Fib(FibArgs request, IServerStreamWriter<Num> responseStream, ServerCallContext context) { - if (request.Limit <= 0) - { - // keep streaming the sequence until cancelled. - IEnumerator<Num> fibEnumerator = FibInternal(long.MaxValue).GetEnumerator(); - while (!context.CancellationToken.IsCancellationRequested && fibEnumerator.MoveNext()) - { - await responseStream.WriteAsync(fibEnumerator.Current); - await Task.Delay(100); - } - } + var limit = request.Limit > 0 ? request.Limit : long.MaxValue; + var fibEnumerator = FibInternal(limit).GetEnumerator(); - if (request.Limit > 0) + // Keep streaming the sequence until the call is cancelled. + // Use CancellationToken from ServerCallContext to detect the cancellation. + while (!context.CancellationToken.IsCancellationRequested && fibEnumerator.MoveNext()) { - foreach (var num in FibInternal(request.Limit)) - { - await responseStream.WriteAsync(num); - } + await responseStream.WriteAsync(fibEnumerator.Current); + await Task.Delay(100); } } @@ -89,6 +81,13 @@ namespace Math static DivReply DivInternal(DivArgs args) { + if (args.Divisor == 0) + { + // One can finish the RPC with non-ok status by throwing RpcException instance. + // Alternatively, resulting status can be set using ServerCallContext.Status + throw new RpcException(new Status(StatusCode.InvalidArgument, "Division by zero")); + } + long quotient = args.Dividend / args.Divisor; long remainder = args.Dividend % args.Divisor; return new DivReply { Quotient = quotient, Remainder = remainder }; diff --git a/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs b/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs index 8db691cb04..4d6ca7ece5 100644 --- a/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs @@ -311,7 +311,7 @@ namespace Grpc.IntegrationTesting var snapshot = histogram.GetSnapshot(true); var elapsedSnapshot = wallClockStopwatch.GetElapsedSnapshot(true); - return (long) (snapshot.Count / elapsedSnapshot.Seconds); + return (long) (snapshot.Count / elapsedSnapshot.TotalSeconds); } } } |