diff options
author | Craig Tiller <ctiller@google.com> | 2017-09-18 13:56:51 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-09-18 13:56:51 -0700 |
commit | 50448beab3d24bf4e097967d53aed27b90fc4858 (patch) | |
tree | e79f698063749e881b8a1eae61af8a190f6d37bb /src/core | |
parent | 1c3fd1a4bf68417b0118e369e78c4db267f3191e (diff) | |
parent | 63f31e8cd16b185fd735bd753afbc0375cd573aa (diff) |
Merge github.com:grpc/grpc into grpc_millis
Diffstat (limited to 'src/core')
53 files changed, 1020 insertions, 526 deletions
diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c index 416bdefa08..f79d3bdb56 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.c +++ b/src/core/ext/filters/client_channel/channel_connectivity.c @@ -86,7 +86,7 @@ static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) { static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, grpc_cq_completion *ignored) { - int delete = 0; + bool should_delete = false; state_watcher *w = (state_watcher *)pw; gpr_mu_lock(&w->mu); switch (w->phase) { @@ -94,12 +94,12 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, case READY_TO_CALL_BACK: GPR_UNREACHABLE_CODE(return ); case CALLING_BACK_AND_FINISHED: - delete = 1; + should_delete = true; break; } gpr_mu_unlock(&w->mu); - if (delete) { + if (should_delete) { delete_state_watcher(exec_ctx, w); } } @@ -161,12 +161,12 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, grpc_error *error) { - partly_done(exec_ctx, pw, true, GRPC_ERROR_REF(error)); + partly_done(exec_ctx, (state_watcher *)pw, true, GRPC_ERROR_REF(error)); } static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, grpc_error *error) { - partly_done(exec_ctx, pw, false, GRPC_ERROR_REF(error)); + partly_done(exec_ctx, (state_watcher *)pw, false, GRPC_ERROR_REF(error)); } int grpc_channel_num_external_connectivity_watchers(grpc_channel *channel) { diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index 93ba8684ed..c1daaf28c9 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -85,7 +85,7 @@ static void method_parameters_unref(method_parameters *method_params) { } static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) { - method_parameters_unref(value); + method_parameters_unref((method_parameters *)value); } static bool parse_wait_for_ready(grpc_json *field, @@ -719,7 +719,8 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "client channel factory arg must be a pointer"); } - grpc_client_channel_factory_ref(arg->value.pointer.p); + grpc_client_channel_factory_ref( + (grpc_client_channel_factory *)arg->value.pointer.p); chand->client_channel_factory = (grpc_client_channel_factory *)arg->value.pointer.p; // Get server name to resolve, using proxy mapper if needed. diff --git a/src/core/ext/filters/client_channel/client_channel_factory.c b/src/core/ext/filters/client_channel/client_channel_factory.c index 7220a8639e..e8aa4cda29 100644 --- a/src/core/ext/filters/client_channel/client_channel_factory.c +++ b/src/core/ext/filters/client_channel/client_channel_factory.c @@ -43,14 +43,13 @@ grpc_channel* grpc_client_channel_factory_create_channel( } static void* factory_arg_copy(void* factory) { - grpc_client_channel_factory_ref(factory); + grpc_client_channel_factory_ref((grpc_client_channel_factory*)factory); return factory; } static void factory_arg_destroy(grpc_exec_ctx* exec_ctx, void* factory) { - // TODO(roth): Remove local exec_ctx when - // https://github.com/grpc/grpc/pull/8705 is merged. - grpc_client_channel_factory_unref(exec_ctx, factory); + grpc_client_channel_factory_unref(exec_ctx, + (grpc_client_channel_factory*)factory); } static int factory_arg_cmp(void* factory1, void* factory2) { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c index bd290464c8..7ad322902b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c @@ -75,7 +75,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->context != NULL); GPR_ASSERT(args->context[GRPC_GRPCLB_CLIENT_STATS].value != NULL); calld->client_stats = grpc_grpclb_client_stats_ref( - args->context[GRPC_GRPCLB_CLIENT_STATS].value); + (grpc_grpclb_client_stats *)args->context[GRPC_GRPCLB_CLIENT_STATS] + .value); // Record call started. grpc_grpclb_client_stats_add_call_started(calld->client_stats); return GRPC_ERROR_NONE; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index 137642613c..cd2bd02bd2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -101,6 +101,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" @@ -137,7 +138,7 @@ static grpc_error *initial_metadata_add_lb_token( } static void destroy_client_stats(void *arg) { - grpc_grpclb_client_stats_unref(arg); + grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats *)arg); } typedef struct wrapped_rr_closure_arg { @@ -285,7 +286,7 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) { * glb_lb_policy */ typedef struct rr_connectivity_data rr_connectivity_data; -static const grpc_lb_policy_vtable glb_lb_policy_vtable; + typedef struct glb_lb_policy { /** base policy: must be first */ grpc_lb_policy base; @@ -727,7 +728,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, /* Allocate the data for the tracking of the new RR policy's connectivity. * It'll be deallocated in glb_rr_connectivity_changed() */ rr_connectivity_data *rr_connectivity = - gpr_zalloc(sizeof(rr_connectivity_data)); + (rr_connectivity_data *)gpr_zalloc(sizeof(rr_connectivity_data)); GRPC_CLOSURE_INIT(&rr_connectivity->on_change, glb_rr_connectivity_changed_locked, rr_connectivity, grpc_combiner_scheduler(glb_policy->base.combiner)); @@ -869,7 +870,8 @@ static grpc_channel_args *build_lb_channel_args( grpc_lb_addresses *lb_addresses = grpc_lb_addresses_create(num_grpclb_addrs, NULL); grpc_slice_hash_table_entry *targets_info_entries = - gpr_zalloc(sizeof(*targets_info_entries) * num_grpclb_addrs); + (grpc_slice_hash_table_entry *)gpr_zalloc(sizeof(*targets_info_entries) * + num_grpclb_addrs); size_t lb_addresses_idx = 0; for (size_t i = 0; i < addresses->num_addresses; ++i) { @@ -911,92 +913,6 @@ static grpc_channel_args *build_lb_channel_args( return result; } -static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, - void *arg, - grpc_error *error); -static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, - grpc_lb_policy_factory *factory, - grpc_lb_policy_args *args) { - /* Count the number of gRPC-LB addresses. There must be at least one. - * TODO(roth): For now, we ignore non-balancer addresses, but in the - * future, we may change the behavior such that we fall back to using - * the non-balancer addresses if we cannot reach any balancers. In the - * fallback case, we should use the LB policy indicated by - * GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is - * unset, we should default to pick_first). */ - const grpc_arg *arg = - grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); - if (arg == NULL || arg->type != GRPC_ARG_POINTER) { - return NULL; - } - grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p; - size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; - } - if (num_grpclb_addrs == 0) return NULL; - - glb_lb_policy *glb_policy = (glb_lb_policy *)gpr_zalloc(sizeof(*glb_policy)); - - /* Get server name. */ - arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI); - GPR_ASSERT(arg != NULL); - GPR_ASSERT(arg->type == GRPC_ARG_STRING); - grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true); - GPR_ASSERT(uri->path[0] != '\0'); - glb_policy->server_name = - gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.", - glb_policy->server_name); - } - grpc_uri_destroy(uri); - - glb_policy->cc_factory = args->client_channel_factory; - GPR_ASSERT(glb_policy->cc_factory != NULL); - - arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS); - glb_policy->lb_call_timeout_ms = - grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX}); - - // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, - // since we use this to trigger the client_load_reporting filter. - grpc_arg new_arg = - grpc_channel_arg_string_create(GRPC_ARG_LB_POLICY_NAME, "grpclb"); - static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; - glb_policy->args = grpc_channel_args_copy_and_add_and_remove( - args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); - - /* Create a client channel over them to communicate with a LB service */ - glb_policy->response_generator = - grpc_fake_resolver_response_generator_create(); - grpc_channel_args *lb_channel_args = build_lb_channel_args( - exec_ctx, addresses, glb_policy->response_generator, args->args); - char *uri_str; - gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name); - glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel( - exec_ctx, uri_str, args->client_channel_factory, lb_channel_args); - - /* Propagate initial resolution */ - grpc_fake_resolver_response_generator_set_response( - exec_ctx, glb_policy->response_generator, lb_channel_args); - grpc_channel_args_destroy(exec_ctx, lb_channel_args); - gpr_free(uri_str); - if (glb_policy->lb_channel == NULL) { - gpr_free((void *)glb_policy->server_name); - grpc_channel_args_destroy(exec_ctx, glb_policy->args); - gpr_free(glb_policy); - return NULL; - } - GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed, - glb_lb_channel_on_connectivity_changed_cb, glb_policy, - grpc_combiner_scheduler(args->combiner)); - grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner); - grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, - "grpclb"); - return &glb_policy->base; -} - static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; GPR_ASSERT(glb_policy->pending_picks == NULL); @@ -1011,6 +927,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); + grpc_subchannel_index_unref(); if (glb_policy->pending_update_args != NULL) { grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args); gpr_free(glb_policy->pending_update_args); @@ -1302,7 +1219,8 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx, static bool load_report_counters_are_zero(grpc_grpclb_request *request) { grpc_grpclb_dropped_call_counts *drop_entries = - request->client_stats.calls_finished_with_drop.arg; + (grpc_grpclb_dropped_call_counts *) + request->client_stats.calls_finished_with_drop.arg; return request->client_stats.num_calls_started == 0 && request->client_stats.num_calls_finished == 0 && request->client_stats.num_calls_finished_with_client_failed_to_send == @@ -1859,6 +1777,90 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_notify_on_state_change_locked, glb_update_locked}; +static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, + grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args) { + /* Count the number of gRPC-LB addresses. There must be at least one. + * TODO(roth): For now, we ignore non-balancer addresses, but in the + * future, we may change the behavior such that we fall back to using + * the non-balancer addresses if we cannot reach any balancers. In the + * fallback case, we should use the LB policy indicated by + * GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is + * unset, we should default to pick_first). */ + const grpc_arg *arg = + grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); + if (arg == NULL || arg->type != GRPC_ARG_POINTER) { + return NULL; + } + grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p; + size_t num_grpclb_addrs = 0; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; + } + if (num_grpclb_addrs == 0) return NULL; + + glb_lb_policy *glb_policy = (glb_lb_policy *)gpr_zalloc(sizeof(*glb_policy)); + + /* Get server name. */ + arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI); + GPR_ASSERT(arg != NULL); + GPR_ASSERT(arg->type == GRPC_ARG_STRING); + grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true); + GPR_ASSERT(uri->path[0] != '\0'); + glb_policy->server_name = + gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.", + glb_policy->server_name); + } + grpc_uri_destroy(uri); + + glb_policy->cc_factory = args->client_channel_factory; + GPR_ASSERT(glb_policy->cc_factory != NULL); + + arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS); + glb_policy->lb_call_timeout_ms = + grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX}); + + // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, + // since we use this to trigger the client_load_reporting filter. + grpc_arg new_arg = + grpc_channel_arg_string_create(GRPC_ARG_LB_POLICY_NAME, "grpclb"); + static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; + glb_policy->args = grpc_channel_args_copy_and_add_and_remove( + args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); + + /* Create a client channel over them to communicate with a LB service */ + glb_policy->response_generator = + grpc_fake_resolver_response_generator_create(); + grpc_channel_args *lb_channel_args = build_lb_channel_args( + exec_ctx, addresses, glb_policy->response_generator, args->args); + char *uri_str; + gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name); + glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel( + exec_ctx, uri_str, args->client_channel_factory, lb_channel_args); + + /* Propagate initial resolution */ + grpc_fake_resolver_response_generator_set_response( + exec_ctx, glb_policy->response_generator, lb_channel_args); + grpc_channel_args_destroy(exec_ctx, lb_channel_args); + gpr_free(uri_str); + if (glb_policy->lb_channel == NULL) { + gpr_free((void *)glb_policy->server_name); + grpc_channel_args_destroy(exec_ctx, glb_policy->args); + gpr_free(glb_policy); + return NULL; + } + grpc_subchannel_index_ref(); + GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed, + glb_lb_channel_on_connectivity_changed_cb, glb_policy, + grpc_combiner_scheduler(args->combiner)); + grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner); + grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, + "grpclb"); + return &glb_policy->base; +} + static void glb_factory_ref(grpc_lb_policy_factory *factory) {} static void glb_factory_unref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c index 393f6256bf..9c6fb94bf1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c @@ -148,7 +148,8 @@ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) { void grpc_grpclb_request_destroy(grpc_grpclb_request *request) { if (request->has_client_stats) { grpc_grpclb_dropped_call_counts *drop_entries = - request->client_stats.calls_finished_with_drop.arg; + (grpc_grpclb_dropped_call_counts *) + request->client_stats.calls_finished_with_drop.arg; grpc_grpclb_dropped_call_counts_destroy(drop_entries); } gpr_free(request); @@ -170,7 +171,8 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse( if (!res.has_initial_response) return NULL; grpc_grpclb_initial_response *initial_res = - gpr_malloc(sizeof(grpc_grpclb_initial_response)); + (grpc_grpclb_initial_response *)gpr_malloc( + sizeof(grpc_grpclb_initial_response)); memcpy(initial_res, &res.initial_response, sizeof(grpc_grpclb_initial_response)); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c index fab3073eb9..d20cbb8388 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c @@ -89,6 +89,7 @@ static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { "picked_first_destroy"); } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); + grpc_subchannel_index_unref(); if (p->pending_update_args != NULL) { grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args); gpr_free(p->pending_update_args); @@ -330,8 +331,8 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses", (void *)p, (unsigned long)addresses->num_addresses); } - grpc_subchannel_args *sc_args = - gpr_zalloc(sizeof(*sc_args) * addresses->num_addresses); + grpc_subchannel_args *sc_args = (grpc_subchannel_args *)gpr_zalloc( + sizeof(*sc_args) * addresses->num_addresses); /* We remove the following keys in order for subchannel keys belonging to * subchannels point to the same address to match. */ static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, @@ -403,7 +404,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, } /* Create the subchannels for the new subchannel args/addresses. */ grpc_subchannel **new_subchannels = - gpr_zalloc(sizeof(*new_subchannels) * sc_args_count); + (grpc_subchannel **)gpr_zalloc(sizeof(*new_subchannels) * sc_args_count); size_t num_new_subchannels = 0; for (size_t i = 0; i < sc_args_count; i++) { grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( @@ -686,6 +687,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, } pf_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); + grpc_subchannel_index_ref(); GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p, grpc_combiner_scheduler(args->combiner)); return &p->base; diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index be91d3d651..8ac1a46abd 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -30,6 +30,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/combiner.h" @@ -310,6 +311,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { (void *)pol, (void *)pol); } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); + grpc_subchannel_index_unref(); gpr_free(p); } @@ -890,6 +892,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->client_channel_factory != NULL); round_robin_lb_policy *p = (round_robin_lb_policy *)gpr_zalloc(sizeof(*p)); grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); + grpc_subchannel_index_ref(); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); rr_update_locked(exec_ctx, &p->base, args); diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.c b/src/core/ext/filters/client_channel/lb_policy_factory.c index cdcaf17544..acf5929746 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.c +++ b/src/core/ext/filters/client_channel/lb_policy_factory.c @@ -126,13 +126,14 @@ void grpc_lb_addresses_destroy(grpc_exec_ctx* exec_ctx, } static void* lb_addresses_copy(void* addresses) { - return grpc_lb_addresses_copy(addresses); + return grpc_lb_addresses_copy((grpc_lb_addresses*)addresses); } static void lb_addresses_destroy(grpc_exec_ctx* exec_ctx, void* addresses) { - grpc_lb_addresses_destroy(exec_ctx, addresses); + grpc_lb_addresses_destroy(exec_ctx, (grpc_lb_addresses*)addresses); } static int lb_addresses_cmp(void* addresses1, void* addresses2) { - return grpc_lb_addresses_cmp(addresses1, addresses2); + return grpc_lb_addresses_cmp((grpc_lb_addresses*)addresses1, + (grpc_lb_addresses*)addresses2); } static const grpc_arg_pointer_vtable lb_addresses_arg_vtable = { lb_addresses_copy, lb_addresses_destroy, lb_addresses_cmp}; @@ -149,7 +150,7 @@ grpc_lb_addresses* grpc_lb_addresses_find_channel_arg( grpc_channel_args_find(channel_args, GRPC_ARG_LB_ADDRESSES); if (lb_addresses_arg == NULL || lb_addresses_arg->type != GRPC_ARG_POINTER) return NULL; - return lb_addresses_arg->value.pointer.p; + return (grpc_lb_addresses*)lb_addresses_arg->value.pointer.p; } void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) { diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c index c32b5c5a75..075c275e42 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c @@ -204,7 +204,7 @@ static char *choose_service_config(char *service_config_choice_json) { int random_pct = rand() % 100; int percentage; if (sscanf(field->value, "%d", &percentage) != 1 || - random_pct > percentage) { + random_pct > percentage || percentage == 0) { service_config_json = NULL; break; } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c index 9747d39a16..7f1f57259a 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c @@ -38,7 +38,7 @@ typedef struct fd_node { /** the owner of this fd node */ grpc_ares_ev_driver *ev_driver; /** the grpc_fd owned by this fd node */ - grpc_fd *grpc_fd; + grpc_fd *fd; /** a closure wrapping on_readable_cb, which should be invoked when the grpc_fd in this node becomes readable. */ grpc_closure read_closure; @@ -96,15 +96,15 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver *ev_driver) { } static void fd_node_destroy(grpc_exec_ctx *exec_ctx, fd_node *fdn) { - gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); + gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd)); GPR_ASSERT(!fdn->readable_registered); GPR_ASSERT(!fdn->writable_registered); gpr_mu_destroy(&fdn->mu); - grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->grpc_fd); + grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->fd); /* c-ares library has closed the fd inside grpc_fd. This fd may be picked up immediately by another thread, and should not be closed by the following grpc_fd_orphan. */ - grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, NULL, true /* already_closed */, + grpc_fd_orphan(exec_ctx, fdn->fd, NULL, NULL, true /* already_closed */, "c-ares query finished"); gpr_free(fdn); } @@ -150,9 +150,8 @@ void grpc_ares_ev_driver_shutdown(grpc_exec_ctx *exec_ctx, ev_driver->shutting_down = true; fd_node *fn = ev_driver->fds; while (fn != NULL) { - grpc_fd_shutdown( - exec_ctx, fn->grpc_fd, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("grpc_ares_ev_driver_shutdown")); + grpc_fd_shutdown(exec_ctx, fn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "grpc_ares_ev_driver_shutdown")); fn = fn->next; } gpr_mu_unlock(&ev_driver->mu); @@ -165,7 +164,7 @@ static fd_node *pop_fd_node(fd_node **head, int fd) { dummy_head.next = *head; fd_node *node = &dummy_head; while (node->next != NULL) { - if (grpc_fd_wrapped_fd(node->next->grpc_fd) == fd) { + if (grpc_fd_wrapped_fd(node->next->fd) == fd) { fd_node *ret = node->next; node->next = node->next->next; *head = dummy_head.next; @@ -184,9 +183,9 @@ static void on_readable_cb(grpc_exec_ctx *exec_ctx, void *arg, fdn->readable_registered = false; gpr_mu_unlock(&fdn->mu); - gpr_log(GPR_DEBUG, "readable on %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); + gpr_log(GPR_DEBUG, "readable on %d", grpc_fd_wrapped_fd(fdn->fd)); if (error == GRPC_ERROR_NONE) { - ares_process_fd(ev_driver->channel, grpc_fd_wrapped_fd(fdn->grpc_fd), + ares_process_fd(ev_driver->channel, grpc_fd_wrapped_fd(fdn->fd), ARES_SOCKET_BAD); } else { // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or @@ -211,10 +210,10 @@ static void on_writable_cb(grpc_exec_ctx *exec_ctx, void *arg, fdn->writable_registered = false; gpr_mu_unlock(&fdn->mu); - gpr_log(GPR_DEBUG, "writable on %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); + gpr_log(GPR_DEBUG, "writable on %d", grpc_fd_wrapped_fd(fdn->fd)); if (error == GRPC_ERROR_NONE) { ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, - grpc_fd_wrapped_fd(fdn->grpc_fd)); + grpc_fd_wrapped_fd(fdn->fd)); } else { // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or // timed out. The pending lookups made on this ev_driver will be cancelled @@ -253,7 +252,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i); fdn = (fd_node *)gpr_malloc(sizeof(fd_node)); gpr_log(GPR_DEBUG, "new fd: %d", socks[i]); - fdn->grpc_fd = grpc_fd_create(socks[i], fd_name); + fdn->fd = grpc_fd_create(socks[i], fd_name); fdn->ev_driver = ev_driver; fdn->readable_registered = false; fdn->writable_registered = false; @@ -262,8 +261,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_cb, fdn, grpc_schedule_on_exec_ctx); - grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, - fdn->grpc_fd); + grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, fdn->fd); gpr_free(fd_name); } fdn->next = new_list; @@ -274,9 +272,8 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, if (ARES_GETSOCK_READABLE(socks_bitmask, i) && !fdn->readable_registered) { grpc_ares_ev_driver_ref(ev_driver); - gpr_log(GPR_DEBUG, "notify read on: %d", - grpc_fd_wrapped_fd(fdn->grpc_fd)); - grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, &fdn->read_closure); + gpr_log(GPR_DEBUG, "notify read on: %d", grpc_fd_wrapped_fd(fdn->fd)); + grpc_fd_notify_on_read(exec_ctx, fdn->fd, &fdn->read_closure); fdn->readable_registered = true; } // Register write_closure if the socket is writable and write_closure @@ -284,9 +281,9 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && !fdn->writable_registered) { gpr_log(GPR_DEBUG, "notify write on: %d", - grpc_fd_wrapped_fd(fdn->grpc_fd)); + grpc_fd_wrapped_fd(fdn->fd)); grpc_ares_ev_driver_ref(ev_driver); - grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, &fdn->write_closure); + grpc_fd_notify_on_write(exec_ctx, fdn->fd, &fdn->write_closure); fdn->writable_registered = true; } gpr_mu_unlock(&fdn->mu); diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c index 2e2b411ab8..0ffb38518a 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c @@ -123,8 +123,8 @@ static void grpc_ares_request_unref(grpc_exec_ctx *exec_ctx, static grpc_ares_hostbyname_request *create_hostbyname_request( grpc_ares_request *parent_request, char *host, uint16_t port, bool is_balancer) { - grpc_ares_hostbyname_request *hr = - gpr_zalloc(sizeof(grpc_ares_hostbyname_request)); + grpc_ares_hostbyname_request *hr = (grpc_ares_hostbyname_request *)gpr_zalloc( + sizeof(grpc_ares_hostbyname_request)); hr->parent_request = parent_request; hr->host = gpr_strdup(host); hr->port = port; @@ -527,7 +527,8 @@ static void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, grpc_resolved_addresses **addrs) { grpc_resolve_address_ares_request *r = - gpr_zalloc(sizeof(grpc_resolve_address_ares_request)); + (grpc_resolve_address_ares_request *)gpr_zalloc( + sizeof(grpc_resolve_address_ares_request)); r->addrs_out = addrs; r->on_resolve_address_done = on_done; GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r, diff --git a/src/core/ext/filters/client_channel/retry_throttle.c b/src/core/ext/filters/client_channel/retry_throttle.c index 6cd6654b6f..09dcade089 100644 --- a/src/core/ext/filters/client_channel/retry_throttle.c +++ b/src/core/ext/filters/client_channel/retry_throttle.c @@ -99,7 +99,7 @@ static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create( int max_milli_tokens, int milli_token_ratio, grpc_server_retry_throttle_data* old_throttle_data) { grpc_server_retry_throttle_data* throttle_data = - gpr_malloc(sizeof(*throttle_data)); + (grpc_server_retry_throttle_data*)gpr_malloc(sizeof(*throttle_data)); memset(throttle_data, 0, sizeof(*throttle_data)); gpr_ref_init(&throttle_data->refs, 1); throttle_data->max_milli_tokens = max_milli_tokens; @@ -131,11 +131,11 @@ static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create( // static void* copy_server_name(void* key, void* unused) { - return gpr_strdup(key); + return gpr_strdup((const char*)key); } static long compare_server_name(void* key1, void* key2, void* unused) { - return strcmp(key1, key2); + return strcmp((const char*)key1, (const char*)key2); } static void destroy_server_retry_throttle_data(void* value, void* unused) { @@ -177,7 +177,8 @@ grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( const char* server_name, int max_milli_tokens, int milli_token_ratio) { gpr_mu_lock(&g_mu); grpc_server_retry_throttle_data* throttle_data = - gpr_avl_get(g_avl, (char*)server_name, NULL); + (grpc_server_retry_throttle_data*)gpr_avl_get(g_avl, (char*)server_name, + NULL); if (throttle_data == NULL) { // Entry not found. Create a new one. throttle_data = grpc_server_retry_throttle_data_create( diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c index 2c466f595a..9bd35f83e8 100644 --- a/src/core/ext/filters/client_channel/subchannel.c +++ b/src/core/ext/filters/client_channel/subchannel.c @@ -33,6 +33,7 @@ #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" @@ -290,6 +291,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, return c; } + GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(exec_ctx); c = (grpc_subchannel *)gpr_zalloc(sizeof(*c)); c->key = key; gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); diff --git a/src/core/ext/filters/client_channel/subchannel_index.c b/src/core/ext/filters/client_channel/subchannel_index.c index f57b631c41..d7a51f3899 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.c +++ b/src/core/ext/filters/client_channel/subchannel_index.c @@ -34,6 +34,8 @@ static gpr_avl g_subchannel_index; static gpr_mu g_mu; +static gpr_refcount g_refcount; + struct grpc_subchannel_key { grpc_subchannel_args args; }; @@ -88,24 +90,26 @@ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, static void sck_avl_destroy(void *p, void *user_data) { grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data; - grpc_subchannel_key_destroy(exec_ctx, p); + grpc_subchannel_key_destroy(exec_ctx, (grpc_subchannel_key *)p); } static void *sck_avl_copy(void *p, void *unused) { - return subchannel_key_copy(p); + return subchannel_key_copy((grpc_subchannel_key *)p); } static long sck_avl_compare(void *a, void *b, void *unused) { - return grpc_subchannel_key_compare(a, b); + return grpc_subchannel_key_compare((grpc_subchannel_key *)a, + (grpc_subchannel_key *)b); } static void scv_avl_destroy(void *p, void *user_data) { grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data; - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, p, "subchannel_index"); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, (grpc_subchannel *)p, + "subchannel_index"); } static void *scv_avl_copy(void *p, void *unused) { - GRPC_SUBCHANNEL_WEAK_REF(p, "subchannel_index"); + GRPC_SUBCHANNEL_WEAK_REF((grpc_subchannel *)p, "subchannel_index"); return p; } @@ -119,15 +123,27 @@ static const gpr_avl_vtable subchannel_avl_vtable = { void grpc_subchannel_index_init(void) { g_subchannel_index = gpr_avl_create(&subchannel_avl_vtable); gpr_mu_init(&g_mu); + gpr_ref_init(&g_refcount, 1); } void grpc_subchannel_index_shutdown(void) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - gpr_mu_destroy(&g_mu); - gpr_avl_unref(g_subchannel_index, &exec_ctx); - grpc_exec_ctx_finish(&exec_ctx); + // TODO(juanlishen): This refcounting mechanism may lead to memory leackage. + // To solve that, we should force polling to flush any pending callbacks, then + // shutdown safely. + grpc_subchannel_index_unref(); +} + +void grpc_subchannel_index_unref(void) { + if (gpr_unref(&g_refcount)) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_mu_destroy(&g_mu); + gpr_avl_unref(g_subchannel_index, &exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); + } } +void grpc_subchannel_index_ref(void) { gpr_ref_non_zero(&g_refcount); } + grpc_subchannel *grpc_subchannel_index_find(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key) { // Lock, and take a reference to the subchannel index. diff --git a/src/core/ext/filters/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h index 98d882a453..92e36d5283 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.h +++ b/src/core/ext/filters/client_channel/subchannel_index.h @@ -59,6 +59,13 @@ void grpc_subchannel_index_init(void); /** Shutdown the subchannel index (global) */ void grpc_subchannel_index_shutdown(void); +/** Increment the refcount (non-zero) of subchannel index (global). */ +void grpc_subchannel_index_ref(void); + +/** Decrement the refcount of subchannel index (global). If the refcount drops + to zero, unref the subchannel index and destroy its mutex. */ +void grpc_subchannel_index_unref(void); + /** \em TEST ONLY. * If \a force_creation is true, all key comparisons will be false, resulting in * new subchannels always being created. Otherwise, the keys will be compared as diff --git a/src/core/ext/filters/http/server/http_server_filter.c b/src/core/ext/filters/http/server/http_server_filter.c index 554a7f530d..03958136b4 100644 --- a/src/core/ext/filters/http/server/http_server_filter.c +++ b/src/core/ext/filters/http/server/http_server_filter.c @@ -83,12 +83,12 @@ static grpc_error *server_filter_outgoing_metadata(grpc_exec_ctx *exec_ctx, } static void add_error(const char *error_name, grpc_error **cumulative, - grpc_error *new) { - if (new == GRPC_ERROR_NONE) return; + grpc_error *new_err) { + if (new_err == GRPC_ERROR_NONE) return; if (*cumulative == GRPC_ERROR_NONE) { *cumulative = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_name); } - *cumulative = grpc_error_add_child(*cumulative, new); + *cumulative = grpc_error_add_child(*cumulative, new_err); } static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/filters/max_age/max_age_filter.c b/src/core/ext/filters/max_age/max_age_filter.c index 1580efa063..b7032f288d 100644 --- a/src/core/ext/filters/max_age/max_age_filter.c +++ b/src/core/ext/filters/max_age/max_age_filter.c @@ -393,7 +393,7 @@ static bool maybe_add_max_age_filter(grpc_exec_ctx* exec_ctx, bool enable = grpc_channel_arg_get_integer( grpc_channel_args_find(channel_args, GRPC_ARG_MAX_CONNECTION_AGE_MS), - MAX_CONNECTION_AGE_INTEGER_OPTIONS) != INT_MAX && + MAX_CONNECTION_AGE_INTEGER_OPTIONS) != INT_MAX || grpc_channel_arg_get_integer( grpc_channel_args_find(channel_args, GRPC_ARG_MAX_CONNECTION_IDLE_MS), MAX_CONNECTION_IDLE_INTEGER_OPTIONS) != INT_MAX; diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c index 0ec9353c04..202bcd47f5 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.c +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -161,7 +161,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(error)); } gpr_mu_unlock(&c->mu); - chttp2_connector_unref(exec_ctx, arg); + chttp2_connector_unref(exec_ctx, (grpc_connector *)arg); } else { GPR_ASSERT(c->endpoint != NULL); start_handshake_locked(exec_ctx, c); diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c index c2b2fc03aa..b7ffd16eac 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.c +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -52,7 +52,7 @@ typedef struct { } server_state; typedef struct { - server_state *server_state; + server_state *svr_state; grpc_pollset *accepting_pollset; grpc_tcp_server_acceptor *acceptor; grpc_handshake_manager *handshake_mgr; @@ -63,8 +63,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_handshaker_args *args = (grpc_handshaker_args *)arg; server_connection_state *connection_state = (server_connection_state *)args->user_data; - gpr_mu_lock(&connection_state->server_state->mu); - if (error != GRPC_ERROR_NONE || connection_state->server_state->shutdown) { + gpr_mu_lock(&connection_state->svr_state->mu); + if (error != GRPC_ERROR_NONE || connection_state->svr_state->shutdown) { const char *error_str = grpc_error_string(error); gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str); @@ -89,7 +89,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_transport *transport = grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0); grpc_server_setup_transport( - exec_ctx, connection_state->server_state->server, transport, + exec_ctx, connection_state->svr_state->server, transport, connection_state->accepting_pollset, args->args); grpc_chttp2_transport_start_reading(exec_ctx, transport, args->read_buffer); @@ -97,11 +97,11 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, } } grpc_handshake_manager_pending_list_remove( - &connection_state->server_state->pending_handshake_mgrs, + &connection_state->svr_state->pending_handshake_mgrs, connection_state->handshake_mgr); - gpr_mu_unlock(&connection_state->server_state->mu); + gpr_mu_unlock(&connection_state->svr_state->mu); grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); - grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server); + grpc_tcp_server_unref(exec_ctx, connection_state->svr_state->tcp_server); gpr_free(connection_state->acceptor); gpr_free(connection_state); } @@ -124,8 +124,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, gpr_mu_unlock(&state->mu); grpc_tcp_server_ref(state->tcp_server); server_connection_state *connection_state = - gpr_malloc(sizeof(*connection_state)); - connection_state->server_state = state; + (server_connection_state *)gpr_malloc(sizeof(*connection_state)); + connection_state->svr_state = state; connection_state->accepting_pollset = accepting_pollset; connection_state->acceptor = acceptor; connection_state->handshake_mgr = handshake_mgr; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index d77047cb9e..30c2494185 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -144,10 +144,11 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); -static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_ping_type ping_type, - grpc_closure *on_initiate, - grpc_closure *on_complete); +static void send_ping_locked( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_ping_type ping_type, grpc_closure *on_initiate, + grpc_closure *on_complete, + grpc_chttp2_initiate_write_reason initiate_write_reason); static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error); @@ -346,7 +347,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (is_client) { grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string( GRPC_CHTTP2_CLIENT_CONNECT_STRING)); - grpc_chttp2_initiate_write(exec_ctx, t, "initial_write"); } /* configure http2 the way we like it */ @@ -562,7 +562,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; } - grpc_chttp2_initiate_write(exec_ctx, t, "init"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE); post_benign_reclaimer(exec_ctx, t); } @@ -830,13 +831,91 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } +static void inc_initiate_write_reason( + grpc_exec_ctx *exec_ctx, grpc_chttp2_initiate_write_reason reason) { + switch (reason) { + case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA( + exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA( + exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL( + exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING( + exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE( + exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED( + exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM(exec_ctx); + break; + } +} + void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, const char *reason) { + grpc_chttp2_transport *t, + grpc_chttp2_initiate_write_reason reason) { GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0); switch (t->write_state) { case GRPC_CHTTP2_WRITE_STATE_IDLE: - set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason); + inc_initiate_write_reason(exec_ctx, reason); + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, + grpc_chttp2_initiate_write_reason_string(reason)); t->is_first_write_in_batch = true; GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CLOSURE_SCHED( @@ -848,7 +927,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, break; case GRPC_CHTTP2_WRITE_STATE_WRITING: set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, - reason); + grpc_chttp2_initiate_write_reason_string(reason)); break; case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: break; @@ -856,16 +935,12 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, GPR_TIMER_END("grpc_chttp2_initiate_write", 0); } -void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, - bool also_initiate_write, const char *reason) { +void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s) { if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become"); } - if (also_initiate_write) { - grpc_chttp2_initiate_write(exec_ctx, t, reason); - } } static grpc_closure_scheduler *write_scheduler(grpc_chttp2_transport *t, @@ -1087,7 +1162,9 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); post_destructive_reclaimer(exec_ctx, t); - grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM); } /* cancel out streams that will never be started */ while (t->next_stream_id >= MAX_CLIENT_STREAM_ID && @@ -1184,7 +1261,9 @@ static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) { if (s->id != 0 && (!s->write_buffering || s->flow_controlled_buffer.length > t->write_buffer_size)) { - grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE); } } @@ -1385,14 +1464,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } } else { GPR_ASSERT(s->id != 0); - bool initiate_write = true; - if (op->send_message && - (op->payload->send_message.send_message->flags & - GRPC_WRITE_BUFFER_HINT)) { - initiate_write = false; + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + if (!(op->send_message && + (op->payload->send_message.send_message->flags & + GRPC_WRITE_BUFFER_HINT))) { + grpc_chttp2_initiate_write( + exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA); } - grpc_chttp2_become_writable(exec_ctx, t, s, initiate_write, - "op.send_initial_metadata"); } } else { s->send_initial_metadata = NULL; @@ -1500,8 +1578,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } else if (s->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ - grpc_chttp2_become_writable(exec_ctx, t, s, true, - "op.send_trailing_metadata"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write( + exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA); } } } @@ -1611,15 +1690,17 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ERROR_UNREF(error); } -static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_ping_type ping_type, - grpc_closure *on_initiate, grpc_closure *on_ack) { +static void send_ping_locked( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_ping_type ping_type, grpc_closure *on_initiate, + grpc_closure *on_ack, + grpc_chttp2_initiate_write_reason initiate_write_reason) { grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type]; grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate, GRPC_ERROR_NONE); if (grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack, GRPC_ERROR_NONE)) { - grpc_chttp2_initiate_write(exec_ctx, t, "send_ping"); + grpc_chttp2_initiate_write(exec_ctx, t, initiate_write_reason); } } @@ -1627,7 +1708,8 @@ static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; t->ping_state.is_delayed_ping_timer_set = false; - grpc_chttp2_initiate_write(exec_ctx, t, "retry_send_ping"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); } void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -1642,7 +1724,8 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { - grpc_chttp2_initiate_write(exec_ctx, t, "continue_pings"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS); } } @@ -1655,7 +1738,8 @@ static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &http_error); grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)http_error, grpc_slice_ref_internal(slice), &t->qbuf); - grpc_chttp2_initiate_write(exec_ctx, t, "goaway_sent"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT); GRPC_ERROR_UNREF(error); } @@ -1706,7 +1790,8 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, if (op->send_ping) { send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, NULL, - op->send_ping); + op->send_ping, + GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING); } if (op->on_connectivity_state_change != NULL) { @@ -1952,7 +2037,8 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(s->id, (uint32_t)http_error, &s->stats.outgoing)); - grpc_chttp2_initiate_write(exec_ctx, t, "rst_stream"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM); } } if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) { @@ -2274,7 +2360,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &s->stats.outgoing)); grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, error); - grpc_chttp2_initiate_write(exec_ctx, t, "close_from_api"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API); } typedef struct { @@ -2309,19 +2396,20 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: break; case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: - grpc_chttp2_become_writable(exec_ctx, t, s, true, - "immediate stream flowctl"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write( + exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL); break; case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: - grpc_chttp2_become_writable(exec_ctx, t, s, false, - "queue stream flowctl"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); break; } switch (action.send_transport_update) { case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: break; case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: - grpc_chttp2_initiate_write(exec_ctx, t, "immediate transport flowctl"); + grpc_chttp2_initiate_write( + exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL); break; // this is the same as no action b/c every time the transport enters the // writing path it will maybe do an update @@ -2339,7 +2427,8 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, (uint32_t)action.max_frame_size); } if (action.send_setting_update == GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY) { - grpc_chttp2_initiate_write(exec_ctx, t, "immediate setting update"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS); } } if (action.need_ping) { @@ -2347,7 +2436,8 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, grpc_bdp_estimator_schedule_ping(&t->flow_control.bdp_estimator); send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE, - &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked); + &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked, + GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING); } } @@ -2426,7 +2516,10 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, if (t->flow_control.initial_window_update > 0) { grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { - grpc_chttp2_become_writable(exec_ctx, t, s, true, "unstalled"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write( + exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING); } } t->flow_control.initial_window_update = 0; @@ -2541,7 +2634,8 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, &t->start_keepalive_ping_locked, - &t->finish_keepalive_ping_locked); + &t->finish_keepalive_ping_locked, + GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); } else { GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init(exec_ctx, &t->keepalive_ping_timer, @@ -2893,7 +2987,8 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, uint32_t frame_size, uint32_t flags) { grpc_chttp2_incoming_byte_stream *incoming_byte_stream = - gpr_malloc(sizeof(*incoming_byte_stream)); + (grpc_chttp2_incoming_byte_stream *)gpr_malloc( + sizeof(*incoming_byte_stream)); incoming_byte_stream->base.length = frame_size; incoming_byte_stream->remaining_bytes = frame_size; incoming_byte_stream->base.flags = flags; @@ -2997,6 +3092,56 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, /******************************************************************************* * MONITORING */ + +const char *grpc_chttp2_initiate_write_reason_string( + grpc_chttp2_initiate_write_reason reason) { + switch (reason) { + case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE: + return "INITIAL_WRITE"; + case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM: + return "START_NEW_STREAM"; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE: + return "SEND_MESSAGE"; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA: + return "SEND_INITIAL_METADATA"; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA: + return "SEND_TRAILING_METADATA"; + case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING: + return "RETRY_SEND_PING"; + case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS: + return "CONTINUE_PINGS"; + case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT: + return "GOAWAY_SENT"; + case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM: + return "RST_STREAM"; + case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API: + return "CLOSE_FROM_API"; + case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL: + return "STREAM_FLOW_CONTROL"; + case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL: + return "TRANSPORT_FLOW_CONTROL"; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS: + return "SEND_SETTINGS"; + case GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING: + return "BDP_ESTIMATOR_PING"; + case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING: + return "FLOW_CONTROL_UNSTALLED_BY_SETTING"; + case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE: + return "FLOW_CONTROL_UNSTALLED_BY_UPDATE"; + case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING: + return "APPLICATION_PING"; + case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING: + return "KEEPALIVE_PING"; + case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED: + return "TRANSPORT_FLOW_CONTROL_UNSTALLED"; + case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE: + return "PING_RESPONSE"; + case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM: + return "FORCE_RST_STREAM"; + } + GPR_UNREACHABLE_CODE(return "unknown"); +} + static grpc_endpoint *chttp2_get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *t) { return ((grpc_chttp2_transport *)t)->ep; diff --git a/src/core/ext/transport/chttp2/transport/flow_control.c b/src/core/ext/transport/chttp2/transport/flow_control.c index 0f078e79e9..569a6349d3 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.c +++ b/src/core/ext/transport/chttp2/transport/flow_control.c @@ -60,24 +60,24 @@ static void pretrace(shadow_flow_control* shadow_fc, #define TRACE_PADDING 30 -static char* fmt_int64_diff_str(int64_t old, int64_t new) { +static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) { char* str; - if (old != new) { - gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old, new); + if (old_val != new_val) { + gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old_val, new_val); } else { - gpr_asprintf(&str, "%" PRId64 "", old); + gpr_asprintf(&str, "%" PRId64 "", old_val); } char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING); gpr_free(str); return str_lp; } -static char* fmt_uint32_diff_str(uint32_t old, uint32_t new) { +static char* fmt_uint32_diff_str(uint32_t old_val, uint32_t new_val) { char* str; - if (new > 0 && old != new) { - gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old, new); + if (new_val > 0 && old_val != new_val) { + gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old_val, new_val); } else { - gpr_asprintf(&str, "%" PRIu32 "", old); + gpr_asprintf(&str, "%" PRIu32 "", old_val); } char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING); gpr_free(str); diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c index e99bc2eb4e..7fc2ce1bd9 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.c +++ b/src/core/ext/transport/chttp2/transport/frame_ping.c @@ -115,7 +115,8 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, t->ping_acks, t->ping_ack_capacity * sizeof(*t->ping_acks)); } t->ping_acks[t->ping_ack_count++] = p->opaque_8bytes; - grpc_chttp2_initiate_write(exec_ctx, t, "ping response"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE); } } } diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.c b/src/core/ext/transport/chttp2/transport/frame_settings.c index 806100adaa..2995bf7310 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.c +++ b/src/core/ext/transport/chttp2/transport/frame_settings.c @@ -44,7 +44,8 @@ static uint8_t *fill_header(uint8_t *out, uint32_t length, uint8_t flags) { return out; } -grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new, +grpc_slice grpc_chttp2_settings_create(uint32_t *old_settings, + const uint32_t *new_settings, uint32_t force_mask, size_t count) { size_t i; uint32_t n = 0; @@ -52,21 +53,21 @@ grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new, uint8_t *p; for (i = 0; i < count; i++) { - n += (new[i] != old[i] || (force_mask & (1u << i)) != 0); + n += (new_settings[i] != old_settings[i] || (force_mask & (1u << i)) != 0); } output = GRPC_SLICE_MALLOC(9 + 6 * n); p = fill_header(GRPC_SLICE_START_PTR(output), 6 * n, 0); for (i = 0; i < count; i++) { - if (new[i] != old[i] || (force_mask & (1u << i)) != 0) { + if (new_settings[i] != old_settings[i] || (force_mask & (1u << i)) != 0) { *p++ = (uint8_t)(grpc_setting_id_to_wire_id[i] >> 8); *p++ = (uint8_t)(grpc_setting_id_to_wire_id[i]); - *p++ = (uint8_t)(new[i] >> 24); - *p++ = (uint8_t)(new[i] >> 16); - *p++ = (uint8_t)(new[i] >> 8); - *p++ = (uint8_t)(new[i]); - old[i] = new[i]; + *p++ = (uint8_t)(new_settings[i] >> 24); + *p++ = (uint8_t)(new_settings[i] >> 16); + *p++ = (uint8_t)(new_settings[i] >> 8); + *p++ = (uint8_t)(new_settings[i]); + old_settings[i] = new_settings[i]; } } diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c index c94f7725bf..c9ab8d1b50 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.c +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c @@ -99,8 +99,10 @@ grpc_error *grpc_chttp2_window_update_parser_parse( grpc_chttp2_flowctl_recv_stream_update( &t->flow_control, &s->flow_control, received_update); if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) { - grpc_chttp2_become_writable(exec_ctx, t, s, true, - "stream.read_flow_control"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write( + exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE); } } } else { @@ -109,7 +111,9 @@ grpc_error *grpc_chttp2_window_update_parser_parse( received_update); bool is_zero = t->flow_control.remote_window <= 0; if (was_zero && !is_zero) { - grpc_chttp2_initiate_write(exec_ctx, t, "new_global_flow_control"); + grpc_chttp2_initiate_write( + exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED); } } } diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 82ff2c8e2c..901e7413c0 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1649,7 +1649,8 @@ static void force_client_rst_stream(grpc_exec_ctx *exec_ctx, void *sp, grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing)); - grpc_chttp2_initiate_write(exec_ctx, t, "force_rst_stream"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM); grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, true, GRPC_ERROR_NONE); } GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "final_rst"); diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c index 089b360ecf..187ce0ea87 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c @@ -42,8 +42,9 @@ grpc_error *grpc_chttp2_incoming_metadata_buffer_add( grpc_mdelem elem) { buffer->size += GRPC_MDELEM_LENGTH(elem); return grpc_metadata_batch_add_tail( - exec_ctx, &buffer->batch, - gpr_arena_alloc(buffer->arena, sizeof(grpc_linked_mdelem)), elem); + exec_ctx, &buffer->batch, (grpc_linked_mdelem *)gpr_arena_alloc( + buffer->arena, sizeof(grpc_linked_mdelem)), + elem); } grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 46930dd22c..4382e30fcd 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -79,6 +79,33 @@ typedef enum { GRPC_CHTTP2_PCL_COUNT /* must be last */ } grpc_chttp2_ping_closure_list; +typedef enum { + GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE, + GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM, + GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE, + GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA, + GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA, + GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING, + GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS, + GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT, + GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM, + GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API, + GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, + GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, + GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, + GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING, + GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING, + GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE, + GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING, + GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING, + GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED, + GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE, + GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM, +} grpc_chttp2_initiate_write_reason; + +const char *grpc_chttp2_initiate_write_reason_string( + grpc_chttp2_initiate_write_reason reason); + typedef struct { grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT]; uint64_t inflight_id; @@ -599,7 +626,8 @@ struct grpc_chttp2_stream { The actual call chain is documented in the implementation of this function. */ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, const char *reason); + grpc_chttp2_transport *t, + grpc_chttp2_initiate_write_reason reason); typedef struct { /** are we writing? */ @@ -851,10 +879,9 @@ void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx, /** add a ref to the stream and add it to the writable list; ref will be dropped in writing.c */ -void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, - bool also_initiate_write, const char *reason); +void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s); void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, diff --git a/src/core/ext/transport/chttp2/transport/stream_map.c b/src/core/ext/transport/chttp2/transport/stream_map.c index 650090d8f0..d6079a9a33 100644 --- a/src/core/ext/transport/chttp2/transport/stream_map.c +++ b/src/core/ext/transport/chttp2/transport/stream_map.c @@ -72,8 +72,10 @@ void grpc_chttp2_stream_map_add(grpc_chttp2_stream_map *map, uint32_t key, /* resize when less than 25% of the table is free, because compaction won't help much */ map->capacity = capacity = 3 * capacity / 2; - map->keys = keys = gpr_realloc(keys, capacity * sizeof(uint32_t)); - map->values = values = gpr_realloc(values, capacity * sizeof(void *)); + map->keys = keys = + (uint32_t *)gpr_realloc(keys, capacity * sizeof(uint32_t)); + map->values = values = + (void **)gpr_realloc(values, capacity * sizeof(void *)); } } diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index eb818fab2f..c413905f5c 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -200,9 +200,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( if (t->flow_control.remote_window > 0) { while (grpc_chttp2_list_pop_stalled_by_transport(t, &s)) { - if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s) && - stream_ref_if_not_destroyed(&s->refcount->refs)) { - grpc_chttp2_initiate_write(exec_ctx, t, "transport.read_flow_control"); + if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) { + stream_ref_if_not_destroyed(&s->refcount->refs); } } } diff --git a/src/core/ext/transport/inproc/inproc_transport.c b/src/core/ext/transport/inproc/inproc_transport.c index 49e507cc78..72406aee2c 100644 --- a/src/core/ext/transport/inproc/inproc_transport.c +++ b/src/core/ext/transport/inproc/inproc_transport.c @@ -37,7 +37,6 @@ if (GRPC_TRACER_ON(grpc_inproc_trace)) gpr_log(__VA_ARGS__); \ } while (0) -static const grpc_transport_vtable inproc_vtable; static grpc_slice g_empty_slice; static grpc_slice g_fake_path_key; static grpc_slice g_fake_path_value; @@ -1167,6 +1166,55 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { } /******************************************************************************* + * INTEGRATION GLUE + */ + +static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_pollset *pollset) { + // Nothing to do here +} + +static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_pollset_set *pollset_set) { + // Nothing to do here +} + +static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *t) { + return NULL; +} + +/******************************************************************************* + * GLOBAL INIT AND DESTROY + */ +static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} + +void grpc_inproc_transport_init(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, NULL, + grpc_schedule_on_exec_ctx); + g_empty_slice = grpc_slice_from_static_buffer(NULL, 0); + + grpc_slice key_tmp = grpc_slice_from_static_string(":path"); + g_fake_path_key = grpc_slice_intern(key_tmp); + grpc_slice_unref_internal(&exec_ctx, key_tmp); + + g_fake_path_value = grpc_slice_from_static_string("/"); + + grpc_slice auth_tmp = grpc_slice_from_static_string(":authority"); + g_fake_auth_key = grpc_slice_intern(auth_tmp); + grpc_slice_unref_internal(&exec_ctx, auth_tmp); + + g_fake_auth_value = grpc_slice_from_static_string("inproc-fail"); + grpc_exec_ctx_finish(&exec_ctx); +} + +static const grpc_transport_vtable inproc_vtable = { + sizeof(inproc_stream), "inproc", init_stream, + set_pollset, set_pollset_set, perform_stream_op, + perform_transport_op, destroy_stream, destroy_transport, + get_endpoint}; + +/******************************************************************************* * Main inproc transport functions */ static void inproc_transports_create(grpc_exec_ctx *exec_ctx, @@ -1178,7 +1226,7 @@ static void inproc_transports_create(grpc_exec_ctx *exec_ctx, inproc_transport *st = (inproc_transport *)gpr_zalloc(sizeof(*st)); inproc_transport *ct = (inproc_transport *)gpr_zalloc(sizeof(*ct)); // Share one lock between both sides since both sides get affected - st->mu = ct->mu = gpr_malloc(sizeof(*st->mu)); + st->mu = ct->mu = (shared_mu *)gpr_malloc(sizeof(*st->mu)); gpr_mu_init(&st->mu->mu); gpr_ref_init(&st->mu->refs, 2); st->base.vtable = &inproc_vtable; @@ -1240,55 +1288,6 @@ grpc_channel *grpc_inproc_channel_create(grpc_server *server, return channel; } -/******************************************************************************* - * INTEGRATION GLUE - */ - -static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_pollset *pollset) { - // Nothing to do here -} - -static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_pollset_set *pollset_set) { - // Nothing to do here -} - -static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *t) { - return NULL; -} - -static const grpc_transport_vtable inproc_vtable = { - sizeof(inproc_stream), "inproc", init_stream, - set_pollset, set_pollset_set, perform_stream_op, - perform_transport_op, destroy_stream, destroy_transport, - get_endpoint}; - -/******************************************************************************* - * GLOBAL INIT AND DESTROY - */ -static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} - -void grpc_inproc_transport_init(void) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, NULL, - grpc_schedule_on_exec_ctx); - g_empty_slice = grpc_slice_from_static_buffer(NULL, 0); - - grpc_slice key_tmp = grpc_slice_from_static_string(":path"); - g_fake_path_key = grpc_slice_intern(key_tmp); - grpc_slice_unref_internal(&exec_ctx, key_tmp); - - g_fake_path_value = grpc_slice_from_static_string("/"); - - grpc_slice auth_tmp = grpc_slice_from_static_string(":authority"); - g_fake_auth_key = grpc_slice_intern(auth_tmp); - grpc_slice_unref_internal(&exec_ctx, auth_tmp); - - g_fake_auth_value = grpc_slice_from_static_string("inproc-fail"); - grpc_exec_ctx_finish(&exec_ctx); -} - void grpc_inproc_transport_shutdown(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_slice_unref_internal(&exec_ctx, g_empty_slice); diff --git a/src/core/lib/debug/stats_data.c b/src/core/lib/debug/stats_data.c index a18efcb524..b3e1ee9b4e 100644 --- a/src/core/lib/debug/stats_data.c +++ b/src/core/lib/debug/stats_data.c @@ -25,6 +25,10 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "client_calls_created", "server_calls_created", + "cqs_created", + "client_channels_created", + "client_subchannels_created", + "server_channels_created", "syscall_poll", "syscall_wait", "histogram_slow_lookups", @@ -46,6 +50,27 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "http2_writes_offloaded", "http2_writes_continued", "http2_partial_writes", + "http2_initiate_write_due_to_initial_write", + "http2_initiate_write_due_to_start_new_stream", + "http2_initiate_write_due_to_send_message", + "http2_initiate_write_due_to_send_initial_metadata", + "http2_initiate_write_due_to_send_trailing_metadata", + "http2_initiate_write_due_to_retry_send_ping", + "http2_initiate_write_due_to_continue_pings", + "http2_initiate_write_due_to_goaway_sent", + "http2_initiate_write_due_to_rst_stream", + "http2_initiate_write_due_to_close_from_api", + "http2_initiate_write_due_to_stream_flow_control", + "http2_initiate_write_due_to_transport_flow_control", + "http2_initiate_write_due_to_send_settings", + "http2_initiate_write_due_to_bdp_estimator_ping", + "http2_initiate_write_due_to_flow_control_unstalled_by_setting", + "http2_initiate_write_due_to_flow_control_unstalled_by_update", + "http2_initiate_write_due_to_application_ping", + "http2_initiate_write_due_to_keepalive_ping", + "http2_initiate_write_due_to_transport_flow_control_unstalled", + "http2_initiate_write_due_to_ping_response", + "http2_initiate_write_due_to_force_rst_stream", "combiner_locks_initiated", "combiner_locks_scheduled_items", "combiner_locks_scheduled_final_items", @@ -62,6 +87,8 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "Number of client side calls created by this process", "Number of server side calls created by this process", + "Number of completion queues created", "Number of client channels created", + "Number of client subchannels created", "Number of server channels created", "Number of polling syscalls (epoll_wait, poll, etc) made by this process", "Number of sleeping syscalls made by this process", "Number of times histogram increments went through the slow (binary " @@ -86,6 +113,30 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "written", "Number of HTTP2 writes that were made knowing there was still more data " "to be written (we cap maximum write size to syscall_write)", + "Number of HTTP2 writes initiated due to 'initial_write'", + "Number of HTTP2 writes initiated due to 'start_new_stream'", + "Number of HTTP2 writes initiated due to 'send_message'", + "Number of HTTP2 writes initiated due to 'send_initial_metadata'", + "Number of HTTP2 writes initiated due to 'send_trailing_metadata'", + "Number of HTTP2 writes initiated due to 'retry_send_ping'", + "Number of HTTP2 writes initiated due to 'continue_pings'", + "Number of HTTP2 writes initiated due to 'goaway_sent'", + "Number of HTTP2 writes initiated due to 'rst_stream'", + "Number of HTTP2 writes initiated due to 'close_from_api'", + "Number of HTTP2 writes initiated due to 'stream_flow_control'", + "Number of HTTP2 writes initiated due to 'transport_flow_control'", + "Number of HTTP2 writes initiated due to 'send_settings'", + "Number of HTTP2 writes initiated due to 'bdp_estimator_ping'", + "Number of HTTP2 writes initiated due to " + "'flow_control_unstalled_by_setting'", + "Number of HTTP2 writes initiated due to " + "'flow_control_unstalled_by_update'", + "Number of HTTP2 writes initiated due to 'application_ping'", + "Number of HTTP2 writes initiated due to 'keepalive_ping'", + "Number of HTTP2 writes initiated due to " + "'transport_flow_control_unstalled'", + "Number of HTTP2 writes initiated due to 'ping_response'", + "Number of HTTP2 writes initiated due to 'force_rst_stream'", "Number of combiner lock entries by process (first items queued to a " "combiner)", "Number of items scheduled against combiner locks", diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h index 479c9520b6..c9871c4a56 100644 --- a/src/core/lib/debug/stats_data.h +++ b/src/core/lib/debug/stats_data.h @@ -27,6 +27,10 @@ typedef enum { GRPC_STATS_COUNTER_CLIENT_CALLS_CREATED, GRPC_STATS_COUNTER_SERVER_CALLS_CREATED, + GRPC_STATS_COUNTER_CQS_CREATED, + GRPC_STATS_COUNTER_CLIENT_CHANNELS_CREATED, + GRPC_STATS_COUNTER_CLIENT_SUBCHANNELS_CREATED, + GRPC_STATS_COUNTER_SERVER_CHANNELS_CREATED, GRPC_STATS_COUNTER_SYSCALL_POLL, GRPC_STATS_COUNTER_SYSCALL_WAIT, GRPC_STATS_COUNTER_HISTOGRAM_SLOW_LOOKUPS, @@ -48,6 +52,27 @@ typedef enum { GRPC_STATS_COUNTER_HTTP2_WRITES_OFFLOADED, GRPC_STATS_COUNTER_HTTP2_WRITES_CONTINUED, GRPC_STATS_COUNTER_HTTP2_PARTIAL_WRITES, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE, + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM, GRPC_STATS_COUNTER_COMBINER_LOCKS_INITIATED, GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_ITEMS, GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS, @@ -109,6 +134,15 @@ typedef enum { GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CLIENT_CALLS_CREATED) #define GRPC_STATS_INC_SERVER_CALLS_CREATED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SERVER_CALLS_CREATED) +#define GRPC_STATS_INC_CQS_CREATED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CQS_CREATED) +#define GRPC_STATS_INC_CLIENT_CHANNELS_CREATED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CLIENT_CHANNELS_CREATED) +#define GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_CLIENT_SUBCHANNELS_CREATED) +#define GRPC_STATS_INC_SERVER_CHANNELS_CREATED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SERVER_CHANNELS_CREATED) #define GRPC_STATS_INC_SYSCALL_POLL(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SYSCALL_POLL) #define GRPC_STATS_INC_SYSCALL_WAIT(exec_ctx) \ @@ -156,6 +190,95 @@ typedef enum { GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_WRITES_CONTINUED) #define GRPC_STATS_INC_HTTP2_PARTIAL_WRITES(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_HTTP2_PARTIAL_WRITES) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA( \ + exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA( \ + exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL( \ + exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL( \ + exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING( \ + exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING( \ + exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE( \ + exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED( \ + exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE) +#define GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), \ + GRPC_STATS_COUNTER_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM) #define GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), \ GRPC_STATS_COUNTER_COMBINER_LOCKS_INITIATED) diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml index 7cf82de96c..84727fe6c4 100644 --- a/src/core/lib/debug/stats_data.yaml +++ b/src/core/lib/debug/stats_data.yaml @@ -20,6 +20,14 @@ doc: Number of client side calls created by this process - counter: server_calls_created doc: Number of server side calls created by this process +- counter: cqs_created + doc: Number of completion queues created +- counter: client_channels_created + doc: Number of client channels created +- counter: client_subchannels_created + doc: Number of client subchannels created +- counter: server_channels_created + doc: Number of server channels created # polling - counter: syscall_poll doc: Number of polling syscalls (epoll_wait, poll, etc) made by this process @@ -109,6 +117,48 @@ - counter: http2_partial_writes doc: Number of HTTP2 writes that were made knowing there was still more data to be written (we cap maximum write size to syscall_write) +- counter: http2_initiate_write_due_to_initial_write + doc: Number of HTTP2 writes initiated due to 'initial_write' +- counter: http2_initiate_write_due_to_start_new_stream + doc: Number of HTTP2 writes initiated due to 'start_new_stream' +- counter: http2_initiate_write_due_to_send_message + doc: Number of HTTP2 writes initiated due to 'send_message' +- counter: http2_initiate_write_due_to_send_initial_metadata + doc: Number of HTTP2 writes initiated due to 'send_initial_metadata' +- counter: http2_initiate_write_due_to_send_trailing_metadata + doc: Number of HTTP2 writes initiated due to 'send_trailing_metadata' +- counter: http2_initiate_write_due_to_retry_send_ping + doc: Number of HTTP2 writes initiated due to 'retry_send_ping' +- counter: http2_initiate_write_due_to_continue_pings + doc: Number of HTTP2 writes initiated due to 'continue_pings' +- counter: http2_initiate_write_due_to_goaway_sent + doc: Number of HTTP2 writes initiated due to 'goaway_sent' +- counter: http2_initiate_write_due_to_rst_stream + doc: Number of HTTP2 writes initiated due to 'rst_stream' +- counter: http2_initiate_write_due_to_close_from_api + doc: Number of HTTP2 writes initiated due to 'close_from_api' +- counter: http2_initiate_write_due_to_stream_flow_control + doc: Number of HTTP2 writes initiated due to 'stream_flow_control' +- counter: http2_initiate_write_due_to_transport_flow_control + doc: Number of HTTP2 writes initiated due to 'transport_flow_control' +- counter: http2_initiate_write_due_to_send_settings + doc: Number of HTTP2 writes initiated due to 'send_settings' +- counter: http2_initiate_write_due_to_bdp_estimator_ping + doc: Number of HTTP2 writes initiated due to 'bdp_estimator_ping' +- counter: http2_initiate_write_due_to_flow_control_unstalled_by_setting + doc: Number of HTTP2 writes initiated due to 'flow_control_unstalled_by_setting' +- counter: http2_initiate_write_due_to_flow_control_unstalled_by_update + doc: Number of HTTP2 writes initiated due to 'flow_control_unstalled_by_update' +- counter: http2_initiate_write_due_to_application_ping + doc: Number of HTTP2 writes initiated due to 'application_ping' +- counter: http2_initiate_write_due_to_keepalive_ping + doc: Number of HTTP2 writes initiated due to 'keepalive_ping' +- counter: http2_initiate_write_due_to_transport_flow_control_unstalled + doc: Number of HTTP2 writes initiated due to 'transport_flow_control_unstalled' +- counter: http2_initiate_write_due_to_ping_response + doc: Number of HTTP2 writes initiated due to 'ping_response' +- counter: http2_initiate_write_due_to_force_rst_stream + doc: Number of HTTP2 writes initiated due to 'force_rst_stream' # combiner locks - counter: combiner_locks_initiated doc: Number of combiner lock entries by process diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql index 5354769291..d21afbbfe4 100644 --- a/src/core/lib/debug/stats_data_bq_schema.sql +++ b/src/core/lib/debug/stats_data_bq_schema.sql @@ -1,5 +1,9 @@ client_calls_created_per_iteration:FLOAT, server_calls_created_per_iteration:FLOAT, +cqs_created_per_iteration:FLOAT, +client_channels_created_per_iteration:FLOAT, +client_subchannels_created_per_iteration:FLOAT, +server_channels_created_per_iteration:FLOAT, syscall_poll_per_iteration:FLOAT, syscall_wait_per_iteration:FLOAT, histogram_slow_lookups_per_iteration:FLOAT, @@ -21,6 +25,27 @@ http2_writes_begun_per_iteration:FLOAT, http2_writes_offloaded_per_iteration:FLOAT, http2_writes_continued_per_iteration:FLOAT, http2_partial_writes_per_iteration:FLOAT, +http2_initiate_write_due_to_initial_write_per_iteration:FLOAT, +http2_initiate_write_due_to_start_new_stream_per_iteration:FLOAT, +http2_initiate_write_due_to_send_message_per_iteration:FLOAT, +http2_initiate_write_due_to_send_initial_metadata_per_iteration:FLOAT, +http2_initiate_write_due_to_send_trailing_metadata_per_iteration:FLOAT, +http2_initiate_write_due_to_retry_send_ping_per_iteration:FLOAT, +http2_initiate_write_due_to_continue_pings_per_iteration:FLOAT, +http2_initiate_write_due_to_goaway_sent_per_iteration:FLOAT, +http2_initiate_write_due_to_rst_stream_per_iteration:FLOAT, +http2_initiate_write_due_to_close_from_api_per_iteration:FLOAT, +http2_initiate_write_due_to_stream_flow_control_per_iteration:FLOAT, +http2_initiate_write_due_to_transport_flow_control_per_iteration:FLOAT, +http2_initiate_write_due_to_send_settings_per_iteration:FLOAT, +http2_initiate_write_due_to_bdp_estimator_ping_per_iteration:FLOAT, +http2_initiate_write_due_to_flow_control_unstalled_by_setting_per_iteration:FLOAT, +http2_initiate_write_due_to_flow_control_unstalled_by_update_per_iteration:FLOAT, +http2_initiate_write_due_to_application_ping_per_iteration:FLOAT, +http2_initiate_write_due_to_keepalive_ping_per_iteration:FLOAT, +http2_initiate_write_due_to_transport_flow_control_unstalled_per_iteration:FLOAT, +http2_initiate_write_due_to_ping_response_per_iteration:FLOAT, +http2_initiate_write_due_to_force_rst_stream_per_iteration:FLOAT, combiner_locks_initiated_per_iteration:FLOAT, combiner_locks_scheduled_items_per_iteration:FLOAT, combiner_locks_scheduled_final_items_per_iteration:FLOAT, diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index 7fa2899092..4e2746ad6c 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -131,9 +131,9 @@ static void fd_global_shutdown(void); * Pollset Declarations */ -typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state_t; +typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state; -static const char *kick_state_string(kick_state_t st) { +static const char *kick_state_string(kick_state st) { switch (st) { case UNKICKED: return "UNKICKED"; @@ -146,7 +146,7 @@ static const char *kick_state_string(kick_state_t st) { } struct grpc_pollset_worker { - kick_state_t kick_state; + kick_state state; int kick_state_mutator; // which line of code last changed kick state bool initialized_cv; grpc_pollset_worker *next; @@ -155,9 +155,9 @@ struct grpc_pollset_worker { grpc_closure_list schedule_on_end_work; }; -#define SET_KICK_STATE(worker, state) \ +#define SET_KICK_STATE(worker, kick_state) \ do { \ - (worker)->kick_state = (state); \ + (worker)->state = (kick_state); \ (worker)->kick_state_mutator = __LINE__; \ } while (false) @@ -509,7 +509,7 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) { if (pollset->root_worker != NULL) { grpc_pollset_worker *worker = pollset->root_worker; do { - switch (worker->kick_state) { + switch (worker->state) { case KICKED: break; case UNKICKED: @@ -685,7 +685,7 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_lock(&pollset->mu); if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d", - pollset, worker, kick_state_string(worker->kick_state), + pollset, worker, kick_state_string(worker->state), is_reassigning); } if (pollset->seen_inactive) { @@ -705,12 +705,12 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, at this point is if it were "kicked specifically". Since the worker has not added itself to the pollset yet (by calling worker_insert()), it is not visible in the "kick any" path yet */ - if (worker->kick_state == UNKICKED) { + if (worker->state == UNKICKED) { pollset->seen_inactive = false; if (neighborhood->active_root == NULL) { neighborhood->active_root = pollset->next = pollset->prev = pollset; /* Make this the designated poller if there isn't one already */ - if (worker->kick_state == UNKICKED && + if (worker->state == UNKICKED && gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) { SET_KICK_STATE(worker, DESIGNATED_POLLER); } @@ -730,20 +730,20 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, worker_insert(pollset, worker); pollset->begin_refs--; - if (worker->kick_state == UNKICKED && !pollset->kicked_without_poller) { + if (worker->state == UNKICKED && !pollset->kicked_without_poller) { GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker); worker->initialized_cv = true; gpr_cv_init(&worker->cv); - while (worker->kick_state == UNKICKED && !pollset->shutting_down) { + while (worker->state == UNKICKED && !pollset->shutting_down) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d", - pollset, worker, kick_state_string(worker->kick_state), + pollset, worker, kick_state_string(worker->state), pollset->shutting_down); } if (gpr_cv_wait(&worker->cv, &pollset->mu, grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME)) && - worker->kick_state == UNKICKED) { + worker->state == UNKICKED) { /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker received a kick */ SET_KICK_STATE(worker, KICKED); @@ -756,7 +756,7 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_log(GPR_ERROR, "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d " "kicked_without_poller: %d", - pollset, worker, kick_state_string(worker->kick_state), + pollset, worker, kick_state_string(worker->state), pollset->shutting_down, pollset->kicked_without_poller); } @@ -776,7 +776,7 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } GPR_TIMER_END("begin_worker", 0); - return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down; + return worker->state == DESIGNATED_POLLER && !pollset->shutting_down; } static bool check_neighborhood_for_available_poller( @@ -793,7 +793,7 @@ static bool check_neighborhood_for_available_poller( grpc_pollset_worker *inspect_worker = inspect->root_worker; if (inspect_worker != NULL) { do { - switch (inspect_worker->kick_state) { + switch (inspect_worker->state) { case UNKICKED: if (gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)inspect_worker)) { @@ -856,7 +856,7 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure_list_move(&worker->schedule_on_end_work, &exec_ctx->closure_list); if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) { - if (worker->next != worker && worker->next->kick_state == UNKICKED) { + if (worker->next != worker && worker->next->state == UNKICKED) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker); } @@ -990,14 +990,14 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, gpr_strvec_add(&log, tmp); if (pollset->root_worker != NULL) { gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}", - kick_state_string(pollset->root_worker->kick_state), + kick_state_string(pollset->root_worker->state), pollset->root_worker->next, - kick_state_string(pollset->root_worker->next->kick_state)); + kick_state_string(pollset->root_worker->next->state)); gpr_strvec_add(&log, tmp); } if (specific_worker != NULL) { gpr_asprintf(&tmp, " worker_kick_state=%s", - kick_state_string(specific_worker->kick_state)); + kick_state_string(specific_worker->state)); gpr_strvec_add(&log, tmp); } tmp = gpr_strvec_flatten(&log, NULL); @@ -1017,13 +1017,13 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, goto done; } grpc_pollset_worker *next_worker = root_worker->next; - if (root_worker->kick_state == KICKED) { + if (root_worker->state == KICKED) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. already kicked %p", root_worker); } SET_KICK_STATE(root_worker, KICKED); goto done; - } else if (next_worker->kick_state == KICKED) { + } else if (next_worker->state == KICKED) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. already kicked %p", next_worker); } @@ -1040,7 +1040,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, SET_KICK_STATE(root_worker, KICKED); ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd); goto done; - } else if (next_worker->kick_state == UNKICKED) { + } else if (next_worker->state == UNKICKED) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. kicked %p", next_worker); } @@ -1048,8 +1048,8 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, SET_KICK_STATE(next_worker, KICKED); gpr_cv_signal(&next_worker->cv); goto done; - } else if (next_worker->kick_state == DESIGNATED_POLLER) { - if (root_worker->kick_state != DESIGNATED_POLLER) { + } else if (next_worker->state == DESIGNATED_POLLER) { + if (root_worker->state != DESIGNATED_POLLER) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log( GPR_ERROR, @@ -1071,7 +1071,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, goto done; } } else { - GPR_ASSERT(next_worker->kick_state == KICKED); + GPR_ASSERT(next_worker->state == KICKED); SET_KICK_STATE(next_worker, KICKED); goto done; } @@ -1085,7 +1085,7 @@ static grpc_error *pollset_kick(grpc_pollset *pollset, GPR_UNREACHABLE_CODE(goto done); } - if (specific_worker->kick_state == KICKED) { + if (specific_worker->state == KICKED) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_ERROR, " .. specific worker already kicked"); } diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index 91164de13e..959a8f4b4f 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -96,12 +96,12 @@ static void pg_join(grpc_exec_ctx *exec_ctx, polling_group *pg, * pollable Declarations */ -typedef struct pollable_t { +typedef struct pollable { polling_obj po; int epfd; grpc_wakeup_fd wakeup; grpc_pollset_worker *root_worker; -} pollable_t; +} pollable; static const char *polling_obj_type_string(polling_obj_type t) { switch (t) { @@ -121,7 +121,7 @@ static const char *polling_obj_type_string(polling_obj_type t) { return "<invalid>"; } -static char *pollable_desc(pollable_t *p) { +static char *pollable_desc(pollable *p) { char *out; gpr_asprintf(&out, "type=%s group=%p epfd=%d wakeup=%d", polling_obj_type_string(p->po.type), p->po.group, p->epfd, @@ -129,19 +129,19 @@ static char *pollable_desc(pollable_t *p) { return out; } -static pollable_t g_empty_pollable; +static pollable g_empty_pollable; -static void pollable_init(pollable_t *p, polling_obj_type type); -static void pollable_destroy(pollable_t *p); +static void pollable_init(pollable *p, polling_obj_type type); +static void pollable_destroy(pollable *p); /* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */ -static grpc_error *pollable_materialize(pollable_t *p); +static grpc_error *pollable_materialize(pollable *p); /******************************************************************************* * Fd Declarations */ struct grpc_fd { - pollable_t pollable; + pollable pollable_obj; int fd; /* refst format: bit 0 : 1=Active / 0=Orphaned @@ -192,15 +192,15 @@ struct grpc_pollset_worker { pollset_worker_link links[POLLSET_WORKER_LINK_COUNT]; gpr_cv cv; grpc_pollset *pollset; - pollable_t *pollable; + pollable *pollable_obj; }; #define MAX_EPOLL_EVENTS 100 #define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 5 struct grpc_pollset { - pollable_t pollable; - pollable_t *current_pollable; + pollable pollable_obj; + pollable *current_pollable_obj; int kick_alls_pending; bool kicked_without_poller; grpc_closure *shutdown_closure; @@ -281,7 +281,7 @@ static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_fd *fd = (grpc_fd *)arg; /* Add the fd to the freelist */ grpc_iomgr_unregister_object(&fd->iomgr_object); - pollable_destroy(&fd->pollable); + pollable_destroy(&fd->pollable_obj); gpr_mu_destroy(&fd->orphaned_mu); gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; @@ -342,7 +342,7 @@ static grpc_fd *fd_create(int fd, const char *name) { new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd)); } - pollable_init(&new_fd->pollable, PO_FD); + pollable_init(&new_fd->pollable_obj, PO_FD); gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; @@ -384,7 +384,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, bool is_fd_closed = already_closed; grpc_error *error = GRPC_ERROR_NONE; - gpr_mu_lock(&fd->pollable.po.mu); + gpr_mu_lock(&fd->pollable_obj.po.mu); gpr_mu_lock(&fd->orphaned_mu); fd->on_done_closure = on_done; @@ -410,7 +410,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); gpr_mu_unlock(&fd->orphaned_mu); - gpr_mu_unlock(&fd->pollable.po.mu); + gpr_mu_unlock(&fd->pollable_obj.po.mu); UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */ GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error)); GRPC_ERROR_UNREF(error); @@ -450,13 +450,13 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, * Pollable Definitions */ -static void pollable_init(pollable_t *p, polling_obj_type type) { +static void pollable_init(pollable *p, polling_obj_type type) { po_init(&p->po, type); p->root_worker = NULL; p->epfd = -1; } -static void pollable_destroy(pollable_t *p) { +static void pollable_destroy(pollable *p) { po_destroy(&p->po); if (p->epfd != -1) { close(p->epfd); @@ -465,7 +465,7 @@ static void pollable_destroy(pollable_t *p) { } /* ensure that p->epfd, p->wakeup are initialized; p->po.mu must be held */ -static grpc_error *pollable_materialize(pollable_t *p) { +static grpc_error *pollable_materialize(pollable *p) { if (p->epfd == -1) { int new_epfd = epoll_create1(EPOLL_CLOEXEC); if (new_epfd < 0) { @@ -491,7 +491,7 @@ static grpc_error *pollable_materialize(pollable_t *p) { } /* pollable must be materialized */ -static grpc_error *pollable_add_fd(pollable_t *p, grpc_fd *fd) { +static grpc_error *pollable_add_fd(pollable *p, grpc_fd *fd) { grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "pollable_add_fd"; const int epfd = p->epfd; @@ -556,30 +556,33 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_unused) { grpc_error *error = GRPC_ERROR_NONE; grpc_pollset *pollset = (grpc_pollset *)arg; - gpr_mu_lock(&pollset->pollable.po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); if (pollset->root_worker != NULL) { grpc_pollset_worker *worker = pollset->root_worker; do { - if (worker->pollable != &pollset->pollable) { - gpr_mu_lock(&worker->pollable->po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_lock(&worker->pollable_obj->po.mu); } if (worker->initialized_cv && worker != pollset->root_worker) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kickall_via_cv %p (pollable %p vs %p)", - pollset, worker, &pollset->pollable, worker->pollable); + pollset, worker, &pollset->pollable_obj, + worker->pollable_obj); } worker->kicked = true; gpr_cv_signal(&worker->cv); } else { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p kickall_via_wakeup %p (pollable %p vs %p)", - pollset, worker, &pollset->pollable, worker->pollable); + pollset, worker, &pollset->pollable_obj, + worker->pollable_obj); } - append_error(&error, grpc_wakeup_fd_wakeup(&worker->pollable->wakeup), + append_error(&error, + grpc_wakeup_fd_wakeup(&worker->pollable_obj->wakeup), "pollset_shutdown"); } - if (worker->pollable != &pollset->pollable) { - gpr_mu_unlock(&worker->pollable->po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker->pollable_obj->po.mu); } worker = worker->links[PWL_POLLSET].next; @@ -587,7 +590,7 @@ static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, } pollset->kick_alls_pending--; pollset_maybe_finish_shutdown(exec_ctx, pollset); - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); GRPC_LOG_IF_ERROR("kick_all", error); } @@ -598,7 +601,7 @@ static void pollset_kick_all(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { GRPC_ERROR_NONE); } -static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable_t *p, +static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable *p, grpc_pollset_worker *specific_worker) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, @@ -663,24 +666,24 @@ static grpc_error *pollset_kick_inner(grpc_pollset *pollset, pollable_t *p, /* p->po.mu must be held before calling this function */ static grpc_error *pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { - pollable_t *p = pollset->current_pollable; - if (p != &pollset->pollable) { + pollable *p = pollset->current_pollable_obj; + if (p != &pollset->pollable_obj) { gpr_mu_lock(&p->po.mu); } grpc_error *error = pollset_kick_inner(pollset, p, specific_worker); - if (p != &pollset->pollable) { + if (p != &pollset->pollable_obj) { gpr_mu_unlock(&p->po.mu); } return error; } static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { - pollable_init(&pollset->pollable, PO_POLLSET); - pollset->current_pollable = &g_empty_pollable; + pollable_init(&pollset->pollable_obj, PO_POLLSET); + pollset->current_pollable_obj = &g_empty_pollable; pollset->kicked_without_poller = false; pollset->shutdown_closure = NULL; pollset->root_worker = NULL; - *mu = &pollset->pollable.po.mu; + *mu = &pollset->pollable_obj.po.mu; } static int poll_deadline_to_millis_timeout(grpc_exec_ctx *exec_ctx, @@ -715,8 +718,8 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { static grpc_error *fd_become_pollable_locked(grpc_fd *fd) { grpc_error *error = GRPC_ERROR_NONE; static const char *err_desc = "fd_become_pollable"; - if (append_error(&error, pollable_materialize(&fd->pollable), err_desc)) { - append_error(&error, pollable_add_fd(&fd->pollable, fd), err_desc); + if (append_error(&error, pollable_materialize(&fd->pollable_obj), err_desc)) { + append_error(&error, pollable_add_fd(&fd->pollable_obj, fd), err_desc); } return error; } @@ -730,8 +733,8 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset_maybe_finish_shutdown(exec_ctx, pollset); } -static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable_t *p) { - return p != &g_empty_pollable && p != &pollset->pollable; +static bool pollset_is_pollable_fd(grpc_pollset *pollset, pollable *p) { + return p != &g_empty_pollable && p != &pollset->pollable_obj; } static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, @@ -777,9 +780,9 @@ static grpc_error *pollset_process_events(grpc_exec_ctx *exec_ctx, /* pollset_shutdown is guaranteed to be called before pollset_destroy. */ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - pollable_destroy(&pollset->pollable); - if (pollset_is_pollable_fd(pollset, pollset->current_pollable)) { - UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable, 2, + pollable_destroy(&pollset->pollable_obj); + if (pollset_is_pollable_fd(pollset, pollset->current_pollable_obj)) { + UNREF_BY(exec_ctx, (grpc_fd *)pollset->current_pollable_obj, 2, "pollset_pollable"); } GRPC_LOG_IF_ERROR("pollset_process_events", @@ -787,7 +790,7 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - pollable_t *p, grpc_millis deadline) { + pollable *p, grpc_millis deadline) { int timeout = poll_deadline_to_millis_timeout(exec_ctx, deadline); if (GRPC_TRACER_ON(grpc_polling_trace)) { @@ -869,69 +872,70 @@ static bool begin_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, worker->initialized_cv = false; worker->kicked = false; worker->pollset = pollset; - worker->pollable = pollset->current_pollable; + worker->pollable_obj = pollset->current_pollable_obj; - if (pollset_is_pollable_fd(pollset, worker->pollable)) { - REF_BY((grpc_fd *)worker->pollable, 2, "one_poll"); + if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) { + REF_BY((grpc_fd *)worker->pollable_obj, 2, "one_poll"); } worker_insert(&pollset->root_worker, PWL_POLLSET, worker); - if (!worker_insert(&worker->pollable->root_worker, PWL_POLLABLE, worker)) { + if (!worker_insert(&worker->pollable_obj->root_worker, PWL_POLLABLE, + worker)) { worker->initialized_cv = true; gpr_cv_init(&worker->cv); - if (worker->pollable != &pollset->pollable) { - gpr_mu_unlock(&pollset->pollable.po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&pollset->pollable_obj.po.mu); } if (GRPC_TRACER_ON(grpc_polling_trace) && - worker->pollable->root_worker != worker) { + worker->pollable_obj->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p wait %p w=%p for %dms", pollset, - worker->pollable, worker, + worker->pollable_obj, worker, poll_deadline_to_millis_timeout(exec_ctx, deadline)); } - while (do_poll && worker->pollable->root_worker != worker) { - if (gpr_cv_wait(&worker->cv, &worker->pollable->po.mu, + while (do_poll && worker->pollable_obj->root_worker != worker) { + if (gpr_cv_wait(&worker->cv, &worker->pollable_obj->po.mu, grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p timeout_wait %p w=%p", pollset, - worker->pollable, worker); + worker->pollable_obj, worker); } do_poll = false; } else if (worker->kicked) { if (GRPC_TRACER_ON(grpc_polling_trace)) { - gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset, worker->pollable, - worker); + gpr_log(GPR_DEBUG, "PS:%p wakeup %p w=%p", pollset, + worker->pollable_obj, worker); } do_poll = false; } else if (GRPC_TRACER_ON(grpc_polling_trace) && - worker->pollable->root_worker != worker) { + worker->pollable_obj->root_worker != worker) { gpr_log(GPR_DEBUG, "PS:%p spurious_wakeup %p w=%p", pollset, - worker->pollable, worker); + worker->pollable_obj, worker); } } - if (worker->pollable != &pollset->pollable) { - gpr_mu_unlock(&worker->pollable->po.mu); - gpr_mu_lock(&pollset->pollable.po.mu); - gpr_mu_lock(&worker->pollable->po.mu); + if (worker->pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker->pollable_obj->po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); + gpr_mu_lock(&worker->pollable_obj->po.mu); } grpc_exec_ctx_invalidate_now(exec_ctx); } return do_poll && pollset->shutdown_closure == NULL && - pollset->current_pollable == worker->pollable; + pollset->current_pollable_obj == worker->pollable_obj; } static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, grpc_pollset_worker **worker_hdl) { if (NEW_ROOT == - worker_remove(&worker->pollable->root_worker, PWL_POLLABLE, worker)) { - gpr_cv_signal(&worker->pollable->root_worker->cv); + worker_remove(&worker->pollable_obj->root_worker, PWL_POLLABLE, worker)) { + gpr_cv_signal(&worker->pollable_obj->root_worker->cv); } if (worker->initialized_cv) { gpr_cv_destroy(&worker->cv); } - if (pollset_is_pollable_fd(pollset, worker->pollable)) { - UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable, 2, "one_poll"); + if (pollset_is_pollable_fd(pollset, worker->pollable_obj)) { + UNREF_BY(exec_ctx, (grpc_fd *)worker->pollable_obj, 2, "one_poll"); } if (EMPTIED == worker_remove(&pollset->root_worker, PWL_POLLSET, worker)) { pollset_maybe_finish_shutdown(exec_ctx, pollset); @@ -958,41 +962,41 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->kicked_without_poller = false; return GRPC_ERROR_NONE; } - if (pollset->current_pollable != &pollset->pollable) { - gpr_mu_lock(&pollset->current_pollable->po.mu); + if (pollset->current_pollable_obj != &pollset->pollable_obj) { + gpr_mu_lock(&pollset->current_pollable_obj->po.mu); } if (begin_worker(exec_ctx, pollset, &worker, worker_hdl, deadline)) { gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); GPR_ASSERT(!pollset->shutdown_closure); - append_error(&error, pollable_materialize(worker.pollable), err_desc); - if (worker.pollable != &pollset->pollable) { - gpr_mu_unlock(&worker.pollable->po.mu); + append_error(&error, pollable_materialize(worker.pollable_obj), err_desc); + if (worker.pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker.pollable_obj->po.mu); } - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); if (pollset->event_cursor == pollset->event_count) { - append_error(&error, - pollset_epoll(exec_ctx, pollset, worker.pollable, deadline), + append_error(&error, pollset_epoll(exec_ctx, pollset, worker.pollable_obj, + deadline), err_desc); } append_error(&error, pollset_process_events(exec_ctx, pollset, false), err_desc); - gpr_mu_lock(&pollset->pollable.po.mu); - if (worker.pollable != &pollset->pollable) { - gpr_mu_lock(&worker.pollable->po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); + if (worker.pollable_obj != &pollset->pollable_obj) { + gpr_mu_lock(&worker.pollable_obj->po.mu); } gpr_tls_set(&g_current_thread_pollset, 0); gpr_tls_set(&g_current_thread_worker, 0); pollset_maybe_finish_shutdown(exec_ctx, pollset); } end_worker(exec_ctx, pollset, &worker, worker_hdl); - if (worker.pollable != &pollset->pollable) { - gpr_mu_unlock(&worker.pollable->po.mu); + if (worker.pollable_obj != &pollset->pollable_obj) { + gpr_mu_unlock(&worker.pollable_obj->po.mu); } if (grpc_exec_ctx_has_work(exec_ctx)) { - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->pollable.po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); } return error; } @@ -1009,27 +1013,27 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, bool fd_locked) { static const char *err_desc = "pollset_add_fd"; grpc_error *error = GRPC_ERROR_NONE; - if (pollset->current_pollable == &g_empty_pollable) { + if (pollset->current_pollable_obj == &g_empty_pollable) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from empty to fd", pollset, fd); } - /* empty pollable --> single fd pollable_t */ + /* empty pollable --> single fd pollable */ pollset_kick_all(exec_ctx, pollset); - pollset->current_pollable = &fd->pollable; - if (!fd_locked) gpr_mu_lock(&fd->pollable.po.mu); + pollset->current_pollable_obj = &fd->pollable_obj; + if (!fd_locked) gpr_mu_lock(&fd->pollable_obj.po.mu); append_error(&error, fd_become_pollable_locked(fd), err_desc); - if (!fd_locked) gpr_mu_unlock(&fd->pollable.po.mu); + if (!fd_locked) gpr_mu_unlock(&fd->pollable_obj.po.mu); REF_BY(fd, 2, "pollset_pollable"); - } else if (pollset->current_pollable == &pollset->pollable) { + } else if (pollset->current_pollable_obj == &pollset->pollable_obj) { if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; already multipolling", pollset, fd); } - append_error(&error, pollable_add_fd(pollset->current_pollable, fd), + append_error(&error, pollable_add_fd(pollset->current_pollable_obj, fd), err_desc); - } else if (pollset->current_pollable != &fd->pollable) { - grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable; + } else if (pollset->current_pollable_obj != &fd->pollable_obj) { + grpc_fd *had_fd = (grpc_fd *)pollset->current_pollable_obj; if (GRPC_TRACER_ON(grpc_polling_trace)) { gpr_log(GPR_DEBUG, "PS:%p add fd %p; transition pollable from fd %p to multipoller", @@ -1041,11 +1045,11 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure, "read"); grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure, "write"); pollset_kick_all(exec_ctx, pollset); - pollset->current_pollable = &pollset->pollable; - if (append_error(&error, pollable_materialize(&pollset->pollable), + pollset->current_pollable_obj = &pollset->pollable_obj; + if (append_error(&error, pollable_materialize(&pollset->pollable_obj), err_desc)) { - pollable_add_fd(&pollset->pollable, had_fd); - pollable_add_fd(&pollset->pollable, fd); + pollable_add_fd(&pollset->pollable_obj, had_fd); + pollable_add_fd(&pollset->pollable_obj, fd); } GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(unref_fd_no_longer_poller, had_fd, @@ -1057,9 +1061,9 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { - gpr_mu_lock(&pollset->pollable.po.mu); + gpr_mu_lock(&pollset->pollable_obj.po.mu); grpc_error *error = pollset_add_fd_locked(exec_ctx, pollset, fd, false); - gpr_mu_unlock(&pollset->pollable.po.mu); + gpr_mu_unlock(&pollset->pollable_obj.po.mu); GRPC_LOG_IF_ERROR("pollset_add_fd", error); } @@ -1081,7 +1085,7 @@ static void pollset_set_destroy(grpc_exec_ctx *exec_ctx, static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, grpc_fd *fd) { - po_join(exec_ctx, &pss->po, &fd->pollable.po); + po_join(exec_ctx, &pss->po, &fd->pollable_obj.po); } static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, @@ -1089,7 +1093,7 @@ static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, grpc_pollset *ps) { - po_join(exec_ctx, &pss->po, &ps->pollable.po); + po_join(exec_ctx, &pss->po, &ps->pollable_obj.po); } static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index bf5193de5c..7a5abf4650 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -1528,7 +1528,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { for (i = 0; i < nfds; i++) { fds[i].revents = 0; if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { - idx = FD_TO_IDX(fds[i].fd); + idx = GRPC_FD_TO_IDX(fds[i].fd); fd_cvs[i].cv = &pollcv_cv; fd_cvs[i].prev = NULL; fd_cvs[i].next = g_cvfds.cvfds[idx].cvs; @@ -1591,8 +1591,8 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { idx = 0; for (i = 0; i < nfds; i++) { if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { - remove_cvn(&g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i])); - if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) { + remove_cvn(&g_cvfds.cvfds[GRPC_FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i])); + if (g_cvfds.cvfds[GRPC_FD_TO_IDX(fds[i].fd)].is_set) { fds[i].revents = POLLIN; if (res >= 0) res++; } diff --git a/src/core/lib/iomgr/wakeup_fd_cv.c b/src/core/lib/iomgr/wakeup_fd_cv.c index 5e0b1d1704..268e0175dd 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.c +++ b/src/core/lib/iomgr/wakeup_fd_cv.c @@ -57,7 +57,7 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { g_cvfds.free_fds = g_cvfds.free_fds->next_free; g_cvfds.cvfds[idx].cvs = NULL; g_cvfds.cvfds[idx].is_set = 0; - fd_info->read_fd = IDX_TO_FD(idx); + fd_info->read_fd = GRPC_IDX_TO_FD(idx); fd_info->write_fd = -1; gpr_mu_unlock(&g_cvfds.mu); return GRPC_ERROR_NONE; @@ -66,8 +66,8 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) { cv_node* cvn; gpr_mu_lock(&g_cvfds.mu); - g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 1; - cvn = g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs; + g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].is_set = 1; + cvn = g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].cvs; while (cvn) { gpr_cv_signal(cvn->cv); cvn = cvn->next; @@ -78,7 +78,7 @@ static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) { static grpc_error* cv_fd_consume(grpc_wakeup_fd* fd_info) { gpr_mu_lock(&g_cvfds.mu); - g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].is_set = 0; + g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].is_set = 0; gpr_mu_unlock(&g_cvfds.mu); return GRPC_ERROR_NONE; } @@ -89,9 +89,9 @@ static void cv_fd_destroy(grpc_wakeup_fd* fd_info) { } gpr_mu_lock(&g_cvfds.mu); // Assert that there are no active pollers - GPR_ASSERT(!g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].cvs); - g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)].next_free = g_cvfds.free_fds; - g_cvfds.free_fds = &g_cvfds.cvfds[FD_TO_IDX(fd_info->read_fd)]; + GPR_ASSERT(!g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].cvs); + g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].next_free = g_cvfds.free_fds; + g_cvfds.free_fds = &g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)]; gpr_mu_unlock(&g_cvfds.mu); } diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h index 46e84f5843..dc170ad5b4 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.h +++ b/src/core/lib/iomgr/wakeup_fd_cv.h @@ -37,8 +37,8 @@ #include "src/core/lib/iomgr/ev_posix.h" -#define FD_TO_IDX(fd) (-(fd)-1) -#define IDX_TO_FD(idx) (-(idx)-1) +#define GRPC_FD_TO_IDX(fd) (-(fd)-1) +#define GRPC_IDX_TO_FD(idx) (-(idx)-1) typedef struct cv_node { gpr_cv* cv; diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index 975d599523..3d19605617 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -137,7 +137,7 @@ static void on_peer_checked_inner(grpc_exec_ctx *exec_ctx, // Create zero-copy frame protector, if implemented. tsi_zero_copy_grpc_protector *zero_copy_protector = NULL; tsi_result result = tsi_handshaker_result_create_zero_copy_grpc_protector( - h->handshaker_result, NULL, &zero_copy_protector); + exec_ctx, h->handshaker_result, NULL, &zero_copy_protector); if (result != TSI_OK && result != TSI_UNIMPLEMENTED) { error = grpc_set_tsi_error_result( GRPC_ERROR_CREATE_FROM_STATIC_STRING( diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index ed5138006e..b2146cf211 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -135,7 +135,7 @@ typedef struct batch_control { typedef struct { gpr_mu child_list_mu; grpc_call *first_child; -} parent_call_t; +} parent_call; typedef struct { grpc_call *parent; @@ -144,7 +144,7 @@ typedef struct { parent->mu */ grpc_call *sibling_next; grpc_call *sibling_prev; -} child_call_t; +} child_call; #define RECV_NONE ((gpr_atm)0) #define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1) @@ -157,8 +157,8 @@ struct grpc_call { grpc_polling_entity pollent; grpc_channel *channel; gpr_timespec start_time; - /* parent_call_t* */ gpr_atm parent_call_atm; - child_call_t *child_call; + /* parent_call* */ gpr_atm parent_call_atm; + child_call *child; /* client or server call */ bool is_client; @@ -304,21 +304,21 @@ void *grpc_call_arena_alloc(grpc_call *call, size_t size) { return gpr_arena_alloc(call->arena, size); } -static parent_call_t *get_or_create_parent_call(grpc_call *call) { - parent_call_t *p = (parent_call_t *)gpr_atm_acq_load(&call->parent_call_atm); +static parent_call *get_or_create_parent_call(grpc_call *call) { + parent_call *p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); if (p == NULL) { - p = (parent_call_t *)gpr_arena_alloc(call->arena, sizeof(*p)); + p = (parent_call *)gpr_arena_alloc(call->arena, sizeof(*p)); gpr_mu_init(&p->child_list_mu); if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm)NULL, (gpr_atm)p)) { gpr_mu_destroy(&p->child_list_mu); - p = (parent_call_t *)gpr_atm_acq_load(&call->parent_call_atm); + p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); } } return p; } -static parent_call_t *get_parent_call(grpc_call *call) { - return (parent_call_t *)gpr_atm_acq_load(&call->parent_call_atm); +static parent_call *get_parent_call(grpc_call *call) { + return (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); } grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, @@ -376,21 +376,21 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, bool immediately_cancel = false; - if (args->parent_call != NULL) { - child_call_t *cc = call->child_call = - (child_call_t *)gpr_arena_alloc(arena, sizeof(child_call_t)); - call->child_call->parent = args->parent_call; + if (args->parent != NULL) { + child_call *cc = call->child = + (child_call *)gpr_arena_alloc(arena, sizeof(child_call)); + call->child->parent = args->parent; - GRPC_CALL_INTERNAL_REF(args->parent_call, "child"); + GRPC_CALL_INTERNAL_REF(args->parent, "child"); GPR_ASSERT(call->is_client); - GPR_ASSERT(!args->parent_call->is_client); + GPR_ASSERT(!args->parent->is_client); - parent_call_t *pc = get_or_create_parent_call(args->parent_call); + parent_call *pc = get_or_create_parent_call(args->parent); gpr_mu_lock(&pc->child_list_mu); if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { - send_deadline = GPR_MIN(send_deadline, args->parent_call->send_deadline); + send_deadline = GPR_MIN(send_deadline, args->parent->send_deadline); } /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with * GRPC_PROPAGATE_STATS_CONTEXT */ @@ -402,9 +402,9 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, "Census tracing propagation requested " "without Census context propagation")); } - grpc_call_context_set( - call, GRPC_CONTEXT_TRACING, - args->parent_call->context[GRPC_CONTEXT_TRACING].value, NULL); + grpc_call_context_set(call, GRPC_CONTEXT_TRACING, + args->parent->context[GRPC_CONTEXT_TRACING].value, + NULL); } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) { add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Census context propagation requested " @@ -412,7 +412,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, } if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) { call->cancellation_is_inherited = 1; - if (gpr_atm_acq_load(&args->parent_call->received_final_op_atm)) { + if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) { immediately_cancel = true; } } @@ -422,9 +422,9 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, cc->sibling_next = cc->sibling_prev = call; } else { cc->sibling_next = pc->first_child; - cc->sibling_prev = pc->first_child->child_call->sibling_prev; - cc->sibling_next->child_call->sibling_prev = - cc->sibling_prev->child_call->sibling_next = call; + cc->sibling_prev = pc->first_child->child->sibling_prev; + cc->sibling_next->child->sibling_prev = + cc->sibling_prev->child->sibling_next = call; } gpr_mu_unlock(&pc->child_list_mu); @@ -529,7 +529,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, if (c->receiving_stream != NULL) { grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); } - parent_call_t *pc = get_parent_call(c); + parent_call *pc = get_parent_call(c); if (pc != NULL) { gpr_mu_destroy(&pc->child_list_mu); } @@ -566,14 +566,14 @@ void grpc_call_ref(grpc_call *c) { gpr_ref(&c->ext_ref); } void grpc_call_unref(grpc_call *c) { if (!gpr_unref(&c->ext_ref)) return; - child_call_t *cc = c->child_call; + child_call *cc = c->child; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_call_unref", 0); GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c)); if (cc) { - parent_call_t *pc = get_parent_call(cc->parent); + parent_call *pc = get_parent_call(cc->parent); gpr_mu_lock(&pc->child_list_mu); if (c == pc->first_child) { pc->first_child = cc->sibling_next; @@ -581,8 +581,8 @@ void grpc_call_unref(grpc_call *c) { pc->first_child = NULL; } } - cc->sibling_prev->child_call->sibling_next = cc->sibling_next; - cc->sibling_next->child_call->sibling_prev = cc->sibling_prev; + cc->sibling_prev->child->sibling_next = cc->sibling_next; + cc->sibling_next->child->sibling_prev = cc->sibling_prev; gpr_mu_unlock(&pc->child_list_mu); GRPC_CALL_INTERNAL_UNREF(&exec_ctx, cc->parent, "child"); } @@ -1310,14 +1310,14 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, /* propagate cancellation to any interested children */ gpr_atm_rel_store(&call->received_final_op_atm, 1); - parent_call_t *pc = get_parent_call(call); + parent_call *pc = get_parent_call(call); if (pc != NULL) { grpc_call *child; gpr_mu_lock(&pc->child_list_mu); child = pc->first_child; if (child != NULL) { do { - next_child_call = child->child_call->sibling_next; + next_child_call = child->child->sibling_next; if (child->cancellation_is_inherited) { GRPC_CALL_INTERNAL_REF(child, "propagate_cancel"); cancel_with_error(exec_ctx, child, STATUS_FROM_API_OVERRIDE, diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index be362c8000..27c2f5243c 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -37,7 +37,7 @@ typedef void (*grpc_ioreq_completion_func)(grpc_exec_ctx *exec_ctx, typedef struct grpc_call_create_args { grpc_channel *channel; - grpc_call *parent_call; + grpc_call *parent; uint32_t propagation_mask; grpc_completion_queue *cq; diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 89760291c2..0b5ca17cc4 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -27,6 +27,7 @@ #include <grpc/support/string_util.h> #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/string.h" @@ -77,6 +78,11 @@ grpc_channel *grpc_channel_create_with_builder( grpc_channel_args *args = grpc_channel_args_copy( grpc_channel_stack_builder_get_channel_arguments(builder)); grpc_channel *channel; + if (channel_stack_type == GRPC_SERVER_CHANNEL) { + GRPC_STATS_INC_SERVER_CHANNELS_CREATED(exec_ctx); + } else { + GRPC_STATS_INC_CLIENT_CHANNELS_CREATED(exec_ctx); + } grpc_error *error = grpc_channel_stack_builder_finish( exec_ctx, builder, sizeof(grpc_channel), 1, destroy_channel, NULL, (void **)&channel); @@ -276,7 +282,7 @@ static grpc_call *grpc_channel_create_call_internal( grpc_call_create_args args; memset(&args, 0, sizeof(args)); args.channel = channel; - args.parent_call = parent_call; + args.parent = parent_call; args.propagation_mask = propagation_mask; args.cq = cq; args.pollset_set_alternative = pollset_set_alternative; diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index b0da5f2364..5335cf111e 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -26,6 +26,7 @@ #include <grpc/support/string_util.h> #include <grpc/support/time.h> +#include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" @@ -421,6 +422,10 @@ grpc_completion_queue *grpc_completion_queue_create_internal( const cq_poller_vtable *poller_vtable = &g_poller_vtable_by_poller_type[polling_type]; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_STATS_INC_CQS_CREATED(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); + cq = (grpc_completion_queue *)gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size + poller_vtable->size()); @@ -576,12 +581,12 @@ static bool atm_inc_if_nonzero(gpr_atm *counter) { } static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { - cq_next_data *cqd = DATA_FROM_CQ(cq); + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); return atm_inc_if_nonzero(&cqd->pending_events); } static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); return atm_inc_if_nonzero(&cqd->pending_events); } @@ -626,7 +631,7 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, } } - cq_next_data *cqd = DATA_FROM_CQ(cq); + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); storage->tag = tag; @@ -687,7 +692,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx, void *done_arg, grpc_cq_completion *storage), void *done_arg, grpc_cq_completion *storage) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0); @@ -770,7 +775,7 @@ typedef struct { static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = (cq_is_finished_arg *)arg; grpc_completion_queue *cq = a->cq; - cq_next_data *cqd = DATA_FROM_CQ(cq); + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = @@ -819,7 +824,7 @@ static void dump_pending_tags(grpc_completion_queue *cq) {} static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) { grpc_event ret; - cq_next_data *cqd = DATA_FROM_CQ(cq); + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); @@ -950,7 +955,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, this function */ static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { - cq_next_data *cqd = DATA_FROM_CQ(cq); + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0); @@ -961,7 +966,7 @@ static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { - cq_next_data *cqd = DATA_FROM_CQ(cq); + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); /* Need an extra ref for cq here because: * We call cq_finish_shutdown_next() below, that would call pollset shutdown. @@ -991,7 +996,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, static int add_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { return 0; } @@ -1003,7 +1008,7 @@ static int add_plucker(grpc_completion_queue *cq, void *tag, static void del_plucker(grpc_completion_queue *cq, void *tag, grpc_pollset_worker **worker) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); for (int i = 0; i < cqd->num_pluckers; i++) { if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) { cqd->num_pluckers--; @@ -1017,7 +1022,7 @@ static void del_plucker(grpc_completion_queue *cq, void *tag, static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { cq_is_finished_arg *a = (cq_is_finished_arg *)arg; grpc_completion_queue *cq = a->cq; - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); GPR_ASSERT(a->stolen_completion == NULL); gpr_atm current_last_seen_things_queued_ever = @@ -1052,7 +1057,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag, grpc_cq_completion *c; grpc_cq_completion *prev; grpc_pollset_worker *worker = NULL; - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); @@ -1174,7 +1179,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); @@ -1188,7 +1193,7 @@ static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, * merging them is a bit tricky and probably not worth it */ static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq) { - cq_pluck_data *cqd = DATA_FROM_CQ(cq); + cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq); /* Need an extra ref for cq here because: * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown. diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 280315036f..b089da2c54 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -36,6 +36,7 @@ #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/resource_quota.h" +#include "src/core/lib/iomgr/timer_manager.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/alarm_internal.h" @@ -179,14 +180,16 @@ void grpc_shutdown(void) { GRPC_EXEC_CTX_INITIALIZER(0, grpc_never_ready_to_finish, NULL); gpr_mu_lock(&g_init_mu); if (--g_initializations == 0) { - grpc_iomgr_shutdown(&exec_ctx); - gpr_timers_global_destroy(); - grpc_tracer_shutdown(); + grpc_executor_shutdown(&exec_ctx); + grpc_timer_manager_set_threading(false); // shutdown timer_manager thread for (i = g_number_of_plugins; i >= 0; i--) { if (g_all_of_the_plugins[i].destroy != NULL) { g_all_of_the_plugins[i].destroy(); } } + grpc_iomgr_shutdown(&exec_ctx); + gpr_timers_global_destroy(); + grpc_tracer_shutdown(); grpc_mdctx_global_shutdown(&exec_ctx); grpc_handshaker_factory_registry_shutdown(&exec_ctx); grpc_slice_intern_shutdown(); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 64e5ce78e7..1d3980eb23 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -76,7 +76,7 @@ typedef struct requested_call { grpc_call_details *details; } batch; struct { - registered_method *registered_method; + registered_method *method; gpr_timespec *deadline; grpc_byte_buffer **optional_payload; } registered; @@ -145,7 +145,7 @@ struct call_data { uint32_t recv_initial_metadata_flags; grpc_metadata_array initial_metadata; - request_matcher *request_matcher; + request_matcher *matcher; grpc_byte_buffer *payload; grpc_closure got_initial_metadata; @@ -171,7 +171,7 @@ struct registered_method { grpc_server_register_method_payload_handling payload_handling; uint32_t flags; /* one request matcher per method */ - request_matcher request_matcher; + request_matcher matcher; registered_method *next; }; @@ -334,7 +334,7 @@ static void request_matcher_destroy(request_matcher *rm) { static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, grpc_error *error) { - grpc_call_unref(grpc_call_from_top_element(elem)); + grpc_call_unref(grpc_call_from_top_element((grpc_call_element *)elem)); } static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, @@ -387,7 +387,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { while ((rm = server->registered_methods) != NULL) { server->registered_methods = rm->next; if (server->started) { - request_matcher_destroy(&rm->request_matcher); + request_matcher_destroy(&rm->matcher); } gpr_free(rm->method); gpr_free(rm->host); @@ -521,7 +521,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, grpc_call_element *call_elem = (grpc_call_element *)arg; call_data *calld = (call_data *)call_elem->call_data; channel_data *chand = (channel_data *)call_elem->channel_data; - request_matcher *rm = calld->request_matcher; + request_matcher *rm = calld->matcher; grpc_server *server = rm->server; if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) { @@ -585,7 +585,7 @@ static void finish_start_new_rpc( return; } - calld->request_matcher = rm; + calld->matcher = rm; switch (payload_handling) { case GRPC_SRM_PAYLOAD_NONE: @@ -631,7 +631,7 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { continue; } finish_start_new_rpc(exec_ctx, server, elem, - &rm->server_registered_method->request_matcher, + &rm->server_registered_method->matcher, rm->server_registered_method->payload_handling); return; } @@ -649,7 +649,7 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { continue; } finish_start_new_rpc(exec_ctx, server, elem, - &rm->server_registered_method->request_matcher, + &rm->server_registered_method->matcher, rm->server_registered_method->payload_handling); return; } @@ -670,7 +670,7 @@ static int num_listeners(grpc_server *server) { static void done_shutdown_event(grpc_exec_ctx *exec_ctx, void *server, grpc_cq_completion *completion) { - server_unref(exec_ctx, server); + server_unref(exec_ctx, (grpc_server *)server); } static int num_channels(grpc_server *server) { @@ -693,9 +693,9 @@ static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx, exec_ctx, &server->unregistered_request_matcher); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher, + request_matcher_kill_requests(exec_ctx, server, &rm->matcher, GRPC_ERROR_REF(error)); - request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher); + request_matcher_zombify_all_pending_calls(exec_ctx, &rm->matcher); } } GRPC_ERROR_UNREF(error); @@ -1116,7 +1116,7 @@ void grpc_server_start(grpc_server *server) { request_matcher_init(&server->unregistered_request_matcher, (size_t)server->max_requested_calls_per_cq, server); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_init(&rm->request_matcher, + request_matcher_init(&rm->matcher, (size_t)server->max_requested_calls_per_cq, server); } @@ -1269,8 +1269,9 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* stay locked, and gather up some stuff to do */ GPR_ASSERT(grpc_cq_begin_op(cq, tag)); if (server->shutdown_published) { - grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, - NULL, gpr_malloc(sizeof(grpc_cq_completion))); + grpc_cq_end_op( + &exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, NULL, + (grpc_cq_completion *)gpr_malloc(sizeof(grpc_cq_completion))); gpr_mu_unlock(&server->mu_global); goto done; } @@ -1392,7 +1393,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, rm = &server->unregistered_request_matcher; break; case REGISTERED_CALL: - rm = &rc->data.registered.registered_method->request_matcher; + rm = &rc->data.registered.method->matcher; break; } server->requested_calls_per_cq[cq_idx][request_id] = *rc; @@ -1521,7 +1522,7 @@ grpc_call_error grpc_server_request_registered_call( rc->tag = tag; rc->cq_bound_to_call = cq_bound_to_call; rc->call = call; - rc->data.registered.registered_method = rm; + rc->data.registered.method = rm; rc->data.registered.deadline = deadline; rc->initial_metadata = initial_metadata; rc->data.registered.optional_payload = optional_payload; diff --git a/src/core/lib/transport/metadata_batch.c b/src/core/lib/transport/metadata_batch.c index b7c46e09bb..2df9c9189c 100644 --- a/src/core/lib/transport/metadata_batch.c +++ b/src/core/lib/transport/metadata_batch.c @@ -233,32 +233,32 @@ void grpc_metadata_batch_remove(grpc_exec_ctx *exec_ctx, void grpc_metadata_batch_set_value(grpc_exec_ctx *exec_ctx, grpc_linked_mdelem *storage, grpc_slice value) { - grpc_mdelem old = storage->md; - grpc_mdelem new = grpc_mdelem_from_slices( - exec_ctx, grpc_slice_ref_internal(GRPC_MDKEY(old)), value); - storage->md = new; - GRPC_MDELEM_UNREF(exec_ctx, old); + grpc_mdelem old_mdelem = storage->md; + grpc_mdelem new_mdelem = grpc_mdelem_from_slices( + exec_ctx, grpc_slice_ref_internal(GRPC_MDKEY(old_mdelem)), value); + storage->md = new_mdelem; + GRPC_MDELEM_UNREF(exec_ctx, old_mdelem); } grpc_error *grpc_metadata_batch_substitute(grpc_exec_ctx *exec_ctx, grpc_metadata_batch *batch, grpc_linked_mdelem *storage, - grpc_mdelem new) { + grpc_mdelem new_mdelem) { assert_valid_callouts(exec_ctx, batch); grpc_error *error = GRPC_ERROR_NONE; - grpc_mdelem old = storage->md; - if (!grpc_slice_eq(GRPC_MDKEY(new), GRPC_MDKEY(old))) { + grpc_mdelem old_mdelem = storage->md; + if (!grpc_slice_eq(GRPC_MDKEY(new_mdelem), GRPC_MDKEY(old_mdelem))) { maybe_unlink_callout(batch, storage); - storage->md = new; + storage->md = new_mdelem; error = maybe_link_callout(batch, storage); if (error != GRPC_ERROR_NONE) { unlink_storage(&batch->list, storage); GRPC_MDELEM_UNREF(exec_ctx, storage->md); } } else { - storage->md = new; + storage->md = new_mdelem; } - GRPC_MDELEM_UNREF(exec_ctx, old); + GRPC_MDELEM_UNREF(exec_ctx, old_mdelem); assert_valid_callouts(exec_ctx, batch); return error; } @@ -300,12 +300,12 @@ grpc_error *grpc_metadata_batch_filter(grpc_exec_ctx *exec_ctx, grpc_error *error = GRPC_ERROR_NONE; while (l) { grpc_linked_mdelem *next = l->next; - grpc_filtered_mdelem new = func(exec_ctx, user_data, l->md); - add_error(&error, new.error, composite_error_string); - if (GRPC_MDISNULL(new.md)) { + grpc_filtered_mdelem new_mdelem = func(exec_ctx, user_data, l->md); + add_error(&error, new_mdelem.error, composite_error_string); + if (GRPC_MDISNULL(new_mdelem.md)) { grpc_metadata_batch_remove(exec_ctx, batch, l); - } else if (new.md.payload != l->md.payload) { - grpc_metadata_batch_substitute(exec_ctx, batch, l, new.md); + } else if (new_mdelem.md.payload != l->md.payload) { + grpc_metadata_batch_substitute(exec_ctx, batch, l, new_mdelem.md); } l = next; } diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index caa11a956e..ae705195f3 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -102,8 +102,9 @@ static void slice_stream_unref(grpc_exec_ctx *exec_ctx, void *p) { grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount *refcount, void *buffer, size_t length) { slice_stream_ref(&refcount->slice_refcount); - return (grpc_slice){.refcount = &refcount->slice_refcount, - .data.refcounted = {.bytes = buffer, .length = length}}; + return (grpc_slice){ + .refcount = &refcount->slice_refcount, + .data.refcounted = {.bytes = (uint8_t *)buffer, .length = length}}; } static const grpc_slice_refcount_vtable stream_ref_slice_vtable = { diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c index e7b3be3d86..64043fea08 100644 --- a/src/core/tsi/fake_transport_security.c +++ b/src/core/tsi/fake_transport_security.c @@ -493,7 +493,8 @@ static tsi_result fake_handshaker_result_extract_peer( } static tsi_result fake_handshaker_result_create_zero_copy_grpc_protector( - const tsi_handshaker_result *self, size_t *max_output_protected_frame_size, + void *exec_ctx, const tsi_handshaker_result *self, + size_t *max_output_protected_frame_size, tsi_zero_copy_grpc_protector **protector) { *protector = tsi_create_fake_zero_copy_grpc_protector(max_output_protected_frame_size); diff --git a/src/core/tsi/transport_security.h b/src/core/tsi/transport_security.h index b0d7039850..3bba38149c 100644 --- a/src/core/tsi/transport_security.h +++ b/src/core/tsi/transport_security.h @@ -84,11 +84,17 @@ struct tsi_handshaker { }; /* Base for tsi_handshaker_result implementations. - See transport_security_interface.h for documentation. */ + See transport_security_interface.h for documentation. + The exec_ctx parameter in create_zero_copy_grpc_protector is supposed to be + of type grpc_exec_ctx*, but we're using void* instead to avoid making the TSI + API depend on grpc. The create_zero_copy_grpc_protector() method is only used + in grpc, where we do need the exec_ctx passed through, but the API still + needs to compile in other applications, where grpc_exec_ctx is not defined. +*/ typedef struct { tsi_result (*extract_peer)(const tsi_handshaker_result *self, tsi_peer *peer); tsi_result (*create_zero_copy_grpc_protector)( - const tsi_handshaker_result *self, + void *exec_ctx, const tsi_handshaker_result *self, size_t *max_output_protected_frame_size, tsi_zero_copy_grpc_protector **protector); tsi_result (*create_frame_protector)(const tsi_handshaker_result *self, diff --git a/src/core/tsi/transport_security_grpc.c b/src/core/tsi/transport_security_grpc.c index 773b35e717..affd995230 100644 --- a/src/core/tsi/transport_security_grpc.c +++ b/src/core/tsi/transport_security_grpc.c @@ -20,16 +20,18 @@ /* This method creates a tsi_zero_copy_grpc_protector object. */ tsi_result tsi_handshaker_result_create_zero_copy_grpc_protector( - const tsi_handshaker_result *self, size_t *max_output_protected_frame_size, + grpc_exec_ctx *exec_ctx, const tsi_handshaker_result *self, + size_t *max_output_protected_frame_size, tsi_zero_copy_grpc_protector **protector) { - if (self == NULL || self->vtable == NULL || protector == NULL) { + if (exec_ctx == NULL || self == NULL || self->vtable == NULL || + protector == NULL) { return TSI_INVALID_ARGUMENT; } if (self->vtable->create_zero_copy_grpc_protector == NULL) { return TSI_UNIMPLEMENTED; } return self->vtable->create_zero_copy_grpc_protector( - self, max_output_protected_frame_size, protector); + exec_ctx, self, max_output_protected_frame_size, protector); } /* --- tsi_zero_copy_grpc_protector common implementation. --- diff --git a/src/core/tsi/transport_security_grpc.h b/src/core/tsi/transport_security_grpc.h index 375a758888..ca6755c12f 100644 --- a/src/core/tsi/transport_security_grpc.h +++ b/src/core/tsi/transport_security_grpc.h @@ -30,7 +30,8 @@ extern "C" { assuming there is no fatal error. The caller is responsible for destroying the protector. */ tsi_result tsi_handshaker_result_create_zero_copy_grpc_protector( - const tsi_handshaker_result *self, size_t *max_output_protected_frame_size, + grpc_exec_ctx *exec_ctx, const tsi_handshaker_result *self, + size_t *max_output_protected_frame_size, tsi_zero_copy_grpc_protector **protector); /* -- tsi_zero_copy_grpc_protector object -- */ |