diff options
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/AsyncCallBase.cs')
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 81 |
1 files changed, 29 insertions, 52 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 4de23706b2..cb8366c216 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -58,7 +58,6 @@ namespace Grpc.Core.Internal readonly Func<TWrite, byte[]> serializer; readonly Func<byte[], TRead> deserializer; - protected readonly GrpcEnvironment environment; protected readonly object myLock = new object(); protected INativeCall call; @@ -67,8 +66,8 @@ namespace Grpc.Core.Internal protected bool started; protected bool cancelRequested; - protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null. protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null. + protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null. protected TaskCompletionSource<object> sendStatusFromServerTcs; protected bool readingDone; // True if last read (i.e. read with null payload) was already received. @@ -78,11 +77,10 @@ namespace Grpc.Core.Internal protected bool initialMetadataSent; protected long streamingWritesCounter; // Number of streaming send operations started so far. - public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment) + public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer) { this.serializer = GrpcPreconditions.CheckNotNull(serializer); this.deserializer = GrpcPreconditions.CheckNotNull(deserializer); - this.environment = GrpcPreconditions.CheckNotNull(environment); } /// <summary> @@ -128,28 +126,31 @@ namespace Grpc.Core.Internal /// <summary> /// Initiates sending a message. Only one send operation can be active at a time. - /// completionDelegate is invoked upon completion. /// </summary> - protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate) + protected Task SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags) { byte[] payload = UnsafeSerialize(msg); lock (myLock) { - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); - CheckSendingAllowed(allowFinished: false); + GrpcPreconditions.CheckState(started); + var earlyResult = CheckSendAllowedOrEarlyResult(); + if (earlyResult != null) + { + return earlyResult; + } call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent); - sendCompletionDelegate = completionDelegate; initialMetadataSent = true; streamingWritesCounter++; + streamingWriteTcs = new TaskCompletionSource<object>(); + return streamingWriteTcs.Task; } } /// <summary> /// Initiates reading a message. Only one read operation can be active at a time. - /// completionDelegate is invoked upon completion. /// </summary> protected Task<TRead> ReadMessageInternalAsync() { @@ -159,7 +160,7 @@ namespace Grpc.Core.Internal if (readingDone) { // the last read that returns null or throws an exception is idempotent - // and maintain its state. + // and maintains its state. GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads."); return streamingReadTcs.Task; } @@ -183,7 +184,7 @@ namespace Grpc.Core.Internal { if (!disposed && call != null) { - bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished); + bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished); if (noMoreSendCompletions && readingDone && finished) { ReleaseResources(); @@ -213,24 +214,11 @@ namespace Grpc.Core.Internal { } - protected virtual void CheckSendingAllowed(bool allowFinished) - { - GrpcPreconditions.CheckState(started); - CheckNotCancelled(); - GrpcPreconditions.CheckState(!disposed || allowFinished); - - GrpcPreconditions.CheckState(!halfcloseRequested, "Already halfclosed."); - GrpcPreconditions.CheckState(!finished || allowFinished, "Already finished."); - GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); - } - - protected void CheckNotCancelled() - { - if (cancelRequested) - { - throw new OperationCanceledException("Remote call has been cancelled."); - } - } + /// <summary> + /// Checks if sending is allowed and possibly returns a Task that allows short-circuiting the send + /// logic by directly returning the write operation result task. Normally, null is returned. + /// </summary> + protected abstract Task CheckSendAllowedOrEarlyResult(); protected byte[] UnsafeSerialize(TWrite msg) { @@ -259,39 +247,27 @@ namespace Grpc.Core.Internal } } - protected void FireCompletion<T>(AsyncCompletionDelegate<T> completionDelegate, T value, Exception error) - { - try - { - completionDelegate(value, error); - } - catch (Exception e) - { - Logger.Error(e, "Exception occured while invoking completion delegate."); - } - } - /// <summary> /// Handles send completion. /// </summary> protected void HandleSendFinished(bool success) { - AsyncCompletionDelegate<object> origCompletionDelegate = null; + TaskCompletionSource<object> origTcs = null; lock (myLock) { - origCompletionDelegate = sendCompletionDelegate; - sendCompletionDelegate = null; + origTcs = streamingWriteTcs; + streamingWriteTcs = null; ReleaseResourcesIfPossible(); } if (!success) { - FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Send failed")); + origTcs.SetException(new InvalidOperationException("Send failed")); } else { - FireCompletion(origCompletionDelegate, null, null); + origTcs.SetResult(null); } } @@ -300,22 +276,23 @@ namespace Grpc.Core.Internal /// </summary> protected void HandleSendCloseFromClientFinished(bool success) { - AsyncCompletionDelegate<object> origCompletionDelegate = null; + TaskCompletionSource<object> origTcs = null; lock (myLock) { - origCompletionDelegate = sendCompletionDelegate; - sendCompletionDelegate = null; + origTcs = streamingWriteTcs; + streamingWriteTcs = null; ReleaseResourcesIfPossible(); } if (!success) { - FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Sending close from client has failed.")); + // TODO(jtattermusch): this method is same as HandleSendFinished (only the error message differs). + origTcs.SetException(new InvalidOperationException("Sending close from client has failed.")); } else { - FireCompletion(origCompletionDelegate, null, null); + origTcs.SetResult(null); } } |