diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/security/credentials.c | 4 | ||||
-rw-r--r-- | src/core/surface/init.c | 4 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_settings.c | 6 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 4 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 2 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AtomicCounter.cs | 33 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 105 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs | 2 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs | 54 | ||||
-rw-r--r-- | src/python/grpcio/tests/_result.py | 2 |
10 files changed, 165 insertions, 51 deletions
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c index a0054741ad..3013fac465 100644 --- a/src/core/security/credentials.c +++ b/src/core/security/credentials.c @@ -179,8 +179,8 @@ void grpc_server_credentials_set_auth_metadata_processor( GRPC_API_TRACE( "grpc_server_credentials_set_auth_metadata_processor(" "creds=%p, " - "processor=grpc_auth_metadata_processor { process: %lx, state: %p })", - 3, (creds, (unsigned long)processor.process, processor.state)); + "processor=grpc_auth_metadata_processor { process: %p, state: %p })", + 3, (creds, (void*)(gpr_intptr)processor.process, processor.state)); if (creds == NULL) return; if (creds->processor.destroy != NULL && creds->processor.state != NULL) { creds->processor.destroy(creds->processor.state); diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 82027af651..81166e8ec5 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -82,8 +82,8 @@ static grpc_plugin g_all_of_the_plugins[MAX_PLUGINS]; static int g_number_of_plugins = 0; void grpc_register_plugin(void (*init)(void), void (*destroy)(void)) { - GRPC_API_TRACE("grpc_register_plugin(init=%lx, destroy=%lx)", 2, - ((unsigned long)init, (unsigned long)destroy)); + GRPC_API_TRACE("grpc_register_plugin(init=%p, destroy=%p)", 2, + ((void*)(gpr_intptr)init, (void*)(gpr_intptr)destroy)); GPR_ASSERT(g_number_of_plugins != MAX_PLUGINS); g_all_of_the_plugins[g_number_of_plugins].init = init; g_all_of_the_plugins[g_number_of_plugins].destroy = destroy; diff --git a/src/core/transport/chttp2/frame_settings.c b/src/core/transport/chttp2/frame_settings.c index 383b6e7f93..8982d7169c 100644 --- a/src/core/transport/chttp2/frame_settings.c +++ b/src/core/transport/chttp2/frame_settings.c @@ -240,8 +240,10 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse( transport_parsing->initial_window_update = (gpr_int64)parser->value - parser->incoming_settings[parser->id]; - gpr_log(GPR_DEBUG, "adding %d for initial_window change", - (int)transport_parsing->initial_window_update); + if (grpc_http_trace) { + gpr_log(GPR_DEBUG, "adding %d for initial_window change", + (int)transport_parsing->initial_window_update); + } } parser->incoming_settings[parser->id] = parser->value; if (grpc_http_trace) { diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index c3847bb6c5..4561c0bfa9 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -348,7 +348,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_log(GPR_ERROR, "%s: must be an integer", GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER); } else if (channel_args->args[i].value.integer < 0) { - gpr_log(GPR_DEBUG, "%s: must be non-negative", + gpr_log(GPR_ERROR, "%s: must be non-negative", GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER); } else { push_setting(t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, @@ -360,7 +360,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_log(GPR_ERROR, "%s: must be an integer", GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER); } else if (channel_args->args[i].value.integer < 0) { - gpr_log(GPR_DEBUG, "%s: must be non-negative", + gpr_log(GPR_ERROR, "%s: must be non-negative", GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER); } else { grpc_chttp2_hpack_compressor_set_max_usable_size( diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 46ca459349..0c805097f9 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -58,7 +58,7 @@ namespace Grpc.Core.Internal public void Initialize(CallSafeHandle call) { - call.SetCompletionRegistry(environment.CompletionRegistry); + call.Initialize(environment.CompletionRegistry, environment.CompletionQueue); server.AddCallReference(this); InitializeInternal(call); diff --git a/src/csharp/Grpc.Core/Internal/AtomicCounter.cs b/src/csharp/Grpc.Core/Internal/AtomicCounter.cs index 7ccda225dc..63bea44e0e 100644 --- a/src/csharp/Grpc.Core/Internal/AtomicCounter.cs +++ b/src/csharp/Grpc.Core/Internal/AtomicCounter.cs @@ -40,14 +40,39 @@ namespace Grpc.Core.Internal { long counter = 0; - public void Increment() + public AtomicCounter(long initialCount = 0) { - Interlocked.Increment(ref counter); + this.counter = initialCount; } - public void Decrement() + public long Increment() { - Interlocked.Decrement(ref counter); + return Interlocked.Increment(ref counter); + } + + public void IncrementIfNonzero(ref bool success) + { + long origValue = counter; + while (true) + { + if (origValue == 0) + { + success = false; + return; + } + long result = Interlocked.CompareExchange(ref counter, origValue + 1, origValue); + if (result == origValue) + { + success = true; + return; + }; + origValue = result; + } + } + + public long Decrement() + { + return Interlocked.Decrement(ref counter); } public long Count diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index ad2e2919b7..69dbdfea5e 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -47,6 +47,7 @@ namespace Grpc.Core.Internal const uint GRPC_WRITE_BUFFER_HINT = 1; CompletionRegistry completionRegistry; + CompletionQueueSafeHandle completionQueue; [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call); @@ -112,9 +113,10 @@ namespace Grpc.Core.Internal { } - public void SetCompletionRegistry(CompletionRegistry completionRegistry) + public void Initialize(CompletionRegistry completionRegistry, CompletionQueueSafeHandle completionQueue) { this.completionRegistry = completionRegistry; + this.completionQueue = completionQueue; } public void SetCredentials(CallCredentialsSafeHandle credentials) @@ -124,10 +126,13 @@ namespace Grpc.Core.Internal public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); - grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags) - .CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); + grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags) + .CheckOk(); + } } public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) @@ -141,72 +146,102 @@ namespace Grpc.Core.Internal public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); - grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); + grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk(); + } } public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); - grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); + grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk(); + } } public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); - grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); + grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk(); + } } public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); - grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk(); + } } public void StartSendCloseFromClient(SendCompletionHandler callback) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); - grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); + } } public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); - grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk(); + } } public void StartReceiveMessage(ReceivedMessageHandler callback) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage())); - grpcsharp_call_recv_message(this, ctx).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage())); + grpcsharp_call_recv_message(this, ctx).CheckOk(); + } } public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata())); - grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata())); + grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk(); + } } public void StartServerSide(ReceivedCloseOnServerHandler callback) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled())); - grpcsharp_call_start_serverside(this, ctx).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled())); + grpcsharp_call_start_serverside(this, ctx).CheckOk(); + } } public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray) { - var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); - grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk(); + using (completionQueue.NewScope()) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk(); + } } public void Cancel() diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs index b3aa27c40f..4a5584121e 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs @@ -92,7 +92,7 @@ namespace Grpc.Core.Internal { result.SetCredentials(credentials); } - result.SetCompletionRegistry(registry); + result.Initialize(registry, cq); return result; } } diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs index 9de2bc7950..3754ad382e 100644 --- a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs @@ -33,6 +33,8 @@ using System.Runtime.InteropServices; using System.Threading.Tasks; using Grpc.Core.Profiling; +using Grpc.Core.Utils; + namespace Grpc.Core.Internal { /// <summary> @@ -40,6 +42,8 @@ namespace Grpc.Core.Internal /// </summary> internal class CompletionQueueSafeHandle : SafeHandleZeroIsInvalid { + AtomicCounter shutdownRefcount = new AtomicCounter(1); + [DllImport("grpc_csharp_ext.dll")] static extern CompletionQueueSafeHandle grpcsharp_completion_queue_create(); @@ -62,6 +66,7 @@ namespace Grpc.Core.Internal public static CompletionQueueSafeHandle Create() { return grpcsharp_completion_queue_create(); + } public CompletionQueueEvent Next() @@ -77,9 +82,18 @@ namespace Grpc.Core.Internal } } + /// <summary> + /// Creates a new usage scope for this completion queue. Once successfully created, + /// the completion queue won't be shutdown before scope.Dispose() is called. + /// </summary> + public UsageScope NewScope() + { + return new UsageScope(this); + } + public void Shutdown() { - grpcsharp_completion_queue_shutdown(this); + DecrementShutdownRefcount(); } protected override bool ReleaseHandle() @@ -87,5 +101,43 @@ namespace Grpc.Core.Internal grpcsharp_completion_queue_destroy(handle); return true; } + + private void DecrementShutdownRefcount() + { + if (shutdownRefcount.Decrement() == 0) + { + grpcsharp_completion_queue_shutdown(this); + } + } + + private void BeginOp() + { + bool success = false; + shutdownRefcount.IncrementIfNonzero(ref success); + Preconditions.CheckState(success, "Shutdown has already been called"); + } + + private void EndOp() + { + DecrementShutdownRefcount(); + } + + // Allows declaring BeginOp and EndOp of a completion queue with a using statement. + // Declared as struct for better performance. + public struct UsageScope : IDisposable + { + readonly CompletionQueueSafeHandle cq; + + public UsageScope(CompletionQueueSafeHandle cq) + { + this.cq = cq; + this.cq.BeginOp(); + } + + public void Dispose() + { + cq.EndOp(); + } + } } } diff --git a/src/python/grpcio/tests/_result.py b/src/python/grpcio/tests/_result.py index 5a570f4279..0670be921f 100644 --- a/src/python/grpcio/tests/_result.py +++ b/src/python/grpcio/tests/_result.py @@ -406,7 +406,7 @@ def summary(result): unexpected_successful=len(unexpected_successes), interrupted=str(running_names))) tracebacks = '\n\n'.join([ - (_Colors.FAIL + '{test_name}' + _Colors.END + + '\n' + + (_Colors.FAIL + '{test_name}' + _Colors.END + '\n' + _Colors.BOLD + 'traceback:' + _Colors.END + '\n' + '{traceback}\n' + _Colors.BOLD + 'stdout:' + _Colors.END + '\n' + |