diff options
author | Jan Tattermusch <jtattermusch@google.com> | 2016-09-16 17:19:11 +0200 |
---|---|---|
committer | Jan Tattermusch <jtattermusch@google.com> | 2016-09-16 17:19:11 +0200 |
commit | 6eb987780a80d2ba83feafb3b0a98a9c60e0153a (patch) | |
tree | 6c9ab95a710759758c873a7e84059a6534654bea /src | |
parent | 7a73bec0e5cb11cf57b2f3ee77cee1249988ca4f (diff) |
simplify delayed streaming write logic
Diffstat (limited to 'src')
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCall.cs | 28 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 20 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 2 |
3 files changed, 31 insertions, 19 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 264c28ae7a..9abaf1120f 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -372,8 +372,7 @@ namespace Grpc.Core.Internal private Task CheckSendPreconditionsClientSide() { GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed."); - // if there is a delayed streaming write, we will treat that as if the write was still in progress until the call finishes. - GrpcPreconditions.CheckState(streamingWriteTcs == null && (finished || delayedStreamingWriteTcs == null), "Only one write can be pending at a time."); + GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time."); if (cancelRequested) { @@ -458,7 +457,7 @@ namespace Grpc.Core.Internal using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse")) { - TaskCompletionSource<object> delayedTcs; + TaskCompletionSource<object> delayedStreamingWriteTcs = null; TResponse msg = default(TResponse); var deserializeException = TryDeserialize(receivedMessage, out msg); @@ -471,16 +470,21 @@ namespace Grpc.Core.Internal receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers); } finishedStatus = receivedStatus; - delayedTcs = delayedStreamingWriteTcs; + + if (isStreamingWriteCompletionDelayed) + { + delayedStreamingWriteTcs = streamingWriteTcs; + streamingWriteTcs = null; + } ReleaseResourcesIfPossible(); } responseHeadersTcs.SetResult(responseHeaders); - if (delayedTcs != null) + if (delayedStreamingWriteTcs != null) { - delayedTcs.SetException(GetRpcExceptionClientOnly()); + delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly()); } var status = receivedStatus.Status; @@ -502,20 +506,24 @@ namespace Grpc.Core.Internal // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT, // success will be always set to true. - TaskCompletionSource<object> delayedTcs; + TaskCompletionSource<object> delayedStreamingWriteTcs = null; lock (myLock) { finished = true; finishedStatus = receivedStatus; - delayedTcs = delayedStreamingWriteTcs; + if (isStreamingWriteCompletionDelayed) + { + delayedStreamingWriteTcs = streamingWriteTcs; + streamingWriteTcs = null; + } ReleaseResourcesIfPossible(); } - if (delayedTcs != null) + if (delayedStreamingWriteTcs != null) { - delayedTcs.SetException(GetRpcExceptionClientOnly()); + delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly()); } var status = receivedStatus.Status; diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index b27baba942..9f9d260e7e 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -68,8 +68,8 @@ namespace Grpc.Core.Internal protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null. protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null. - protected TaskCompletionSource<object> delayedStreamingWriteTcs; protected TaskCompletionSource<object> sendStatusFromServerTcs; + protected bool isStreamingWriteCompletionDelayed; // Only used for the client side. protected bool readingDone; // True if last read (i.e. read with null payload) was already received. protected bool halfcloseRequested; // True if send close have been initiated. @@ -263,16 +263,20 @@ namespace Grpc.Core.Internal TaskCompletionSource<object> origTcs = null; lock (myLock) { - origTcs = streamingWriteTcs; - streamingWriteTcs = null; + if (!success && !finished && IsClient) { + // We should be setting this only once per call, following writes will be short circuited + // because they cannot start until the entire call finishes. + GrpcPreconditions.CheckState(!isStreamingWriteCompletionDelayed); - if (!success && !finished && IsClient) - { - // We should be setting this only once per call, following writes will be short circuited. - GrpcPreconditions.CheckState (delayedStreamingWriteTcs == null); - delayedStreamingWriteTcs = origTcs; + // leave streamingWriteTcs set, it will be completed once call finished. + isStreamingWriteCompletionDelayed = true; delayCompletion = true; } + else + { + origTcs = streamingWriteTcs; + streamingWriteTcs = null; + } ReleaseResourcesIfPossible(); } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 8d9f548d62..50fdfa9006 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -208,7 +208,7 @@ namespace Grpc.Core.Internal { GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed."); GrpcPreconditions.CheckState(!finished, "Already finished."); - GrpcPreconditions.CheckState(streamingWriteTcs == null && delayedStreamingWriteTcs == null, "Only one write can be pending at a time"); + GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time"); GrpcPreconditions.CheckState(!disposed); return null; |