aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2016-09-16 17:19:11 +0200
committerGravatar Jan Tattermusch <jtattermusch@google.com>2016-09-16 17:19:11 +0200
commit6eb987780a80d2ba83feafb3b0a98a9c60e0153a (patch)
tree6c9ab95a710759758c873a7e84059a6534654bea /src
parent7a73bec0e5cb11cf57b2f3ee77cee1249988ca4f (diff)
simplify delayed streaming write logic
Diffstat (limited to 'src')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs28
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs20
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs2
3 files changed, 31 insertions, 19 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 264c28ae7a..9abaf1120f 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -372,8 +372,7 @@ namespace Grpc.Core.Internal
private Task CheckSendPreconditionsClientSide()
{
GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed.");
- // if there is a delayed streaming write, we will treat that as if the write was still in progress until the call finishes.
- GrpcPreconditions.CheckState(streamingWriteTcs == null && (finished || delayedStreamingWriteTcs == null), "Only one write can be pending at a time.");
+ GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time.");
if (cancelRequested)
{
@@ -458,7 +457,7 @@ namespace Grpc.Core.Internal
using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse"))
{
- TaskCompletionSource<object> delayedTcs;
+ TaskCompletionSource<object> delayedStreamingWriteTcs = null;
TResponse msg = default(TResponse);
var deserializeException = TryDeserialize(receivedMessage, out msg);
@@ -471,16 +470,21 @@ namespace Grpc.Core.Internal
receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers);
}
finishedStatus = receivedStatus;
- delayedTcs = delayedStreamingWriteTcs;
+
+ if (isStreamingWriteCompletionDelayed)
+ {
+ delayedStreamingWriteTcs = streamingWriteTcs;
+ streamingWriteTcs = null;
+ }
ReleaseResourcesIfPossible();
}
responseHeadersTcs.SetResult(responseHeaders);
- if (delayedTcs != null)
+ if (delayedStreamingWriteTcs != null)
{
- delayedTcs.SetException(GetRpcExceptionClientOnly());
+ delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
}
var status = receivedStatus.Status;
@@ -502,20 +506,24 @@ namespace Grpc.Core.Internal
// NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
// success will be always set to true.
- TaskCompletionSource<object> delayedTcs;
+ TaskCompletionSource<object> delayedStreamingWriteTcs = null;
lock (myLock)
{
finished = true;
finishedStatus = receivedStatus;
- delayedTcs = delayedStreamingWriteTcs;
+ if (isStreamingWriteCompletionDelayed)
+ {
+ delayedStreamingWriteTcs = streamingWriteTcs;
+ streamingWriteTcs = null;
+ }
ReleaseResourcesIfPossible();
}
- if (delayedTcs != null)
+ if (delayedStreamingWriteTcs != null)
{
- delayedTcs.SetException(GetRpcExceptionClientOnly());
+ delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
}
var status = receivedStatus.Status;
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index b27baba942..9f9d260e7e 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -68,8 +68,8 @@ namespace Grpc.Core.Internal
protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null.
- protected TaskCompletionSource<object> delayedStreamingWriteTcs;
protected TaskCompletionSource<object> sendStatusFromServerTcs;
+ protected bool isStreamingWriteCompletionDelayed; // Only used for the client side.
protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
protected bool halfcloseRequested; // True if send close have been initiated.
@@ -263,16 +263,20 @@ namespace Grpc.Core.Internal
TaskCompletionSource<object> origTcs = null;
lock (myLock)
{
- origTcs = streamingWriteTcs;
- streamingWriteTcs = null;
+ if (!success && !finished && IsClient) {
+ // We should be setting this only once per call, following writes will be short circuited
+ // because they cannot start until the entire call finishes.
+ GrpcPreconditions.CheckState(!isStreamingWriteCompletionDelayed);
- if (!success && !finished && IsClient)
- {
- // We should be setting this only once per call, following writes will be short circuited.
- GrpcPreconditions.CheckState (delayedStreamingWriteTcs == null);
- delayedStreamingWriteTcs = origTcs;
+ // leave streamingWriteTcs set, it will be completed once call finished.
+ isStreamingWriteCompletionDelayed = true;
delayCompletion = true;
}
+ else
+ {
+ origTcs = streamingWriteTcs;
+ streamingWriteTcs = null;
+ }
ReleaseResourcesIfPossible();
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 8d9f548d62..50fdfa9006 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -208,7 +208,7 @@ namespace Grpc.Core.Internal
{
GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed.");
GrpcPreconditions.CheckState(!finished, "Already finished.");
- GrpcPreconditions.CheckState(streamingWriteTcs == null && delayedStreamingWriteTcs == null, "Only one write can be pending at a time");
+ GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time");
GrpcPreconditions.CheckState(!disposed);
return null;