aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal/AsyncCall.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/AsyncCall.cs')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs234
1 files changed, 172 insertions, 62 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 3c9e090ba4..4cdf0ee6a7 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -17,6 +17,7 @@
#endregion
using System;
+using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Logging;
using Grpc.Core.Profiling;
@@ -34,6 +35,8 @@ namespace Grpc.Core.Internal
readonly CallInvocationDetails<TRequest, TResponse> details;
readonly INativeCall injectedNativeCall; // for testing
+ bool registeredWithChannel;
+
// Dispose of to de-register cancellation token registration
IDisposable cancellationTokenRegistration;
@@ -77,43 +80,59 @@ namespace Grpc.Core.Internal
using (profiler.NewScope("AsyncCall.UnaryCall"))
using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.CreateSync())
{
- byte[] payload = UnsafeSerialize(msg);
+ bool callStartedOk = false;
+ try
+ {
+ unaryResponseTcs = new TaskCompletionSource<TResponse>();
- unaryResponseTcs = new TaskCompletionSource<TResponse>();
+ lock (myLock)
+ {
+ GrpcPreconditions.CheckState(!started);
+ started = true;
+ Initialize(cq);
- lock (myLock)
- {
- GrpcPreconditions.CheckState(!started);
- started = true;
- Initialize(cq);
+ halfcloseRequested = true;
+ readingDone = true;
+ }
- halfcloseRequested = true;
- readingDone = true;
- }
+ byte[] payload = UnsafeSerialize(msg);
- using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
- {
- var ctx = details.Channel.Environment.BatchContextPool.Lease();
- try
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
- var ev = cq.Pluck(ctx.Handle);
- bool success = (ev.success != 0);
+ var ctx = details.Channel.Environment.BatchContextPool.Lease();
try
{
- using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch"))
+ call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
+ callStartedOk = true;
+
+ var ev = cq.Pluck(ctx.Handle);
+ bool success = (ev.success != 0);
+ try
{
- HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata());
+ using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch"))
+ {
+ HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata());
+ }
+ }
+ catch (Exception e)
+ {
+ Logger.Error(e, "Exception occurred while invoking completion delegate.");
}
}
- catch (Exception e)
+ finally
{
- Logger.Error(e, "Exception occurred while invoking completion delegate.");
+ ctx.Recycle();
}
}
- finally
+ }
+ finally
+ {
+ if (!callStartedOk)
{
- ctx.Recycle();
+ lock (myLock)
+ {
+ OnFailedToStartCallLocked();
+ }
}
}
@@ -130,22 +149,35 @@ namespace Grpc.Core.Internal
{
lock (myLock)
{
- GrpcPreconditions.CheckState(!started);
- started = true;
+ bool callStartedOk = false;
+ try
+ {
+ GrpcPreconditions.CheckState(!started);
+ started = true;
- Initialize(details.Channel.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
- halfcloseRequested = true;
- readingDone = true;
+ halfcloseRequested = true;
+ readingDone = true;
- byte[] payload = UnsafeSerialize(msg);
+ byte[] payload = UnsafeSerialize(msg);
- unaryResponseTcs = new TaskCompletionSource<TResponse>();
- using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
+ unaryResponseTcs = new TaskCompletionSource<TResponse>();
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
+ {
+ call.StartUnary(UnaryResponseClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
+ callStartedOk = true;
+ }
+
+ return unaryResponseTcs.Task;
+ }
+ finally
{
- call.StartUnary(UnaryResponseClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
+ if (!callStartedOk)
+ {
+ OnFailedToStartCallLocked();
+ }
}
- return unaryResponseTcs.Task;
}
}
@@ -157,20 +189,32 @@ namespace Grpc.Core.Internal
{
lock (myLock)
{
- GrpcPreconditions.CheckState(!started);
- started = true;
+ bool callStartedOk = false;
+ try
+ {
+ GrpcPreconditions.CheckState(!started);
+ started = true;
- Initialize(details.Channel.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
- readingDone = true;
+ readingDone = true;
+
+ unaryResponseTcs = new TaskCompletionSource<TResponse>();
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
+ {
+ call.StartClientStreaming(UnaryResponseClientCallback, metadataArray, details.Options.Flags);
+ callStartedOk = true;
+ }
- unaryResponseTcs = new TaskCompletionSource<TResponse>();
- using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
+ return unaryResponseTcs.Task;
+ }
+ finally
{
- call.StartClientStreaming(UnaryResponseClientCallback, metadataArray, details.Options.Flags);
+ if (!callStartedOk)
+ {
+ OnFailedToStartCallLocked();
+ }
}
-
- return unaryResponseTcs.Task;
}
}
@@ -181,21 +225,33 @@ namespace Grpc.Core.Internal
{
lock (myLock)
{
- GrpcPreconditions.CheckState(!started);
- started = true;
+ bool callStartedOk = false;
+ try
+ {
+ GrpcPreconditions.CheckState(!started);
+ started = true;
- Initialize(details.Channel.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
- halfcloseRequested = true;
+ halfcloseRequested = true;
- byte[] payload = UnsafeSerialize(msg);
+ byte[] payload = UnsafeSerialize(msg);
- streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
- using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
+ streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
+ {
+ call.StartServerStreaming(ReceivedStatusOnClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
+ callStartedOk = true;
+ }
+ call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback);
+ }
+ finally
{
- call.StartServerStreaming(ReceivedStatusOnClientCallback, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
+ if (!callStartedOk)
+ {
+ OnFailedToStartCallLocked();
+ }
}
- call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback);
}
}
@@ -207,17 +263,29 @@ namespace Grpc.Core.Internal
{
lock (myLock)
{
- GrpcPreconditions.CheckState(!started);
- started = true;
+ bool callStartedOk = false;
+ try
+ {
+ GrpcPreconditions.CheckState(!started);
+ started = true;
- Initialize(details.Channel.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
- streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
- using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
+ streamingResponseCallFinishedTcs = new TaskCompletionSource<object>();
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
+ {
+ call.StartDuplexStreaming(ReceivedStatusOnClientCallback, metadataArray, details.Options.Flags);
+ callStartedOk = true;
+ }
+ call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback);
+ }
+ finally
{
- call.StartDuplexStreaming(ReceivedStatusOnClientCallback, metadataArray, details.Options.Flags);
+ if (!callStartedOk)
+ {
+ OnFailedToStartCallLocked();
+ }
}
- call.StartReceiveInitialMetadata(ReceivedResponseHeadersCallback);
}
}
@@ -325,9 +393,22 @@ namespace Grpc.Core.Internal
}
}
- protected override void OnAfterReleaseResources()
+ protected override void OnAfterReleaseResourcesLocked()
{
- details.Channel.RemoveCallReference(this);
+ if (registeredWithChannel)
+ {
+ details.Channel.RemoveCallReference(this);
+ registeredWithChannel = false;
+ }
+ }
+
+ protected override void OnAfterReleaseResourcesUnlocked()
+ {
+ // If cancellation callback is in progress, this can block
+ // so we need to do this outside of call's lock to prevent
+ // deadlock.
+ // See https://github.com/grpc/grpc/issues/14777
+ // See https://github.com/dotnet/corefx/issues/14903
cancellationTokenRegistration?.Dispose();
}
@@ -385,10 +466,27 @@ namespace Grpc.Core.Internal
var call = CreateNativeCall(cq);
details.Channel.AddCallReference(this);
+ registeredWithChannel = true;
InitializeInternal(call);
+
RegisterCancellationCallback();
}
+ private void OnFailedToStartCallLocked()
+ {
+ ReleaseResources();
+
+ // We need to execute the hook that disposes the cancellation token
+ // registration, but it cannot be done from under a lock.
+ // To make things simple, we just schedule the unregistering
+ // on a threadpool.
+ // - Once the native call is disposed, the Cancel() calls are ignored anyway
+ // - We don't care about the overhead as OnFailedToStartCallLocked() only happens
+ // when something goes very bad when initializing a call and that should
+ // never happen when gRPC is used correctly.
+ ThreadPool.QueueUserWorkItem((state) => OnAfterReleaseResourcesUnlocked());
+ }
+
private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq)
{
if (injectedNativeCall != null)
@@ -448,6 +546,7 @@ namespace Grpc.Core.Internal
TResponse msg = default(TResponse);
var deserializeException = TryDeserialize(receivedMessage, out msg);
+ bool releasedResources;
lock (myLock)
{
finished = true;
@@ -464,7 +563,12 @@ namespace Grpc.Core.Internal
streamingWriteTcs = null;
}
- ReleaseResourcesIfPossible();
+ releasedResources = ReleaseResourcesIfPossible();
+ }
+
+ if (releasedResources)
+ {
+ OnAfterReleaseResourcesUnlocked();
}
responseHeadersTcs.SetResult(responseHeaders);
@@ -494,6 +598,7 @@ namespace Grpc.Core.Internal
TaskCompletionSource<object> delayedStreamingWriteTcs = null;
+ bool releasedResources;
lock (myLock)
{
finished = true;
@@ -504,7 +609,12 @@ namespace Grpc.Core.Internal
streamingWriteTcs = null;
}
- ReleaseResourcesIfPossible();
+ releasedResources = ReleaseResourcesIfPossible();
+ }
+
+ if (releasedResources)
+ {
+ OnAfterReleaseResourcesUnlocked();
}
if (delayedStreamingWriteTcs != null)