From 694cf8b0d266f4fadcc5b52f1bfc6f2c0dd1ccb5 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 15 Jan 2016 21:13:25 -0800 Subject: Shared subchannel sketch --- src/core/surface/init.c | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src/core/surface') diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 19cea4c4f6..8f1936227e 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -46,6 +46,7 @@ #include "src/core/client_config/resolver_registry.h" #include "src/core/client_config/resolvers/dns_resolver.h" #include "src/core/client_config/resolvers/sockaddr_resolver.h" +#include "src/core/client_config/subchannel.h" #include "src/core/debug/trace.h" #include "src/core/iomgr/executor.h" #include "src/core/iomgr/iomgr.h" @@ -125,6 +126,7 @@ void grpc_init(void) { } gpr_timers_global_init(); grpc_cq_global_init(); + grpc_subchannel_global_init(); for (i = 0; i < g_number_of_plugins; i++) { if (g_all_of_the_plugins[i].init != NULL) { g_all_of_the_plugins[i].init(); @@ -143,6 +145,7 @@ void grpc_shutdown(void) { grpc_executor_shutdown(); grpc_cq_global_shutdown(); grpc_iomgr_shutdown(); + grpc_subchannel_global_shutdown(); census_shutdown(); gpr_timers_global_destroy(); grpc_tracer_shutdown(); -- cgit v1.2.3 From 7391f133375f0840a4219db24f9a93a96887742e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 22 Jan 2016 06:39:54 -0800 Subject: subchannel progress --- src/core/client_config/connector.c | 3 +- src/core/client_config/connector.h | 2 +- src/core/client_config/subchannel.c | 21 ++++-- src/core/client_config/subchannel.h | 8 ++- src/core/client_config/subchannel_index.c | 104 +++++++++++++++++++++--------- src/core/client_config/subchannel_index.h | 22 +++++++ src/core/surface/secure_channel_create.c | 2 +- 7 files changed, 123 insertions(+), 39 deletions(-) (limited to 'src/core/surface') diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c index 1603ffb8be..eaa215fe8f 100644 --- a/src/core/client_config/connector.c +++ b/src/core/client_config/connector.c @@ -33,8 +33,9 @@ #include "src/core/client_config/connector.h" -void grpc_connector_ref(grpc_connector* connector) { +grpc_connector *grpc_connector_ref(grpc_connector* connector) { connector->vtable->ref(connector); + return connector; } void grpc_connector_unref(grpc_exec_ctx* exec_ctx, grpc_connector* connector) { diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h index b4482fa2ee..b301e1bb19 100644 --- a/src/core/client_config/connector.h +++ b/src/core/client_config/connector.h @@ -81,7 +81,7 @@ struct grpc_connector_vtable { grpc_connect_out_args *out_args, grpc_closure *notify); }; -void grpc_connector_ref(grpc_connector *connector); +grpc_connector *grpc_connector_ref(grpc_connector *connector); void grpc_connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *connector); /** Connect using the connector: max one outstanding call at a time */ void grpc_connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *connector, diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 9f287c4b03..bf2f15a444 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -42,11 +42,11 @@ #include "src/core/channel/client_channel.h" #include "src/core/channel/connected_channel.h" #include "src/core/client_config/initial_connect_string.h" +#include "src/core/client_config/subchannel_index.h" #include "src/core/iomgr/timer.h" #include "src/core/profiling/timers.h" #include "src/core/surface/channel.h" #include "src/core/transport/connectivity_state.h" -#include "src/core/transport/connectivity_state.h" #define INTERNAL_REF_BITS 16 #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1)) @@ -95,6 +95,8 @@ struct grpc_subchannel { struct sockaddr *addr; size_t addr_len; + grpc_subchannel_key *key; + /** initial string to send to peer */ gpr_slice initial_connect_string; @@ -239,6 +241,7 @@ void grpc_subchannel_weak_ref(grpc_subchannel *c static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connected_subchannel *con; + grpc_subchannel_index_unregister(exec_ctx, c->key, c); gpr_mu_lock(&c->mu); GPR_ASSERT(!c->disconnected); c->disconnected = 1; @@ -277,10 +280,19 @@ static uint32_t random_seed() { return (uint32_t)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC))); } -grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, +grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, + grpc_connector *connector, grpc_subchannel_args *args) { - grpc_subchannel *c = gpr_malloc(sizeof(*c)); + grpc_subchannel_key *key = grpc_subchannel_key_create(connector, args); + grpc_subchannel *c = grpc_subchannel_index_find(exec_ctx, key); + if (c) { + grpc_subchannel_key_destroy(key); + return c; + } + + c = gpr_malloc(sizeof(*c)); memset(c, 0, sizeof(*c)); + c->key = key; gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); c->connector = connector; grpc_connector_ref(c->connector); @@ -302,7 +314,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); gpr_mu_init(&c->mu); - return c; + + return grpc_subchannel_index_register(exec_ctx, key, c); } static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 57c7c9dc67..8fd976276b 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -48,6 +48,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; #ifdef GRPC_STREAM_REFCOUNT_DEBUG #define GRPC_SUBCHANNEL_REF(p, r) \ grpc_subchannel_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \ + grpc_subchannel_ref_from_weak_ref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_UNREF(cl, p, r) \ grpc_subchannel_unref((cl), (p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_WEAK_REF(p, r) \ @@ -66,6 +68,7 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; , const char *file, int line, const char *reason #else #define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p)) +#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) grpc_subchannel_ref_from_weak_ref((p)) #define GRPC_SUBCHANNEL_UNREF(cl, p, r) grpc_subchannel_unref((cl), (p)) #define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p)) #define GRPC_SUBCHANNEL_WEAK_UNREF(cl, p, r) \ @@ -146,6 +149,8 @@ grpc_call_stack *grpc_subchannel_call_get_call_stack( grpc_subchannel_call *subchannel_call); struct grpc_subchannel_args { + /* When updating this struct, also update subchannel_index.c */ + /** Channel filters for this channel - wrapped factories will likely want to mutate this */ const grpc_channel_filter **filters; @@ -159,7 +164,8 @@ struct grpc_subchannel_args { }; /** create a subchannel given a connector */ -grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, +grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, + grpc_connector *connector, grpc_subchannel_args *args); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_H */ diff --git a/src/core/client_config/subchannel_index.c b/src/core/client_config/subchannel_index.c index ffe6c1fe93..9f6ecca295 100644 --- a/src/core/client_config/subchannel_index.c +++ b/src/core/client_config/subchannel_index.c @@ -31,49 +31,83 @@ * */ +#include "src/core/client_config/subchannel_index.h" + +#include + +#include +#include +#include + +#include "src/core/channel/channel_args.h" + /* a map of subchannel_key --> subchannel, used for detecting connections to the same destination in order to share them */ static gpr_avl g_subchannel_index; static gpr_mu g_mu; -struct subchannel_key { - size_t addr_len; - struct sockaddr *addr; - grpc_channel_args *normalized_args; +struct grpc_subchannel_key { + grpc_connector *connector; + grpc_subchannel_args args; }; GPR_TLS_DECL(subchannel_index_exec_ctx); -static subchannel_key *subchannel_key_create(struct sockaddr *sockaddr, size_t addr_len, grpc_channel_args *args) { - subchannel_key *k = gpr_malloc(sizeof(*k)); - k->addr_len = addr_len; - k->addr = gpr_malloc(addr_len); - memcpy(k->addr, addr, addr_len); - k->normalized_args = grpc_channel_args_normalize(args); - return k; +static void enter_ctx(grpc_exec_ctx *exec_ctx) { + GPR_ASSERT(gpr_tls_get(&subchannel_index_exec_ctx) == 0); + gpr_tls_set(&subchannel_index_exec_ctx, (intptr_t)exec_ctx); +} + +static void leave_ctx(grpc_exec_ctx *exec_ctx) { + GPR_ASSERT(gpr_tls_get(&subchannel_index_exec_ctx) == (intptr_t)exec_ctx); + gpr_tls_set(&subchannel_index_exec_ctx, 0); +} + +static grpc_exec_ctx *current_ctx() { + grpc_exec_ctx *c = (grpc_exec_ctx *)gpr_tls_get(&subchannel_index_exec_ctx); + GPR_ASSERT(c != NULL); + return c; } -static subchannel_key *subchannel_key_copy(subchannel_key *k) { - subchannel_key *k = gpr_malloc(sizeof(*k)); - k->addr_len = addr_len; - k->addr = gpr_malloc(addr_len); - memcpy(k->addr, addr, addr_len); - k->normalized_args = grpc_channel_args_copy(args); +static grpc_subchannel_key *create_key(grpc_connector *connector, grpc_subchannel_args *args, grpc_channel_args *(*copy_channel_args)(const grpc_channel_args *args)) { + grpc_subchannel_key *k = gpr_malloc(sizeof(*k)); + k->connector = grpc_connector_ref(connector); + k->args.filter_count = args->filter_count; + k->args.filters = gpr_malloc(sizeof(*k->args.filters) * k->args.filter_count); + memcpy(k->args.filters, args->filters, sizeof(*k->args.filters) * k->args.filter_count); + k->args.addr_len = args->addr_len; + k->args.addr = gpr_malloc(args->addr_len); + memcpy(k->args.addr, args->addr, k->args.addr_len); + k->args.args = copy_channel_args(args->args); return k; } -static int subchannel_key_compare(subchannel_key *a, subchannel_key *b) { - int c = GPR_ICMP(a->addr_len, b->addr_len); +grpc_subchannel_key *grpc_subchannel_key_create(grpc_connector *connector, grpc_subchannel_args *args) { + return create_key(connector, args, grpc_channel_args_normalize); +} + +static grpc_subchannel_key *subchannel_key_copy(grpc_subchannel_key *k) { + return create_key(k->connector, &k->args, grpc_channel_args_copy); +} + +static int subchannel_key_compare(grpc_subchannel_key *a, grpc_subchannel_key *b) { + int c = GPR_ICMP(a->connector, b->connector); + if (c != 0) return c; + c = GPR_ICMP(a->args.addr_len, b->args.addr_len); if (c != 0) return c; - c = memcmp(a->addr, b->addr, a->addr_len); + c = GPR_ICMP(a->args.filter_count, b->args.filter_count); if (c != 0) return c; - return grpc_channel_args_compare(a->normalized_args, b->normalized_args); + c = memcmp(a->args.addr, b->args.addr, a->args.addr_len); + if (c != 0) return c; + c = memcmp(a->args.filters, b->args.filters, a->args.filter_count * sizeof(*a->args.filters)); + return grpc_channel_args_compare(a->args.args, b->args.args); } -static void subchannel_key_destroy(subchannel_key *k) { - gpr_free(k->addr); - grpc_channel_args_destroy(k->normalized_args); +static void subchannel_key_destroy(grpc_subchannel_key *k) { + gpr_free(k->args.addr); + gpr_free(k->args.filters); + grpc_channel_args_destroy((grpc_channel_args*)k->args.args); gpr_free(k); } @@ -85,16 +119,17 @@ static void *sck_avl_copy(void *p) { return subchannel_key_copy(p); } -static void *sck_avl_compare(void *a, void *b) { +static long sck_avl_compare(void *a, void *b) { return subchannel_key_compare(a, b); } static void scv_avl_destroy(void *p) { - GRPC_SUBCHANNEL_UNREF(exec_ctx, p, "subchannel_index"); + grpc_exec_ctx *exec_ctx = current_ctx(); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, p, "subchannel_index"); } static void *scv_avl_copy(void *p) { - GRPC_SUBCHANNEL_REF(p, "subchannel_index"); + GRPC_SUBCHANNEL_WEAK_REF(p, "subchannel_index"); return p; } @@ -107,26 +142,31 @@ static const gpr_avl_vtable subchannel_avl_vtable = { }; grpc_subchannel *grpc_subchannel_index_find( - grpc_exec_ctx *ctx, + grpc_exec_ctx *exec_ctx, grpc_connector *connector, grpc_subchannel_args *args) { + enter_ctx(ctx); + gpr_mu_lock(&g_mu); gpr_avl index = gpr_avl_ref(g_subchannel_index); gpr_mu_unlock(&g_mu); subchannel_key *key = subchannel_key_create(connector, args); - grpc_subchannel *c = grpc_subchannel_ref(gpr_avl_get(index, key)); + grpc_subchannel *c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(gpr_avl_get(index, key)); subchannel_key_destroy(key); gpr_avl_unref(index); + leave_ctx(ctx); return c; } grpc_subchannel *grpc_subchannel_index_register( - grpc_exec_ctx *ctx, + grpc_exec_ctx *exec_ctx, grpc_connector *connector, grpc_subchannel_args *args, grpc_subchannel *constructed) { + enter_ctx(ctx); + subchannel_key *key = subchannel_key_create(connector, args); grpc_subchannel *c = NULL; @@ -137,7 +177,7 @@ grpc_subchannel *grpc_subchannel_index_register( c = gpr_avl_get(index, key); if (c != NULL) { - GRPC_SUBCHANNEL_UNREF(constructed); + GRPC_SUBCHANNEL_WEAK_UNREF(constructed); } else { gpr_avl updated = gpr_avl_add(index, key, constructed); @@ -151,5 +191,7 @@ grpc_subchannel *grpc_subchannel_index_register( gpr_avl_unref(index); } + leave_ctx(ctx); + return c; } diff --git a/src/core/client_config/subchannel_index.h b/src/core/client_config/subchannel_index.h index d501e121f1..dfbc3228d9 100644 --- a/src/core/client_config/subchannel_index.h +++ b/src/core/client_config/subchannel_index.h @@ -34,6 +34,28 @@ #ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_INDEX_H #define GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_INDEX_H +#include "src/core/client_config/connector.h" +#include "src/core/client_config/subchannel.h" +typedef struct grpc_subchannel_key grpc_subchannel_key; + +grpc_subchannel_key *grpc_subchannel_key_create( + grpc_connector *con, grpc_subchannel_args *args); + +void grpc_subchannel_key_destroy(grpc_subchannel_key *key); + +grpc_subchannel *grpc_subchannel_index_find( + grpc_exec_ctx *ctx, + grpc_subchannel_key *key); + +grpc_subchannel *grpc_subchannel_index_register( + grpc_exec_ctx *ctx, + grpc_subchannel_key *key, + grpc_subchannel *constructed); + +void grpc_subchannel_index_unregister( + grpc_exec_ctx *ctx, + grpc_subchannel_key *key, + grpc_subchannel *constructed); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_INDEX_H */ diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 552a570713..38f3e28e3d 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -238,7 +238,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel( gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); args->args = final_args; - s = grpc_subchannel_create(&c->base, args); + s = grpc_subchannel_create(exec_ctx, &c->base, args); grpc_connector_unref(exec_ctx, &c->base); grpc_channel_args_destroy(final_args); return s; -- cgit v1.2.3 From 8cdba6644a83333e53b6188ca95b5a6c88968257 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 22 Jan 2016 20:01:55 -0800 Subject: Subchannel index compiles --- src/core/client_config/subchannel.c | 15 +++++++ src/core/client_config/subchannel.h | 2 + src/core/client_config/subchannel_index.c | 72 ++++++++++++++++++++++++------- src/core/client_config/subchannel_index.h | 9 ++-- src/core/surface/channel_create.c | 2 +- src/core/surface/init.c | 5 ++- test/core/end2end/fixtures/h2_uchannel.c | 2 +- 7 files changed, 84 insertions(+), 23 deletions(-) (limited to 'src/core/surface') diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index bf2f15a444..c704595ec7 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -239,6 +239,21 @@ void grpc_subchannel_weak_ref(grpc_subchannel *c GPR_ASSERT(old_refs != 0); } +grpc_subchannel *grpc_subchannel_ref_from_weak_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + if (!c) return NULL; + for (;;) { + gpr_atm old_refs = gpr_atm_acq_load(&c->ref_pair); + if (old_refs >= (1 << INTERNAL_REF_BITS)) { + gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS); + if (gpr_atm_rel_cas(&c->ref_pair, old_refs, new_refs)) { + return c; + } + } else { + return NULL; + } + } +} + static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connected_subchannel *con; grpc_subchannel_index_unregister(exec_ctx, c->key, c); diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 8fd976276b..0d470f593c 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -84,6 +84,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; void grpc_subchannel_ref(grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +grpc_subchannel *grpc_subchannel_ref_from_weak_ref(grpc_subchannel *channel + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); diff --git a/src/core/client_config/subchannel_index.c b/src/core/client_config/subchannel_index.c index 9f6ecca295..575ccb4aad 100644 --- a/src/core/client_config/subchannel_index.c +++ b/src/core/client_config/subchannel_index.c @@ -33,6 +33,7 @@ #include "src/core/client_config/subchannel_index.h" +#include #include #include @@ -104,7 +105,7 @@ static int subchannel_key_compare(grpc_subchannel_key *a, grpc_subchannel_key *b return grpc_channel_args_compare(a->args.args, b->args.args); } -static void subchannel_key_destroy(grpc_subchannel_key *k) { +void grpc_subchannel_key_destroy(grpc_subchannel_key *k) { gpr_free(k->args.addr); gpr_free(k->args.filters); grpc_channel_args_destroy((grpc_channel_args*)k->args.args); @@ -112,7 +113,7 @@ static void subchannel_key_destroy(grpc_subchannel_key *k) { } static void sck_avl_destroy(void *p) { - subchannel_key_destroy(p); + grpc_subchannel_key_destroy(p); } static void *sck_avl_copy(void *p) { @@ -141,33 +142,38 @@ static const gpr_avl_vtable subchannel_avl_vtable = { .copy_value = scv_avl_copy }; +void grpc_subchannel_index_init(void) { + g_subchannel_index = gpr_avl_create(&subchannel_avl_vtable); + gpr_mu_init(&g_mu); +} + +void grpc_subchannel_index_shutdown(void) { + gpr_mu_destroy(&g_mu); + gpr_avl_unref(g_subchannel_index); +} + grpc_subchannel *grpc_subchannel_index_find( grpc_exec_ctx *exec_ctx, - grpc_connector *connector, - grpc_subchannel_args *args) { - enter_ctx(ctx); + grpc_subchannel_key *key) { + enter_ctx(exec_ctx); gpr_mu_lock(&g_mu); gpr_avl index = gpr_avl_ref(g_subchannel_index); gpr_mu_unlock(&g_mu); - subchannel_key *key = subchannel_key_create(connector, args); - grpc_subchannel *c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(gpr_avl_get(index, key)); - subchannel_key_destroy(key); + grpc_subchannel *c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(gpr_avl_get(index, key), "index_find"); gpr_avl_unref(index); - leave_ctx(ctx); + leave_ctx(exec_ctx); return c; } grpc_subchannel *grpc_subchannel_index_register( grpc_exec_ctx *exec_ctx, - grpc_connector *connector, - grpc_subchannel_args *args, + grpc_subchannel_key *key, grpc_subchannel *constructed) { - enter_ctx(ctx); + enter_ctx(exec_ctx); - subchannel_key *key = subchannel_key_create(connector, args); grpc_subchannel *c = NULL; while (c == NULL) { @@ -177,13 +183,13 @@ grpc_subchannel *grpc_subchannel_index_register( c = gpr_avl_get(index, key); if (c != NULL) { - GRPC_SUBCHANNEL_WEAK_UNREF(constructed); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, constructed, "index_register"); } else { gpr_avl updated = gpr_avl_add(index, key, constructed); gpr_mu_lock(&g_mu); if (index.root == g_subchannel_index.root) { - GPR_SWAP(index, g_subchannel_index); + GPR_SWAP(gpr_avl, updated, g_subchannel_index); c = constructed; } gpr_mu_unlock(&g_mu); @@ -191,7 +197,41 @@ grpc_subchannel *grpc_subchannel_index_register( gpr_avl_unref(index); } - leave_ctx(ctx); + leave_ctx(exec_ctx); return c; } + +void grpc_subchannel_index_unregister( + grpc_exec_ctx *exec_ctx, + grpc_subchannel_key *key, + grpc_subchannel *constructed) { + enter_ctx(exec_ctx); + + bool done = false; + while (!done) { + gpr_mu_lock(&g_mu); + gpr_avl index = gpr_avl_ref(g_subchannel_index); + gpr_mu_unlock(&g_mu); + + grpc_subchannel *c = gpr_avl_get(index, key); + if (c != constructed) { + break; + } + + gpr_avl updated = gpr_avl_remove(index, key); + + gpr_mu_lock(&g_mu); + if (index.root == g_subchannel_index.root) { + GPR_SWAP(gpr_avl, updated, g_subchannel_index); + done = true; + } else { + GPR_SWAP(gpr_avl, updated, index); + } + gpr_mu_unlock(&g_mu); + + gpr_avl_unref(index); + } + + leave_ctx(exec_ctx); +} diff --git a/src/core/client_config/subchannel_index.h b/src/core/client_config/subchannel_index.h index dfbc3228d9..69c19969a9 100644 --- a/src/core/client_config/subchannel_index.h +++ b/src/core/client_config/subchannel_index.h @@ -45,17 +45,20 @@ grpc_subchannel_key *grpc_subchannel_key_create( void grpc_subchannel_key_destroy(grpc_subchannel_key *key); grpc_subchannel *grpc_subchannel_index_find( - grpc_exec_ctx *ctx, + grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key); grpc_subchannel *grpc_subchannel_index_register( - grpc_exec_ctx *ctx, + grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key, grpc_subchannel *constructed); void grpc_subchannel_index_unregister( - grpc_exec_ctx *ctx, + grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key, grpc_subchannel *constructed); +void grpc_subchannel_index_init(void); +void grpc_subchannel_index_shutdown(void); + #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_INDEX_H */ diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 49083f0870..031ae1b543 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -172,7 +172,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel( c->base.vtable = &connector_vtable; gpr_ref_init(&c->refs, 1); args->args = final_args; - s = grpc_subchannel_create(&c->base, args); + s = grpc_subchannel_create(exec_ctx, &c->base, args); grpc_connector_unref(exec_ctx, &c->base); grpc_channel_args_destroy(final_args); return s; diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 8f1936227e..66c5a522a4 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -47,6 +47,7 @@ #include "src/core/client_config/resolvers/dns_resolver.h" #include "src/core/client_config/resolvers/sockaddr_resolver.h" #include "src/core/client_config/subchannel.h" +#include "src/core/client_config/subchannel_index.h" #include "src/core/debug/trace.h" #include "src/core/iomgr/executor.h" #include "src/core/iomgr/iomgr.h" @@ -126,7 +127,7 @@ void grpc_init(void) { } gpr_timers_global_init(); grpc_cq_global_init(); - grpc_subchannel_global_init(); + grpc_subchannel_index_init(); for (i = 0; i < g_number_of_plugins; i++) { if (g_all_of_the_plugins[i].init != NULL) { g_all_of_the_plugins[i].init(); @@ -145,7 +146,7 @@ void grpc_shutdown(void) { grpc_executor_shutdown(); grpc_cq_global_shutdown(); grpc_iomgr_shutdown(); - grpc_subchannel_global_shutdown(); + grpc_subchannel_index_shutdown(); census_shutdown(); gpr_timers_global_destroy(); grpc_tracer_shutdown(); diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index 9b622e80d6..ee6aac796d 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -159,7 +159,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel( c->base.vtable = &connector_vtable; gpr_ref_init(&c->refs, 1); args->args = final_args; - s = grpc_subchannel_create(&c->base, args); + s = grpc_subchannel_create(exec_ctx, &c->base, args); grpc_connector_unref(exec_ctx, &c->base); grpc_channel_args_destroy(final_args); if (*f->sniffed_subchannel) { -- cgit v1.2.3 From 194824467c8d014123a8a3cc8f4ad65b6f9a9b85 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 25 Jan 2016 09:59:20 -0800 Subject: Get subchannel index working --- build.yaml | 4 +- grpc.gemspec | 2 + include/grpc/support/avl.h | 3 +- include/grpc/support/useful.h | 2 +- package.json | 2 + src/core/channel/channel_args.c | 4 +- src/core/channel/channel_args.h | 2 +- src/core/client_config/connector.c | 2 +- src/core/client_config/connector.h | 2 +- src/core/client_config/subchannel.c | 14 ++-- src/core/client_config/subchannel.h | 10 +-- src/core/client_config/subchannel_index.c | 109 ++++++++++++++++++------------ src/core/client_config/subchannel_index.h | 20 +++++- src/core/security/security_context.c | 2 +- src/core/surface/channel_create.c | 2 +- src/core/surface/secure_channel_create.c | 2 +- src/cpp/common/channel_arguments.cc | 2 +- test/core/end2end/fixtures/h2_uchannel.c | 2 +- tools/asan_suppressions.txt | 3 - tools/distrib/check_copyright.py | 6 +- tools/lsan_suppressions.txt | 3 + tools/run_tests/configs.json | 4 +- 22 files changed, 125 insertions(+), 77 deletions(-) delete mode 100644 tools/asan_suppressions.txt create mode 100644 tools/lsan_suppressions.txt (limited to 'src/core/surface') diff --git a/build.yaml b/build.yaml index 525035397c..bbc36a8b12 100644 --- a/build.yaml +++ b/build.yaml @@ -2505,8 +2505,8 @@ configs: LDXX: clang++ compile_the_world: true test_environ: - ASAN_OPTIONS: suppressions=tools/asan_suppressions.txt:detect_leaks=1:color=always - LSAN_OPTIONS: suppressions=tools/asan_suppressions.txt:report_objects=1 + ASAN_OPTIONS: detect_leaks=1:color=always + LSAN_OPTIONS: suppressions=tools/lsan_suppressions.txt:report_objects=1 timeout_multiplier: 1.5 asan-noleaks: CC: clang diff --git a/grpc.gemspec b/grpc.gemspec index 47b66ae535..afc4a890f4 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -172,6 +172,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/client_config/resolvers/sockaddr_resolver.h ) s.files += %w( src/core/client_config/subchannel.h ) s.files += %w( src/core/client_config/subchannel_factory.h ) + s.files += %w( src/core/client_config/subchannel_index.h ) s.files += %w( src/core/client_config/uri_parser.h ) s.files += %w( src/core/compression/algorithm_metadata.h ) s.files += %w( src/core/compression/message_compress.h ) @@ -310,6 +311,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/client_config/resolvers/sockaddr_resolver.c ) s.files += %w( src/core/client_config/subchannel.c ) s.files += %w( src/core/client_config/subchannel_factory.c ) + s.files += %w( src/core/client_config/subchannel_index.c ) s.files += %w( src/core/client_config/uri_parser.c ) s.files += %w( src/core/compression/algorithm.c ) s.files += %w( src/core/compression/message_compress.c ) diff --git a/include/grpc/support/avl.h b/include/grpc/support/avl.h index d462f52bfe..54605ceb7c 100644 --- a/include/grpc/support/avl.h +++ b/include/grpc/support/avl.h @@ -81,7 +81,8 @@ void gpr_avl_unref(gpr_avl avl); if key exists in avl, the new tree's key entry updated (i.e. a duplicate is not created) */ gpr_avl gpr_avl_add(gpr_avl avl, void *key, void *value); -/** return a new tree with key deleted */ +/** return a new tree with key deleted + implicitly unrefs avl to allow easy chaining. */ gpr_avl gpr_avl_remove(gpr_avl avl, void *key); /** lookup key, and return the associated value. does not mutate avl. diff --git a/include/grpc/support/useful.h b/include/grpc/support/useful.h index 003e096cf9..a0e76da29e 100644 --- a/include/grpc/support/useful.h +++ b/include/grpc/support/useful.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/package.json b/package.json index 0b87f6a59d..74de532bc2 100644 --- a/package.json +++ b/package.json @@ -123,6 +123,7 @@ "src/core/client_config/resolvers/sockaddr_resolver.h", "src/core/client_config/subchannel.h", "src/core/client_config/subchannel_factory.h", + "src/core/client_config/subchannel_index.h", "src/core/client_config/uri_parser.h", "src/core/compression/algorithm_metadata.h", "src/core/compression/message_compress.h", @@ -261,6 +262,7 @@ "src/core/client_config/resolvers/sockaddr_resolver.c", "src/core/client_config/subchannel.c", "src/core/client_config/subchannel_factory.c", + "src/core/client_config/subchannel_index.c", "src/core/client_config/uri_parser.c", "src/core/compression/algorithm.c", "src/core/compression/message_compress.c", diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c index 6ac6b99475..63e440f97b 100644 --- a/src/core/channel/channel_args.c +++ b/src/core/channel/channel_args.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -93,7 +93,7 @@ grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, } static int cmp_arg(const grpc_arg *a, const grpc_arg *b) { - int c = a->type - b->type; + int c = GPR_ICMP(a->type, b->type); if (c != 0) return c; c = strcmp(a->key, b->key); if (c != 0) return c; diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h index c8eaacb87b..b3a7c9f434 100644 --- a/src/core/channel/channel_args.h +++ b/src/core/channel/channel_args.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c index 9aac02ae1c..aa34aa7fab 100644 --- a/src/core/client_config/connector.c +++ b/src/core/client_config/connector.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h index b301e1bb19..b91eb512c3 100644 --- a/src/core/client_config/connector.h +++ b/src/core/client_config/connector.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 32c718553d..145e146862 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -210,6 +210,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); grpc_pollset_set_destroy(&c->pollset_set); + grpc_subchannel_key_destroy(exec_ctx, c->key); gpr_free(c); } @@ -225,18 +226,21 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, return old_val; } -void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), 0 REF_MUTATE_PURPOSE("STRONG_REF")); GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0); + return c; } -void grpc_subchannel_weak_ref(grpc_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_subchannel *grpc_subchannel_weak_ref(grpc_subchannel *c + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF")); GPR_ASSERT(old_refs != 0); + return c; } grpc_subchannel *grpc_subchannel_ref_from_weak_ref( @@ -302,7 +306,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key = grpc_subchannel_key_create(connector, args); grpc_subchannel *c = grpc_subchannel_index_find(exec_ctx, key); if (c) { - grpc_subchannel_key_destroy(key); + grpc_subchannel_key_destroy(exec_ctx, key); return c; } diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index c48d7a3a8b..313e63c75c 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -83,15 +83,15 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS #endif -void grpc_subchannel_ref(grpc_subchannel *channel - GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *channel + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); grpc_subchannel *grpc_subchannel_ref_from_weak_ref( grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_subchannel_weak_ref(grpc_subchannel *channel - GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +grpc_subchannel *grpc_subchannel_weak_ref(grpc_subchannel *channel + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); diff --git a/src/core/client_config/subchannel_index.c b/src/core/client_config/subchannel_index.c index 418ebd135f..c4b9730161 100644 --- a/src/core/client_config/subchannel_index.c +++ b/src/core/client_config/subchannel_index.c @@ -1,35 +1,35 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ +// +// +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// #include "src/core/client_config/subchannel_index.h" @@ -42,8 +42,8 @@ #include "src/core/channel/channel_args.h" -/* a map of subchannel_key --> subchannel, used for detecting connections - to the same destination in order to share them */ +// a map of subchannel_key --> subchannel, used for detecting connections +// to the same destination in order to share them static gpr_avl g_subchannel_index; static gpr_mu g_mu; @@ -111,14 +111,18 @@ static int subchannel_key_compare(grpc_subchannel_key *a, return grpc_channel_args_compare(a->args.args, b->args.args); } -void grpc_subchannel_key_destroy(grpc_subchannel_key *k) { +void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, + grpc_subchannel_key *k) { + grpc_connector_unref(exec_ctx, k->connector); gpr_free(k->args.addr); gpr_free(k->args.filters); grpc_channel_args_destroy((grpc_channel_args *)k->args.args); gpr_free(k); } -static void sck_avl_destroy(void *p) { grpc_subchannel_key_destroy(p); } +static void sck_avl_destroy(void *p) { + grpc_subchannel_key_destroy(current_ctx(), p); +} static void *sck_avl_copy(void *p) { return subchannel_key_copy(p); } @@ -127,8 +131,7 @@ static long sck_avl_compare(void *a, void *b) { } static void scv_avl_destroy(void *p) { - grpc_exec_ctx *exec_ctx = current_ctx(); - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, p, "subchannel_index"); + GRPC_SUBCHANNEL_WEAK_UNREF(current_ctx(), p, "subchannel_index"); } static void *scv_avl_copy(void *p) { @@ -157,6 +160,8 @@ grpc_subchannel *grpc_subchannel_index_find(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key) { enter_ctx(exec_ctx); + // Lock, and take a reference to the subchannel index. + // We don't need to do the search under a lock as avl's are immutable. gpr_mu_lock(&g_mu); gpr_avl index = gpr_avl_ref(g_subchannel_index); gpr_mu_unlock(&g_mu); @@ -177,22 +182,34 @@ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx, grpc_subchannel *c = NULL; while (c == NULL) { + // Compare and swap loop: + // - take a reference to the current index gpr_mu_lock(&g_mu); gpr_avl index = gpr_avl_ref(g_subchannel_index); gpr_mu_unlock(&g_mu); + // - Check to see if a subchannel already exists c = gpr_avl_get(index, key); if (c != NULL) { + // yes -> we're done GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, constructed, "index_register"); } else { - gpr_avl updated = gpr_avl_add(index, key, constructed); - + // no -> update the avl and compare/swap + gpr_avl updated = + gpr_avl_add(gpr_avl_ref(index), subchannel_key_copy(key), + GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register")); + + // it may happen (but it's expected to be unlikely) + // that some other thread has changed the index: + // compare/swap here to check that, and retry as necessary gpr_mu_lock(&g_mu); if (index.root == g_subchannel_index.root) { GPR_SWAP(gpr_avl, updated, g_subchannel_index); c = constructed; } gpr_mu_unlock(&g_mu); + + gpr_avl_unref(updated); } gpr_avl_unref(index); } @@ -209,26 +226,32 @@ void grpc_subchannel_index_unregister(grpc_exec_ctx *exec_ctx, bool done = false; while (!done) { + // Compare and swap loop: + // - take a reference to the current index gpr_mu_lock(&g_mu); gpr_avl index = gpr_avl_ref(g_subchannel_index); gpr_mu_unlock(&g_mu); + // Check to see if this key still refers to the previously + // registered subchannel grpc_subchannel *c = gpr_avl_get(index, key); if (c != constructed) { + gpr_avl_unref(index); break; } - gpr_avl updated = gpr_avl_remove(index, key); + // compare and swap the update (some other thread may have + // mutated the index behind us) + gpr_avl updated = gpr_avl_remove(gpr_avl_ref(index), key); gpr_mu_lock(&g_mu); if (index.root == g_subchannel_index.root) { GPR_SWAP(gpr_avl, updated, g_subchannel_index); done = true; - } else { - GPR_SWAP(gpr_avl, updated, index); } gpr_mu_unlock(&g_mu); + gpr_avl_unref(updated); gpr_avl_unref(index); } diff --git a/src/core/client_config/subchannel_index.h b/src/core/client_config/subchannel_index.h index 6c87081170..fc3187a758 100644 --- a/src/core/client_config/subchannel_index.h +++ b/src/core/client_config/subchannel_index.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -37,25 +37,41 @@ #include "src/core/client_config/connector.h" #include "src/core/client_config/subchannel.h" +/** \file Provides an index of active subchannels so that they can be + shared amongst channels */ + typedef struct grpc_subchannel_key grpc_subchannel_key; +/** Create a key that can be used to uniquely identify a subchannel */ grpc_subchannel_key *grpc_subchannel_key_create(grpc_connector *con, grpc_subchannel_args *args); -void grpc_subchannel_key_destroy(grpc_subchannel_key *key); +/** Destroy a subchannel key */ +void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, + grpc_subchannel_key *key); +/** Given a subchannel key, find the subchannel registered for it. + Returns NULL if no such channel exists. + Thread-safe. */ grpc_subchannel *grpc_subchannel_index_find(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key); +/** Register a subchannel against a key. + Takes ownership of \a constructed. + Returns the registered subchannel. This may be different from + \a constructed in the case of a registration race. */ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key, grpc_subchannel *constructed); +/** Remove \a constructed as the registered subchannel for \a key. */ void grpc_subchannel_index_unregister(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key, grpc_subchannel *constructed); +/** Initialize the subchannel index (global) */ void grpc_subchannel_index_init(void); +/** Shutdown the subchannel index (global) */ void grpc_subchannel_index_shutdown(void); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_INDEX_H */ diff --git a/src/core/security/security_context.c b/src/core/security/security_context.c index d28664cad3..a71b3bc915 100644 --- a/src/core/security/security_context.c +++ b/src/core/security/security_context.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 031ae1b543..7d067b5863 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 38f3e28e3d..6cbd5985ab 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/cpp/common/channel_arguments.cc b/src/cpp/common/channel_arguments.cc index f1583de5a1..a495118ffc 100644 --- a/src/cpp/common/channel_arguments.cc +++ b/src/cpp/common/channel_arguments.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index ee6aac796d..24079c0b4b 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/tools/asan_suppressions.txt b/tools/asan_suppressions.txt deleted file mode 100644 index d672cb6cae..0000000000 --- a/tools/asan_suppressions.txt +++ /dev/null @@ -1,3 +0,0 @@ -# this is busted in BoringSSL -leak:CRYPTO_set_thread_local -leak:err_get_state diff --git a/tools/distrib/check_copyright.py b/tools/distrib/check_copyright.py index 935acf525e..6123218c55 100755 --- a/tools/distrib/check_copyright.py +++ b/tools/distrib/check_copyright.py @@ -68,9 +68,9 @@ with open('LICENSE') as f: # that given a line of license text, returns what should # be in the file LICENSE_PREFIX = { - '.c': r'\s*\*\s*', - '.cc': r'\s*\*\s*', - '.h': r'\s*\*\s*', + '.c': r'\s*(//|\*)\s*', + '.cc': r'\s*(//|\*)\s*', + '.h': r'\s*(//|\*)\s*', '.m': r'\s*\*\s*', '.php': r'\s*\*\s*', '.js': r'\s*\*\s*', diff --git a/tools/lsan_suppressions.txt b/tools/lsan_suppressions.txt new file mode 100644 index 0000000000..d672cb6cae --- /dev/null +++ b/tools/lsan_suppressions.txt @@ -0,0 +1,3 @@ +# this is busted in BoringSSL +leak:CRYPTO_set_thread_local +leak:err_get_state diff --git a/tools/run_tests/configs.json b/tools/run_tests/configs.json index 769942df99..b21793d9cc 100644 --- a/tools/run_tests/configs.json +++ b/tools/run_tests/configs.json @@ -45,8 +45,8 @@ { "config": "asan", "environ": { - "ASAN_OPTIONS": "suppressions=tools/asan_suppressions.txt:detect_leaks=1:color=always", - "LSAN_OPTIONS": "suppressions=tools/asan_suppressions.txt:report_objects=1" + "ASAN_OPTIONS": "detect_leaks=1:color=always", + "LSAN_OPTIONS": "suppressions=tools/lsan_suppressions.txt:report_objects=1" }, "timeout_multiplier": 1.5 }, -- cgit v1.2.3 From a44cbfc11c7d018785ef5699b900453090df07e3 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 3 Feb 2016 16:02:49 -0800 Subject: Fix race condition in transport API Specifically: Receiving trailing and initial metadata had to be published in lock-step. => If we wanted trailing metadata, we might not get initial metadata processed until messages arrived. => Compression code had no idea what codec to use. To fix it, publish initial metadata as soon as it's ready (this is a transport API change). Requires changes to grpc_call to ensure ordering in processing initial metadata and messages (one may be delayed). Exposed at least some bugs in C++ where we never read initial metadata. I expect at least one more similar bug. --- include/grpc++/impl/codegen/sync_stream.h | 20 +++- src/core/census/grpc_filter.c | 4 +- src/core/channel/http_client_filter.c | 4 +- src/core/channel/http_server_filter.c | 4 +- src/core/channel/subchannel_call_holder.c | 6 +- src/core/security/server_auth_filter.c | 4 +- src/core/surface/call.c | 154 ++++++++++++++++++++---------- src/core/surface/lame_client.c | 3 +- src/core/surface/server.c | 4 +- src/core/transport/chttp2/internal.h | 2 +- src/core/transport/chttp2_transport.c | 15 +-- src/core/transport/transport.c | 1 + src/core/transport/transport.h | 5 +- test/core/fling/client.c | 12 ++- test/cpp/end2end/hybrid_end2end_test.cc | 2 +- 15 files changed, 156 insertions(+), 84 deletions(-) (limited to 'src/core/surface') diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 33d25e837c..9ae48bd23d 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -193,6 +193,15 @@ class ClientWriter : public ClientWriterInterface { cq_.Pluck(&ops); } + void WaitForInitialMetadata() { + GPR_ASSERT(!context_->initial_metadata_received_); + + CallOpSet ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); // status ignored + } + using WriterInterface::Write; bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { CallOpSet ops; @@ -213,6 +222,9 @@ class ClientWriter : public ClientWriterInterface { /// Read the final response and wait for the final status. Status Finish() GRPC_OVERRIDE { Status status; + if (!context_->initial_metadata_received_) { + finish_ops_.RecvInitialMetadata(context_); + } finish_ops_.ClientRecvStatus(context_, &status); call_.PerformOps(&finish_ops_); GPR_ASSERT(cq_.Pluck(&finish_ops_)); @@ -221,7 +233,8 @@ class ClientWriter : public ClientWriterInterface { private: ClientContext* context_; - CallOpSet finish_ops_; + CallOpSet finish_ops_; CompletionQueue cq_; Call call_; }; @@ -292,7 +305,10 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { } Status Finish() GRPC_OVERRIDE { - CallOpSet ops; + CallOpSet ops; + if (!context_->initial_metadata_received_) { + ops.RecvInitialMetadata(context_); + } Status status; ops.ClientRecvStatus(context_, &status); call_.PerformOps(&ops); diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c index a8db32b9d5..c8aaf31e2d 100644 --- a/src/core/census/grpc_filter.c +++ b/src/core/census/grpc_filter.c @@ -107,8 +107,8 @@ static void server_mutate_op(grpc_call_element *elem, if (op->recv_initial_metadata) { /* substitute our callback for the op callback */ calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv = op->on_complete; - op->on_complete = &calld->finish_recv; + calld->on_done_recv = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->finish_recv; } } diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 43eee046b8..1aa27208c2 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -127,8 +127,8 @@ static void hc_mutate_op(grpc_call_element *elem, if (op->recv_initial_metadata != NULL) { /* substitute our callback for the higher callback */ calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv = op->on_complete; - op->on_complete = &calld->hc_on_recv; + calld->on_done_recv = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->hc_on_recv; } } diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index bb75323933..370f8dbe42 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -186,8 +186,8 @@ static void hs_mutate_op(grpc_call_element *elem, if (op->recv_initial_metadata) { /* substitute our callback for the higher callback */ calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv = op->on_complete; - op->on_complete = &calld->hs_on_recv; + calld->on_done_recv = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->hs_on_recv; } } diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index 3ad9fd9efb..81297c8d44 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -241,10 +241,8 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder) { size_t i; for (i = 0; i < holder->waiting_ops_count; i++) { - grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, false, - NULL); - grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].recv_message_ready, - false, NULL); + grpc_transport_stream_op_finish_with_failure(exec_ctx, + &holder->waiting_ops[i]); } holder->waiting_ops_count = 0; } diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index 4c78711387..3d8e5e8d35 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -176,8 +176,8 @@ static void set_recv_ops_md_callbacks(grpc_call_element *elem, if (op->recv_initial_metadata != NULL) { /* substitute our callback for the higher callback */ calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv = op->on_complete; - op->on_complete = &calld->auth_on_recv; + calld->on_done_recv = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->auth_on_recv; calld->transport_op = *op; } } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 9495e748b5..1b117aa6b8 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -159,6 +159,9 @@ struct grpc_call { uint8_t receiving_message; uint8_t received_final_op; + /* have we received initial metadata */ + bool has_initial_md_been_received; + batch_control active_batches[MAX_CONCURRENT_BATCHES]; /* first idx: is_receiving, second idx: is_trailing */ @@ -200,6 +203,7 @@ struct grpc_call { gpr_slice receiving_slice; grpc_closure receiving_slice_ready; grpc_closure receiving_stream_ready; + grpc_closure receiving_initial_metadata_ready; uint32_t test_only_last_message_flags; union { @@ -212,6 +216,11 @@ struct grpc_call { int *cancelled; } server; } final_op; + + struct { + void *bctlp; + bool success; + } saved_receiving_stream_ready_ctx; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -993,6 +1002,94 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, } } +static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl, + bool success) { + grpc_call *call = bctl->call; + if (call->receiving_stream == NULL) { + *call->receiving_buffer = NULL; + call->receiving_message = 0; + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } + } else if (call->receiving_stream->length > + grpc_channel_get_max_message_length(call->channel)) { + cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL, + "Max message size exceeded"); + grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); + call->receiving_stream = NULL; + *call->receiving_buffer = NULL; + call->receiving_message = 0; + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } + } else { + call->test_only_last_message_flags = call->receiving_stream->flags; + if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && + (call->compression_algorithm > GRPC_COMPRESS_NONE)) { + *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create( + NULL, 0, call->compression_algorithm); + } else { + *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0); + } + grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, + bctl); + continue_receiving_slices(exec_ctx, bctl); + /* early out */ + return; + } +} + +static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, + bool success) { + batch_control *bctl = bctlp; + grpc_call *call = bctl->call; + + gpr_mu_lock(&bctl->call->mu); + if (bctl->call->has_initial_md_been_received) { + gpr_mu_unlock(&bctl->call->mu); + process_data_after_md(exec_ctx, bctlp, success); + } else { + call->saved_receiving_stream_ready_ctx.bctlp = bctlp; + call->saved_receiving_stream_ready_ctx.success = success; + gpr_mu_unlock(&bctl->call->mu); + } +} + +static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, + void *bctlp, bool success) { + batch_control *bctl = bctlp; + grpc_call *call = bctl->call; + + gpr_mu_lock(&call->mu); + + grpc_metadata_batch *md = + &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; + grpc_metadata_batch_filter(md, recv_initial_filter, call); + call->has_initial_md_been_received = true; + + if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != + 0 && + !call->is_client) { + GPR_TIMER_BEGIN("set_deadline_alarm", 0); + set_deadline_alarm(exec_ctx, call, md->deadline); + GPR_TIMER_END("set_deadline_alarm", 0); + } + + if (call->saved_receiving_stream_ready_ctx.bctlp != NULL) { + grpc_closure *saved_rsr_closure = grpc_closure_create( + receiving_stream_ready, call->saved_receiving_stream_ready_ctx.bctlp); + grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure, + call->saved_receiving_stream_ready_ctx.success, NULL); + call->saved_receiving_stream_ready_ctx.bctlp = NULL; + } + + gpr_mu_unlock(&call->mu); + + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } +} + static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; @@ -1011,19 +1108,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { grpc_metadata_batch_destroy( &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); } - if (bctl->recv_initial_metadata) { - grpc_metadata_batch *md = - &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; - grpc_metadata_batch_filter(md, recv_initial_filter, call); - - if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != - 0 && - !call->is_client) { - GPR_TIMER_BEGIN("set_deadline_alarm", 0); - set_deadline_alarm(exec_ctx, call, md->deadline); - GPR_TIMER_END("set_deadline_alarm", 0); - } - } if (bctl->recv_final_op) { grpc_metadata_batch *md = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; @@ -1065,45 +1149,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { } } -static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, - bool success) { - batch_control *bctl = bctlp; - grpc_call *call = bctl->call; - - if (call->receiving_stream == NULL) { - *call->receiving_buffer = NULL; - call->receiving_message = 0; - if (gpr_unref(&bctl->steps_to_complete)) { - post_batch_completion(exec_ctx, bctl); - } - } else if (call->receiving_stream->length > - grpc_channel_get_max_message_length(call->channel)) { - cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL, - "Max message size exceeded"); - grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); - call->receiving_stream = NULL; - *call->receiving_buffer = NULL; - call->receiving_message = 0; - if (gpr_unref(&bctl->steps_to_complete)) { - post_batch_completion(exec_ctx, bctl); - } - } else { - call->test_only_last_message_flags = call->receiving_stream->flags; - if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && - (call->compression_algorithm > GRPC_COMPRESS_NONE)) { - *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create( - NULL, 0, call->compression_algorithm); - } else { - *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0); - } - grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, - bctl); - continue_receiving_slices(exec_ctx, bctl); - /* early out */ - return; - } -} - static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_op *ops, size_t nops, void *notify_tag, @@ -1273,9 +1318,14 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } call->received_initial_metadata = 1; call->buffered_metadata[0] = op->data.recv_initial_metadata; + grpc_closure_init(&call->receiving_initial_metadata_ready, + receiving_initial_metadata_ready, bctl); bctl->recv_initial_metadata = 1; stream_op.recv_initial_metadata = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; + stream_op.recv_initial_metadata_ready = + &call->receiving_initial_metadata_ready; + num_completion_callbacks_needed++; break; case GRPC_OP_RECV_MESSAGE: /* Flag validation: currently allow no flags */ diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 705996cad3..537069e984 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -78,8 +78,7 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx, } else if (op->recv_trailing_metadata != NULL) { fill_metadata(elem, op->recv_trailing_metadata); } - grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL); - grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL); + grpc_transport_stream_op_finish_with_failure(exec_ctx, op); } static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 42cffccb4c..fb5e0d4b9e 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -596,8 +596,8 @@ static void server_mutate_op(grpc_call_element *elem, if (op->recv_initial_metadata != NULL) { calld->recv_initial_metadata = op->recv_initial_metadata; - calld->on_done_recv_initial_metadata = op->on_complete; - op->on_complete = &calld->server_on_recv_initial_metadata; + calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->server_on_recv_initial_metadata; } } diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index c611496e7e..0e1e2c4265 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -385,7 +385,7 @@ typedef struct { grpc_closure *send_trailing_metadata_finished; grpc_metadata_batch *recv_initial_metadata; - grpc_closure *recv_initial_metadata_finished; + grpc_closure *recv_initial_metadata_ready; grpc_byte_stream **recv_message; grpc_closure *recv_message_ready; grpc_metadata_batch *recv_trailing_metadata; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 9298573c7f..617d98875c 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -544,7 +544,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, GPR_ASSERT(s->global.send_initial_metadata_finished == NULL); GPR_ASSERT(s->global.send_message_finished == NULL); GPR_ASSERT(s->global.send_trailing_metadata_finished == NULL); - GPR_ASSERT(s->global.recv_initial_metadata_finished == NULL); + GPR_ASSERT(s->global.recv_initial_metadata_ready == NULL); GPR_ASSERT(s->global.recv_message_ready == NULL); GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL); grpc_chttp2_data_parser_destroy(exec_ctx, &s->parsing.data_parser); @@ -863,9 +863,9 @@ static void perform_stream_op_locked( } if (op->recv_initial_metadata != NULL) { - GPR_ASSERT(stream_global->recv_initial_metadata_finished == NULL); - stream_global->recv_initial_metadata_finished = - add_closure_barrier(on_complete); + GPR_ASSERT(stream_global->recv_initial_metadata_ready == NULL); + stream_global->recv_initial_metadata_ready = + op->recv_initial_metadata_ready; stream_global->recv_initial_metadata = op->recv_initial_metadata; grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } @@ -1009,13 +1009,14 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_byte_stream *bs; while ( grpc_chttp2_list_pop_check_read_ops(transport_global, &stream_global)) { - if (stream_global->recv_initial_metadata_finished != NULL && + if (stream_global->recv_initial_metadata_ready != NULL && stream_global->published_initial_metadata) { grpc_chttp2_incoming_metadata_buffer_publish( &stream_global->received_initial_metadata, stream_global->recv_initial_metadata); - grpc_chttp2_complete_closure_step( - exec_ctx, &stream_global->recv_initial_metadata_finished, 1); + grpc_exec_ctx_enqueue( + exec_ctx, stream_global->recv_initial_metadata_ready, true, NULL); + stream_global->recv_initial_metadata_ready = NULL; } if (stream_global->recv_message_ready != NULL) { if (stream_global->incoming_frames.head != NULL) { diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c index 08d685668c..6e154b629a 100644 --- a/src/core/transport/transport.c +++ b/src/core/transport/transport.c @@ -126,6 +126,7 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, void grpc_transport_stream_op_finish_with_failure( grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) { grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL); + grpc_exec_ctx_enqueue(exec_ctx, op->recv_initial_metadata_ready, false, NULL); grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL); } diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index f5cac77adc..8902c5d2f6 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -92,6 +92,8 @@ typedef struct grpc_transport_stream_op { /** Receive initial metadata from the stream, into provided metadata batch. */ grpc_metadata_batch *recv_initial_metadata; + /** Should be enqueued when initial metadata is ready to be processed. */ + grpc_closure *recv_initial_metadata_ready; /** Receive message data from the stream, into provided byte stream. */ grpc_byte_stream **recv_message; @@ -103,7 +105,8 @@ typedef struct grpc_transport_stream_op { grpc_metadata_batch *recv_trailing_metadata; /** Should be enqueued when all requested operations (excluding recv_message - which has its own closure) in a given batch have been completed. */ + and recv_initial_metadata which have their own closures) in a given batch + have been completed. */ grpc_closure *on_complete; /** If != GRPC_STATUS_OK, cancel this stream */ diff --git a/test/core/fling/client.c b/test/core/fling/client.c index 02db681cfd..b36aef3093 100644 --- a/test/core/fling/client.c +++ b/test/core/fling/client.c @@ -51,7 +51,7 @@ static grpc_channel *channel; static grpc_completion_queue *cq; static grpc_call *call; static grpc_op ops[6]; -static grpc_op stream_init_op; +static grpc_op stream_init_ops[2]; static grpc_op stream_step_ops[2]; static grpc_metadata_array initial_metadata_recv; static grpc_metadata_array trailing_metadata_recv; @@ -105,13 +105,17 @@ static void step_ping_pong_request(void) { } static void init_ping_pong_stream(void) { + grpc_metadata_array_init(&initial_metadata_recv); + grpc_call_error error; call = grpc_channel_create_call(channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq, "/Reflector/reflectStream", "localhost", gpr_inf_future(GPR_CLOCK_REALTIME), NULL); - stream_init_op.op = GRPC_OP_SEND_INITIAL_METADATA; - stream_init_op.data.send_initial_metadata.count = 0; - error = grpc_call_start_batch(call, &stream_init_op, 1, (void *)1, NULL); + stream_init_ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + stream_init_ops[0].data.send_initial_metadata.count = 0; + stream_init_ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; + stream_init_ops[1].data.recv_initial_metadata = &initial_metadata_recv; + error = grpc_call_start_batch(call, stream_init_ops, 2, (void *)1, NULL); GPR_ASSERT(GRPC_CALL_OK == error); grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc index f8405627f9..c72e20628f 100644 --- a/test/cpp/end2end/hybrid_end2end_test.cc +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -216,7 +216,7 @@ class HybridEnd2endTest : public ::testing::Test { } // Create a separate cq for each potential handler. for (int i = 0; i < 5; i++) { - cqs_.push_back(std::move(builder.AddCompletionQueue())); + cqs_.push_back(builder.AddCompletionQueue()); } server_ = builder.BuildAndStart(); } -- cgit v1.2.3 From 6f87164f3c3bfd918a5bf571e2da6ec73cb9ea57 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 3 Feb 2016 16:15:31 -0800 Subject: Properly handle "." in metadata --- src/core/surface/validate_metadata.c | 2 +- test/cpp/end2end/async_end2end_test.cc | 4 ++++ tools/codegen/core/gen_legal_metadata_characters.c | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) (limited to 'src/core/surface') diff --git a/src/core/surface/validate_metadata.c b/src/core/surface/validate_metadata.c index df2e80b4b7..bf4126867f 100644 --- a/src/core/surface/validate_metadata.c +++ b/src/core/surface/validate_metadata.c @@ -50,7 +50,7 @@ static int conforms_to(const char *s, size_t len, const uint8_t *legal_bits) { int grpc_header_key_is_legal(const char *key, size_t length) { static const uint8_t legal_header_bits[256 / 8] = { - 0x00, 0x00, 0x00, 0x00, 0x00, 0x20, 0xff, 0x03, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x60, 0xff, 0x03, 0x00, 0x00, 0x00, 0x80, 0xfe, 0xff, 0xff, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; if (length == 0) { diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 252bda3798..a194c615cd 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -479,8 +479,10 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { send_request.set_message("Hello"); std::pair meta1("key1", "val1"); std::pair meta2("key2", "val2"); + std::pair meta3("g.r.d-bin", "xyz"); cli_ctx.AddMetadata(meta1.first, meta1.second); cli_ctx.AddMetadata(meta2.first, meta2.second); + cli_ctx.AddMetadata(meta3.first, meta3.second); std::unique_ptr> response_reader( stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); @@ -494,6 +496,8 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) { ToString(client_initial_metadata.find(meta1.first)->second)); EXPECT_EQ(meta2.second, ToString(client_initial_metadata.find(meta2.first)->second)); + EXPECT_EQ(meta3.second, + ToString(client_initial_metadata.find(meta3.first)->second)); EXPECT_GE(client_initial_metadata.size(), static_cast(2)); send_response.set_message(recv_request.message()); diff --git a/tools/codegen/core/gen_legal_metadata_characters.c b/tools/codegen/core/gen_legal_metadata_characters.c index 3c9e1c7619..6ac32656cb 100644 --- a/tools/codegen/core/gen_legal_metadata_characters.c +++ b/tools/codegen/core/gen_legal_metadata_characters.c @@ -52,7 +52,7 @@ static void legal(int x) { static void dump(void) { int i; - printf("static const gpr_uint8 legal_header_bits[256/8] = "); + printf("static const uint8_t legal_header_bits[256/8] = "); for (i = 0; i < 256 / 8; i++) printf("%c 0x%02x", i ? ',' : '{', legal_bits[i]); printf(" };\n"); -- cgit v1.2.3 From 1935c3655c3cb92ac31c802fc613392888cb3482 Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Tue, 9 Feb 2016 01:54:56 +0100 Subject: Flagging master as 0.14.0-dev. --- Makefile | 2 +- build.yaml | 2 +- package.json | 2 +- src/core/surface/version.c | 2 +- src/csharp/Grpc.Core/VersionInfo.cs | 4 ++-- src/csharp/build_packages.bat | 2 +- src/python/grpcio/grpc_version.py | 2 +- src/ruby/lib/grpc/version.rb | 2 +- tools/doxygen/Doxyfile.c++ | 2 +- tools/doxygen/Doxyfile.c++.internal | 2 +- tools/doxygen/Doxyfile.core | 2 +- tools/doxygen/Doxyfile.core.internal | 2 +- 12 files changed, 13 insertions(+), 13 deletions(-) (limited to 'src/core/surface') diff --git a/Makefile b/Makefile index e86e8f15e5..d9cc89d9d0 100644 --- a/Makefile +++ b/Makefile @@ -361,7 +361,7 @@ E = @echo Q = @ endif -VERSION = 0.13.0-pre1 +VERSION = 0.14.0-dev CPPFLAGS_NO_ARCH += $(addprefix -I, $(INCLUDES)) $(addprefix -D, $(DEFINES)) CPPFLAGS += $(CPPFLAGS_NO_ARCH) $(ARCH_FLAGS) diff --git a/build.yaml b/build.yaml index 0e9dc89d42..630a49410f 100644 --- a/build.yaml +++ b/build.yaml @@ -7,7 +7,7 @@ settings: '#3': Use "-preN" suffixes to identify pre-release versions '#4': Per-language overrides are possible with (eg) ruby_version tag here '#5': See the expand_version.py for all the quirks here - version: 0.13.0-pre1 + version: 0.14.0-dev filegroups: - name: census public_headers: diff --git a/package.json b/package.json index 8a5f13bc49..9a86f4d998 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "grpc", - "version": "0.13.0-pre1", + "version": "0.14.0-dev", "author": "Google Inc.", "description": "gRPC Library for Node", "homepage": "http://www.grpc.io/", diff --git a/src/core/surface/version.c b/src/core/surface/version.c index fc883da189..7723f39401 100644 --- a/src/core/surface/version.c +++ b/src/core/surface/version.c @@ -36,4 +36,4 @@ #include -const char *grpc_version_string(void) { return "0.13.0-pre1"; } +const char *grpc_version_string(void) { return "0.14.0-dev"; } diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs index 1460995fc3..4bd4f204dd 100644 --- a/src/csharp/Grpc.Core/VersionInfo.cs +++ b/src/csharp/Grpc.Core/VersionInfo.cs @@ -41,11 +41,11 @@ namespace Grpc.Core /// /// Current version of gRPC C# assemblies /// - public const string CurrentAssemblyVersion = "0.13.0.0"; + public const string CurrentAssemblyVersion = "0.14.0.0"; /// /// Current version of gRPC C# /// - public const string CurrentVersion = "0.13.0-pre1"; + public const string CurrentVersion = "0.14.0-dev"; } } diff --git a/src/csharp/build_packages.bat b/src/csharp/build_packages.bat index 8ffd671011..b7768f7821 100644 --- a/src/csharp/build_packages.bat +++ b/src/csharp/build_packages.bat @@ -1,7 +1,7 @@ @rem Builds gRPC NuGet packages @rem Current package versions -set VERSION=0.13.0-pre1 +set VERSION=0.14.0-dev set PROTOBUF_VERSION=3.0.0-beta2 @rem Packages that depend on prerelease packages (like Google.Protobuf) need to have prerelease suffix as well. diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py index b256871bd5..75b88cfd61 100644 --- a/src/python/grpcio/grpc_version.py +++ b/src/python/grpcio/grpc_version.py @@ -29,4 +29,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!! -VERSION='0.13.0rc1' +VERSION='0.14.0.dev0' diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb index e98073c057..9cbd36a355 100644 --- a/src/ruby/lib/grpc/version.rb +++ b/src/ruby/lib/grpc/version.rb @@ -29,5 +29,5 @@ # GRPC contains the General RPC module. module GRPC - VERSION = '0.13.0-pre1' + VERSION = '0.14.0-dev' end diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index 1a15e3972c..5de1a15604 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC C++" # could be handy for archiving the generated documentation or if some version # control system is used. -PROJECT_NUMBER = 0.13.0-pre1 +PROJECT_NUMBER = 0.14.0-dev # Using the PROJECT_BRIEF tag one can provide an optional one line description # for a project that appears at the top of each page and should give viewer a diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 41a0547a5c..bd724d8bda 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC C++" # could be handy for archiving the generated documentation or if some version # control system is used. -PROJECT_NUMBER = 0.13.0-pre1 +PROJECT_NUMBER = 0.14.0-dev # Using the PROJECT_BRIEF tag one can provide an optional one line description # for a project that appears at the top of each page and should give viewer a diff --git a/tools/doxygen/Doxyfile.core b/tools/doxygen/Doxyfile.core index a94970c65e..e00fcfc999 100644 --- a/tools/doxygen/Doxyfile.core +++ b/tools/doxygen/Doxyfile.core @@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC Core" # could be handy for archiving the generated documentation or if some version # control system is used. -PROJECT_NUMBER = 0.13.0-pre1 +PROJECT_NUMBER = 0.14.0-dev # Using the PROJECT_BRIEF tag one can provide an optional one line description # for a project that appears at the top of each page and should give viewer a diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 0ab8208a9b..c7b5be758b 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -40,7 +40,7 @@ PROJECT_NAME = "GRPC Core" # could be handy for archiving the generated documentation or if some version # control system is used. -PROJECT_NUMBER = 0.13.0-pre1 +PROJECT_NUMBER = 0.14.0-dev # Using the PROJECT_BRIEF tag one can provide an optional one line description # for a project that appears at the top of each page and should give viewer a -- cgit v1.2.3 From 090c867e28c6898b5e5fe32d8fe5ef5178986721 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 11 Feb 2016 14:43:43 -0800 Subject: This testing exposed a race condition in alarm creation - the alarm needs to "begin" at the CQ before the timer should be inited. --- src/core/surface/alarm.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core/surface') diff --git a/src/core/surface/alarm.c b/src/core/surface/alarm.c index d753023ca9..fb496f6c47 100644 --- a/src/core/surface/alarm.c +++ b/src/core/surface/alarm.c @@ -63,9 +63,9 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, alarm->cq = cq; alarm->tag = tag; + grpc_cq_begin_op(cq, tag); grpc_timer_init(&exec_ctx, &alarm->alarm, deadline, alarm_cb, alarm, gpr_now(GPR_CLOCK_MONOTONIC)); - grpc_cq_begin_op(cq, tag); grpc_exec_ctx_finish(&exec_ctx); return alarm; } -- cgit v1.2.3