diff options
author | 2015-05-04 09:20:43 -0700 | |
---|---|---|
committer | 2015-05-04 14:56:51 -0700 | |
commit | 8c2dd9d864cb874f8fbe577faf8c3f72e6a077e4 (patch) | |
tree | fc4141f5561aa05f2d0bcce70f9f98296731a1b4 /src/csharp/Grpc.Core | |
parent | 1b54fcf31b32ae8c7f07ae733e781c184791a7c2 (diff) |
Fixes for C# cancellation support
Diffstat (limited to 'src/csharp/Grpc.Core')
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 17 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 11 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs | 8 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerCallHandler.cs | 51 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Status.cs | 5 |
5 files changed, 79 insertions, 13 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index b911cdcc87..7cf0f6ff84 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -180,7 +180,8 @@ namespace Grpc.Core.Internal { if (!disposed && call != null) { - if (halfclosed && readingDone && finished) + bool noMoreSendCompletions = halfclosed || (cancelRequested && sendCompletionDelegate == null); + if (noMoreSendCompletions && readingDone && finished) { ReleaseResources(); return true; @@ -207,8 +208,9 @@ namespace Grpc.Core.Internal protected void CheckSendingAllowed() { Preconditions.CheckState(started); - Preconditions.CheckState(!disposed); Preconditions.CheckState(!errorOccured); + CheckNotCancelled(); + Preconditions.CheckState(!disposed); Preconditions.CheckState(!halfcloseRequested, "Already halfclosed."); Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); @@ -221,7 +223,14 @@ namespace Grpc.Core.Internal Preconditions.CheckState(!errorOccured); Preconditions.CheckState(!readingDone, "Stream has already been closed."); - Preconditions.CheckState(readCompletionDelegate == null, "Only one write can be pending at a time"); + Preconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time"); + } + + protected void CheckNotCancelled() { + if (cancelRequested) + { + throw new OperationCanceledException("Remote call has been cancelled."); + } } protected byte[] UnsafeSerialize(TWrite msg) @@ -292,6 +301,8 @@ namespace Grpc.Core.Internal }); } + + /// <summary> /// Handles send completion. /// </summary> diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 4775f2d07b..3c66c67dcc 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -123,18 +123,23 @@ namespace Grpc.Core.Internal /// </summary> private void HandleFinishedServerside(bool wasError, BatchContextSafeHandleNotOwned ctx) { + bool cancelled = ctx.GetReceivedCloseOnServerCancelled(); + lock (myLock) { finished = true; - if (readCompletionDelegate == null) + if (cancelled) { - // allow disposal of native call - readingDone = true; + // Once we cancel, we don't have to care that much + // about reads and writes. + Cancel(); } ReleaseResourcesIfPossible(); } + // TODO(jtattermusch): check if call was cancelled. + // TODO: handle error ... finishedServersideTcs.SetResult(null); diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs index 3c54753756..b562abaa7a 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs @@ -61,6 +61,9 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char* + [DllImport("grpc_csharp_ext.dll")] + static extern int grpcsharp_batch_context_recv_close_on_server_cancelled(BatchContextSafeHandleNotOwned ctx); + public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false) { SetHandle(handle); @@ -94,5 +97,10 @@ namespace Grpc.Core.Internal { return Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_method(this)); } + + public bool GetReceivedCloseOnServerCancelled() + { + return grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0; + } } }
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 0416eada34..01b2a11369 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -80,7 +80,14 @@ namespace Grpc.Core.Internal Console.WriteLine("Exception occured in handler: " + e); status = HandlerUtils.StatusFromException(e); } - await responseStream.WriteStatus(status); + try + { + await responseStream.WriteStatus(status); + } + catch (OperationCanceledException) + { + // Call has been already cancelled. + } await finishedTask; } } @@ -121,7 +128,15 @@ namespace Grpc.Core.Internal Console.WriteLine("Exception occured in handler: " + e); status = HandlerUtils.StatusFromException(e); } - await responseStream.WriteStatus(status); + + try + { + await responseStream.WriteStatus(status); + } + catch (OperationCanceledException) + { + // Call has been already cancelled. + } await finishedTask; } } @@ -151,15 +166,30 @@ namespace Grpc.Core.Internal Status status = Status.DefaultSuccess; try { - var result = await handler(requestStream); - await responseStream.Write(result); - } + var result = await handler(requestStream); + try + { + await responseStream.Write(result); + } + catch (OperationCanceledException) + { + status = Status.DefaultCancelled; + } + } catch (Exception e) { Console.WriteLine("Exception occured in handler: " + e); status = HandlerUtils.StatusFromException(e); } - await responseStream.WriteStatus(status); + + try + { + await responseStream.WriteStatus(status); + } + catch (OperationCanceledException) + { + // Call has been already cancelled. + } await finishedTask; } } @@ -196,7 +226,14 @@ namespace Grpc.Core.Internal Console.WriteLine("Exception occured in handler: " + e); status = HandlerUtils.StatusFromException(e); } - await responseStream.WriteStatus(status); + try + { + await responseStream.WriteStatus(status); + } + catch (OperationCanceledException) + { + // Call has been already cancelled. + } await finishedTask; } } diff --git a/src/csharp/Grpc.Core/Status.cs b/src/csharp/Grpc.Core/Status.cs index b588170694..754f6cb3ca 100644 --- a/src/csharp/Grpc.Core/Status.cs +++ b/src/csharp/Grpc.Core/Status.cs @@ -44,6 +44,11 @@ namespace Grpc.Core /// </summary> public static readonly Status DefaultSuccess = new Status(StatusCode.OK, ""); + /// <summary> + /// Default result of a cancelled RPC. StatusCode=Cancelled, empty details message. + /// </summary> + public static readonly Status DefaultCancelled = new Status(StatusCode.Cancelled, ""); + readonly StatusCode statusCode; readonly string detail; |