aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.Core/Internal')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs22
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs32
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs6
3 files changed, 56 insertions, 4 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index f549c52876..db40b553ce 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -341,6 +341,11 @@ namespace Grpc.Core.Internal
get { return true; }
}
+ protected override Exception GetRpcExceptionClientOnly()
+ {
+ return new RpcException(finishedStatus.Value.Status);
+ }
+
protected override Task CheckSendAllowedOrEarlyResult()
{
var earlyResult = CheckSendPreconditionsClientSide();
@@ -452,6 +457,7 @@ namespace Grpc.Core.Internal
using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse"))
{
+ TaskCompletionSource<object> delayedTcs;
TResponse msg = default(TResponse);
var deserializeException = TryDeserialize(receivedMessage, out msg);
@@ -464,14 +470,19 @@ namespace Grpc.Core.Internal
receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers);
}
finishedStatus = receivedStatus;
+ delayedTcs = delayedStreamingWriteTcs;
ReleaseResourcesIfPossible();
}
responseHeadersTcs.SetResult(responseHeaders);
- var status = receivedStatus.Status;
+ if (delayedTcs != null)
+ {
+ delayedTcs.SetException(GetRpcExceptionClientOnly());
+ }
+ var status = receivedStatus.Status;
if (status.StatusCode != StatusCode.OK)
{
unaryResponseTcs.SetException(new RpcException(status));
@@ -490,16 +501,23 @@ namespace Grpc.Core.Internal
// NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
// success will be always set to true.
+ TaskCompletionSource<object> delayedTcs;
+
lock (myLock)
{
finished = true;
finishedStatus = receivedStatus;
+ delayedTcs = delayedStreamingWriteTcs;
ReleaseResourcesIfPossible();
}
- var status = receivedStatus.Status;
+ if (delayedTcs != null)
+ {
+ delayedTcs.SetException(GetRpcExceptionClientOnly());
+ }
+ var status = receivedStatus.Status;
if (status.StatusCode != StatusCode.OK)
{
streamingCallFinishedTcs.SetException(new RpcException(status));
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index eb9c3ea62d..b27baba942 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -68,6 +68,7 @@ namespace Grpc.Core.Internal
protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null.
+ protected TaskCompletionSource<object> delayedStreamingWriteTcs;
protected TaskCompletionSource<object> sendStatusFromServerTcs;
protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
@@ -200,6 +201,12 @@ namespace Grpc.Core.Internal
get;
}
+ /// <summary>
+ /// Returns an exception to throw for a failed send operation.
+ /// It is only allowed to call this method for a call that has already finished.
+ /// </summary>
+ protected abstract Exception GetRpcExceptionClientOnly();
+
private void ReleaseResources()
{
if (call != null)
@@ -252,18 +259,39 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleSendFinished(bool success)
{
+ bool delayCompletion = false;
TaskCompletionSource<object> origTcs = null;
lock (myLock)
{
origTcs = streamingWriteTcs;
streamingWriteTcs = null;
+ if (!success && !finished && IsClient)
+ {
+ // We should be setting this only once per call, following writes will be short circuited.
+ GrpcPreconditions.CheckState (delayedStreamingWriteTcs == null);
+ delayedStreamingWriteTcs = origTcs;
+ delayCompletion = true;
+ }
+
ReleaseResourcesIfPossible();
}
if (!success)
{
- origTcs.SetException(new InvalidOperationException("Send failed"));
+ if (!delayCompletion)
+ {
+ if (IsClient)
+ {
+ GrpcPreconditions.CheckState(finished); // implied by !success && !delayCompletion && IsClient
+ origTcs.SetException(GetRpcExceptionClientOnly());
+ }
+ else
+ {
+ origTcs.SetException (new IOException("Error sending from server."));
+ }
+ }
+ // if delayCompletion == true, postpone SetException until call finishes.
}
else
{
@@ -283,7 +311,7 @@ namespace Grpc.Core.Internal
if (!success)
{
- sendStatusFromServerTcs.SetException(new InvalidOperationException("Error sending status from server."));
+ sendStatusFromServerTcs.SetException(new IOException("Error sending status from server."));
}
else
{
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 56c23ba3ef..50fdfa9006 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -33,6 +33,7 @@
using System;
using System.Diagnostics;
+using System.IO;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
@@ -193,6 +194,11 @@ namespace Grpc.Core.Internal
get { return false; }
}
+ protected override Exception GetRpcExceptionClientOnly()
+ {
+ throw new InvalidOperationException("Call be only called for client calls");
+ }
+
protected override void OnAfterReleaseResources()
{
server.RemoveCallReference(this);