aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2016-05-23 14:56:14 -0400
committerGravatar Jan Tattermusch <jtattermusch@google.com>2016-05-23 15:13:27 -0400
commit6098848a3f7dade0691cb414c31007ef176ffca7 (patch)
tree2287a590344899c1497f8d34b3e0c0d131764f25
parent239fce134426d73eb8d433f618f22aab10821826 (diff)
allow short-circuiting the send operation
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs29
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs20
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs20
3 files changed, 50 insertions, 19 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index ad690bd2ec..dec6eafd46 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -252,7 +252,7 @@ namespace Grpc.Core.Internal
lock (myLock)
{
GrpcPreconditions.CheckState(started);
- CheckSendingAllowed(allowFinished: true);
+ CheckSendPreconditionsClientSide();
if (disposed || finished)
{
@@ -437,17 +437,30 @@ namespace Grpc.Core.Internal
}
}
- protected override void CheckSendingAllowed(bool allowFinished)
+ protected override Task CheckSendAllowedOrEarlyResult()
{
- base.CheckSendingAllowed(true);
+ CheckSendPreconditionsClientSide();
- // throwing RpcException if we already received status on client
- // side makes the most sense.
- // Note that this throws even for StatusCode.OK.
- if (!allowFinished && finishedStatus.HasValue)
+ if (finishedStatus.HasValue)
{
- throw new RpcException(finishedStatus.Value.Status);
+ // throwing RpcException if we already received status on client
+ // side makes the most sense.
+ // Note that this throws even for StatusCode.OK.
+ // Writing after the call has finished is not a programming error because server can close
+ // the call anytime, so don't throw directly, but let the write task finish with an error.
+ var tcs = new TaskCompletionSource<object>();
+ tcs.SetException(new RpcException(finishedStatus.Value.Status));
+ return tcs.Task;
}
+
+ return null;
+ }
+
+ private void CheckSendPreconditionsClientSide()
+ {
+ CheckNotCancelled();
+ GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed.");
+ GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time.");
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 13f6309f6e..d60876ddf3 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -136,7 +136,11 @@ namespace Grpc.Core.Internal
lock (myLock)
{
GrpcPreconditions.CheckState(started);
- CheckSendingAllowed(allowFinished: false);
+ var earlyResult = CheckSendAllowedOrEarlyResult();
+ if (earlyResult != null)
+ {
+ return earlyResult;
+ }
call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
@@ -212,15 +216,11 @@ namespace Grpc.Core.Internal
{
}
- protected virtual void CheckSendingAllowed(bool allowFinished)
- {
- CheckNotCancelled();
- GrpcPreconditions.CheckState(!disposed || allowFinished);
-
- GrpcPreconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
- GrpcPreconditions.CheckState(!finished || allowFinished, "Already finished.");
- GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time");
- }
+ /// <summary>
+ /// Checks if sending is allowed and possibly returns a Task that allows short-circuiting the send
+ /// logic by directly returning the write operation result task. Normally, null is returned.
+ /// </summary>
+ protected abstract Task CheckSendAllowedOrEarlyResult();
protected void CheckNotCancelled()
{
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index b5dca4290f..a4f6e4d1b0 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -116,9 +116,15 @@ namespace Grpc.Core.Internal
{
GrpcPreconditions.CheckNotNull(headers, "metadata");
+ GrpcPreconditions.CheckState(started);
GrpcPreconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
GrpcPreconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts.");
- CheckSendingAllowed(allowFinished: false);
+
+ var earlyResult = CheckSendAllowedOrEarlyResult();
+ if (earlyResult != null)
+ {
+ return earlyResult;
+ }
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
@@ -192,6 +198,18 @@ namespace Grpc.Core.Internal
server.RemoveCallReference(this);
}
+ protected override Task CheckSendAllowedOrEarlyResult()
+ {
+ CheckNotCancelled();
+ GrpcPreconditions.CheckState(!disposed);
+
+ GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed.");
+ GrpcPreconditions.CheckState(!finished, "Already finished.");
+ GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time");
+
+ return null;
+ }
+
/// <summary>
/// Handles the server side close completion.
/// </summary>