aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Julien Boeuf <jboeuf@google.com>2015-12-18 09:50:45 -0800
committerGravatar Julien Boeuf <jboeuf@google.com>2015-12-18 09:50:45 -0800
commit96d7c8e7c906f209052a3617a1f97ac3950bd0ba (patch)
tree5e2e1d50944945998602ad5c29355f849d789d51 /src
parentee9d78b960f5c26181c9499412829a95fc972745 (diff)
parent3f9fe46d4df9ee3be7a4248dabad0df6f875ceb4 (diff)
Merge branch 'master' of github.com:grpc/grpc into remove_peer_from_ssl_auth_context
Diffstat (limited to 'src')
-rw-r--r--src/core/security/credentials.c4
-rw-r--r--src/core/surface/init.c4
-rw-r--r--src/core/transport/chttp2/frame_settings.c6
-rw-r--r--src/core/transport/chttp2_transport.c4
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/AtomicCounter.cs33
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs105
-rw-r--r--src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs54
-rw-r--r--src/python/grpcio/tests/_result.py2
10 files changed, 165 insertions, 51 deletions
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c
index 866c4b792f..1d1c3b098a 100644
--- a/src/core/security/credentials.c
+++ b/src/core/security/credentials.c
@@ -179,8 +179,8 @@ void grpc_server_credentials_set_auth_metadata_processor(
GRPC_API_TRACE(
"grpc_server_credentials_set_auth_metadata_processor("
"creds=%p, "
- "processor=grpc_auth_metadata_processor { process: %lx, state: %p })",
- 3, (creds, (unsigned long)processor.process, processor.state));
+ "processor=grpc_auth_metadata_processor { process: %p, state: %p })",
+ 3, (creds, (void*)(gpr_intptr)processor.process, processor.state));
if (creds == NULL) return;
if (creds->processor.destroy != NULL && creds->processor.state != NULL) {
creds->processor.destroy(creds->processor.state);
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index 82027af651..81166e8ec5 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -82,8 +82,8 @@ static grpc_plugin g_all_of_the_plugins[MAX_PLUGINS];
static int g_number_of_plugins = 0;
void grpc_register_plugin(void (*init)(void), void (*destroy)(void)) {
- GRPC_API_TRACE("grpc_register_plugin(init=%lx, destroy=%lx)", 2,
- ((unsigned long)init, (unsigned long)destroy));
+ GRPC_API_TRACE("grpc_register_plugin(init=%p, destroy=%p)", 2,
+ ((void*)(gpr_intptr)init, (void*)(gpr_intptr)destroy));
GPR_ASSERT(g_number_of_plugins != MAX_PLUGINS);
g_all_of_the_plugins[g_number_of_plugins].init = init;
g_all_of_the_plugins[g_number_of_plugins].destroy = destroy;
diff --git a/src/core/transport/chttp2/frame_settings.c b/src/core/transport/chttp2/frame_settings.c
index 383b6e7f93..8982d7169c 100644
--- a/src/core/transport/chttp2/frame_settings.c
+++ b/src/core/transport/chttp2/frame_settings.c
@@ -240,8 +240,10 @@ grpc_chttp2_parse_error grpc_chttp2_settings_parser_parse(
transport_parsing->initial_window_update =
(gpr_int64)parser->value -
parser->incoming_settings[parser->id];
- gpr_log(GPR_DEBUG, "adding %d for initial_window change",
- (int)transport_parsing->initial_window_update);
+ if (grpc_http_trace) {
+ gpr_log(GPR_DEBUG, "adding %d for initial_window change",
+ (int)transport_parsing->initial_window_update);
+ }
}
parser->incoming_settings[parser->id] = parser->value;
if (grpc_http_trace) {
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index c3847bb6c5..4561c0bfa9 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -348,7 +348,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log(GPR_ERROR, "%s: must be an integer",
GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER);
} else if (channel_args->args[i].value.integer < 0) {
- gpr_log(GPR_DEBUG, "%s: must be non-negative",
+ gpr_log(GPR_ERROR, "%s: must be non-negative",
GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER);
} else {
push_setting(t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
@@ -360,7 +360,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
gpr_log(GPR_ERROR, "%s: must be an integer",
GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER);
} else if (channel_args->args[i].value.integer < 0) {
- gpr_log(GPR_DEBUG, "%s: must be non-negative",
+ gpr_log(GPR_ERROR, "%s: must be non-negative",
GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER);
} else {
grpc_chttp2_hpack_compressor_set_max_usable_size(
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 46ca459349..0c805097f9 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -58,7 +58,7 @@ namespace Grpc.Core.Internal
public void Initialize(CallSafeHandle call)
{
- call.SetCompletionRegistry(environment.CompletionRegistry);
+ call.Initialize(environment.CompletionRegistry, environment.CompletionQueue);
server.AddCallReference(this);
InitializeInternal(call);
diff --git a/src/csharp/Grpc.Core/Internal/AtomicCounter.cs b/src/csharp/Grpc.Core/Internal/AtomicCounter.cs
index 7ccda225dc..63bea44e0e 100644
--- a/src/csharp/Grpc.Core/Internal/AtomicCounter.cs
+++ b/src/csharp/Grpc.Core/Internal/AtomicCounter.cs
@@ -40,14 +40,39 @@ namespace Grpc.Core.Internal
{
long counter = 0;
- public void Increment()
+ public AtomicCounter(long initialCount = 0)
{
- Interlocked.Increment(ref counter);
+ this.counter = initialCount;
}
- public void Decrement()
+ public long Increment()
{
- Interlocked.Decrement(ref counter);
+ return Interlocked.Increment(ref counter);
+ }
+
+ public void IncrementIfNonzero(ref bool success)
+ {
+ long origValue = counter;
+ while (true)
+ {
+ if (origValue == 0)
+ {
+ success = false;
+ return;
+ }
+ long result = Interlocked.CompareExchange(ref counter, origValue + 1, origValue);
+ if (result == origValue)
+ {
+ success = true;
+ return;
+ };
+ origValue = result;
+ }
+ }
+
+ public long Decrement()
+ {
+ return Interlocked.Decrement(ref counter);
}
public long Count
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index ad2e2919b7..69dbdfea5e 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -47,6 +47,7 @@ namespace Grpc.Core.Internal
const uint GRPC_WRITE_BUFFER_HINT = 1;
CompletionRegistry completionRegistry;
+ CompletionQueueSafeHandle completionQueue;
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call);
@@ -112,9 +113,10 @@ namespace Grpc.Core.Internal
{
}
- public void SetCompletionRegistry(CompletionRegistry completionRegistry)
+ public void Initialize(CompletionRegistry completionRegistry, CompletionQueueSafeHandle completionQueue)
{
this.completionRegistry = completionRegistry;
+ this.completionQueue = completionQueue;
}
public void SetCredentials(CallCredentialsSafeHandle credentials)
@@ -124,10 +126,13 @@ namespace Grpc.Core.Internal
public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
- grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
- .CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
+ .CheckOk();
+ }
}
public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
@@ -141,72 +146,102 @@ namespace Grpc.Core.Internal
public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
- grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
+ grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
+ }
}
public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
- grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
+ grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
+ }
}
public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
- grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
+ grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
+ }
}
public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
- grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
+ }
}
public void StartSendCloseFromClient(SendCompletionHandler callback)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
- grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
+ }
}
public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
- grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
+ }
}
public void StartReceiveMessage(ReceivedMessageHandler callback)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
- grpcsharp_call_recv_message(this, ctx).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
+ grpcsharp_call_recv_message(this, ctx).CheckOk();
+ }
}
public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
- grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
+ grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
+ }
}
public void StartServerSide(ReceivedCloseOnServerHandler callback)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
- grpcsharp_call_start_serverside(this, ctx).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
+ grpcsharp_call_start_serverside(this, ctx).CheckOk();
+ }
}
public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
{
- var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
- grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
+ using (completionQueue.NewScope())
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
+ }
}
public void Cancel()
diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
index b3aa27c40f..4a5584121e 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
@@ -92,7 +92,7 @@ namespace Grpc.Core.Internal
{
result.SetCredentials(credentials);
}
- result.SetCompletionRegistry(registry);
+ result.Initialize(registry, cq);
return result;
}
}
diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
index 9de2bc7950..3754ad382e 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
@@ -33,6 +33,8 @@ using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Profiling;
+using Grpc.Core.Utils;
+
namespace Grpc.Core.Internal
{
/// <summary>
@@ -40,6 +42,8 @@ namespace Grpc.Core.Internal
/// </summary>
internal class CompletionQueueSafeHandle : SafeHandleZeroIsInvalid
{
+ AtomicCounter shutdownRefcount = new AtomicCounter(1);
+
[DllImport("grpc_csharp_ext.dll")]
static extern CompletionQueueSafeHandle grpcsharp_completion_queue_create();
@@ -62,6 +66,7 @@ namespace Grpc.Core.Internal
public static CompletionQueueSafeHandle Create()
{
return grpcsharp_completion_queue_create();
+
}
public CompletionQueueEvent Next()
@@ -77,9 +82,18 @@ namespace Grpc.Core.Internal
}
}
+ /// <summary>
+ /// Creates a new usage scope for this completion queue. Once successfully created,
+ /// the completion queue won't be shutdown before scope.Dispose() is called.
+ /// </summary>
+ public UsageScope NewScope()
+ {
+ return new UsageScope(this);
+ }
+
public void Shutdown()
{
- grpcsharp_completion_queue_shutdown(this);
+ DecrementShutdownRefcount();
}
protected override bool ReleaseHandle()
@@ -87,5 +101,43 @@ namespace Grpc.Core.Internal
grpcsharp_completion_queue_destroy(handle);
return true;
}
+
+ private void DecrementShutdownRefcount()
+ {
+ if (shutdownRefcount.Decrement() == 0)
+ {
+ grpcsharp_completion_queue_shutdown(this);
+ }
+ }
+
+ private void BeginOp()
+ {
+ bool success = false;
+ shutdownRefcount.IncrementIfNonzero(ref success);
+ Preconditions.CheckState(success, "Shutdown has already been called");
+ }
+
+ private void EndOp()
+ {
+ DecrementShutdownRefcount();
+ }
+
+ // Allows declaring BeginOp and EndOp of a completion queue with a using statement.
+ // Declared as struct for better performance.
+ public struct UsageScope : IDisposable
+ {
+ readonly CompletionQueueSafeHandle cq;
+
+ public UsageScope(CompletionQueueSafeHandle cq)
+ {
+ this.cq = cq;
+ this.cq.BeginOp();
+ }
+
+ public void Dispose()
+ {
+ cq.EndOp();
+ }
+ }
}
}
diff --git a/src/python/grpcio/tests/_result.py b/src/python/grpcio/tests/_result.py
index 5a570f4279..0670be921f 100644
--- a/src/python/grpcio/tests/_result.py
+++ b/src/python/grpcio/tests/_result.py
@@ -406,7 +406,7 @@ def summary(result):
unexpected_successful=len(unexpected_successes),
interrupted=str(running_names)))
tracebacks = '\n\n'.join([
- (_Colors.FAIL + '{test_name}' + _Colors.END + + '\n' +
+ (_Colors.FAIL + '{test_name}' + _Colors.END + '\n' +
_Colors.BOLD + 'traceback:' + _Colors.END + '\n' +
'{traceback}\n' +
_Colors.BOLD + 'stdout:' + _Colors.END + '\n' +