diff options
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/AsyncCall.cs')
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCall.cs | 234 |
1 files changed, 172 insertions, 62 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 9946d1a6cf..4cdf0ee6a7 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -17,6 +17,7 @@ #endregion using System; +using System.Threading; using System.Threading.Tasks; using Grpc.Core.Logging; using Grpc.Core.Profiling; @@ -34,6 +35,8 @@ namespace Grpc.Core.Internal readonly CallInvocationDetails<TRequest, TResponse> details; readonly INativeCall injectedNativeCall; // for testing + bool registeredWithChannel; + // Dispose of to de-register cancellation token registration IDisposable cancellationTokenRegistration; @@ -77,43 +80,59 @@ namespace Grpc.Core.Internal using (profiler.NewScope("AsyncCall.UnaryCall")) using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.CreateSync()) { - byte[] payload = UnsafeSerialize(msg); + bool callStartedOk = false; + try + { + unaryResponseTcs = new TaskCompletionSource<TResponse>(); - unaryResponseTcs = new TaskCompletionSource<TResponse>(); + lock (myLock) + { + GrpcPreconditions.CheckState(!started); + started = true; + Initialize(cq); - lock (myLock) - { - GrpcPreconditions.CheckState(!started); - started = true; - Initialize(cq); + halfcloseRequested = true; + readingDone = true; + } - halfcloseRequested = true; - readingDone = true; - } + byte[] payload = UnsafeSerialize(msg); - using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) - { - var ctx = details.Channel.Environment.BatchContextPool.Lease(); - try + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { - call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); - var ev = cq.Pluck(ctx.Handle); - bool success = (ev.success != 0); + var ctx = details.Channel.Environment.BatchContextPool.Lease(); try { - using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch")) + call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + callStartedOk = true; + + var ev = cq.Pluck(ctx.Handle); + bool success = (ev.success != 0); + try { - HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); + using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch")) + { + HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); + } + } + catch (Exception e) + { + Logger.Error(e, "Exception occurred while invoking completion delegate."); } } - catch (Exception e) + finally { - Logger.Error(e, "Exception occured while invoking completion delegate."); + ctx.Recycle(); } } - finally + } + finally + { + if (!callStartedOk) { - ctx.Recycle(); + lock (myLock) + { + OnFailedToStartCallLocked(); + } } } @@ -130,22 +149,35 @@ namespace Grpc.Core.Internal { lock (myLock) { - GrpcPreconditions.CheckState(!started); - started = true; + bool callStartedOk = false; + try + { + GrpcPreconditions.CheckState(!started); + started = true; - Initialize(details.Channel.CompletionQueue); + Initialize(details.Channel.CompletionQueue); - halfcloseRequested = true; - readingDone = true; + halfcloseRequested = true; + readingDone = true; - byte[] payload = UnsafeSerialize(msg); + byte[] payload = UnsafeSerialize(msg); - unaryResponseTcs = new TaskCompletionSource<TResponse>(); - using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + unaryResponseTcs = new TaskCompletionSource<TResponse>(); + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + { + call.StartUnary(UnaryResponseClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + callStartedOk = true; + } + + return unaryResponseTcs.Task; + } + finally { - call.StartUnary(UnaryResponseClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + if (!callStartedOk) + { + OnFailedToStartCallLocked(); + } } - return unaryResponseTcs.Task; } } @@ -157,20 +189,32 @@ namespace Grpc.Core.Internal { lock (myLock) { - GrpcPreconditions.CheckState(!started); - started = true; + bool callStartedOk = false; + try + { + GrpcPreconditions.CheckState(!started); + started = true; - Initialize(details.Channel.CompletionQueue); + Initialize(details.Channel.CompletionQueue); - readingDone = true; + readingDone = true; + + unaryResponseTcs = new TaskCompletionSource<TResponse>(); + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + { + call.StartClientStreaming(UnaryResponseClientCallback, metadataArray, details.Options.Flags); + callStartedOk = true; + } - unaryResponseTcs = new TaskCompletionSource<TResponse>(); - using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + return unaryResponseTcs.Task; + } + finally { - call.StartClientStreaming(UnaryResponseClientCallback, metadataArray, details.Options.Flags); + if (!callStartedOk) + { + OnFailedToStartCallLocked(); + } } - - return unaryResponseTcs.Task; } } @@ -181,21 +225,33 @@ namespace Grpc.Core.Internal { lock (myLock) { - GrpcPreconditions.CheckState(!started); - started = true; + bool callStartedOk = false; + try + { + GrpcPreconditions.CheckState(!started); + started = true; - Initialize(details.Channel.CompletionQueue); + Initialize(details.Channel.CompletionQueue); - halfcloseRequested = true; + halfcloseRequested = true; - byte[] payload = UnsafeSerialize(msg); + byte[] payload = UnsafeSerialize(msg); - streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); - using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + { + call.StartServerStreaming(ReceivedStatusOnClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + callStartedOk = true; + } + call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); + } + finally { - call.StartServerStreaming(ReceivedStatusOnClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + if (!callStartedOk) + { + OnFailedToStartCallLocked(); + } } - call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); } } @@ -207,17 +263,29 @@ namespace Grpc.Core.Internal { lock (myLock) { - GrpcPreconditions.CheckState(!started); - started = true; + bool callStartedOk = false; + try + { + GrpcPreconditions.CheckState(!started); + started = true; - Initialize(details.Channel.CompletionQueue); + Initialize(details.Channel.CompletionQueue); - streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); - using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); + using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) + { + call.StartDuplexStreaming(ReceivedStatusOnClientCallback, metadataArray, details.Options.Flags); + callStartedOk = true; + } + call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); + } + finally { - call.StartDuplexStreaming(ReceivedStatusOnClientCallback, metadataArray, details.Options.Flags); + if (!callStartedOk) + { + OnFailedToStartCallLocked(); + } } - call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback); } } @@ -325,9 +393,22 @@ namespace Grpc.Core.Internal } } - protected override void OnAfterReleaseResources() + protected override void OnAfterReleaseResourcesLocked() { - details.Channel.RemoveCallReference(this); + if (registeredWithChannel) + { + details.Channel.RemoveCallReference(this); + registeredWithChannel = false; + } + } + + protected override void OnAfterReleaseResourcesUnlocked() + { + // If cancellation callback is in progress, this can block + // so we need to do this outside of call's lock to prevent + // deadlock. + // See https://github.com/grpc/grpc/issues/14777 + // See https://github.com/dotnet/corefx/issues/14903 cancellationTokenRegistration?.Dispose(); } @@ -385,10 +466,27 @@ namespace Grpc.Core.Internal var call = CreateNativeCall(cq); details.Channel.AddCallReference(this); + registeredWithChannel = true; InitializeInternal(call); + RegisterCancellationCallback(); } + private void OnFailedToStartCallLocked() + { + ReleaseResources(); + + // We need to execute the hook that disposes the cancellation token + // registration, but it cannot be done from under a lock. + // To make things simple, we just schedule the unregistering + // on a threadpool. + // - Once the native call is disposed, the Cancel() calls are ignored anyway + // - We don't care about the overhead as OnFailedToStartCallLocked() only happens + // when something goes very bad when initializing a call and that should + // never happen when gRPC is used correctly. + ThreadPool.QueueUserWorkItem((state) => OnAfterReleaseResourcesUnlocked()); + } + private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq) { if (injectedNativeCall != null) @@ -448,6 +546,7 @@ namespace Grpc.Core.Internal TResponse msg = default(TResponse); var deserializeException = TryDeserialize(receivedMessage, out msg); + bool releasedResources; lock (myLock) { finished = true; @@ -464,7 +563,12 @@ namespace Grpc.Core.Internal streamingWriteTcs = null; } - ReleaseResourcesIfPossible(); + releasedResources = ReleaseResourcesIfPossible(); + } + + if (releasedResources) + { + OnAfterReleaseResourcesUnlocked(); } responseHeadersTcs.SetResult(responseHeaders); @@ -494,6 +598,7 @@ namespace Grpc.Core.Internal TaskCompletionSource<object> delayedStreamingWriteTcs = null; + bool releasedResources; lock (myLock) { finished = true; @@ -504,7 +609,12 @@ namespace Grpc.Core.Internal streamingWriteTcs = null; } - ReleaseResourcesIfPossible(); + releasedResources = ReleaseResourcesIfPossible(); + } + + if (releasedResources) + { + OnAfterReleaseResourcesUnlocked(); } if (delayedStreamingWriteTcs != null) |