From a610e32e4b07d860048d478e2f2c851c7c9bb4d6 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 13 Sep 2016 16:53:05 +0200 Subject: throw correct exception failed writes --- src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 32 ++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) (limited to 'src/csharp/Grpc.Core/Internal/AsyncCallBase.cs') diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index eb9c3ea62d..b27baba942 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -68,6 +68,7 @@ namespace Grpc.Core.Internal protected TaskCompletionSource streamingReadTcs; // Completion of a pending streaming read if not null. protected TaskCompletionSource streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null. + protected TaskCompletionSource delayedStreamingWriteTcs; protected TaskCompletionSource sendStatusFromServerTcs; protected bool readingDone; // True if last read (i.e. read with null payload) was already received. @@ -200,6 +201,12 @@ namespace Grpc.Core.Internal get; } + /// + /// Returns an exception to throw for a failed send operation. + /// It is only allowed to call this method for a call that has already finished. + /// + protected abstract Exception GetRpcExceptionClientOnly(); + private void ReleaseResources() { if (call != null) @@ -252,18 +259,39 @@ namespace Grpc.Core.Internal /// protected void HandleSendFinished(bool success) { + bool delayCompletion = false; TaskCompletionSource origTcs = null; lock (myLock) { origTcs = streamingWriteTcs; streamingWriteTcs = null; + if (!success && !finished && IsClient) + { + // We should be setting this only once per call, following writes will be short circuited. + GrpcPreconditions.CheckState (delayedStreamingWriteTcs == null); + delayedStreamingWriteTcs = origTcs; + delayCompletion = true; + } + ReleaseResourcesIfPossible(); } if (!success) { - origTcs.SetException(new InvalidOperationException("Send failed")); + if (!delayCompletion) + { + if (IsClient) + { + GrpcPreconditions.CheckState(finished); // implied by !success && !delayCompletion && IsClient + origTcs.SetException(GetRpcExceptionClientOnly()); + } + else + { + origTcs.SetException (new IOException("Error sending from server.")); + } + } + // if delayCompletion == true, postpone SetException until call finishes. } else { @@ -283,7 +311,7 @@ namespace Grpc.Core.Internal if (!success) { - sendStatusFromServerTcs.SetException(new InvalidOperationException("Error sending status from server.")); + sendStatusFromServerTcs.SetException(new IOException("Error sending status from server.")); } else { -- cgit v1.2.3 From 6eb987780a80d2ba83feafb3b0a98a9c60e0153a Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Fri, 16 Sep 2016 17:19:11 +0200 Subject: simplify delayed streaming write logic --- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 28 +++++++++++++++--------- src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 20 ++++++++++------- src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 2 +- 3 files changed, 31 insertions(+), 19 deletions(-) (limited to 'src/csharp/Grpc.Core/Internal/AsyncCallBase.cs') diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 264c28ae7a..9abaf1120f 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -372,8 +372,7 @@ namespace Grpc.Core.Internal private Task CheckSendPreconditionsClientSide() { GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed."); - // if there is a delayed streaming write, we will treat that as if the write was still in progress until the call finishes. - GrpcPreconditions.CheckState(streamingWriteTcs == null && (finished || delayedStreamingWriteTcs == null), "Only one write can be pending at a time."); + GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time."); if (cancelRequested) { @@ -458,7 +457,7 @@ namespace Grpc.Core.Internal using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse")) { - TaskCompletionSource delayedTcs; + TaskCompletionSource delayedStreamingWriteTcs = null; TResponse msg = default(TResponse); var deserializeException = TryDeserialize(receivedMessage, out msg); @@ -471,16 +470,21 @@ namespace Grpc.Core.Internal receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers); } finishedStatus = receivedStatus; - delayedTcs = delayedStreamingWriteTcs; + + if (isStreamingWriteCompletionDelayed) + { + delayedStreamingWriteTcs = streamingWriteTcs; + streamingWriteTcs = null; + } ReleaseResourcesIfPossible(); } responseHeadersTcs.SetResult(responseHeaders); - if (delayedTcs != null) + if (delayedStreamingWriteTcs != null) { - delayedTcs.SetException(GetRpcExceptionClientOnly()); + delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly()); } var status = receivedStatus.Status; @@ -502,20 +506,24 @@ namespace Grpc.Core.Internal // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT, // success will be always set to true. - TaskCompletionSource delayedTcs; + TaskCompletionSource delayedStreamingWriteTcs = null; lock (myLock) { finished = true; finishedStatus = receivedStatus; - delayedTcs = delayedStreamingWriteTcs; + if (isStreamingWriteCompletionDelayed) + { + delayedStreamingWriteTcs = streamingWriteTcs; + streamingWriteTcs = null; + } ReleaseResourcesIfPossible(); } - if (delayedTcs != null) + if (delayedStreamingWriteTcs != null) { - delayedTcs.SetException(GetRpcExceptionClientOnly()); + delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly()); } var status = receivedStatus.Status; diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index b27baba942..9f9d260e7e 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -68,8 +68,8 @@ namespace Grpc.Core.Internal protected TaskCompletionSource streamingReadTcs; // Completion of a pending streaming read if not null. protected TaskCompletionSource streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null. - protected TaskCompletionSource delayedStreamingWriteTcs; protected TaskCompletionSource sendStatusFromServerTcs; + protected bool isStreamingWriteCompletionDelayed; // Only used for the client side. protected bool readingDone; // True if last read (i.e. read with null payload) was already received. protected bool halfcloseRequested; // True if send close have been initiated. @@ -263,16 +263,20 @@ namespace Grpc.Core.Internal TaskCompletionSource origTcs = null; lock (myLock) { - origTcs = streamingWriteTcs; - streamingWriteTcs = null; + if (!success && !finished && IsClient) { + // We should be setting this only once per call, following writes will be short circuited + // because they cannot start until the entire call finishes. + GrpcPreconditions.CheckState(!isStreamingWriteCompletionDelayed); - if (!success && !finished && IsClient) - { - // We should be setting this only once per call, following writes will be short circuited. - GrpcPreconditions.CheckState (delayedStreamingWriteTcs == null); - delayedStreamingWriteTcs = origTcs; + // leave streamingWriteTcs set, it will be completed once call finished. + isStreamingWriteCompletionDelayed = true; delayCompletion = true; } + else + { + origTcs = streamingWriteTcs; + streamingWriteTcs = null; + } ReleaseResourcesIfPossible(); } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 8d9f548d62..50fdfa9006 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -208,7 +208,7 @@ namespace Grpc.Core.Internal { GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed."); GrpcPreconditions.CheckState(!finished, "Already finished."); - GrpcPreconditions.CheckState(streamingWriteTcs == null && delayedStreamingWriteTcs == null, "Only one write can be pending at a time"); + GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time"); GrpcPreconditions.CheckState(!disposed); return null; -- cgit v1.2.3