aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2016-05-26 18:30:28 -0400
committerGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2016-05-26 18:30:28 -0400
commit55ebba12055f810b019d8f9c1561e9c0da6b81a8 (patch)
treedd42c1c2605e4f8771536d1516a59e06051ca44f
parent22ca12b83fe63b49c787f2a56999a782d39177e2 (diff)
parentf581659215916398b9a304fe44787d145104030d (diff)
Merge pull request #6699 from jtattermusch/csharp_streaming_api_improvements
C# streaming api improvements
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs16
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs39
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.csproj1
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs90
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs77
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs30
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCompletion.cs94
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientRequestStream.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerResponseStream.cs8
9 files changed, 149 insertions, 214 deletions
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
index 0e204761f6..4e5a23b3c2 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
@@ -136,7 +136,6 @@ namespace Grpc.Core.Internal.Tests
public void WriteAfterCancelNotificationFails()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
- var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
@@ -181,6 +180,21 @@ namespace Grpc.Core.Internal.Tests
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
+ [Test]
+ public void WriteAfterWriteStatusThrowsInvalidOperationException()
+ {
+ var finishedTask = asyncCallServer.ServerSideCallAsync();
+ var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
+
+ asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null);
+ Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await responseStream.WriteAsync("request1"));
+
+ fakeCall.SendStatusFromServerHandler(true);
+ fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+
+ AssertFinished(asyncCallServer, fakeCall, finishedTask);
+ }
+
static void AssertFinished(AsyncCallServer<string, string> asyncCallServer, FakeNativeCall fakeCall, Task finishedTask)
{
Assert.IsTrue(fakeCall.IsDisposed);
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
index 777a1c8c50..81897f8c77 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
@@ -33,7 +33,6 @@
using System;
using System.Collections.Generic;
-using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
@@ -82,7 +81,7 @@ namespace Grpc.Core.Internal.Tests
Assert.ThrowsAsync(typeof(InvalidOperationException),
async () => await asyncCall.ReadMessageAsync());
Assert.Throws(typeof(InvalidOperationException),
- () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
+ () => asyncCall.SendMessageAsync("abc", new WriteFlags()));
}
[Test]
@@ -103,7 +102,7 @@ namespace Grpc.Core.Internal.Tests
var resultTask = asyncCall.UnaryCallAsync("request1");
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.InvalidArgument),
- CreateResponsePayload(),
+ null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
@@ -148,7 +147,7 @@ namespace Grpc.Core.Internal.Tests
var resultTask = asyncCall.ClientStreamingCallAsync();
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.InvalidArgument),
- CreateResponsePayload(),
+ null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
@@ -193,7 +192,7 @@ namespace Grpc.Core.Internal.Tests
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.Internal),
- CreateResponsePayload(),
+ null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
@@ -211,7 +210,9 @@ namespace Grpc.Core.Internal.Tests
new Metadata());
AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
- var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
+
+ var writeTask = requestStream.WriteAsync("request1");
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(Status.DefaultSuccess, ex.Status);
}
@@ -223,11 +224,13 @@ namespace Grpc.Core.Internal.Tests
fakeCall.UnaryResponseClientHandler(true,
new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()),
- CreateResponsePayload(),
+ null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange);
- var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
+
+ var writeTask = requestStream.WriteAsync("request1");
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(StatusCode.OutOfRange, ex.Status.StatusCode);
}
@@ -267,7 +270,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
- public void ClientStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
+ public void ClientStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -275,11 +278,12 @@ namespace Grpc.Core.Internal.Tests
asyncCall.Cancel();
Assert.IsTrue(fakeCall.IsCancelled);
- Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1"));
+ var writeTask = requestStream.WriteAsync("request1");
+ Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.Cancelled),
- CreateResponsePayload(),
+ null,
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled);
@@ -290,7 +294,7 @@ namespace Grpc.Core.Internal.Tests
{
asyncCall.StartServerStreamingCall("request1");
Assert.Throws(typeof(InvalidOperationException),
- () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
+ () => asyncCall.SendMessageAsync("abc", new WriteFlags()));
}
[Test]
@@ -390,12 +394,13 @@ namespace Grpc.Core.Internal.Tests
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
- var ex = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("request1"));
+ var writeTask = requestStream.WriteAsync("request1");
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
Assert.AreEqual(Status.DefaultSuccess, ex.Status);
}
[Test]
- public void DuplexStreaming_CompleteAfterReceivingStatusFails()
+ public void DuplexStreaming_CompleteAfterReceivingStatusSuceeds()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -411,7 +416,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
- public void DuplexStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
+ public void DuplexStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -419,7 +424,9 @@ namespace Grpc.Core.Internal.Tests
asyncCall.Cancel();
Assert.IsTrue(fakeCall.IsCancelled);
- Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1"));
+
+ var writeTask = requestStream.WriteAsync("request1");
+ Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 4bf30e83c1..a8b7b5f00d 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -86,7 +86,6 @@
<Compile Include="Utils\BenchmarkUtil.cs" />
<Compile Include="ChannelCredentials.cs" />
<Compile Include="Internal\ChannelArgsSafeHandle.cs" />
- <Compile Include="Internal\AsyncCompletion.cs" />
<Compile Include="Internal\AsyncCallBase.cs" />
<Compile Include="Internal\AsyncCallServer.cs" />
<Compile Include="Internal\AsyncCall.cs" />
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 55351869b5..8669f0f702 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -32,12 +32,7 @@
#endregion
using System;
-using System.Diagnostics;
-using System.Runtime.CompilerServices;
-using System.Runtime.InteropServices;
-using System.Threading;
using System.Threading.Tasks;
-using Grpc.Core.Internal;
using Grpc.Core.Logging;
using Grpc.Core.Profiling;
using Grpc.Core.Utils;
@@ -57,9 +52,11 @@ namespace Grpc.Core.Internal
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
+ // TODO(jtattermusch): this field doesn't need to be initialized for unary response calls.
// Indicates that response streaming call has finished.
TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
+ // TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers).
// Response headers set here once received.
TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>();
@@ -232,11 +229,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends a streaming request. Only one pending send action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendMessageAsync(TRequest msg, WriteFlags writeFlags)
{
- StartSendMessageInternal(msg, writeFlags, completionDelegate);
+ return SendMessageInternalAsync(msg, writeFlags);
}
/// <summary>
@@ -250,29 +246,32 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends halfclose, indicating client is done with streaming requests.
/// Only one pending send action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendCloseFromClient(AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendCloseFromClientAsync()
{
lock (myLock)
{
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
- CheckSendingAllowed(allowFinished: true);
+ GrpcPreconditions.CheckState(started);
- if (!disposed && !finished)
+ var earlyResult = CheckSendPreconditionsClientSide();
+ if (earlyResult != null)
{
- call.StartSendCloseFromClient(HandleSendCloseFromClientFinished);
+ return earlyResult;
}
- else
+
+ if (disposed || finished)
{
// In case the call has already been finished by the serverside,
- // the halfclose has already been done implicitly, so we only
- // emit the notification for the completion delegate.
- Task.Run(() => HandleSendCloseFromClientFinished(true));
+ // the halfclose has already been done implicitly, so just return
+ // completed task here.
+ halfcloseRequested = true;
+ return Task.FromResult<object>(null);
}
+ call.StartSendCloseFromClient(HandleSendCloseFromClientFinished);
halfcloseRequested = true;
- sendCompletionDelegate = completionDelegate;
+ streamingWriteTcs = new TaskCompletionSource<object>();
+ return streamingWriteTcs.Task;
}
}
@@ -342,6 +341,45 @@ namespace Grpc.Core.Internal
get { return true; }
}
+ protected override Task CheckSendAllowedOrEarlyResult()
+ {
+ var earlyResult = CheckSendPreconditionsClientSide();
+ if (earlyResult != null)
+ {
+ return earlyResult;
+ }
+
+ if (finishedStatus.HasValue)
+ {
+ // throwing RpcException if we already received status on client
+ // side makes the most sense.
+ // Note that this throws even for StatusCode.OK.
+ // Writing after the call has finished is not a programming error because server can close
+ // the call anytime, so don't throw directly, but let the write task finish with an error.
+ var tcs = new TaskCompletionSource<object>();
+ tcs.SetException(new RpcException(finishedStatus.Value.Status));
+ return tcs.Task;
+ }
+
+ return null;
+ }
+
+ private Task CheckSendPreconditionsClientSide()
+ {
+ GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed.");
+ GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time.");
+
+ if (cancelRequested)
+ {
+ // Return a cancelled task.
+ var tcs = new TaskCompletionSource<object>();
+ tcs.SetCanceled();
+ return tcs.Task;
+ }
+
+ return null;
+ }
+
private void Initialize(CompletionQueueSafeHandle cq)
{
using (Profilers.ForCurrentThread().NewScope("AsyncCall.Initialize"))
@@ -400,6 +438,7 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)
{
+ // TODO(jtattermusch): handle success==false
responseHeadersTcs.SetResult(responseHeaders);
}
@@ -443,19 +482,6 @@ namespace Grpc.Core.Internal
}
}
- protected override void CheckSendingAllowed(bool allowFinished)
- {
- base.CheckSendingAllowed(true);
-
- // throwing RpcException if we already received status on client
- // side makes the most sense.
- // Note that this throws even for StatusCode.OK.
- if (!allowFinished && finishedStatus.HasValue)
- {
- throw new RpcException(finishedStatus.Value.Status);
- }
- }
-
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 4de23706b2..5f561daedd 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -67,8 +67,8 @@ namespace Grpc.Core.Internal
protected bool started;
protected bool cancelRequested;
- protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
+ protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null.
protected TaskCompletionSource<object> sendStatusFromServerTcs;
protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
@@ -128,28 +128,31 @@ namespace Grpc.Core.Internal
/// <summary>
/// Initiates sending a message. Only one send operation can be active at a time.
- /// completionDelegate is invoked upon completion.
/// </summary>
- protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
+ protected Task SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags)
{
byte[] payload = UnsafeSerialize(msg);
lock (myLock)
{
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
- CheckSendingAllowed(allowFinished: false);
+ GrpcPreconditions.CheckState(started);
+ var earlyResult = CheckSendAllowedOrEarlyResult();
+ if (earlyResult != null)
+ {
+ return earlyResult;
+ }
call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
- sendCompletionDelegate = completionDelegate;
initialMetadataSent = true;
streamingWritesCounter++;
+ streamingWriteTcs = new TaskCompletionSource<object>();
+ return streamingWriteTcs.Task;
}
}
/// <summary>
/// Initiates reading a message. Only one read operation can be active at a time.
- /// completionDelegate is invoked upon completion.
/// </summary>
protected Task<TRead> ReadMessageInternalAsync()
{
@@ -159,7 +162,7 @@ namespace Grpc.Core.Internal
if (readingDone)
{
// the last read that returns null or throws an exception is idempotent
- // and maintain its state.
+ // and maintains its state.
GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads.");
return streamingReadTcs.Task;
}
@@ -183,7 +186,7 @@ namespace Grpc.Core.Internal
{
if (!disposed && call != null)
{
- bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished);
+ bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished);
if (noMoreSendCompletions && readingDone && finished)
{
ReleaseResources();
@@ -213,24 +216,11 @@ namespace Grpc.Core.Internal
{
}
- protected virtual void CheckSendingAllowed(bool allowFinished)
- {
- GrpcPreconditions.CheckState(started);
- CheckNotCancelled();
- GrpcPreconditions.CheckState(!disposed || allowFinished);
-
- GrpcPreconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
- GrpcPreconditions.CheckState(!finished || allowFinished, "Already finished.");
- GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
- }
-
- protected void CheckNotCancelled()
- {
- if (cancelRequested)
- {
- throw new OperationCanceledException("Remote call has been cancelled.");
- }
- }
+ /// <summary>
+ /// Checks if sending is allowed and possibly returns a Task that allows short-circuiting the send
+ /// logic by directly returning the write operation result task. Normally, null is returned.
+ /// </summary>
+ protected abstract Task CheckSendAllowedOrEarlyResult();
protected byte[] UnsafeSerialize(TWrite msg)
{
@@ -259,39 +249,27 @@ namespace Grpc.Core.Internal
}
}
- protected void FireCompletion<T>(AsyncCompletionDelegate<T> completionDelegate, T value, Exception error)
- {
- try
- {
- completionDelegate(value, error);
- }
- catch (Exception e)
- {
- Logger.Error(e, "Exception occured while invoking completion delegate.");
- }
- }
-
/// <summary>
/// Handles send completion.
/// </summary>
protected void HandleSendFinished(bool success)
{
- AsyncCompletionDelegate<object> origCompletionDelegate = null;
+ TaskCompletionSource<object> origTcs = null;
lock (myLock)
{
- origCompletionDelegate = sendCompletionDelegate;
- sendCompletionDelegate = null;
+ origTcs = streamingWriteTcs;
+ streamingWriteTcs = null;
ReleaseResourcesIfPossible();
}
if (!success)
{
- FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Send failed"));
+ origTcs.SetException(new InvalidOperationException("Send failed"));
}
else
{
- FireCompletion(origCompletionDelegate, null, null);
+ origTcs.SetResult(null);
}
}
@@ -300,22 +278,23 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleSendCloseFromClientFinished(bool success)
{
- AsyncCompletionDelegate<object> origCompletionDelegate = null;
+ TaskCompletionSource<object> origTcs = null;
lock (myLock)
{
- origCompletionDelegate = sendCompletionDelegate;
- sendCompletionDelegate = null;
+ origTcs = streamingWriteTcs;
+ streamingWriteTcs = null;
ReleaseResourcesIfPossible();
}
if (!success)
{
- FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Sending close from client has failed."));
+ // TODO(jtattermusch): this method is same as HandleSendFinished (only the error message differs).
+ origTcs.SetException(new InvalidOperationException("Sending close from client has failed."));
}
else
{
- FireCompletion(origCompletionDelegate, null, null);
+ origTcs.SetResult(null);
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index b1566b44a7..d1bb80762e 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -91,11 +91,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends a streaming response. Only one pending send action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendMessageAsync(TResponse msg, WriteFlags writeFlags)
{
- StartSendMessageInternal(msg, writeFlags, completionDelegate);
+ return SendMessageInternalAsync(msg, writeFlags);
}
/// <summary>
@@ -110,20 +109,22 @@ namespace Grpc.Core.Internal
/// Initiates sending a initial metadata.
/// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation
/// to make things simpler.
- /// completionDelegate is invoked upon completion.
/// </summary>
- public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendInitialMetadataAsync(Metadata headers)
{
lock (myLock)
{
GrpcPreconditions.CheckNotNull(headers, "metadata");
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+ GrpcPreconditions.CheckState(started);
GrpcPreconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
GrpcPreconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts.");
- CheckSendingAllowed(allowFinished: false);
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+ var earlyResult = CheckSendAllowedOrEarlyResult();
+ if (earlyResult != null)
+ {
+ return earlyResult;
+ }
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
@@ -131,7 +132,8 @@ namespace Grpc.Core.Internal
}
this.initialMetadataSent = true;
- sendCompletionDelegate = completionDelegate;
+ streamingWriteTcs = new TaskCompletionSource<object>();
+ return streamingWriteTcs.Task;
}
}
@@ -196,6 +198,16 @@ namespace Grpc.Core.Internal
server.RemoveCallReference(this);
}
+ protected override Task CheckSendAllowedOrEarlyResult()
+ {
+ GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed.");
+ GrpcPreconditions.CheckState(!finished, "Already finished.");
+ GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time");
+ GrpcPreconditions.CheckState(!disposed);
+
+ return null;
+ }
+
/// <summary>
/// Handles the server side close completion.
/// </summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs b/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs
deleted file mode 100644
index 7e86fddb4d..0000000000
--- a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs
+++ /dev/null
@@ -1,94 +0,0 @@
-#region Copyright notice and license
-
-// Copyright 2015, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-#endregion
-
-using System;
-using System.Diagnostics;
-using System.Runtime.CompilerServices;
-using System.Runtime.InteropServices;
-using System.Threading;
-using System.Threading.Tasks;
-using Grpc.Core.Internal;
-using Grpc.Core.Utils;
-
-namespace Grpc.Core.Internal
-{
- /// <summary>
- /// If error != null, there's been an error or operation has been cancelled.
- /// </summary>
- internal delegate void AsyncCompletionDelegate<T>(T result, Exception error);
-
- /// <summary>
- /// Helper for transforming AsyncCompletionDelegate into full-fledged Task.
- /// </summary>
- internal class AsyncCompletionTaskSource<T>
- {
- readonly TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
- readonly AsyncCompletionDelegate<T> completionDelegate;
-
- public AsyncCompletionTaskSource()
- {
- completionDelegate = new AsyncCompletionDelegate<T>(HandleCompletion);
- }
-
- public Task<T> Task
- {
- get
- {
- return tcs.Task;
- }
- }
-
- public AsyncCompletionDelegate<T> CompletionDelegate
- {
- get
- {
- return completionDelegate;
- }
- }
-
- private void HandleCompletion(T value, Exception error)
- {
- if (error == null)
- {
- tcs.SetResult(value);
- return;
- }
- if (error is OperationCanceledException)
- {
- tcs.SetCanceled();
- return;
- }
- tcs.SetException(error);
- }
- }
-}
diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
index 013f00ff6f..924de028f5 100644
--- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
@@ -50,16 +50,12 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TRequest message)
{
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
- return taskSource.Task;
+ return call.SendMessageAsync(message, GetWriteFlags());
}
public Task CompleteAsync()
{
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendCloseFromClient(taskSource.CompletionDelegate);
- return taskSource.Task;
+ return call.SendCloseFromClientAsync();
}
public WriteOptions WriteOptions
diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
index ecfee0bfdd..25b79b4398 100644
--- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
@@ -52,16 +52,12 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TResponse message)
{
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
- return taskSource.Task;
+ return call.SendMessageAsync(message, GetWriteFlags());
}
public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate);
- return taskSource.Task;
+ return call.SendInitialMetadataAsync(responseHeaders);
}
public WriteOptions WriteOptions