aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2015-09-01 12:27:15 -0700
committerGravatar Jan Tattermusch <jtattermusch@google.com>2015-12-16 17:15:14 -0800
commitea7469d62e92b9c54973ea3f9a7e892360c29336 (patch)
treee33ecc0705708bd89598768cdfa64b88b662a534 /src
parent416000f43fa93533e71d3180f6705cde013ae34d (diff)
refcounting completion queue shutdown
Diffstat (limited to 'src')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/AtomicCounter.cs33
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs105
-rw-r--r--src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs54
5 files changed, 154 insertions, 42 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 46ca459349..0c805097f9 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -58,7 +58,7 @@ namespace Grpc.Core.Internal
public void Initialize(CallSafeHandle call)
{
- call.SetCompletionRegistry(environment.CompletionRegistry);
+ call.Initialize(environment.CompletionRegistry, environment.CompletionQueue);
server.AddCallReference(this);
InitializeInternal(call);
diff --git a/src/csharp/Grpc.Core/Internal/AtomicCounter.cs b/src/csharp/Grpc.Core/Internal/AtomicCounter.cs
index 7ccda225dc..63bea44e0e 100644
--- a/src/csharp/Grpc.Core/Internal/AtomicCounter.cs
+++ b/src/csharp/Grpc.Core/Internal/AtomicCounter.cs
@@ -40,14 +40,39 @@ namespace Grpc.Core.Internal
{
long counter = 0;
- public void Increment()
+ public AtomicCounter(long initialCount = 0)
{
- Interlocked.Increment(ref counter);
+ this.counter = initialCount;
}
- public void Decrement()
+ public long Increment()
{
- Interlocked.Decrement(ref counter);
+ return Interlocked.Increment(ref counter);
+ }
+
+ public void IncrementIfNonzero(ref bool success)
+ {
+ long origValue = counter;
+ while (true)
+ {
+ if (origValue == 0)
+ {
+ success = false;
+ return;
+ }
+ long result = Interlocked.CompareExchange(ref counter, origValue + 1, origValue);
+ if (result == origValue)
+ {
+ success = true;
+ return;
+ };
+ origValue = result;
+ }
+ }
+
+ public long Decrement()
+ {
+ return Interlocked.Decrement(ref counter);
}
public long Count
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index ad2e2919b7..69dbdfea5e 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -47,6 +47,7 @@ namespace Grpc.Core.Internal
const uint GRPC_WRITE_BUFFER_HINT = 1;
CompletionRegistry completionRegistry;
+ CompletionQueueSafeHandle completionQueue;
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call);
@@ -112,9 +113,10 @@ namespace Grpc.Core.Internal
{
}
- public void SetCompletionRegistry(CompletionRegistry completionRegistry)
+ public void Initialize(CompletionRegistry completionRegistry, CompletionQueueSafeHandle completionQueue)
{
this.completionRegistry = completionRegistry;
+ this.completionQueue = completionQueue;
}
public void SetCredentials(CallCredentialsSafeHandle credentials)
@@ -124,10 +126,13 @@ namespace Grpc.Core.Internal
public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
- grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
- .CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
+ .CheckOk();
+ }
}
public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
@@ -141,72 +146,102 @@ namespace Grpc.Core.Internal
public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
- grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
+ grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
+ }
}
public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
- grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
+ grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
+ }
}
public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
- grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
+ grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
+ }
}
public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
- grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
+ }
}
public void StartSendCloseFromClient(SendCompletionHandler callback)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
- grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
+ }
}
public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
- grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
+ }
}
public void StartReceiveMessage(ReceivedMessageHandler callback)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
- grpcsharp_call_recv_message(this, ctx).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
+ grpcsharp_call_recv_message(this, ctx).CheckOk();
+ }
}
public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
- grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
+ grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
+ }
}
public void StartServerSide(ReceivedCloseOnServerHandler callback)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
- grpcsharp_call_start_serverside(this, ctx).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
+ grpcsharp_call_start_serverside(this, ctx).CheckOk();
+ }
}
public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
- grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
+ }
}
public void Cancel()
diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
index b3aa27c40f..4a5584121e 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
@@ -92,7 +92,7 @@ namespace Grpc.Core.Internal
{
result.SetCredentials(credentials);
}
- result.SetCompletionRegistry(registry);
+ result.Initialize(registry, cq);
return result;
}
}
diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
index 9de2bc7950..3754ad382e 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
@@ -33,6 +33,8 @@ using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Profiling;
+using Grpc.Core.Utils;
+
namespace Grpc.Core.Internal
{
/// <summary>
@@ -40,6 +42,8 @@ namespace Grpc.Core.Internal
/// </summary>
internal class CompletionQueueSafeHandle : SafeHandleZeroIsInvalid
{
+ AtomicCounter shutdownRefcount = new AtomicCounter(1);
+
[DllImport("grpc_csharp_ext.dll")]
static extern CompletionQueueSafeHandle grpcsharp_completion_queue_create();
@@ -62,6 +66,7 @@ namespace Grpc.Core.Internal
public static CompletionQueueSafeHandle Create()
{
return grpcsharp_completion_queue_create();
+
}
public CompletionQueueEvent Next()
@@ -77,9 +82,18 @@ namespace Grpc.Core.Internal
}
}
+ /// <summary>
+ /// Creates a new usage scope for this completion queue. Once successfully created,
+ /// the completion queue won't be shutdown before scope.Dispose() is called.
+ /// </summary>
+ public UsageScope NewScope()
+ {
+ return new UsageScope(this);
+ }
+
public void Shutdown()
{
- grpcsharp_completion_queue_shutdown(this);
+ DecrementShutdownRefcount();
}
protected override bool ReleaseHandle()
@@ -87,5 +101,43 @@ namespace Grpc.Core.Internal
grpcsharp_completion_queue_destroy(handle);
return true;
}
+
+ private void DecrementShutdownRefcount()
+ {
+ if (shutdownRefcount.Decrement() == 0)
+ {
+ grpcsharp_completion_queue_shutdown(this);
+ }
+ }
+
+ private void BeginOp()
+ {
+ bool success = false;
+ shutdownRefcount.IncrementIfNonzero(ref success);
+ Preconditions.CheckState(success, "Shutdown has already been called");
+ }
+
+ private void EndOp()
+ {
+ DecrementShutdownRefcount();
+ }
+
+ // Allows declaring BeginOp and EndOp of a completion queue with a using statement.
+ // Declared as struct for better performance.
+ public struct UsageScope : IDisposable
+ {
+ readonly CompletionQueueSafeHandle cq;
+
+ public UsageScope(CompletionQueueSafeHandle cq)
+ {
+ this.cq = cq;
+ this.cq.BeginOp();
+ }
+
+ public void Dispose()
+ {
+ cq.EndOp();
+ }
+ }
}
}