diff options
author | David Garcia Quintas <dgq@google.com> | 2015-05-07 17:20:24 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2015-05-07 17:20:24 -0700 |
commit | 4251aad3eb7bbc9220c8b4a8064da82416409ab3 (patch) | |
tree | cfbf2db24e0f40ab79fb535923873130b944c4b3 /src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | |
parent | d4f10c03209e731cf5d1793a23d3823ea2b05493 (diff) | |
parent | 442918f225797e28e1bfee023046af4f441a50ae (diff) |
Merge branch 'master' of github.com:grpc/grpc into standalone_benchmarks
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/AsyncCallBase.cs')
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 135 |
1 files changed, 62 insertions, 73 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 15b0cfe249..fc5bee40e2 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -44,7 +44,7 @@ namespace Grpc.Core.Internal { /// <summary> /// Base for handling both client side and server side calls. - /// Handles native call lifecycle and provides convenience methods. + /// Manages native call lifecycle and provides convenience methods. /// </summary> internal abstract class AsyncCallBase<TWrite, TRead> { @@ -65,16 +65,14 @@ namespace Grpc.Core.Internal protected bool errorOccured; protected bool cancelRequested; - protected AsyncCompletionDelegate sendCompletionDelegate; // Completion of a pending send or sendclose if not null. - protected bool readPending; // True if there is a read in progress. + protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null. + protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null. + protected bool readingDone; protected bool halfcloseRequested; protected bool halfclosed; protected bool finished; // True if close has been received from the peer. - // Streaming reads will be delivered to this observer. For a call that only does unary read it may remain null. - protected IObserver<TRead> readObserver; - public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer) { this.serializer = Preconditions.CheckNotNull(serializer); @@ -131,10 +129,10 @@ namespace Grpc.Core.Internal } /// <summary> - /// Initiates sending a message. Only once send operation can be active at a time. + /// Initiates sending a message. Only one send operation can be active at a time. /// completionDelegate is invoked upon completion. /// </summary> - protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate completionDelegate) + protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate<object> completionDelegate) { byte[] payload = UnsafeSerialize(msg); @@ -149,31 +147,29 @@ namespace Grpc.Core.Internal } /// <summary> - /// Requests receiving a next message. + /// Initiates reading a message. Only one read operation can be active at a time. + /// completionDelegate is invoked upon completion. /// </summary> - protected void StartReceiveMessage() + protected void StartReadMessageInternal(AsyncCompletionDelegate<TRead> completionDelegate) { lock (myLock) { - Preconditions.CheckState(started); - Preconditions.CheckState(!disposed); - Preconditions.CheckState(!errorOccured); - - Preconditions.CheckState(!readingDone); - Preconditions.CheckState(!readPending); + Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + CheckReadingAllowed(); call.StartReceiveMessage(readFinishedHandler); - readPending = true; + readCompletionDelegate = completionDelegate; } } + // TODO(jtattermusch): find more fitting name for this method. /// <summary> /// Default behavior just completes the read observer, but more sofisticated behavior might be required /// by subclasses. /// </summary> - protected virtual void CompleteReadObserver() + protected virtual void ProcessLastRead(AsyncCompletionDelegate<TRead> completionDelegate) { - FireReadObserverOnCompleted(); + FireCompletion(completionDelegate, default(TRead), null); } /// <summary> @@ -184,7 +180,8 @@ namespace Grpc.Core.Internal { if (!disposed && call != null) { - if (halfclosed && readingDone && finished) + bool noMoreSendCompletions = halfclosed || (cancelRequested && sendCompletionDelegate == null); + if (noMoreSendCompletions && readingDone && finished) { ReleaseResources(); return true; @@ -195,6 +192,7 @@ namespace Grpc.Core.Internal private void ReleaseResources() { + OnReleaseResources(); if (call != null) { call.Dispose(); @@ -203,16 +201,39 @@ namespace Grpc.Core.Internal disposed = true; } + protected virtual void OnReleaseResources() + { + } + protected void CheckSendingAllowed() { Preconditions.CheckState(started); - Preconditions.CheckState(!disposed); Preconditions.CheckState(!errorOccured); + CheckNotCancelled(); + Preconditions.CheckState(!disposed); Preconditions.CheckState(!halfcloseRequested, "Already halfclosed."); Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); } + protected void CheckReadingAllowed() + { + Preconditions.CheckState(started); + Preconditions.CheckState(!disposed); + Preconditions.CheckState(!errorOccured); + + Preconditions.CheckState(!readingDone, "Stream has already been closed."); + Preconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time"); + } + + protected void CheckNotCancelled() + { + if (cancelRequested) + { + throw new OperationCanceledException("Remote call has been cancelled."); + } + } + protected byte[] UnsafeSerialize(TWrite msg) { return serializer(msg); @@ -248,47 +269,11 @@ namespace Grpc.Core.Internal } } - protected void FireReadObserverOnNext(TRead value) + protected void FireCompletion<T>(AsyncCompletionDelegate<T> completionDelegate, T value, Exception error) { try { - readObserver.OnNext(value); - } - catch (Exception e) - { - Console.WriteLine("Exception occured while invoking readObserver.OnNext: " + e); - } - } - - protected void FireReadObserverOnCompleted() - { - try - { - readObserver.OnCompleted(); - } - catch (Exception e) - { - Console.WriteLine("Exception occured while invoking readObserver.OnCompleted: " + e); - } - } - - protected void FireReadObserverOnError(Exception error) - { - try - { - readObserver.OnError(error); - } - catch (Exception e) - { - Console.WriteLine("Exception occured while invoking readObserver.OnError: " + e); - } - } - - protected void FireCompletion(AsyncCompletionDelegate completionDelegate, Exception error) - { - try - { - completionDelegate(error); + completionDelegate(value, error); } catch (Exception e) { @@ -322,7 +307,7 @@ namespace Grpc.Core.Internal /// </summary> private void HandleSendFinished(bool wasError, BatchContextSafeHandleNotOwned ctx) { - AsyncCompletionDelegate origCompletionDelegate = null; + AsyncCompletionDelegate<object> origCompletionDelegate = null; lock (myLock) { origCompletionDelegate = sendCompletionDelegate; @@ -333,11 +318,11 @@ namespace Grpc.Core.Internal if (wasError) { - FireCompletion(origCompletionDelegate, new OperationFailedException("Send failed")); + FireCompletion(origCompletionDelegate, null, new OperationFailedException("Send failed")); } else { - FireCompletion(origCompletionDelegate, null); + FireCompletion(origCompletionDelegate, null, null); } } @@ -346,7 +331,7 @@ namespace Grpc.Core.Internal /// </summary> private void HandleHalfclosed(bool wasError, BatchContextSafeHandleNotOwned ctx) { - AsyncCompletionDelegate origCompletionDelegate = null; + AsyncCompletionDelegate<object> origCompletionDelegate = null; lock (myLock) { halfclosed = true; @@ -358,11 +343,11 @@ namespace Grpc.Core.Internal if (wasError) { - FireCompletion(origCompletionDelegate, new OperationFailedException("Halfclose failed")); + FireCompletion(origCompletionDelegate, null, new OperationFailedException("Halfclose failed")); } else { - FireCompletion(origCompletionDelegate, null); + FireCompletion(origCompletionDelegate, null, null); } } @@ -373,11 +358,19 @@ namespace Grpc.Core.Internal { var payload = ctx.GetReceivedMessage(); + AsyncCompletionDelegate<TRead> origCompletionDelegate = null; lock (myLock) { - readPending = false; - if (payload == null) + origCompletionDelegate = readCompletionDelegate; + if (payload != null) { + readCompletionDelegate = null; + } + else + { + // This was the last read. Keeping the readCompletionDelegate + // to be either fired by this handler or by client-side finished + // handler. readingDone = true; } @@ -392,15 +385,11 @@ namespace Grpc.Core.Internal TRead msg; TryDeserialize(payload, out msg); - FireReadObserverOnNext(msg); - - // Start a new read. The current one has already been delivered, - // so correct ordering of reads is assured. - StartReceiveMessage(); + FireCompletion(origCompletionDelegate, msg, null); } else { - CompleteReadObserver(); + ProcessLastRead(origCompletionDelegate); } } } |