aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2016-05-03 12:55:42 -0700
committerGravatar Jan Tattermusch <jtattermusch@google.com>2016-05-03 12:58:05 -0700
commitb32e29f0a2f2cbe858e040190f2fd69237007f9a (patch)
tree74f64827cade60544d3cac9948ef6f633632b255 /src/csharp/Grpc.Core/Internal
parentce60d8e7a4f9a23495d0d2edb22dadbb78ca8089 (diff)
make SendStatusFromServer independent on WriteAsync
Diffstat (limited to 'src/csharp/Grpc.Core/Internal')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs9
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs19
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs12
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerResponseStream.cs7
4 files changed, 18 insertions, 29 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index abacfabadb..18dbe87734 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -69,6 +69,7 @@ namespace Grpc.Core.Internal
protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
+ protected TaskCompletionSource<object> sendStatusFromServerTcs;
protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
protected bool halfcloseRequested; // True if send close have been initiated.
@@ -328,22 +329,18 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleSendStatusFromServerFinished(bool success)
{
- AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
{
- origCompletionDelegate = sendCompletionDelegate;
- sendCompletionDelegate = null;
-
ReleaseResourcesIfPossible();
}
if (!success)
{
- FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Error sending status from server."));
+ sendStatusFromServerTcs.SetException(new InvalidOperationException("Error sending status from server."));
}
else
{
- FireCompletion(origCompletionDelegate, null, null);
+ sendStatusFromServerTcs.SetResult(null);
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index efcf4ea7fe..94f49bd8f2 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -136,24 +136,24 @@ namespace Grpc.Core.Internal
}
/// <summary>
- /// Sends call result status, also indicating server is done with streaming responses.
- /// Only one pending send action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
+ /// Sends call result status, indicating we are done with writes.
+ /// Sending a status different from StatusCode.OK will also implicitly cancel the call.
/// </summary>
- public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendStatusFromServerAsync(Status status, Metadata trailers)
{
lock (myLock)
{
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
- CheckSendingAllowed(allowFinished: false);
+ GrpcPreconditions.CheckState(started);
+ GrpcPreconditions.CheckState(!disposed);
+ GrpcPreconditions.CheckState(!halfcloseRequested, "Can only send status from server once.");
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent);
}
halfcloseRequested = true;
- readingDone = true;
- sendCompletionDelegate = completionDelegate;
+ sendStatusFromServerTcs = new TaskCompletionSource<object>();
+ return sendStatusFromServerTcs.Task;
}
}
@@ -198,12 +198,13 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleFinishedServerside(bool success, bool cancelled)
{
+ // NOTE: because this event is a result of batch containing GRPC_OP_RECV_CLOSE_ON_SERVER,
+ // success will be always set to true.
lock (myLock)
{
finished = true;
ReleaseResourcesIfPossible();
}
- // TODO(jtattermusch): handle error
if (cancelled)
{
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 1f83e51548..bf9df9f783 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -93,7 +93,7 @@ namespace Grpc.Core.Internal
}
try
{
- await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -149,7 +149,7 @@ namespace Grpc.Core.Internal
try
{
- await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -209,7 +209,7 @@ namespace Grpc.Core.Internal
try
{
- await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -260,7 +260,7 @@ namespace Grpc.Core.Internal
}
try
{
- await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -282,9 +282,7 @@ namespace Grpc.Core.Internal
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
- var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall);
-
- await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false);
await finishedTask.ConfigureAwait(false);
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
index 03e39efc02..ecfee0bfdd 100644
--- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
@@ -57,13 +57,6 @@ namespace Grpc.Core.Internal
return taskSource.Task;
}
- public Task WriteStatusAsync(Status status, Metadata trailers)
- {
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate);
- return taskSource.Task;
- }
-
public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{
var taskSource = new AsyncCompletionTaskSource<object>();