aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2015-08-21 13:48:52 -0700
committerGravatar Jan Tattermusch <jtattermusch@google.com>2015-08-21 16:13:55 -0700
commit3af838a2d719c65c360f28ee8551c6012f408401 (patch)
tree3ab2a1158c1c1fa4e05e8cdb79cd2e9412d5c21e /src/csharp/Grpc.Core
parentfb34a99d9810cf4cac2c1d20813379d5ea976adf (diff)
simplify stream reads on client side
Diffstat (limited to 'src/csharp/Grpc.Core')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs66
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs37
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientResponseStream.cs8
3 files changed, 40 insertions, 71 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 132b426424..d687bb6283 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -56,14 +56,15 @@ namespace Grpc.Core.Internal
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
+ // Indicates that steaming call has finished.
+ TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
+
// Response headers set here once received.
TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>();
// Set after status is received. Used for both unary and streaming response calls.
ClientSideStatus? finishedStatus;
- bool readObserverCompleted; // True if readObserver has already been completed.
-
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
: base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment)
{
@@ -74,8 +75,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// This constructor should only be used for testing.
/// </summary>
- public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall)
- : this(callDetails)
+ public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall) : this(callDetails)
{
this.injectedNativeCall = injectedNativeCall;
}
@@ -192,7 +192,6 @@ namespace Grpc.Core.Internal
Initialize(environment.CompletionQueue);
halfcloseRequested = true;
- halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
byte[] payload = UnsafeSerialize(msg);
@@ -261,6 +260,17 @@ namespace Grpc.Core.Internal
}
/// <summary>
+ /// Get the task that completes once if streaming call finishes with ok status and throws RpcException with given status otherwise.
+ /// </summary>
+ public Task StreamingCallFinishedTask
+ {
+ get
+ {
+ return streamingCallFinishedTcs.Task;
+ }
+ }
+
+ /// <summary>
/// Get the task that completes once response headers are received.
/// </summary>
public Task<Metadata> ResponseHeadersAsync
@@ -305,36 +315,6 @@ namespace Grpc.Core.Internal
}
}
- /// <summary>
- /// On client-side, we only fire readCompletionDelegate once all messages have been read
- /// and status has been received.
- /// </summary>
- protected override void ProcessLastRead(AsyncCompletionDelegate<TResponse> completionDelegate)
- {
- if (completionDelegate != null && readingDone && finishedStatus.HasValue)
- {
- bool shouldComplete;
- lock (myLock)
- {
- shouldComplete = !readObserverCompleted;
- readObserverCompleted = true;
- }
-
- if (shouldComplete)
- {
- var status = finishedStatus.Value.Status;
- if (status.StatusCode != StatusCode.OK)
- {
- FireCompletion(completionDelegate, default(TResponse), new RpcException(status));
- }
- else
- {
- FireCompletion(completionDelegate, default(TResponse), null);
- }
- }
- }
- }
-
protected override void OnAfterReleaseResources()
{
details.Channel.RemoveCallReference(this);
@@ -392,8 +372,6 @@ namespace Grpc.Core.Internal
finished = true;
finishedStatus = receivedStatus;
- halfclosed = true;
-
ReleaseResourcesIfPossible();
}
@@ -403,7 +381,6 @@ namespace Grpc.Core.Internal
if (!success || status.StatusCode != StatusCode.OK)
{
-
unaryResponseTcs.SetException(new RpcException(status));
return;
}
@@ -420,18 +397,23 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleFinished(bool success, ClientSideStatus receivedStatus)
{
- AsyncCompletionDelegate<TResponse> origReadCompletionDelegate = null;
lock (myLock)
{
finished = true;
finishedStatus = receivedStatus;
- origReadCompletionDelegate = readCompletionDelegate;
-
ReleaseResourcesIfPossible();
}
- ProcessLastRead(origReadCompletionDelegate);
+ var status = receivedStatus.Status;
+
+ if (!success || status.StatusCode != StatusCode.OK)
+ {
+ streamingCallFinishedTcs.SetException(new RpcException(status));
+ return;
+ }
+
+ streamingCallFinishedTcs.SetResult(null);
}
}
} \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 7744dbec00..4d20394644 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -61,19 +61,17 @@ namespace Grpc.Core.Internal
protected bool disposed;
protected bool started;
- protected bool errorOccured;
protected bool cancelRequested;
protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null.
- protected bool readingDone;
- protected bool halfcloseRequested;
- protected bool halfclosed;
+ protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
+ protected bool halfcloseRequested; // True if send close have been initiated.
protected bool finished; // True if close has been received from the peer.
protected bool initialMetadataSent;
- protected long streamingWritesCounter;
+ protected long streamingWritesCounter; // Number of streaming send operations started so far.
public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment)
{
@@ -161,16 +159,6 @@ namespace Grpc.Core.Internal
}
}
- // TODO(jtattermusch): find more fitting name for this method.
- /// <summary>
- /// Default behavior just completes the read observer, but more sofisticated behavior might be required
- /// by subclasses.
- /// </summary>
- protected virtual void ProcessLastRead(AsyncCompletionDelegate<TRead> completionDelegate)
- {
- FireCompletion(completionDelegate, default(TRead), null);
- }
-
/// <summary>
/// If there are no more pending actions and no new actions can be started, releases
/// the underlying native resources.
@@ -179,7 +167,7 @@ namespace Grpc.Core.Internal
{
if (!disposed && call != null)
{
- bool noMoreSendCompletions = halfclosed || ((cancelRequested || finished) && sendCompletionDelegate == null);
+ bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished);
if (noMoreSendCompletions && readingDone && finished)
{
ReleaseResources();
@@ -206,7 +194,6 @@ namespace Grpc.Core.Internal
protected void CheckSendingAllowed()
{
Preconditions.CheckState(started);
- Preconditions.CheckState(!errorOccured);
CheckNotCancelled();
Preconditions.CheckState(!disposed);
@@ -218,7 +205,6 @@ namespace Grpc.Core.Internal
protected virtual void CheckReadingAllowed()
{
Preconditions.CheckState(started);
- Preconditions.CheckState(!errorOccured);
Preconditions.CheckState(!disposed);
Preconditions.CheckState(!readingDone, "Stream has already been closed.");
@@ -312,7 +298,6 @@ namespace Grpc.Core.Internal
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
{
- halfclosed = true;
origCompletionDelegate = sendCompletionDelegate;
sendCompletionDelegate = null;
@@ -338,15 +323,11 @@ namespace Grpc.Core.Internal
lock (myLock)
{
origCompletionDelegate = readCompletionDelegate;
- if (receivedMessage != null)
- {
- readCompletionDelegate = null;
- }
- else
+ readCompletionDelegate = null;
+
+ if (receivedMessage == null)
{
- // This was the last read. Keeping the readCompletionDelegate
- // to be either fired by this handler or by client-side finished
- // handler.
+ // This was the last read.
readingDone = true;
}
@@ -365,7 +346,7 @@ namespace Grpc.Core.Internal
}
else
{
- ProcessLastRead(origCompletionDelegate);
+ FireCompletion(origCompletionDelegate, default(TRead), null);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
index 6c44521038..b4a7335c7c 100644
--- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
@@ -72,7 +72,13 @@ namespace Grpc.Core.Internal
call.StartReadMessage(taskSource.CompletionDelegate);
var result = await taskSource.Task;
this.current = result;
- return result != null;
+
+ if (result == null)
+ {
+ await call.StreamingCallFinishedTask;
+ return false;
+ }
+ return true;
}
public void Dispose()