From 6098848a3f7dade0691cb414c31007ef176ffca7 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 23 May 2016 14:56:14 -0400 Subject: allow short-circuiting the send operation --- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 29 +++++++++++++++++------- src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 20 ++++++++-------- src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 20 +++++++++++++++- 3 files changed, 50 insertions(+), 19 deletions(-) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index ad690bd2ec..dec6eafd46 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -252,7 +252,7 @@ namespace Grpc.Core.Internal lock (myLock) { GrpcPreconditions.CheckState(started); - CheckSendingAllowed(allowFinished: true); + CheckSendPreconditionsClientSide(); if (disposed || finished) { @@ -437,17 +437,30 @@ namespace Grpc.Core.Internal } } - protected override void CheckSendingAllowed(bool allowFinished) + protected override Task CheckSendAllowedOrEarlyResult() { - base.CheckSendingAllowed(true); + CheckSendPreconditionsClientSide(); - // throwing RpcException if we already received status on client - // side makes the most sense. - // Note that this throws even for StatusCode.OK. - if (!allowFinished && finishedStatus.HasValue) + if (finishedStatus.HasValue) { - throw new RpcException(finishedStatus.Value.Status); + // throwing RpcException if we already received status on client + // side makes the most sense. + // Note that this throws even for StatusCode.OK. + // Writing after the call has finished is not a programming error because server can close + // the call anytime, so don't throw directly, but let the write task finish with an error. + var tcs = new TaskCompletionSource(); + tcs.SetException(new RpcException(finishedStatus.Value.Status)); + return tcs.Task; } + + return null; + } + + private void CheckSendPreconditionsClientSide() + { + CheckNotCancelled(); + GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed."); + GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time."); } /// diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 13f6309f6e..d60876ddf3 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -136,7 +136,11 @@ namespace Grpc.Core.Internal lock (myLock) { GrpcPreconditions.CheckState(started); - CheckSendingAllowed(allowFinished: false); + var earlyResult = CheckSendAllowedOrEarlyResult(); + if (earlyResult != null) + { + return earlyResult; + } call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent); @@ -212,15 +216,11 @@ namespace Grpc.Core.Internal { } - protected virtual void CheckSendingAllowed(bool allowFinished) - { - CheckNotCancelled(); - GrpcPreconditions.CheckState(!disposed || allowFinished); - - GrpcPreconditions.CheckState(!halfcloseRequested, "Already halfclosed."); - GrpcPreconditions.CheckState(!finished || allowFinished, "Already finished."); - GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time"); - } + /// + /// Checks if sending is allowed and possibly returns a Task that allows short-circuiting the send + /// logic by directly returning the write operation result task. Normally, null is returned. + /// + protected abstract Task CheckSendAllowedOrEarlyResult(); protected void CheckNotCancelled() { diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index b5dca4290f..a4f6e4d1b0 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -116,9 +116,15 @@ namespace Grpc.Core.Internal { GrpcPreconditions.CheckNotNull(headers, "metadata"); + GrpcPreconditions.CheckState(started); GrpcPreconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call."); GrpcPreconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts."); - CheckSendingAllowed(allowFinished: false); + + var earlyResult = CheckSendAllowedOrEarlyResult(); + if (earlyResult != null) + { + return earlyResult; + } using (var metadataArray = MetadataArraySafeHandle.Create(headers)) { @@ -192,6 +198,18 @@ namespace Grpc.Core.Internal server.RemoveCallReference(this); } + protected override Task CheckSendAllowedOrEarlyResult() + { + CheckNotCancelled(); + GrpcPreconditions.CheckState(!disposed); + + GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed."); + GrpcPreconditions.CheckState(!finished, "Already finished."); + GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time"); + + return null; + } + /// /// Handles the server side close completion. /// -- cgit v1.2.3