#region Copyright notice and license // Copyright 2015, Google Inc. // All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above // copyright notice, this list of conditions and the following disclaimer // in the documentation and/or other materials provided with the // distribution. // * Neither the name of Google Inc. nor the names of its // contributors may be used to endorse or promote products derived from // this software without specific prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #endregion using System; using System.Diagnostics; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using Grpc.Core.Internal; using Grpc.Core.Logging; using Grpc.Core.Utils; namespace Grpc.Core.Internal { /// /// Base for handling both client side and server side calls. /// Manages native call lifecycle and provides convenience methods. /// internal abstract class AsyncCallBase { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType>(); readonly Func serializer; readonly Func deserializer; protected readonly object myLock = new object(); protected CallSafeHandle call; protected bool disposed; protected bool started; protected bool errorOccured; protected bool cancelRequested; protected AsyncCompletionDelegate sendCompletionDelegate; // Completion of a pending send or sendclose if not null. protected AsyncCompletionDelegate readCompletionDelegate; // Completion of a pending send or sendclose if not null. protected bool readingDone; protected bool halfcloseRequested; protected bool halfclosed; protected bool finished; // True if close has been received from the peer. protected bool initialMetadataSent; protected long streamingWritesCounter; public AsyncCallBase(Func serializer, Func deserializer) { this.serializer = Preconditions.CheckNotNull(serializer); this.deserializer = Preconditions.CheckNotNull(deserializer); } /// /// Requests cancelling the call. /// public void Cancel() { lock (myLock) { Preconditions.CheckState(started); cancelRequested = true; if (!disposed) { call.Cancel(); } } } /// /// Requests cancelling the call with given status. /// public void CancelWithStatus(Status status) { lock (myLock) { Preconditions.CheckState(started); cancelRequested = true; if (!disposed) { call.CancelWithStatus(status); } } } protected void InitializeInternal(CallSafeHandle call) { lock (myLock) { this.call = call; } } /// /// Initiates sending a message. Only one send operation can be active at a time. /// completionDelegate is invoked upon completion. /// protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate completionDelegate) { byte[] payload = UnsafeSerialize(msg); lock (myLock) { Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); CheckSendingAllowed(); call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent); sendCompletionDelegate = completionDelegate; initialMetadataSent = true; streamingWritesCounter++; } } /// /// Initiates reading a message. Only one read operation can be active at a time. /// completionDelegate is invoked upon completion. /// protected void StartReadMessageInternal(AsyncCompletionDelegate completionDelegate) { lock (myLock) { Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); CheckReadingAllowed(); call.StartReceiveMessage(HandleReadFinished); readCompletionDelegate = completionDelegate; } } // TODO(jtattermusch): find more fitting name for this method. /// /// Default behavior just completes the read observer, but more sofisticated behavior might be required /// by subclasses. /// protected virtual void ProcessLastRead(AsyncCompletionDelegate completionDelegate) { FireCompletion(completionDelegate, default(TRead), null); } /// /// If there are no more pending actions and no new actions can be started, releases /// the underlying native resources. /// protected bool ReleaseResourcesIfPossible() { if (!disposed && call != null) { bool noMoreSendCompletions = halfclosed || (cancelRequested && sendCompletionDelegate == null); if (noMoreSendCompletions && readingDone && finished) { ReleaseResources(); return true; } } return false; } private void ReleaseResources() { OnReleaseResources(); if (call != null) { call.Dispose(); } disposed = true; } protected virtual void OnReleaseResources() { } protected void CheckSendingAllowed() { Preconditions.CheckState(started); Preconditions.CheckState(!errorOccured); CheckNotCancelled(); Preconditions.CheckState(!disposed); Preconditions.CheckState(!halfcloseRequested, "Already halfclosed."); Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); } protected void CheckReadingAllowed() { Preconditions.CheckState(started); Preconditions.CheckState(!disposed); Preconditions.CheckState(!errorOccured); Preconditions.CheckState(!readingDone, "Stream has already been closed."); Preconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time"); } protected void CheckNotCancelled() { if (cancelRequested) { throw new OperationCanceledException("Remote call has been cancelled."); } } protected byte[] UnsafeSerialize(TWrite msg) { return serializer(msg); } protected bool TrySerialize(TWrite msg, out byte[] payload) { try { payload = serializer(msg); return true; } catch (Exception e) { Logger.Error(e, "Exception occured while trying to serialize message"); payload = null; return false; } } protected bool TryDeserialize(byte[] payload, out TRead msg) { try { msg = deserializer(payload); return true; } catch (Exception e) { Logger.Error(e, "Exception occured while trying to deserialize message."); msg = default(TRead); return false; } } protected void FireCompletion(AsyncCompletionDelegate completionDelegate, T value, Exception error) { try { completionDelegate(value, error); } catch (Exception e) { Logger.Error(e, "Exception occured while invoking completion delegate."); } } /// /// Handles send completion. /// protected void HandleSendFinished(bool success, BatchContextSafeHandle ctx) { AsyncCompletionDelegate origCompletionDelegate = null; lock (myLock) { origCompletionDelegate = sendCompletionDelegate; sendCompletionDelegate = null; ReleaseResourcesIfPossible(); } if (!success) { FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Send failed")); } else { FireCompletion(origCompletionDelegate, null, null); } } /// /// Handles halfclose completion. /// protected void HandleHalfclosed(bool success, BatchContextSafeHandle ctx) { AsyncCompletionDelegate origCompletionDelegate = null; lock (myLock) { halfclosed = true; origCompletionDelegate = sendCompletionDelegate; sendCompletionDelegate = null; ReleaseResourcesIfPossible(); } if (!success) { FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Halfclose failed")); } else { FireCompletion(origCompletionDelegate, null, null); } } /// /// Handles streaming read completion. /// protected void HandleReadFinished(bool success, BatchContextSafeHandle ctx) { var payload = ctx.GetReceivedMessage(); AsyncCompletionDelegate origCompletionDelegate = null; lock (myLock) { origCompletionDelegate = readCompletionDelegate; if (payload != null) { readCompletionDelegate = null; } else { // This was the last read. Keeping the readCompletionDelegate // to be either fired by this handler or by client-side finished // handler. readingDone = true; } ReleaseResourcesIfPossible(); } // TODO: handle the case when error occured... if (payload != null) { // TODO: handle deserialization error TRead msg; TryDeserialize(payload, out msg); FireCompletion(origCompletionDelegate, msg, null); } else { ProcessLastRead(origCompletionDelegate); } } } }