aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2016-05-02 17:17:18 -0700
committerGravatar Jan Tattermusch <jtattermusch@google.com>2016-05-03 07:59:59 -0700
commit96f21a27cb7975d52aa92197536eef0ad8dca455 (patch)
tree9ab394f79d21761528842ffae7e62f32ddea5e08
parenta83ad2aa65a04d16b484e7c4373b7db5575221a4 (diff)
make end-of-stream idempotent
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs39
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs5
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs39
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs5
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientResponseStream.cs4
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerRequestStream.cs4
6 files changed, 58 insertions, 38 deletions
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
index 324b682510..96749cda14 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
@@ -76,8 +76,8 @@ namespace Grpc.Core.Internal.Tests
public void AsyncUnary_StreamingOperationsNotAllowed()
{
asyncCall.UnaryCallAsync("request1");
- Assert.Throws(typeof(InvalidOperationException),
- () => asyncCall.StartReadMessage((x,y) => {}));
+ Assert.ThrowsAsync(typeof(InvalidOperationException),
+ async () => await asyncCall.ReadMessageAsync());
Assert.Throws(typeof(InvalidOperationException),
() => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
}
@@ -123,8 +123,8 @@ namespace Grpc.Core.Internal.Tests
public void ClientStreaming_StreamingReadNotAllowed()
{
asyncCall.ClientStreamingCallAsync();
- Assert.Throws(typeof(InvalidOperationException),
- () => asyncCall.StartReadMessage((x,y) => {}));
+ Assert.ThrowsAsync(typeof(InvalidOperationException),
+ async () => await asyncCall.ReadMessageAsync());
}
[Test]
@@ -182,9 +182,6 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
- fakeCall.ReceivedResponseHeadersHandler(true, new Metadata());
- Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
-
// try alternative order of completions
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageHandler(true, null);
@@ -199,15 +196,35 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
- fakeCall.ReceivedResponseHeadersHandler(true, new Metadata());
- Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
-
fakeCall.ReceivedMessageHandler(false, null); // after a failed read, we rely on C core to deliver appropriate status code.
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Internal));
AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal);
}
+ [Test]
+ public void ServerStreaming_MoreResponses_Success()
+ {
+ asyncCall.StartServerStreamingCall("request1");
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+
+ var readTask1 = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
+ Assert.IsTrue(readTask1.Result);
+ Assert.AreEqual("response1", responseStream.Current);
+
+ var readTask2 = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
+ Assert.IsTrue(readTask2.Result);
+ Assert.AreEqual("response1", responseStream.Current);
+
+ var readTask3 = responseStream.MoveNext();
+ fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+ fakeCall.ReceivedMessageHandler(true, null);
+
+ AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3);
+ }
+
ClientSideStatus CreateClientSideStatus(StatusCode statusCode)
{
return new ClientSideStatus(new Status(statusCode, ""), new Metadata());
@@ -236,7 +253,6 @@ namespace Grpc.Core.Internal.Tests
Assert.IsFalse(moveNextTask.Result);
Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus());
- Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
Assert.AreEqual(0, asyncCall.GetTrailers().Count);
}
@@ -259,7 +275,6 @@ namespace Grpc.Core.Internal.Tests
var ex = Assert.ThrowsAsync<RpcException>(async () => await moveNextTask);
Assert.AreEqual(expectedStatusCode, asyncCall.GetStatus().StatusCode);
- Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
Assert.AreEqual(0, asyncCall.GetTrailers().Count);
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 50ba617cdb..f522174bd0 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -241,11 +241,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Receives a streaming response. Only one pending read action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartReadMessage(AsyncCompletionDelegate<TResponse> completionDelegate)
+ public Task<TResponse> ReadMessageAsync()
{
- StartReadMessageInternal(completionDelegate);
+ return ReadMessageInternalAsync();
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 877b997aba..abacfabadb 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -68,7 +68,7 @@ namespace Grpc.Core.Internal
protected bool cancelRequested;
protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
- protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null.
+ protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
protected bool halfcloseRequested; // True if send close have been initiated.
@@ -150,15 +150,25 @@ namespace Grpc.Core.Internal
/// Initiates reading a message. Only one read operation can be active at a time.
/// completionDelegate is invoked upon completion.
/// </summary>
- protected void StartReadMessageInternal(AsyncCompletionDelegate<TRead> completionDelegate)
+ protected Task<TRead> ReadMessageInternalAsync()
{
lock (myLock)
{
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckReadingAllowed();
+ if (readingDone)
+ {
+ // the last read that returns null or throws an exception is idempotent
+ // and maintain its state.
+ GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads.");
+ return streamingReadTcs.Task;
+ }
+
+ GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time");
+ GrpcPreconditions.CheckState(!disposed);
call.StartReceiveMessage(HandleReadFinished);
- readCompletionDelegate = completionDelegate;
+ streamingReadTcs = new TaskCompletionSource<TRead>();
+ return streamingReadTcs.Task;
}
}
@@ -216,10 +226,6 @@ namespace Grpc.Core.Internal
protected virtual void CheckReadingAllowed()
{
GrpcPreconditions.CheckState(started);
- GrpcPreconditions.CheckState(!disposed);
-
- GrpcPreconditions.CheckState(!readingDone, "Stream has already been closed.");
- GrpcPreconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time");
}
protected void CheckNotCancelled()
@@ -353,12 +359,10 @@ namespace Grpc.Core.Internal
TRead msg = default(TRead);
var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null;
- AsyncCompletionDelegate<TRead> origCompletionDelegate = null;
+ TaskCompletionSource<TRead> origTcs = null;
lock (myLock)
{
- origCompletionDelegate = readCompletionDelegate;
- readCompletionDelegate = null;
-
+ origTcs = streamingReadTcs;
if (receivedMessage == null)
{
// This was the last read.
@@ -368,18 +372,25 @@ namespace Grpc.Core.Internal
if (deserializeException != null && IsClient)
{
readingDone = true;
+
+ // TODO(jtattermusch): it might be too late to set the status
CancelWithStatus(DeserializeResponseFailureStatus);
}
+ if (!readingDone)
+ {
+ streamingReadTcs = null;
+ }
+
ReleaseResourcesIfPossible();
}
if (deserializeException != null && !IsClient)
{
- FireCompletion(origCompletionDelegate, default(TRead), new IOException("Failed to deserialize request message.", deserializeException));
+ origTcs.SetException(new IOException("Failed to deserialize request message.", deserializeException));
return;
}
- FireCompletion(origCompletionDelegate, msg, null);
+ origTcs.SetResult(msg);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index bea2b3660c..cce480b2c4 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -91,11 +91,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Receives a streaming request. Only one pending read action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartReadMessage(AsyncCompletionDelegate<TRequest> completionDelegate)
+ public Task<TRequest> ReadMessageAsync()
{
- StartReadMessageInternal(completionDelegate);
+ return ReadMessageInternalAsync();
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
index d6e34a0f04..ad9423ff58 100644
--- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
@@ -68,9 +68,7 @@ namespace Grpc.Core.Internal
{
throw new InvalidOperationException("Cancellation of individual reads is not supported.");
}
- var taskSource = new AsyncCompletionTaskSource<TResponse>();
- call.StartReadMessage(taskSource.CompletionDelegate);
- var result = await taskSource.Task.ConfigureAwait(false);
+ var result = await call.ReadMessageAsync().ConfigureAwait(false);
this.current = result;
if (result == null)
diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
index e7be82c318..d76030d1ad 100644
--- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
@@ -68,9 +68,7 @@ namespace Grpc.Core.Internal
{
throw new InvalidOperationException("Cancellation of individual reads is not supported.");
}
- var taskSource = new AsyncCompletionTaskSource<TRequest>();
- call.StartReadMessage(taskSource.CompletionDelegate);
- var result = await taskSource.Task.ConfigureAwait(false);
+ var result = await call.ReadMessageAsync().ConfigureAwait(false);
this.current = result;
return result != null;
}