#region Copyright notice and license // Copyright 2015-2016, Google Inc. // All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above // copyright notice, this list of conditions and the following disclaimer // in the documentation and/or other materials provided with the // distribution. // * Neither the name of Google Inc. nor the names of its // contributors may be used to endorse or promote products derived from // this software without specific prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #endregion using System; using System.Diagnostics; using System.IO; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using Grpc.Core.Internal; using Grpc.Core.Logging; using Grpc.Core.Profiling; using Grpc.Core.Utils; namespace Grpc.Core.Internal { /// /// Base for handling both client side and server side calls. /// Manages native call lifecycle and provides convenience methods. /// internal abstract class AsyncCallBase { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType>(); protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message."); readonly Func serializer; readonly Func deserializer; protected readonly GrpcEnvironment environment; protected readonly object myLock = new object(); protected INativeCall call; protected bool disposed; protected bool started; protected bool cancelRequested; protected AsyncCompletionDelegate sendCompletionDelegate; // Completion of a pending send or sendclose if not null. protected AsyncCompletionDelegate readCompletionDelegate; // Completion of a pending send or sendclose if not null. protected bool readingDone; // True if last read (i.e. read with null payload) was already received. protected bool halfcloseRequested; // True if send close have been initiated. protected bool finished; // True if close has been received from the peer. protected bool initialMetadataSent; protected long streamingWritesCounter; // Number of streaming send operations started so far. public AsyncCallBase(Func serializer, Func deserializer, GrpcEnvironment environment) { this.serializer = GrpcPreconditions.CheckNotNull(serializer); this.deserializer = GrpcPreconditions.CheckNotNull(deserializer); this.environment = GrpcPreconditions.CheckNotNull(environment); } /// /// Requests cancelling the call. /// public void Cancel() { lock (myLock) { GrpcPreconditions.CheckState(started); cancelRequested = true; if (!disposed) { call.Cancel(); } } } /// /// Requests cancelling the call with given status. /// protected void CancelWithStatus(Status status) { lock (myLock) { cancelRequested = true; if (!disposed) { call.CancelWithStatus(status); } } } protected void InitializeInternal(INativeCall call) { lock (myLock) { this.call = call; } } /// /// Initiates sending a message. Only one send operation can be active at a time. /// completionDelegate is invoked upon completion. /// protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate completionDelegate) { byte[] payload = UnsafeSerialize(msg); lock (myLock) { GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); CheckSendingAllowed(); call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent); sendCompletionDelegate = completionDelegate; initialMetadataSent = true; streamingWritesCounter++; } } /// /// Initiates reading a message. Only one read operation can be active at a time. /// completionDelegate is invoked upon completion. /// protected void StartReadMessageInternal(AsyncCompletionDelegate completionDelegate) { lock (myLock) { GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); CheckReadingAllowed(); call.StartReceiveMessage(HandleReadFinished); readCompletionDelegate = completionDelegate; } } /// /// If there are no more pending actions and no new actions can be started, releases /// the underlying native resources. /// protected bool ReleaseResourcesIfPossible() { using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.ReleaseResourcesIfPossible")) { if (!disposed && call != null) { bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished); if (noMoreSendCompletions && readingDone && finished) { ReleaseResources(); return true; } } return false; } } protected abstract bool IsClient { get; } private void ReleaseResources() { if (call != null) { call.Dispose(); } disposed = true; OnAfterReleaseResources(); } protected virtual void OnAfterReleaseResources() { } protected void CheckSendingAllowed() { GrpcPreconditions.CheckState(started); CheckNotCancelled(); GrpcPreconditions.CheckState(!disposed); GrpcPreconditions.CheckState(!halfcloseRequested, "Already halfclosed."); GrpcPreconditions.CheckState(!finished, "Already finished."); GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); } protected virtual void CheckReadingAllowed() { GrpcPreconditions.CheckState(started); GrpcPreconditions.CheckState(!disposed); GrpcPreconditions.CheckState(!readingDone, "Stream has already been closed."); GrpcPreconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time"); } protected void CheckNotCancelled() { if (cancelRequested) { throw new OperationCanceledException("Remote call has been cancelled."); } } protected byte[] UnsafeSerialize(TWrite msg) { using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.UnsafeSerialize")) { return serializer(msg); } } protected Exception TryDeserialize(byte[] payload, out TRead msg) { using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.TryDeserialize")) { try { msg = deserializer(payload); return null; } catch (Exception e) { msg = default(TRead); return e; } } } protected void FireCompletion(AsyncCompletionDelegate completionDelegate, T value, Exception error) { try { completionDelegate(value, error); } catch (Exception e) { Logger.Error(e, "Exception occured while invoking completion delegate."); } } /// /// Handles send completion. /// protected void HandleSendFinished(bool success) { AsyncCompletionDelegate origCompletionDelegate = null; lock (myLock) { origCompletionDelegate = sendCompletionDelegate; sendCompletionDelegate = null; ReleaseResourcesIfPossible(); } if (!success) { FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Send failed")); } else { FireCompletion(origCompletionDelegate, null, null); } } /// /// Handles halfclose completion. /// protected void HandleHalfclosed(bool success) { AsyncCompletionDelegate origCompletionDelegate = null; lock (myLock) { origCompletionDelegate = sendCompletionDelegate; sendCompletionDelegate = null; ReleaseResourcesIfPossible(); } if (!success) { FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Halfclose failed")); } else { FireCompletion(origCompletionDelegate, null, null); } } /// /// Handles streaming read completion. /// protected void HandleReadFinished(bool success, byte[] receivedMessage) { TRead msg = default(TRead); var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null; AsyncCompletionDelegate origCompletionDelegate = null; lock (myLock) { origCompletionDelegate = readCompletionDelegate; readCompletionDelegate = null; if (receivedMessage == null) { // This was the last read. readingDone = true; } if (deserializeException != null && IsClient) { readingDone = true; CancelWithStatus(DeserializeResponseFailureStatus); } ReleaseResourcesIfPossible(); } // TODO: handle the case when success==false if (deserializeException != null && !IsClient) { FireCompletion(origCompletionDelegate, default(TRead), new IOException("Failed to deserialize request message.", deserializeException)); return; } FireCompletion(origCompletionDelegate, msg, null); } } }