diff options
Diffstat (limited to 'src/csharp/Grpc.Core/Internal')
10 files changed, 320 insertions, 79 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 9946d1a6cf..4cdf0ee6a7 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -17,6 +17,7 @@ #endregion using System; +using System.Threading; using System.Threading.Tasks; using Grpc.Core.Logging; using Grpc.Core.Profiling; @@ -34,6 +35,8 @@ namespace Grpc.Core.Internal readonly CallInvocationDetails<TRequest, TResponse> details; readonly INativeCall injectedNativeCall; // for testing + bool registeredWithChannel; + // Dispose of to de-register cancellation token registration IDisposable cancellationTokenRegistration; @@ -77,43 +80,59 @@ namespace Grpc.Core.Internal using (profiler.NewScope("AsyncCall.UnaryCall")) using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.CreateSync()) { - byte[] payload = UnsafeSerialize(msg); + bool callStartedOk = false; + try + { + unaryResponseTcs = new TaskCompletionSource<TResponse>(); - unaryResponseTcs = new TaskCompletionSource<TResponse>(); + lock (myLock) + { + GrpcPreconditions.CheckState(!started); + started = true; + Initialize(cq); - lock (myLock) - { - GrpcPreconditions.CheckState(!started); - started = true; - Initialize(cq); + halfcloseRequested = true; + readingDone = true; + } - halfcloseRequested = true; - readingDone = true; - } + byte[] payload = UnsafeSerialize(msg); - using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) - { - var ctx = details.Channel.Environment.BatchContextPool.Lease(); - try + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { - call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); - var ev = cq.Pluck(ctx.Handle); - bool success = (ev.success != 0); + var ctx = details.Channel.Environment.BatchContextPool.Lease(); try { - using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch")) + call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + callStartedOk = true; + + var ev = cq.Pluck(ctx.Handle); + bool success = (ev.success != 0); + try { - HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); + using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch")) + { + HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); + } + } + catch (Exception e) + { + Logger.Error(e, "Exception occurred while invoking completion delegate."); } } - catch (Exception e) + finally { - Logger.Error(e, "Exception occured while invoking completion delegate."); + ctx.Recycle(); } } - finally + } + finally + { + if (!callStartedOk) { - ctx.Recycle(); + lock (myLock) + { + OnFailedToStartCallLocked(); + } } } @@ -130,22 +149,35 @@ namespace Grpc.Core.Internal { lock (myLock) { - GrpcPreconditions.CheckState(!started); - started = true; + bool callStartedOk = false; + try + { + GrpcPreconditions.CheckState(!started); + started = true; - Initialize(details.Channel.CompletionQueue); + Initialize(details.Channel.CompletionQueue); - halfcloseRequested = true; - readingDone = true; + halfcloseRequested = true; + readingDone = true; - byte[] payload = UnsafeSerialize(msg); + byte[] payload = UnsafeSerialize(msg); - unaryResponseTcs = new TaskCompletionSource<TResponse>(); - using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + unaryResponseTcs = new TaskCompletionSource<TResponse>(); + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + { + call.StartUnary(UnaryResponseClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + callStartedOk = true; + } + + return unaryResponseTcs.Task; + } + finally { - call.StartUnary(UnaryResponseClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + if (!callStartedOk) + { + OnFailedToStartCallLocked(); + } } - return unaryResponseTcs.Task; } } @@ -157,20 +189,32 @@ namespace Grpc.Core.Internal { lock (myLock) { - GrpcPreconditions.CheckState(!started); - started = true; + bool callStartedOk = false; + try + { + GrpcPreconditions.CheckState(!started); + started = true; - Initialize(details.Channel.CompletionQueue); + Initialize(details.Channel.CompletionQueue); - readingDone = true; + readingDone = true; + + unaryResponseTcs = new TaskCompletionSource<TResponse>(); + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + { + call.StartClientStreaming(UnaryResponseClientCallback, metadataArray, details.Options.Flags); + callStartedOk = true; + } - unaryResponseTcs = new TaskCompletionSource<TResponse>(); - using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + return unaryResponseTcs.Task; + } + finally { - call.StartClientStreaming(UnaryResponseClientCallback, metadataArray, details.Options.Flags); + if (!callStartedOk) + { + OnFailedToStartCallLocked(); + } } - - return unaryResponseTcs.Task; } } @@ -181,21 +225,33 @@ namespace Grpc.Core.Internal { lock (myLock) { - GrpcPreconditions.CheckState(!started); - started = true; + bool callStartedOk = false; + try + { + GrpcPreconditions.CheckState(!started); + started = true; - Initialize(details.Channel.CompletionQueue); + Initialize(details.Channel.CompletionQueue); - halfcloseRequested = true; + halfcloseRequested = true; - byte[] payload = UnsafeSerialize(msg); + byte[] payload = UnsafeSerialize(msg); - streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); - using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + { + call.StartServerStreaming(ReceivedStatusOnClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + callStartedOk = true; + } + call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); + } + finally { - call.StartServerStreaming(ReceivedStatusOnClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + if (!callStartedOk) + { + OnFailedToStartCallLocked(); + } } - call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); } } @@ -207,17 +263,29 @@ namespace Grpc.Core.Internal { lock (myLock) { - GrpcPreconditions.CheckState(!started); - started = true; + bool callStartedOk = false; + try + { + GrpcPreconditions.CheckState(!started); + started = true; - Initialize(details.Channel.CompletionQueue); + Initialize(details.Channel.CompletionQueue); - streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); - using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + { + call.StartDuplexStreaming(ReceivedStatusOnClientCallback, metadataArray, details.Options.Flags); + callStartedOk = true; + } + call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); + } + finally { - call.StartDuplexStreaming(ReceivedStatusOnClientCallback, metadataArray, details.Options.Flags); + if (!callStartedOk) + { + OnFailedToStartCallLocked(); + } } - call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); } } @@ -325,9 +393,22 @@ namespace Grpc.Core.Internal } } - protected override void OnAfterReleaseResources() + protected override void OnAfterReleaseResourcesLocked() { - details.Channel.RemoveCallReference(this); + if (registeredWithChannel) + { + details.Channel.RemoveCallReference(this); + registeredWithChannel = false; + } + } + + protected override void OnAfterReleaseResourcesUnlocked() + { + // If cancellation callback is in progress, this can block + // so we need to do this outside of call's lock to prevent + // deadlock. + // See https://github.com/grpc/grpc/issues/14777 + // See https://github.com/dotnet/corefx/issues/14903 cancellationTokenRegistration?.Dispose(); } @@ -385,10 +466,27 @@ namespace Grpc.Core.Internal var call = CreateNativeCall(cq); details.Channel.AddCallReference(this); + registeredWithChannel = true; InitializeInternal(call); + RegisterCancellationCallback(); } + private void OnFailedToStartCallLocked() + { + ReleaseResources(); + + // We need to execute the hook that disposes the cancellation token + // registration, but it cannot be done from under a lock. + // To make things simple, we just schedule the unregistering + // on a threadpool. + // - Once the native call is disposed, the Cancel() calls are ignored anyway + // - We don't care about the overhead as OnFailedToStartCallLocked() only happens + // when something goes very bad when initializing a call and that should + // never happen when gRPC is used correctly. + ThreadPool.QueueUserWorkItem((state) => OnAfterReleaseResourcesUnlocked()); + } + private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq) { if (injectedNativeCall != null) @@ -448,6 +546,7 @@ namespace Grpc.Core.Internal TResponse msg = default(TResponse); var deserializeException = TryDeserialize(receivedMessage, out msg); + bool releasedResources; lock (myLock) { finished = true; @@ -464,7 +563,12 @@ namespace Grpc.Core.Internal streamingWriteTcs = null; } - ReleaseResourcesIfPossible(); + releasedResources = ReleaseResourcesIfPossible(); + } + + if (releasedResources) + { + OnAfterReleaseResourcesUnlocked(); } responseHeadersTcs.SetResult(responseHeaders); @@ -494,6 +598,7 @@ namespace Grpc.Core.Internal TaskCompletionSource<object> delayedStreamingWriteTcs = null; + bool releasedResources; lock (myLock) { finished = true; @@ -504,7 +609,12 @@ namespace Grpc.Core.Internal streamingWriteTcs = null; } - ReleaseResourcesIfPossible(); + releasedResources = ReleaseResourcesIfPossible(); + } + + if (releasedResources) + { + OnAfterReleaseResourcesUnlocked(); } if (delayedStreamingWriteTcs != null) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 3273c26b88..a93dc34620 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -189,17 +189,21 @@ namespace Grpc.Core.Internal /// </summary> protected abstract Exception GetRpcExceptionClientOnly(); - private void ReleaseResources() + protected void ReleaseResources() { if (call != null) { call.Dispose(); } disposed = true; - OnAfterReleaseResources(); + OnAfterReleaseResourcesLocked(); } - protected virtual void OnAfterReleaseResources() + protected virtual void OnAfterReleaseResourcesLocked() + { + } + + protected virtual void OnAfterReleaseResourcesUnlocked() { } @@ -235,6 +239,7 @@ namespace Grpc.Core.Internal { bool delayCompletion = false; TaskCompletionSource<object> origTcs = null; + bool releasedResources; lock (myLock) { if (!success && !finished && IsClient) { @@ -252,7 +257,12 @@ namespace Grpc.Core.Internal streamingWriteTcs = null; } - ReleaseResourcesIfPossible(); + releasedResources = ReleaseResourcesIfPossible(); + } + + if (releasedResources) + { + OnAfterReleaseResourcesUnlocked(); } if (!success) @@ -282,9 +292,15 @@ namespace Grpc.Core.Internal /// </summary> protected void HandleSendStatusFromServerFinished(bool success) { + bool releasedResources; lock (myLock) { - ReleaseResourcesIfPossible(); + releasedResources = ReleaseResourcesIfPossible(); + } + + if (releasedResources) + { + OnAfterReleaseResourcesUnlocked(); } if (!success) @@ -310,6 +326,7 @@ namespace Grpc.Core.Internal var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null; TaskCompletionSource<TRead> origTcs = null; + bool releasedResources; lock (myLock) { origTcs = streamingReadTcs; @@ -332,7 +349,12 @@ namespace Grpc.Core.Internal streamingReadTcs = null; } - ReleaseResourcesIfPossible(); + releasedResources = ReleaseResourcesIfPossible(); + } + + if (releasedResources) + { + OnAfterReleaseResourcesUnlocked(); } if (deserializeException != null && !IsClient) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 11acb27533..0ceca4abb8 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -184,7 +184,7 @@ namespace Grpc.Core.Internal throw new InvalidOperationException("Call be only called for client calls"); } - protected override void OnAfterReleaseResources() + protected override void OnAfterReleaseResourcesLocked() { server.RemoveCallReference(this); } @@ -206,6 +206,7 @@ namespace Grpc.Core.Internal { // NOTE: because this event is a result of batch containing GRPC_OP_RECV_CLOSE_ON_SERVER, // success will be always set to true. + bool releasedResources; lock (myLock) { finished = true; @@ -217,7 +218,12 @@ namespace Grpc.Core.Internal streamingReadTcs = new TaskCompletionSource<TRequest>(); streamingReadTcs.SetResult(default(TRequest)); } - ReleaseResourcesIfPossible(); + releasedResources = ReleaseResourcesIfPossible(); + } + + if (releasedResources) + { + OnAfterReleaseResourcesUnlocked(); } if (cancelled) diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs index 53a859d18f..085e7faf59 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs @@ -144,7 +144,7 @@ namespace Grpc.Core.Internal } catch (Exception e) { - Logger.Error(e, "Exception occured while invoking batch completion delegate."); + Logger.Error(e, "Exception occurred while invoking batch completion delegate."); } finally { diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index 8ddda9be5c..622cfde9e7 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -189,7 +189,7 @@ namespace Grpc.Core.Internal } catch (Exception e) { - Logger.Error(e, "Exception occured while extracting event from completion registry."); + Logger.Error(e, "Exception occurred while extracting event from completion registry."); } } } @@ -233,7 +233,7 @@ namespace Grpc.Core.Internal } catch (Exception e) { - Logger.Error(e, "Exception occured while invoking completion delegate"); + Logger.Error(e, "Exception occurred while invoking completion delegate"); } finally { diff --git a/src/csharp/Grpc.Core/Internal/NativeExtension.cs b/src/csharp/Grpc.Core/Internal/NativeExtension.cs index d5ec998bbd..f526b913af 100644 --- a/src/csharp/Grpc.Core/Internal/NativeExtension.cs +++ b/src/csharp/Grpc.Core/Internal/NativeExtension.cs @@ -106,7 +106,15 @@ namespace Grpc.Core.Internal /// </summary> private static NativeMethods LoadNativeMethods() { - return PlatformApis.IsUnity ? LoadNativeMethodsUnity() : new NativeMethods(LoadUnmanagedLibrary()); + if (PlatformApis.IsUnity) + { + return LoadNativeMethodsUnity(); + } + if (PlatformApis.IsXamarin) + { + return LoadNativeMethodsXamarin(); + } + return new NativeMethods(LoadUnmanagedLibrary()); } /// <summary> @@ -128,6 +136,20 @@ namespace Grpc.Core.Internal } } + /// <summary> + /// Return native method delegates when running on the Xamarin platform. + /// WARNING: Xamarin support is experimental and work-in-progress. Don't expect it to work. + /// </summary> + private static NativeMethods LoadNativeMethodsXamarin() + { + if (PlatformApis.IsXamarinAndroid) + { + return new NativeMethods(new NativeMethods.DllImportsFromSharedLib()); + } + // not tested yet + return new NativeMethods(new NativeMethods.DllImportsFromStaticLib()); + } + private static string GetAssemblyPath() { var assembly = typeof(NativeExtension).GetTypeInfo().Assembly; diff --git a/src/csharp/Grpc.Core/Internal/NativeLogRedirector.cs b/src/csharp/Grpc.Core/Internal/NativeLogRedirector.cs index bf6440123a..30264acb10 100644 --- a/src/csharp/Grpc.Core/Internal/NativeLogRedirector.cs +++ b/src/csharp/Grpc.Core/Internal/NativeLogRedirector.cs @@ -51,6 +51,7 @@ namespace Grpc.Core.Internal } } + [MonoPInvokeCallback(typeof(GprLogDelegate))] private static void HandleWrite(IntPtr fileStringPtr, int line, ulong threadId, IntPtr severityStringPtr, IntPtr msgPtr) { try @@ -86,4 +87,22 @@ namespace Grpc.Core.Internal } } } + + /// <summary> + /// Use this attribute to mark methods that will be called back from P/Invoke calls. + /// iOS (and probably other AOT platforms) needs to have delegates registered. + /// Instead of depending on Xamarin.iOS for this, we can just create our own, + /// the iOS runtime just checks for the type name. + /// See: https://docs.microsoft.com/en-gb/xamarin/ios/internals/limitations#reverse-callbacks + /// </summary> + [AttributeUsage(AttributeTargets.Method)] + internal sealed class MonoPInvokeCallbackAttribute : Attribute + { + public MonoPInvokeCallbackAttribute(Type type) + { + Type = type; + } + + public Type Type { get; private set; } + } } diff --git a/src/csharp/Grpc.Core/Internal/NativeMetadataCredentialsPlugin.cs b/src/csharp/Grpc.Core/Internal/NativeMetadataCredentialsPlugin.cs index 4d695e8850..faeb51e6f7 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMetadataCredentialsPlugin.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMetadataCredentialsPlugin.cs @@ -68,8 +68,8 @@ namespace Grpc.Core.Internal } catch (Exception e) { - Native.grpcsharp_metadata_credentials_notify_from_plugin(callbackPtr, userDataPtr, MetadataArraySafeHandle.Create(Metadata.Empty), StatusCode.Unknown, GetMetadataExceptionStatusMsg); - Logger.Error(e, GetMetadataExceptionLogMsg); + // eat the exception, we must not throw when inside callback from native code. + Logger.Error(e, "Exception occurred while invoking native metadata interceptor handler."); } } @@ -87,7 +87,8 @@ namespace Grpc.Core.Internal } catch (Exception e) { - Native.grpcsharp_metadata_credentials_notify_from_plugin(callbackPtr, userDataPtr, MetadataArraySafeHandle.Create(Metadata.Empty), StatusCode.Unknown, GetMetadataExceptionStatusMsg); + string detail = GetMetadataExceptionStatusMsg + " " + e.ToString(); + Native.grpcsharp_metadata_credentials_notify_from_plugin(callbackPtr, userDataPtr, MetadataArraySafeHandle.Create(Metadata.Empty), StatusCode.Unknown, detail); Logger.Error(e, GetMetadataExceptionLogMsg); } } diff --git a/src/csharp/Grpc.Core/Internal/PlatformApis.cs b/src/csharp/Grpc.Core/Internal/PlatformApis.cs index b90fbccb2b..a8f147545b 100644 --- a/src/csharp/Grpc.Core/Internal/PlatformApis.cs +++ b/src/csharp/Grpc.Core/Internal/PlatformApis.cs @@ -33,12 +33,19 @@ namespace Grpc.Core.Internal internal static class PlatformApis { const string UnityEngineApplicationClassName = "UnityEngine.Application, UnityEngine"; + const string XamarinAndroidObjectClassName = "Java.Lang.Object, Mono.Android"; + const string XamarinIOSObjectClassName = "Foundation.NSObject, Xamarin.iOS"; + static readonly bool isLinux; static readonly bool isMacOSX; static readonly bool isWindows; static readonly bool isMono; static readonly bool isNetCore; static readonly bool isUnity; + static readonly bool isUnityIOS; + static readonly bool isXamarin; + static readonly bool isXamarinIOS; + static readonly bool isXamarinAndroid; static PlatformApis() { @@ -57,7 +64,28 @@ namespace Grpc.Core.Internal isNetCore = false; #endif isMono = Type.GetType("Mono.Runtime") != null; - isUnity = Type.GetType(UnityEngineApplicationClassName) != null; + + // Unity + var unityApplicationClass = Type.GetType(UnityEngineApplicationClassName); + if (unityApplicationClass != null) + { + isUnity = true; + // Consult value of Application.platform via reflection + // https://docs.unity3d.com/ScriptReference/Application-platform.html + var platformProperty = unityApplicationClass.GetTypeInfo().GetProperty("platform"); + var unityRuntimePlatform = platformProperty?.GetValue(null)?.ToString(); + isUnityIOS = (unityRuntimePlatform == "IPhonePlayer"); + } + else + { + isUnity = false; + isUnityIOS = false; + } + + // Xamarin + isXamarinIOS = Type.GetType(XamarinIOSObjectClassName) != null; + isXamarinAndroid = Type.GetType(XamarinAndroidObjectClassName) != null; + isXamarin = isXamarinIOS || isXamarinAndroid; } public static bool IsLinux @@ -89,6 +117,39 @@ namespace Grpc.Core.Internal } /// <summary> + /// true if running on Unity iOS, false otherwise. + /// </summary> + public static bool IsUnityIOS + { + get { return isUnityIOS; } + } + + /// <summary> + /// true if running on a Xamarin platform (either Xamarin.Android or Xamarin.iOS), + /// false otherwise. + /// </summary> + public static bool IsXamarin + { + get { return isXamarin; } + } + + /// <summary> + /// true if running on Xamarin.iOS, false otherwise. + /// </summary> + public static bool IsXamarinIOS + { + get { return isXamarinIOS; } + } + + /// <summary> + /// true if running on Xamarin.Android, false otherwise. + /// </summary> + public static bool IsXamarinAndroid + { + get { return isXamarinAndroid; } + } + + /// <summary> /// true if running on .NET Core (CoreCLR), false otherwise. /// </summary> public static bool IsNetCore diff --git a/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs index ebc2d6d8d6..24fde75e5a 100644 --- a/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/RequestCallContextSafeHandle.cs @@ -112,7 +112,7 @@ namespace Grpc.Core.Internal } catch (Exception e) { - Logger.Error(e, "Exception occured while invoking request call completion delegate."); + Logger.Error(e, "Exception occurred while invoking request call completion delegate."); } finally { |