diff options
author | 2016-09-16 17:19:11 +0200 | |
---|---|---|
committer | 2016-09-16 17:19:11 +0200 | |
commit | 6eb987780a80d2ba83feafb3b0a98a9c60e0153a (patch) | |
tree | 6c9ab95a710759758c873a7e84059a6534654bea /src/csharp/Grpc.Core/Internal/AsyncCall.cs | |
parent | 7a73bec0e5cb11cf57b2f3ee77cee1249988ca4f (diff) |
simplify delayed streaming write logic
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/AsyncCall.cs')
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCall.cs | 28 |
1 files changed, 18 insertions, 10 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 264c28ae7a..9abaf1120f 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -372,8 +372,7 @@ namespace Grpc.Core.Internal private Task CheckSendPreconditionsClientSide() { GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed."); - // if there is a delayed streaming write, we will treat that as if the write was still in progress until the call finishes. - GrpcPreconditions.CheckState(streamingWriteTcs == null && (finished || delayedStreamingWriteTcs == null), "Only one write can be pending at a time."); + GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time."); if (cancelRequested) { @@ -458,7 +457,7 @@ namespace Grpc.Core.Internal using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse")) { - TaskCompletionSource<object> delayedTcs; + TaskCompletionSource<object> delayedStreamingWriteTcs = null; TResponse msg = default(TResponse); var deserializeException = TryDeserialize(receivedMessage, out msg); @@ -471,16 +470,21 @@ namespace Grpc.Core.Internal receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers); } finishedStatus = receivedStatus; - delayedTcs = delayedStreamingWriteTcs; + + if (isStreamingWriteCompletionDelayed) + { + delayedStreamingWriteTcs = streamingWriteTcs; + streamingWriteTcs = null; + } ReleaseResourcesIfPossible(); } responseHeadersTcs.SetResult(responseHeaders); - if (delayedTcs != null) + if (delayedStreamingWriteTcs != null) { - delayedTcs.SetException(GetRpcExceptionClientOnly()); + delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly()); } var status = receivedStatus.Status; @@ -502,20 +506,24 @@ namespace Grpc.Core.Internal // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT, // success will be always set to true. - TaskCompletionSource<object> delayedTcs; + TaskCompletionSource<object> delayedStreamingWriteTcs = null; lock (myLock) { finished = true; finishedStatus = receivedStatus; - delayedTcs = delayedStreamingWriteTcs; + if (isStreamingWriteCompletionDelayed) + { + delayedStreamingWriteTcs = streamingWriteTcs; + streamingWriteTcs = null; + } ReleaseResourcesIfPossible(); } - if (delayedTcs != null) + if (delayedStreamingWriteTcs != null) { - delayedTcs.SetException(GetRpcExceptionClientOnly()); + delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly()); } var status = receivedStatus.Status; |