diff options
author | 2017-09-25 12:38:17 -0700 | |
---|---|---|
committer | 2017-09-25 12:38:17 -0700 | |
commit | 1c285b98122ef8fe31aa3325a4c10f5b05107ca8 (patch) | |
tree | f811ca6879a76bb526a7af7a35e987780d1bcba8 /src/core/ext | |
parent | d4bb9bddd6cd1bfa4829deee9c3163c6065dafcb (diff) | |
parent | 008a173a7e2ba1d5c0933aa7a77395945ba2024d (diff) |
Merge remote-tracking branch 'upstream/master' into fix-stream-compression-config-interface
Diffstat (limited to 'src/core/ext')
45 files changed, 1120 insertions, 742 deletions
diff --git a/src/core/ext/census/base_resources.c b/src/core/ext/census/base_resources.c index 2114bf04cd..1f2bb39fe0 100644 --- a/src/core/ext/census/base_resources.c +++ b/src/core/ext/census/base_resources.c @@ -37,20 +37,20 @@ void define_base_resources() { google_census_Resource_BasicUnit numerator = google_census_Resource_BasicUnit_SECS; - resource r = {"client_rpc_latency", // name - "Client RPC latency in seconds", // description - 0, // prefix - 1, // n_numerators - &numerator, // numerators - 0, // n_denominators - NULL}; // denominators + resource r = {(char *)"client_rpc_latency", // name + (char *)"Client RPC latency in seconds", // description + 0, // prefix + 1, // n_numerators + &numerator, // numerators + 0, // n_denominators + NULL}; // denominators define_resource(&r); - r = (resource){"server_rpc_latency", // name - "Server RPC latency in seconds", // description - 0, // prefix - 1, // n_numerators - &numerator, // numerators - 0, // n_denominators - NULL}; // denominators + r = (resource){(char *)"server_rpc_latency", // name + (char *)"Server RPC latency in seconds", // description + 0, // prefix + 1, // n_numerators + &numerator, // numerators + 0, // n_denominators + NULL}; // denominators define_resource(&r); } diff --git a/src/core/ext/filters/client_channel/channel_connectivity.c b/src/core/ext/filters/client_channel/channel_connectivity.c index e5f6fa76ae..3844b98021 100644 --- a/src/core/ext/filters/client_channel/channel_connectivity.c +++ b/src/core/ext/filters/client_channel/channel_connectivity.c @@ -86,7 +86,7 @@ static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) { static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, grpc_cq_completion *ignored) { - int delete = 0; + bool should_delete = false; state_watcher *w = (state_watcher *)pw; gpr_mu_lock(&w->mu); switch (w->phase) { @@ -94,12 +94,12 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, case READY_TO_CALL_BACK: GPR_UNREACHABLE_CODE(return ); case CALLING_BACK_AND_FINISHED: - delete = 1; + should_delete = true; break; } gpr_mu_unlock(&w->mu); - if (delete) { + if (should_delete) { delete_state_watcher(exec_ctx, w); } } @@ -161,12 +161,12 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, grpc_error *error) { - partly_done(exec_ctx, pw, true, GRPC_ERROR_REF(error)); + partly_done(exec_ctx, (state_watcher *)pw, true, GRPC_ERROR_REF(error)); } static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, grpc_error *error) { - partly_done(exec_ctx, pw, false, GRPC_ERROR_REF(error)); + partly_done(exec_ctx, (state_watcher *)pw, false, GRPC_ERROR_REF(error)); } int grpc_channel_num_external_connectivity_watchers(grpc_channel *channel) { diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index dad5c4fce5..016199b1f4 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -85,7 +85,7 @@ static void method_parameters_unref(method_parameters *method_params) { } static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) { - method_parameters_unref(value); + method_parameters_unref((method_parameters *)value); } static bool parse_wait_for_ready(grpc_json *field, @@ -375,7 +375,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, } // Extract the following fields from the resolver result, if non-NULL. bool lb_policy_updated = false; - char *lb_policy_name = NULL; + char *lb_policy_name_dup = NULL; bool lb_policy_name_changed = false; grpc_lb_policy *new_lb_policy = NULL; char *service_config_json = NULL; @@ -383,6 +383,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_slice_hash_table *method_params_table = NULL; if (chand->resolver_result != NULL) { // Find LB policy name. + const char *lb_policy_name = NULL; const grpc_arg *channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME); if (channel_arg != NULL) { @@ -473,7 +474,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, // Before we clean up, save a copy of lb_policy_name, since it might // be pointing to data inside chand->resolver_result. // The copy will be saved in chand->lb_policy_name below. - lb_policy_name = gpr_strdup(lb_policy_name); + lb_policy_name_dup = gpr_strdup(lb_policy_name); grpc_channel_args_destroy(exec_ctx, chand->resolver_result); chand->resolver_result = NULL; } @@ -481,8 +482,8 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, gpr_log(GPR_DEBUG, "chand=%p: resolver result: lb_policy_name=\"%s\"%s, " "service_config=\"%s\"", - chand, lb_policy_name, lb_policy_name_changed ? " (changed)" : "", - service_config_json); + chand, lb_policy_name_dup, + lb_policy_name_changed ? " (changed)" : "", service_config_json); } // Now swap out fields in chand. Note that the new values may still // be NULL if (e.g.) the resolver failed to return results or the @@ -490,9 +491,9 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, // // First, swap out the data used by cc_get_channel_info(). gpr_mu_lock(&chand->info_mu); - if (lb_policy_name != NULL) { + if (lb_policy_name_dup != NULL) { gpr_free(chand->info_lb_policy_name); - chand->info_lb_policy_name = lb_policy_name; + chand->info_lb_policy_name = lb_policy_name_dup; } if (service_config_json != NULL) { gpr_free(chand->info_service_config_json); @@ -717,7 +718,8 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "client channel factory arg must be a pointer"); } - grpc_client_channel_factory_ref(arg->value.pointer.p); + grpc_client_channel_factory_ref( + (grpc_client_channel_factory *)arg->value.pointer.p); chand->client_channel_factory = (grpc_client_channel_factory *)arg->value.pointer.p; // Get server name to resolve, using proxy mapper if needed. @@ -1016,13 +1018,11 @@ static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } -static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_error *error) { +// Invoked when a pick is completed, on both success or failure. +static void pick_done_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_error *error) { call_data *calld = (call_data *)elem->call_data; channel_data *chand = (channel_data *)elem->channel_data; - grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, - chand->interested_parties); if (calld->connected_subchannel == NULL) { // Failed to create subchannel. GRPC_ERROR_UNREF(calld->error); @@ -1044,12 +1044,116 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } -/** Return true if subchannel is available immediately (in which case - subchannel_ready_locked() should not be called), or false otherwise (in - which case subchannel_ready_locked() should be called when the subchannel - is available). */ -static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem); +// A wrapper around pick_done_locked() that is used in cases where +// either (a) the pick was deferred pending a resolver result or (b) the +// pick was done asynchronously. Removes the call's polling entity from +// chand->interested_parties before invoking pick_done_locked(). +static void async_pick_done_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, grpc_error *error) { + channel_data *chand = (channel_data *)elem->channel_data; + call_data *calld = (call_data *)elem->call_data; + grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, + chand->interested_parties); + pick_done_locked(exec_ctx, elem, error); +} + +// Note: This runs under the client_channel combiner, but will NOT be +// holding the call combiner. +static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = (grpc_call_element *)arg; + channel_data *chand = (channel_data *)elem->channel_data; + call_data *calld = (call_data *)elem->call_data; + if (calld->lb_policy != NULL) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", + chand, calld, calld->lb_policy); + } + grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy, + &calld->connected_subchannel, + GRPC_ERROR_REF(error)); + } + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_callback_cancel"); +} + +// Callback invoked by grpc_lb_policy_pick_locked() for async picks. +// Unrefs the LB policy and invokes async_pick_done_locked(). +static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = (grpc_call_element *)arg; + channel_data *chand = (channel_data *)elem->channel_data; + call_data *calld = (call_data *)elem->call_data; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously", + chand, calld); + } + GPR_ASSERT(calld->lb_policy != NULL); + GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel"); + calld->lb_policy = NULL; + async_pick_done_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); +} + +// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked(). +// If the pick was completed synchronously, unrefs the LB policy and +// returns true. +static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + channel_data *chand = (channel_data *)elem->channel_data; + call_data *calld = (call_data *)elem->call_data; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p", + chand, calld, chand->lb_policy); + } + apply_service_config_to_call_locked(exec_ctx, elem); + // If the application explicitly set wait_for_ready, use that. + // Otherwise, if the service config specified a value for this + // method, use that. + uint32_t initial_metadata_flags = + calld->initial_metadata_batch->payload->send_initial_metadata + .send_initial_metadata_flags; + const bool wait_for_ready_set_from_api = + initial_metadata_flags & + GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; + const bool wait_for_ready_set_from_service_config = + calld->method_params != NULL && + calld->method_params->wait_for_ready != WAIT_FOR_READY_UNSET; + if (!wait_for_ready_set_from_api && wait_for_ready_set_from_service_config) { + if (calld->method_params->wait_for_ready == WAIT_FOR_READY_TRUE) { + initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; + } else { + initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; + } + } + const grpc_lb_policy_pick_args inputs = { + calld->initial_metadata_batch->payload->send_initial_metadata + .send_initial_metadata, + initial_metadata_flags, &calld->lb_token_mdelem}; + // Keep a ref to the LB policy in calld while the pick is pending. + GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel"); + calld->lb_policy = chand->lb_policy; + GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem, + grpc_combiner_scheduler(chand->combiner)); + const bool pick_done = grpc_lb_policy_pick_locked( + exec_ctx, chand->lb_policy, &inputs, &calld->connected_subchannel, + calld->subchannel_call_context, NULL, &calld->lb_pick_closure); + if (pick_done) { + /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously", + chand, calld); + } + GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel"); + calld->lb_policy = NULL; + } else { + GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); + grpc_call_combiner_set_notify_on_cancel( + exec_ctx, calld->call_combiner, + GRPC_CLOSURE_INIT(&calld->lb_pick_cancel_closure, + pick_callback_cancel_locked, elem, + grpc_combiner_scheduler(chand->combiner))); + } + return pick_done; +} typedef struct { grpc_call_element *elem; @@ -1069,17 +1173,17 @@ static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx, gpr_free(args); return; } - args->finished = true; - grpc_call_element *elem = args->elem; - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; // If we don't yet have a resolver result, then a closure for // pick_after_resolver_result_done_locked() will have been added to // chand->waiting_for_resolver_result_closures, and it may not be invoked // until after this call has been destroyed. We mark the operation as // finished, so that when pick_after_resolver_result_done_locked() // is called, it will be a no-op. We also immediately invoke - // subchannel_ready_locked() to propagate the error back to the caller. + // async_pick_done_locked() to propagate the error back to the caller. + args->finished = true; + grpc_call_element *elem = args->elem; + channel_data *chand = (channel_data *)elem->channel_data; + call_data *calld = (call_data *)elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick waiting for resolver result", @@ -1087,12 +1191,12 @@ static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx, } // Note: Although we are not in the call combiner here, we are // basically stealing the call combiner from the pending pick, so - // it's safe to call subchannel_ready_locked() here -- we are + // it's safe to call async_pick_done_locked() here -- we are // essentially calling it here instead of calling it in // pick_after_resolver_result_done_locked(). - subchannel_ready_locked(exec_ctx, elem, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick cancelled", &error, 1)); + async_pick_done_locked(exec_ctx, elem, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Pick cancelled", &error, 1)); } static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, @@ -1117,14 +1221,19 @@ static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data", chand, calld); } - subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); + async_pick_done_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); } else { if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick", chand, calld); } - if (pick_subchannel_locked(exec_ctx, elem)) { - subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_NONE); + if (pick_callback_start_locked(exec_ctx, elem)) { + // Even if the LB policy returns a result synchronously, we have + // already added our polling entity to chand->interested_parties + // in order to wait for the resolver result, so we need to + // remove it here. Therefore, we call async_pick_done_locked() + // instead of pick_done_locked(). + async_pick_done_locked(exec_ctx, elem, GRPC_ERROR_NONE); } } } @@ -1152,154 +1261,38 @@ static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx, grpc_combiner_scheduler(chand->combiner))); } -// Note: This runs under the client_channel combiner, but will NOT be -// holding the call combiner. -static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_call_element *elem = (grpc_call_element *)arg; - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; - if (error != GRPC_ERROR_NONE && calld->lb_policy != NULL) { - if (GRPC_TRACER_ON(grpc_client_channel_trace)) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", - chand, calld, calld->lb_policy); - } - grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy, - &calld->connected_subchannel, - GRPC_ERROR_REF(error)); - } - GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_callback_cancel"); -} - -// Callback invoked by grpc_lb_policy_pick_locked() for async picks. -// Unrefs the LB policy and invokes subchannel_ready_locked(). -static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void start_pick_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *ignored) { grpc_call_element *elem = (grpc_call_element *)arg; - channel_data *chand = (channel_data *)elem->channel_data; call_data *calld = (call_data *)elem->call_data; - if (GRPC_TRACER_ON(grpc_client_channel_trace)) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously", - chand, calld); - } - GPR_ASSERT(calld->lb_policy != NULL); - GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel"); - calld->lb_policy = NULL; - subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); -} - -// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked(). -// If the pick was completed synchronously, unrefs the LB policy and -// returns true. -static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - const grpc_lb_policy_pick_args *inputs) { channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; - if (GRPC_TRACER_ON(grpc_client_channel_trace)) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p", - chand, calld, chand->lb_policy); - } - // Keep a ref to the LB policy in calld while the pick is pending. - GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel"); - calld->lb_policy = chand->lb_policy; - GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem, - grpc_combiner_scheduler(chand->combiner)); - const bool pick_done = grpc_lb_policy_pick_locked( - exec_ctx, chand->lb_policy, inputs, &calld->connected_subchannel, - calld->subchannel_call_context, NULL, &calld->lb_pick_closure); - if (pick_done) { - /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ - if (GRPC_TRACER_ON(grpc_client_channel_trace)) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously", - chand, calld); + GPR_ASSERT(calld->connected_subchannel == NULL); + if (chand->lb_policy != NULL) { + // We already have an LB policy, so ask it for a pick. + if (pick_callback_start_locked(exec_ctx, elem)) { + // Pick completed synchronously. + pick_done_locked(exec_ctx, elem, GRPC_ERROR_NONE); + return; } - GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel"); - calld->lb_policy = NULL; } else { - GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); - grpc_call_combiner_set_notify_on_cancel( - exec_ctx, calld->call_combiner, - GRPC_CLOSURE_INIT(&calld->lb_pick_cancel_closure, - pick_callback_cancel_locked, elem, - grpc_combiner_scheduler(chand->combiner))); - } - return pick_done; -} - -static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - GPR_TIMER_BEGIN("pick_subchannel", 0); - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; - bool pick_done = false; - if (chand->lb_policy != NULL) { - apply_service_config_to_call_locked(exec_ctx, elem); - // If the application explicitly set wait_for_ready, use that. - // Otherwise, if the service config specified a value for this - // method, use that. - uint32_t initial_metadata_flags = - calld->initial_metadata_batch->payload->send_initial_metadata - .send_initial_metadata_flags; - const bool wait_for_ready_set_from_api = - initial_metadata_flags & - GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; - const bool wait_for_ready_set_from_service_config = - calld->method_params != NULL && - calld->method_params->wait_for_ready != WAIT_FOR_READY_UNSET; - if (!wait_for_ready_set_from_api && - wait_for_ready_set_from_service_config) { - if (calld->method_params->wait_for_ready == WAIT_FOR_READY_TRUE) { - initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; - } else { - initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; - } + // We do not yet have an LB policy, so wait for a resolver result. + if (chand->resolver == NULL) { + pick_done_locked(exec_ctx, elem, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); + return; } - const grpc_lb_policy_pick_args inputs = { - calld->initial_metadata_batch->payload->send_initial_metadata - .send_initial_metadata, - initial_metadata_flags, &calld->lb_token_mdelem}; - pick_done = pick_callback_start_locked(exec_ctx, elem, &inputs); - } else if (chand->resolver != NULL) { if (!chand->started_resolving) { start_resolving_locked(exec_ctx, chand); } pick_after_resolver_result_start_locked(exec_ctx, elem); - } else { - subchannel_ready_locked( - exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); - } - GPR_TIMER_END("pick_subchannel", 0); - return pick_done; -} - -static void start_pick_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error_ignored) { - GPR_TIMER_BEGIN("start_pick_locked", 0); - grpc_call_element *elem = (grpc_call_element *)arg; - call_data *calld = (call_data *)elem->call_data; - channel_data *chand = (channel_data *)elem->channel_data; - GPR_ASSERT(calld->connected_subchannel == NULL); - if (pick_subchannel_locked(exec_ctx, elem)) { - // Pick was returned synchronously. - if (calld->connected_subchannel == NULL) { - GRPC_ERROR_UNREF(calld->error); - calld->error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Call dropped by load balancing policy"); - waiting_for_pick_batches_fail(exec_ctx, elem, - GRPC_ERROR_REF(calld->error)); - } else { - // Create subchannel call. - create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_NONE); - } - } else { - // Pick will be done asynchronously. Add the call's polling entity to - // the channel's interested_parties, so that I/O for the resolver - // and LB policy can be done under it. - grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, - chand->interested_parties); } - GPR_TIMER_END("start_pick_locked", 0); + // We need to wait for either a resolver result or for an async result + // from the LB policy. Add the polling entity from call_data to the + // channel_data's interested_parties, so that the I/O of the LB policy + // and resolver can be done under it. The polling entity will be + // removed in async_pick_done_locked(). + grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, + chand->interested_parties); } static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -1394,7 +1387,8 @@ static void cc_start_transport_stream_op_batch( // combiner to start a pick. if (batch->send_initial_metadata) { if (GRPC_TRACER_ON(grpc_client_channel_trace)) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering combiner", chand, calld); + gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering client_channel combiner", + chand, calld); } GRPC_CLOSURE_SCHED( exec_ctx, diff --git a/src/core/ext/filters/client_channel/client_channel_factory.c b/src/core/ext/filters/client_channel/client_channel_factory.c index 7220a8639e..57eac8f875 100644 --- a/src/core/ext/filters/client_channel/client_channel_factory.c +++ b/src/core/ext/filters/client_channel/client_channel_factory.c @@ -43,14 +43,13 @@ grpc_channel* grpc_client_channel_factory_create_channel( } static void* factory_arg_copy(void* factory) { - grpc_client_channel_factory_ref(factory); + grpc_client_channel_factory_ref((grpc_client_channel_factory*)factory); return factory; } static void factory_arg_destroy(grpc_exec_ctx* exec_ctx, void* factory) { - // TODO(roth): Remove local exec_ctx when - // https://github.com/grpc/grpc/pull/8705 is merged. - grpc_client_channel_factory_unref(exec_ctx, factory); + grpc_client_channel_factory_unref(exec_ctx, + (grpc_client_channel_factory*)factory); } static int factory_arg_cmp(void* factory1, void* factory2) { @@ -64,6 +63,6 @@ static const grpc_arg_pointer_vtable factory_arg_vtable = { grpc_arg grpc_client_channel_factory_create_channel_arg( grpc_client_channel_factory* factory) { - return grpc_channel_arg_pointer_create(GRPC_ARG_CLIENT_CHANNEL_FACTORY, + return grpc_channel_arg_pointer_create((char*)GRPC_ARG_CLIENT_CHANNEL_FACTORY, factory, &factory_arg_vtable); } diff --git a/src/core/ext/filters/client_channel/client_channel_plugin.c b/src/core/ext/filters/client_channel/client_channel_plugin.c index c32e83d012..1f71c5a7f9 100644 --- a/src/core/ext/filters/client_channel/client_channel_plugin.c +++ b/src/core/ext/filters/client_channel/client_channel_plugin.c @@ -54,8 +54,8 @@ static bool set_default_host_if_unset(grpc_exec_ctx *exec_ctx, char *default_authority = grpc_get_default_authority( exec_ctx, grpc_channel_stack_builder_get_target(builder)); if (default_authority != NULL) { - grpc_arg arg = grpc_channel_arg_string_create(GRPC_ARG_DEFAULT_AUTHORITY, - default_authority); + grpc_arg arg = grpc_channel_arg_string_create( + (char *)GRPC_ARG_DEFAULT_AUTHORITY, default_authority); grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1); grpc_channel_stack_builder_set_channel_arguments(exec_ctx, builder, new_args); diff --git a/src/core/ext/filters/client_channel/http_proxy.c b/src/core/ext/filters/client_channel/http_proxy.c index ef3512ed83..c507a2750e 100644 --- a/src/core/ext/filters/client_channel/http_proxy.c +++ b/src/core/ext/filters/client_channel/http_proxy.c @@ -44,6 +44,8 @@ static char* get_http_proxy_server(grpc_exec_ctx* exec_ctx, char** user_cred) { GPR_ASSERT(user_cred != NULL); char* proxy_name = NULL; char* uri_str = gpr_getenv("http_proxy"); + char** authority_strs = NULL; + size_t authority_nstrs; if (uri_str == NULL) return NULL; grpc_uri* uri = grpc_uri_parse(exec_ctx, uri_str, false /* suppress_errors */); @@ -56,8 +58,6 @@ static char* get_http_proxy_server(grpc_exec_ctx* exec_ctx, char** user_cred) { goto done; } /* Split on '@' to separate user credentials from host */ - char** authority_strs = NULL; - size_t authority_nstrs; gpr_string_split(uri->authority, "@", &authority_strs, &authority_nstrs); GPR_ASSERT(authority_nstrs != 0); /* should have at least 1 string */ if (authority_nstrs == 1) { @@ -157,7 +157,7 @@ static bool proxy_mapper_map_name(grpc_exec_ctx* exec_ctx, } grpc_arg args_to_add[2]; args_to_add[0] = grpc_channel_arg_string_create( - GRPC_ARG_HTTP_CONNECT_SERVER, + (char*)GRPC_ARG_HTTP_CONNECT_SERVER, uri->path[0] == '/' ? uri->path + 1 : uri->path); if (user_cred != NULL) { /* Use base64 encoding for user credentials as stated in RFC 7617 */ @@ -166,8 +166,8 @@ static bool proxy_mapper_map_name(grpc_exec_ctx* exec_ctx, char* header; gpr_asprintf(&header, "Proxy-Authorization:Basic %s", encoded_user_cred); gpr_free(encoded_user_cred); - args_to_add[1] = - grpc_channel_arg_string_create(GRPC_ARG_HTTP_CONNECT_HEADERS, header); + args_to_add[1] = grpc_channel_arg_string_create( + (char*)GRPC_ARG_HTTP_CONNECT_HEADERS, header); *new_args = grpc_channel_args_copy_and_add(args, args_to_add, 2); gpr_free(header); } else { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c index bd290464c8..7ad322902b 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c @@ -75,7 +75,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->context != NULL); GPR_ASSERT(args->context[GRPC_GRPCLB_CLIENT_STATS].value != NULL); calld->client_stats = grpc_grpclb_client_stats_ref( - args->context[GRPC_GRPCLB_CLIENT_STATS].value); + (grpc_grpclb_client_stats *)args->context[GRPC_GRPCLB_CLIENT_STATS] + .value); // Record call started. grpc_grpclb_client_stats_add_call_started(calld->client_stats); return GRPC_ERROR_NONE; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index 5aafed1374..85ef7894ea 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -101,6 +101,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" +#include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/combiner.h" @@ -137,7 +138,7 @@ static grpc_error *initial_metadata_add_lb_token( } static void destroy_client_stats(void *arg) { - grpc_grpclb_client_stats_unref(arg); + grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats *)arg); } typedef struct wrapped_rr_closure_arg { @@ -285,7 +286,7 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) { * glb_lb_policy */ typedef struct rr_connectivity_data rr_connectivity_data; -static const grpc_lb_policy_vtable glb_lb_policy_vtable; + typedef struct glb_lb_policy { /** base policy: must be first */ grpc_lb_policy base; @@ -727,7 +728,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, /* Allocate the data for the tracking of the new RR policy's connectivity. * It'll be deallocated in glb_rr_connectivity_changed() */ rr_connectivity_data *rr_connectivity = - gpr_zalloc(sizeof(rr_connectivity_data)); + (rr_connectivity_data *)gpr_zalloc(sizeof(rr_connectivity_data)); GRPC_CLOSURE_INIT(&rr_connectivity->on_change, glb_rr_connectivity_changed_locked, rr_connectivity, grpc_combiner_scheduler(glb_policy->base.combiner)); @@ -869,7 +870,8 @@ static grpc_channel_args *build_lb_channel_args( grpc_lb_addresses *lb_addresses = grpc_lb_addresses_create(num_grpclb_addrs, NULL); grpc_slice_hash_table_entry *targets_info_entries = - gpr_zalloc(sizeof(*targets_info_entries) * num_grpclb_addrs); + (grpc_slice_hash_table_entry *)gpr_zalloc(sizeof(*targets_info_entries) * + num_grpclb_addrs); size_t lb_addresses_idx = 0; for (size_t i = 0; i < addresses->num_addresses; ++i) { @@ -911,92 +913,6 @@ static grpc_channel_args *build_lb_channel_args( return result; } -static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, - void *arg, - grpc_error *error); -static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, - grpc_lb_policy_factory *factory, - grpc_lb_policy_args *args) { - /* Count the number of gRPC-LB addresses. There must be at least one. - * TODO(roth): For now, we ignore non-balancer addresses, but in the - * future, we may change the behavior such that we fall back to using - * the non-balancer addresses if we cannot reach any balancers. In the - * fallback case, we should use the LB policy indicated by - * GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is - * unset, we should default to pick_first). */ - const grpc_arg *arg = - grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); - if (arg == NULL || arg->type != GRPC_ARG_POINTER) { - return NULL; - } - grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p; - size_t num_grpclb_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; - } - if (num_grpclb_addrs == 0) return NULL; - - glb_lb_policy *glb_policy = (glb_lb_policy *)gpr_zalloc(sizeof(*glb_policy)); - - /* Get server name. */ - arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI); - GPR_ASSERT(arg != NULL); - GPR_ASSERT(arg->type == GRPC_ARG_STRING); - grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true); - GPR_ASSERT(uri->path[0] != '\0'); - glb_policy->server_name = - gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.", - glb_policy->server_name); - } - grpc_uri_destroy(uri); - - glb_policy->cc_factory = args->client_channel_factory; - GPR_ASSERT(glb_policy->cc_factory != NULL); - - arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS); - glb_policy->lb_call_timeout_ms = - grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX}); - - // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, - // since we use this to trigger the client_load_reporting filter. - grpc_arg new_arg = - grpc_channel_arg_string_create(GRPC_ARG_LB_POLICY_NAME, "grpclb"); - static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; - glb_policy->args = grpc_channel_args_copy_and_add_and_remove( - args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); - - /* Create a client channel over them to communicate with a LB service */ - glb_policy->response_generator = - grpc_fake_resolver_response_generator_create(); - grpc_channel_args *lb_channel_args = build_lb_channel_args( - exec_ctx, addresses, glb_policy->response_generator, args->args); - char *uri_str; - gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name); - glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel( - exec_ctx, uri_str, args->client_channel_factory, lb_channel_args); - - /* Propagate initial resolution */ - grpc_fake_resolver_response_generator_set_response( - exec_ctx, glb_policy->response_generator, lb_channel_args); - grpc_channel_args_destroy(exec_ctx, lb_channel_args); - gpr_free(uri_str); - if (glb_policy->lb_channel == NULL) { - gpr_free((void *)glb_policy->server_name); - grpc_channel_args_destroy(exec_ctx, glb_policy->args); - gpr_free(glb_policy); - return NULL; - } - GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed, - glb_lb_channel_on_connectivity_changed_cb, glb_policy, - grpc_combiner_scheduler(args->combiner)); - grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner); - grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, - "grpclb"); - return &glb_policy->base; -} - static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; GPR_ASSERT(glb_policy->pending_picks == NULL); @@ -1011,6 +927,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); + grpc_subchannel_index_unref(); if (glb_policy->pending_update_args != NULL) { grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args); gpr_free(glb_policy->pending_update_args); @@ -1303,7 +1220,8 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx, static bool load_report_counters_are_zero(grpc_grpclb_request *request) { grpc_grpclb_dropped_call_counts *drop_entries = - request->client_stats.calls_finished_with_drop.arg; + (grpc_grpclb_dropped_call_counts *) + request->client_stats.calls_finished_with_drop.arg; return request->client_stats.num_calls_started == 0 && request->client_stats.num_calls_finished == 0 && request->client_stats.num_calls_finished_with_client_failed_to_send == @@ -1642,6 +1560,9 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_response_received); /* loop */ GPR_ASSERT(GRPC_CALL_OK == call_error); + } else { + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "lb_on_response_received_locked_shutdown"); } } else { /* empty payload: call cancelled. */ /* dispose of the "lb_on_response_received_locked" weak ref taken in @@ -1865,6 +1786,90 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { glb_notify_on_state_change_locked, glb_update_locked}; +static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, + grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args) { + /* Count the number of gRPC-LB addresses. There must be at least one. + * TODO(roth): For now, we ignore non-balancer addresses, but in the + * future, we may change the behavior such that we fall back to using + * the non-balancer addresses if we cannot reach any balancers. In the + * fallback case, we should use the LB policy indicated by + * GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is + * unset, we should default to pick_first). */ + const grpc_arg *arg = + grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); + if (arg == NULL || arg->type != GRPC_ARG_POINTER) { + return NULL; + } + grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p; + size_t num_grpclb_addrs = 0; + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs; + } + if (num_grpclb_addrs == 0) return NULL; + + glb_lb_policy *glb_policy = (glb_lb_policy *)gpr_zalloc(sizeof(*glb_policy)); + + /* Get server name. */ + arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI); + GPR_ASSERT(arg != NULL); + GPR_ASSERT(arg->type == GRPC_ARG_STRING); + grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true); + GPR_ASSERT(uri->path[0] != '\0'); + glb_policy->server_name = + gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.", + glb_policy->server_name); + } + grpc_uri_destroy(uri); + + glb_policy->cc_factory = args->client_channel_factory; + GPR_ASSERT(glb_policy->cc_factory != NULL); + + arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS); + glb_policy->lb_call_timeout_ms = + grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX}); + + // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, + // since we use this to trigger the client_load_reporting filter. + grpc_arg new_arg = grpc_channel_arg_string_create( + (char *)GRPC_ARG_LB_POLICY_NAME, (char *)"grpclb"); + static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; + glb_policy->args = grpc_channel_args_copy_and_add_and_remove( + args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); + + /* Create a client channel over them to communicate with a LB service */ + glb_policy->response_generator = + grpc_fake_resolver_response_generator_create(); + grpc_channel_args *lb_channel_args = build_lb_channel_args( + exec_ctx, addresses, glb_policy->response_generator, args->args); + char *uri_str; + gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name); + glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel( + exec_ctx, uri_str, args->client_channel_factory, lb_channel_args); + + /* Propagate initial resolution */ + grpc_fake_resolver_response_generator_set_response( + exec_ctx, glb_policy->response_generator, lb_channel_args); + grpc_channel_args_destroy(exec_ctx, lb_channel_args); + gpr_free(uri_str); + if (glb_policy->lb_channel == NULL) { + gpr_free((void *)glb_policy->server_name); + grpc_channel_args_destroy(exec_ctx, glb_policy->args); + gpr_free(glb_policy); + return NULL; + } + grpc_subchannel_index_ref(); + GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed, + glb_lb_channel_on_connectivity_changed_cb, glb_policy, + grpc_combiner_scheduler(args->combiner)); + grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner); + grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, + "grpclb"); + return &glb_policy->base; +} + static void glb_factory_ref(grpc_lb_policy_factory *factory) {} static void glb_factory_unref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c index 407bd18adb..8ef6dfc6f4 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c @@ -148,7 +148,8 @@ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) { void grpc_grpclb_request_destroy(grpc_grpclb_request *request) { if (request->has_client_stats) { grpc_grpclb_dropped_call_counts *drop_entries = - request->client_stats.calls_finished_with_drop.arg; + (grpc_grpclb_dropped_call_counts *) + request->client_stats.calls_finished_with_drop.arg; grpc_grpclb_dropped_call_counts_destroy(drop_entries); } gpr_free(request); @@ -170,7 +171,8 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse( if (!res.has_initial_response) return NULL; grpc_grpclb_initial_response *initial_res = - gpr_malloc(sizeof(grpc_grpclb_initial_response)); + (grpc_grpclb_initial_response *)gpr_malloc( + sizeof(grpc_grpclb_initial_response)); memcpy(initial_res, &res.initial_response, sizeof(grpc_grpclb_initial_response)); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c index fab3073eb9..d20cbb8388 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c @@ -89,6 +89,7 @@ static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { "picked_first_destroy"); } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); + grpc_subchannel_index_unref(); if (p->pending_update_args != NULL) { grpc_channel_args_destroy(exec_ctx, p->pending_update_args->args); gpr_free(p->pending_update_args); @@ -330,8 +331,8 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses", (void *)p, (unsigned long)addresses->num_addresses); } - grpc_subchannel_args *sc_args = - gpr_zalloc(sizeof(*sc_args) * addresses->num_addresses); + grpc_subchannel_args *sc_args = (grpc_subchannel_args *)gpr_zalloc( + sizeof(*sc_args) * addresses->num_addresses); /* We remove the following keys in order for subchannel keys belonging to * subchannels point to the same address to match. */ static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, @@ -403,7 +404,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, } /* Create the subchannels for the new subchannel args/addresses. */ grpc_subchannel **new_subchannels = - gpr_zalloc(sizeof(*new_subchannels) * sc_args_count); + (grpc_subchannel **)gpr_zalloc(sizeof(*new_subchannels) * sc_args_count); size_t num_new_subchannels = 0; for (size_t i = 0; i < sc_args_count; i++) { grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( @@ -686,6 +687,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, } pf_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); + grpc_subchannel_index_ref(); GRPC_CLOSURE_INIT(&p->connectivity_changed, pf_connectivity_changed_locked, p, grpc_combiner_scheduler(args->combiner)); return &p->base; diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index be91d3d651..a3a62e9f3c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -30,6 +30,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" +#include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/combiner.h" @@ -310,6 +311,7 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { (void *)pol, (void *)pol); } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); + grpc_subchannel_index_unref(); gpr_free(p); } @@ -587,7 +589,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, // Dispose of outdated subchannel lists. if (sd->subchannel_list != p->subchannel_list && sd->subchannel_list != p->latest_pending_subchannel_list) { - char *reason = NULL; + const char *reason = NULL; if (sd->subchannel_list->shutting_down) { reason = "sl_outdated_straggler"; rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, reason); @@ -890,6 +892,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->client_channel_factory != NULL); round_robin_lb_policy *p = (round_robin_lb_policy *)gpr_zalloc(sizeof(*p)); grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); + grpc_subchannel_index_ref(); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); rr_update_locked(exec_ctx, &p->base, args); diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.c b/src/core/ext/filters/client_channel/lb_policy_factory.c index cdcaf17544..4d1405454c 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.c +++ b/src/core/ext/filters/client_channel/lb_policy_factory.c @@ -126,13 +126,14 @@ void grpc_lb_addresses_destroy(grpc_exec_ctx* exec_ctx, } static void* lb_addresses_copy(void* addresses) { - return grpc_lb_addresses_copy(addresses); + return grpc_lb_addresses_copy((grpc_lb_addresses*)addresses); } static void lb_addresses_destroy(grpc_exec_ctx* exec_ctx, void* addresses) { - grpc_lb_addresses_destroy(exec_ctx, addresses); + grpc_lb_addresses_destroy(exec_ctx, (grpc_lb_addresses*)addresses); } static int lb_addresses_cmp(void* addresses1, void* addresses2) { - return grpc_lb_addresses_cmp(addresses1, addresses2); + return grpc_lb_addresses_cmp((grpc_lb_addresses*)addresses1, + (grpc_lb_addresses*)addresses2); } static const grpc_arg_pointer_vtable lb_addresses_arg_vtable = { lb_addresses_copy, lb_addresses_destroy, lb_addresses_cmp}; @@ -140,7 +141,7 @@ static const grpc_arg_pointer_vtable lb_addresses_arg_vtable = { grpc_arg grpc_lb_addresses_create_channel_arg( const grpc_lb_addresses* addresses) { return grpc_channel_arg_pointer_create( - GRPC_ARG_LB_ADDRESSES, (void*)addresses, &lb_addresses_arg_vtable); + (char*)GRPC_ARG_LB_ADDRESSES, (void*)addresses, &lb_addresses_arg_vtable); } grpc_lb_addresses* grpc_lb_addresses_find_channel_arg( @@ -149,7 +150,7 @@ grpc_lb_addresses* grpc_lb_addresses_find_channel_arg( grpc_channel_args_find(channel_args, GRPC_ARG_LB_ADDRESSES); if (lb_addresses_arg == NULL || lb_addresses_arg->type != GRPC_ARG_POINTER) return NULL; - return lb_addresses_arg->value.pointer.p; + return (grpc_lb_addresses*)lb_addresses_arg->value.pointer.p; } void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) { diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c index b87a3b7082..9bb229ad95 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c @@ -204,7 +204,7 @@ static char *choose_service_config(char *service_config_choice_json) { int random_pct = rand() % 100; int percentage; if (sscanf(field->value, "%d", &percentage) != 1 || - random_pct > percentage) { + random_pct > percentage || percentage == 0) { service_config_json = NULL; break; } @@ -249,7 +249,7 @@ static void dns_ares_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, service_config_string); args_to_remove[num_args_to_remove++] = GRPC_ARG_SERVICE_CONFIG; new_args[num_args_to_add++] = grpc_channel_arg_string_create( - GRPC_ARG_SERVICE_CONFIG, service_config_string); + (char *)GRPC_ARG_SERVICE_CONFIG, service_config_string); service_config = grpc_service_config_create(service_config_string); if (service_config != NULL) { const char *lb_policy_name = @@ -257,7 +257,7 @@ static void dns_ares_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, if (lb_policy_name != NULL) { args_to_remove[num_args_to_remove++] = GRPC_ARG_LB_POLICY_NAME; new_args[num_args_to_add++] = grpc_channel_arg_string_create( - GRPC_ARG_LB_POLICY_NAME, (char *)lb_policy_name); + (char *)GRPC_ARG_LB_POLICY_NAME, (char *)lb_policy_name); } } } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c index 9747d39a16..c30cc93b6f 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c @@ -20,6 +20,7 @@ #if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET) #include <ares.h> +#include <sys/ioctl.h> #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h" @@ -37,8 +38,6 @@ typedef struct fd_node { /** the owner of this fd node */ grpc_ares_ev_driver *ev_driver; - /** the grpc_fd owned by this fd node */ - grpc_fd *grpc_fd; /** a closure wrapping on_readable_cb, which should be invoked when the grpc_fd in this node becomes readable. */ grpc_closure read_closure; @@ -50,10 +49,14 @@ typedef struct fd_node { /** mutex guarding the rest of the state */ gpr_mu mu; + /** the grpc_fd owned by this fd node */ + grpc_fd *fd; /** if the readable closure has been registered */ bool readable_registered; /** if the writable closure has been registered */ bool writable_registered; + /** if the fd is being shut down */ + bool shutting_down; } fd_node; struct grpc_ares_ev_driver { @@ -96,19 +99,31 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver *ev_driver) { } static void fd_node_destroy(grpc_exec_ctx *exec_ctx, fd_node *fdn) { - gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); + gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd)); GPR_ASSERT(!fdn->readable_registered); GPR_ASSERT(!fdn->writable_registered); gpr_mu_destroy(&fdn->mu); - grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->grpc_fd); /* c-ares library has closed the fd inside grpc_fd. This fd may be picked up immediately by another thread, and should not be closed by the following grpc_fd_orphan. */ - grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, NULL, true /* already_closed */, + grpc_fd_orphan(exec_ctx, fdn->fd, NULL, NULL, true /* already_closed */, "c-ares query finished"); gpr_free(fdn); } +static void fd_node_shutdown(grpc_exec_ctx *exec_ctx, fd_node *fdn) { + gpr_mu_lock(&fdn->mu); + fdn->shutting_down = true; + if (!fdn->readable_registered && !fdn->writable_registered) { + gpr_mu_unlock(&fdn->mu); + fd_node_destroy(exec_ctx, fdn); + } else { + grpc_fd_shutdown(exec_ctx, fdn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "c-ares fd shutdown")); + gpr_mu_unlock(&fdn->mu); + } +} + grpc_error *grpc_ares_ev_driver_create(grpc_ares_ev_driver **ev_driver, grpc_pollset_set *pollset_set) { *ev_driver = (grpc_ares_ev_driver *)gpr_malloc(sizeof(grpc_ares_ev_driver)); @@ -150,9 +165,8 @@ void grpc_ares_ev_driver_shutdown(grpc_exec_ctx *exec_ctx, ev_driver->shutting_down = true; fd_node *fn = ev_driver->fds; while (fn != NULL) { - grpc_fd_shutdown( - exec_ctx, fn->grpc_fd, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("grpc_ares_ev_driver_shutdown")); + grpc_fd_shutdown(exec_ctx, fn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "grpc_ares_ev_driver_shutdown")); fn = fn->next; } gpr_mu_unlock(&ev_driver->mu); @@ -165,7 +179,7 @@ static fd_node *pop_fd_node(fd_node **head, int fd) { dummy_head.next = *head; fd_node *node = &dummy_head; while (node->next != NULL) { - if (grpc_fd_wrapped_fd(node->next->grpc_fd) == fd) { + if (grpc_fd_wrapped_fd(node->next->fd) == fd) { fd_node *ret = node->next; node->next = node->next->next; *head = dummy_head.next; @@ -176,18 +190,33 @@ static fd_node *pop_fd_node(fd_node **head, int fd) { return NULL; } +/* Check if \a fd is still readable */ +static bool grpc_ares_is_fd_still_readable(grpc_ares_ev_driver *ev_driver, + int fd) { + size_t bytes_available = 0; + return ioctl(fd, FIONREAD, &bytes_available) == 0 && bytes_available > 0; +} + static void on_readable_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { fd_node *fdn = (fd_node *)arg; grpc_ares_ev_driver *ev_driver = fdn->ev_driver; gpr_mu_lock(&fdn->mu); + const int fd = grpc_fd_wrapped_fd(fdn->fd); fdn->readable_registered = false; + if (fdn->shutting_down && !fdn->writable_registered) { + gpr_mu_unlock(&fdn->mu); + fd_node_destroy(exec_ctx, fdn); + grpc_ares_ev_driver_unref(ev_driver); + return; + } gpr_mu_unlock(&fdn->mu); - gpr_log(GPR_DEBUG, "readable on %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); + gpr_log(GPR_DEBUG, "readable on %d", fd); if (error == GRPC_ERROR_NONE) { - ares_process_fd(ev_driver->channel, grpc_fd_wrapped_fd(fdn->grpc_fd), - ARES_SOCKET_BAD); + do { + ares_process_fd(ev_driver->channel, fd, ARES_SOCKET_BAD); + } while (grpc_ares_is_fd_still_readable(ev_driver, fd)); } else { // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or // timed out. The pending lookups made on this ev_driver will be cancelled @@ -208,13 +237,19 @@ static void on_writable_cb(grpc_exec_ctx *exec_ctx, void *arg, fd_node *fdn = (fd_node *)arg; grpc_ares_ev_driver *ev_driver = fdn->ev_driver; gpr_mu_lock(&fdn->mu); + const int fd = grpc_fd_wrapped_fd(fdn->fd); fdn->writable_registered = false; + if (fdn->shutting_down && !fdn->readable_registered) { + gpr_mu_unlock(&fdn->mu); + fd_node_destroy(exec_ctx, fdn); + grpc_ares_ev_driver_unref(ev_driver); + return; + } gpr_mu_unlock(&fdn->mu); - gpr_log(GPR_DEBUG, "writable on %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); + gpr_log(GPR_DEBUG, "writable on %d", fd); if (error == GRPC_ERROR_NONE) { - ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, - grpc_fd_wrapped_fd(fdn->grpc_fd)); + ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, fd); } else { // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or // timed out. The pending lookups made on this ev_driver will be cancelled @@ -253,17 +288,17 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i); fdn = (fd_node *)gpr_malloc(sizeof(fd_node)); gpr_log(GPR_DEBUG, "new fd: %d", socks[i]); - fdn->grpc_fd = grpc_fd_create(socks[i], fd_name); + fdn->fd = grpc_fd_create(socks[i], fd_name); fdn->ev_driver = ev_driver; fdn->readable_registered = false; fdn->writable_registered = false; + fdn->shutting_down = false; gpr_mu_init(&fdn->mu); GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_cb, fdn, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_cb, fdn, grpc_schedule_on_exec_ctx); - grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, - fdn->grpc_fd); + grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, fdn->fd); gpr_free(fd_name); } fdn->next = new_list; @@ -274,9 +309,8 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, if (ARES_GETSOCK_READABLE(socks_bitmask, i) && !fdn->readable_registered) { grpc_ares_ev_driver_ref(ev_driver); - gpr_log(GPR_DEBUG, "notify read on: %d", - grpc_fd_wrapped_fd(fdn->grpc_fd)); - grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, &fdn->read_closure); + gpr_log(GPR_DEBUG, "notify read on: %d", grpc_fd_wrapped_fd(fdn->fd)); + grpc_fd_notify_on_read(exec_ctx, fdn->fd, &fdn->read_closure); fdn->readable_registered = true; } // Register write_closure if the socket is writable and write_closure @@ -284,9 +318,9 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && !fdn->writable_registered) { gpr_log(GPR_DEBUG, "notify write on: %d", - grpc_fd_wrapped_fd(fdn->grpc_fd)); + grpc_fd_wrapped_fd(fdn->fd)); grpc_ares_ev_driver_ref(ev_driver); - grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, &fdn->write_closure); + grpc_fd_notify_on_write(exec_ctx, fdn->fd, &fdn->write_closure); fdn->writable_registered = true; } gpr_mu_unlock(&fdn->mu); @@ -299,7 +333,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx, while (ev_driver->fds != NULL) { fd_node *cur = ev_driver->fds; ev_driver->fds = ev_driver->fds->next; - fd_node_destroy(exec_ctx, cur); + fd_node_shutdown(exec_ctx, cur); } ev_driver->fds = new_list; // If the ev driver has no working fd, all the tasks are done. diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c index 0d71f3560e..04379975e1 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c @@ -123,8 +123,8 @@ static void grpc_ares_request_unref(grpc_exec_ctx *exec_ctx, static grpc_ares_hostbyname_request *create_hostbyname_request( grpc_ares_request *parent_request, char *host, uint16_t port, bool is_balancer) { - grpc_ares_hostbyname_request *hr = - gpr_zalloc(sizeof(grpc_ares_hostbyname_request)); + grpc_ares_hostbyname_request *hr = (grpc_ares_hostbyname_request *)gpr_zalloc( + sizeof(grpc_ares_hostbyname_request)); hr->parent_request = parent_request; hr->host = gpr_strdup(host); hr->port = port; @@ -174,7 +174,7 @@ static void on_hostbyname_done_cb(void *arg, int status, int timeouts, grpc_lb_addresses_set_address( *lb_addresses, i, &addr, addr_len, hr->is_balancer /* is_balancer */, - hr->is_balancer ? strdup(hr->host) : NULL /* balancer_name */, + hr->is_balancer ? hr->host : NULL /* balancer_name */, NULL /* user_data */); char output[INET6_ADDRSTRLEN]; ares_inet_ntop(AF_INET6, &addr.sin6_addr, output, INET6_ADDRSTRLEN); @@ -195,7 +195,7 @@ static void on_hostbyname_done_cb(void *arg, int status, int timeouts, grpc_lb_addresses_set_address( *lb_addresses, i, &addr, addr_len, hr->is_balancer /* is_balancer */, - hr->is_balancer ? strdup(hr->host) : NULL /* balancer_name */, + hr->is_balancer ? hr->host : NULL /* balancer_name */, NULL /* user_data */); char output[INET_ADDRSTRLEN]; ares_inet_ntop(AF_INET, &addr.sin_addr, output, INET_ADDRSTRLEN); @@ -275,14 +275,15 @@ static void on_txt_done_cb(void *arg, int status, int timeouts, gpr_log(GPR_DEBUG, "on_txt_done_cb"); char *error_msg; grpc_ares_request *r = (grpc_ares_request *)arg; + const size_t prefix_len = sizeof(g_service_config_attribute_prefix) - 1; + struct ares_txt_ext *result = NULL; + struct ares_txt_ext *reply = NULL; + grpc_error *error = GRPC_ERROR_NONE; gpr_mu_lock(&r->mu); if (status != ARES_SUCCESS) goto fail; - struct ares_txt_ext *reply = NULL; status = ares_parse_txt_reply_ext(buf, len, &reply); if (status != ARES_SUCCESS) goto fail; // Find service config in TXT record. - const size_t prefix_len = sizeof(g_service_config_attribute_prefix) - 1; - struct ares_txt_ext *result; for (result = reply; result != NULL; result = result->next) { if (result->record_start && memcmp(result->txt, g_service_config_attribute_prefix, prefix_len) == @@ -313,7 +314,7 @@ static void on_txt_done_cb(void *arg, int status, int timeouts, fail: gpr_asprintf(&error_msg, "C-ares TXT lookup status is not ARES_SUCCESS: %s", ares_strerror(status)); - grpc_error *error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg); + error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg); gpr_free(error_msg); if (r->error == GRPC_ERROR_NONE) { r->error = error; @@ -331,6 +332,9 @@ static grpc_ares_request *grpc_dns_lookup_ares_impl( grpc_closure *on_done, grpc_lb_addresses **addrs, bool check_grpclb, char **service_config_json) { grpc_error *error = GRPC_ERROR_NONE; + grpc_ares_hostbyname_request *hr = NULL; + grpc_ares_request *r = NULL; + ares_channel *channel = NULL; /* TODO(zyc): Enable tracing after #9603 is checked in */ /* if (grpc_dns_trace) { gpr_log(GPR_DEBUG, "resolve_address (blocking): name=%s, default_port=%s", @@ -360,8 +364,7 @@ static grpc_ares_request *grpc_dns_lookup_ares_impl( error = grpc_ares_ev_driver_create(&ev_driver, interested_parties); if (error != GRPC_ERROR_NONE) goto error_cleanup; - grpc_ares_request *r = - (grpc_ares_request *)gpr_zalloc(sizeof(grpc_ares_request)); + r = (grpc_ares_request *)gpr_zalloc(sizeof(grpc_ares_request)); gpr_mu_init(&r->mu); r->ev_driver = ev_driver; r->on_done = on_done; @@ -369,7 +372,7 @@ static grpc_ares_request *grpc_dns_lookup_ares_impl( r->service_config_json_out = service_config_json; r->success = false; r->error = GRPC_ERROR_NONE; - ares_channel *channel = grpc_ares_ev_driver_get_channel(r->ev_driver); + channel = grpc_ares_ev_driver_get_channel(r->ev_driver); // If dns_server is specified, use it. if (dns_server != NULL) { @@ -410,12 +413,12 @@ static grpc_ares_request *grpc_dns_lookup_ares_impl( } gpr_ref_init(&r->pending_queries, 1); if (grpc_ipv6_loopback_available()) { - grpc_ares_hostbyname_request *hr = create_hostbyname_request( - r, host, strhtons(port), false /* is_balancer */); + hr = create_hostbyname_request(r, host, strhtons(port), + false /* is_balancer */); ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_cb, hr); } - grpc_ares_hostbyname_request *hr = create_hostbyname_request( - r, host, strhtons(port), false /* is_balancer */); + hr = create_hostbyname_request(r, host, strhtons(port), + false /* is_balancer */); ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_cb, hr); if (check_grpclb) { /* Query the SRV record */ @@ -527,7 +530,8 @@ static void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, grpc_resolved_addresses **addrs) { grpc_resolve_address_ares_request *r = - gpr_zalloc(sizeof(grpc_resolve_address_ares_request)); + (grpc_resolve_address_ares_request *)gpr_zalloc( + sizeof(grpc_resolve_address_ares_request)); r->addrs_out = addrs; r->on_resolve_address_done = on_done; GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r, diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c index c4676e0299..69ea440ae6 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c @@ -210,7 +210,7 @@ grpc_arg grpc_fake_resolver_response_generator_arg( grpc_fake_resolver_response_generator* generator) { grpc_arg arg; arg.type = GRPC_ARG_POINTER; - arg.key = GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR; + arg.key = (char*)GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR; arg.value.pointer.p = generator; arg.value.pointer.vtable = &response_generator_arg_vtable; return arg; diff --git a/src/core/ext/filters/client_channel/retry_throttle.c b/src/core/ext/filters/client_channel/retry_throttle.c index 6cd6654b6f..09dcade089 100644 --- a/src/core/ext/filters/client_channel/retry_throttle.c +++ b/src/core/ext/filters/client_channel/retry_throttle.c @@ -99,7 +99,7 @@ static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create( int max_milli_tokens, int milli_token_ratio, grpc_server_retry_throttle_data* old_throttle_data) { grpc_server_retry_throttle_data* throttle_data = - gpr_malloc(sizeof(*throttle_data)); + (grpc_server_retry_throttle_data*)gpr_malloc(sizeof(*throttle_data)); memset(throttle_data, 0, sizeof(*throttle_data)); gpr_ref_init(&throttle_data->refs, 1); throttle_data->max_milli_tokens = max_milli_tokens; @@ -131,11 +131,11 @@ static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create( // static void* copy_server_name(void* key, void* unused) { - return gpr_strdup(key); + return gpr_strdup((const char*)key); } static long compare_server_name(void* key1, void* key2, void* unused) { - return strcmp(key1, key2); + return strcmp((const char*)key1, (const char*)key2); } static void destroy_server_retry_throttle_data(void* value, void* unused) { @@ -177,7 +177,8 @@ grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( const char* server_name, int max_milli_tokens, int milli_token_ratio) { gpr_mu_lock(&g_mu); grpc_server_retry_throttle_data* throttle_data = - gpr_avl_get(g_avl, (char*)server_name, NULL); + (grpc_server_retry_throttle_data*)gpr_avl_get(g_avl, (char*)server_name, + NULL); if (throttle_data == NULL) { // Entry not found. Create a new one. throttle_data = grpc_server_retry_throttle_data_create( diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c index 05c55aaa89..40a51c72d6 100644 --- a/src/core/ext/filters/client_channel/subchannel.c +++ b/src/core/ext/filters/client_channel/subchannel.c @@ -32,6 +32,7 @@ #include "src/core/ext/filters/client_channel/uri_parser.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" @@ -290,6 +291,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, return c; } + GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(exec_ctx); c = (grpc_subchannel *)gpr_zalloc(sizeof(*c)); c->key = key; gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); @@ -809,6 +811,6 @@ const char *grpc_get_subchannel_address_uri_arg(const grpc_channel_args *args) { grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address *addr) { return grpc_channel_arg_string_create( - GRPC_ARG_SUBCHANNEL_ADDRESS, + (char *)GRPC_ARG_SUBCHANNEL_ADDRESS, addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup("")); } diff --git a/src/core/ext/filters/client_channel/subchannel_index.c b/src/core/ext/filters/client_channel/subchannel_index.c index f57b631c41..d7a51f3899 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.c +++ b/src/core/ext/filters/client_channel/subchannel_index.c @@ -34,6 +34,8 @@ static gpr_avl g_subchannel_index; static gpr_mu g_mu; +static gpr_refcount g_refcount; + struct grpc_subchannel_key { grpc_subchannel_args args; }; @@ -88,24 +90,26 @@ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, static void sck_avl_destroy(void *p, void *user_data) { grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data; - grpc_subchannel_key_destroy(exec_ctx, p); + grpc_subchannel_key_destroy(exec_ctx, (grpc_subchannel_key *)p); } static void *sck_avl_copy(void *p, void *unused) { - return subchannel_key_copy(p); + return subchannel_key_copy((grpc_subchannel_key *)p); } static long sck_avl_compare(void *a, void *b, void *unused) { - return grpc_subchannel_key_compare(a, b); + return grpc_subchannel_key_compare((grpc_subchannel_key *)a, + (grpc_subchannel_key *)b); } static void scv_avl_destroy(void *p, void *user_data) { grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data; - GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, p, "subchannel_index"); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, (grpc_subchannel *)p, + "subchannel_index"); } static void *scv_avl_copy(void *p, void *unused) { - GRPC_SUBCHANNEL_WEAK_REF(p, "subchannel_index"); + GRPC_SUBCHANNEL_WEAK_REF((grpc_subchannel *)p, "subchannel_index"); return p; } @@ -119,15 +123,27 @@ static const gpr_avl_vtable subchannel_avl_vtable = { void grpc_subchannel_index_init(void) { g_subchannel_index = gpr_avl_create(&subchannel_avl_vtable); gpr_mu_init(&g_mu); + gpr_ref_init(&g_refcount, 1); } void grpc_subchannel_index_shutdown(void) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - gpr_mu_destroy(&g_mu); - gpr_avl_unref(g_subchannel_index, &exec_ctx); - grpc_exec_ctx_finish(&exec_ctx); + // TODO(juanlishen): This refcounting mechanism may lead to memory leackage. + // To solve that, we should force polling to flush any pending callbacks, then + // shutdown safely. + grpc_subchannel_index_unref(); +} + +void grpc_subchannel_index_unref(void) { + if (gpr_unref(&g_refcount)) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_mu_destroy(&g_mu); + gpr_avl_unref(g_subchannel_index, &exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); + } } +void grpc_subchannel_index_ref(void) { gpr_ref_non_zero(&g_refcount); } + grpc_subchannel *grpc_subchannel_index_find(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key) { // Lock, and take a reference to the subchannel index. diff --git a/src/core/ext/filters/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h index 98d882a453..92e36d5283 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.h +++ b/src/core/ext/filters/client_channel/subchannel_index.h @@ -59,6 +59,13 @@ void grpc_subchannel_index_init(void); /** Shutdown the subchannel index (global) */ void grpc_subchannel_index_shutdown(void); +/** Increment the refcount (non-zero) of subchannel index (global). */ +void grpc_subchannel_index_ref(void); + +/** Decrement the refcount of subchannel index (global). If the refcount drops + to zero, unref the subchannel index and destroy its mutex. */ +void grpc_subchannel_index_unref(void); + /** \em TEST ONLY. * If \a force_creation is true, all key comparisons will be false, resulting in * new subchannels always being created. Otherwise, the keys will be compared as diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c index a66916fbe1..f4c01f75a6 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.c +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c @@ -225,7 +225,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, exec_ctx, calld->message_compression_algorithm, &calld->slices, &tmp); if (did_compress) { if (GRPC_TRACER_ON(grpc_compression_trace)) { - char *algo_name; + const char *algo_name; const size_t before_size = calld->slices.length; const size_t after_size = tmp.length; const float savings_ratio = 1.0f - (float)after_size / (float)before_size; @@ -239,7 +239,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } else { if (GRPC_TRACER_ON(grpc_compression_trace)) { - char *algo_name; + const char *algo_name; GPR_ASSERT(grpc_message_compression_algorithm_name( calld->message_compression_algorithm, &algo_name)); gpr_log(GPR_DEBUG, diff --git a/src/core/ext/filters/http/server/http_server_filter.c b/src/core/ext/filters/http/server/http_server_filter.c index 554a7f530d..03958136b4 100644 --- a/src/core/ext/filters/http/server/http_server_filter.c +++ b/src/core/ext/filters/http/server/http_server_filter.c @@ -83,12 +83,12 @@ static grpc_error *server_filter_outgoing_metadata(grpc_exec_ctx *exec_ctx, } static void add_error(const char *error_name, grpc_error **cumulative, - grpc_error *new) { - if (new == GRPC_ERROR_NONE) return; + grpc_error *new_err) { + if (new_err == GRPC_ERROR_NONE) return; if (*cumulative == GRPC_ERROR_NONE) { *cumulative = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_name); } - *cumulative = grpc_error_add_child(*cumulative, new); + *cumulative = grpc_error_add_child(*cumulative, new_err); } static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_plugin.c b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.c index f56afd27d2..2486ead427 100644 --- a/src/core/ext/filters/load_reporting/server_load_reporting_plugin.c +++ b/src/core/ext/filters/load_reporting/server_load_reporting_plugin.c @@ -55,7 +55,8 @@ static bool maybe_add_server_load_reporting_filter( } grpc_arg grpc_load_reporting_enable_arg() { - return grpc_channel_arg_integer_create(GRPC_ARG_ENABLE_LOAD_REPORTING, 1); + return grpc_channel_arg_integer_create((char *)GRPC_ARG_ENABLE_LOAD_REPORTING, + 1); } /* Plugin registration */ diff --git a/src/core/ext/filters/max_age/max_age_filter.c b/src/core/ext/filters/max_age/max_age_filter.c index 450f67746f..0ac803ed41 100644 --- a/src/core/ext/filters/max_age/max_age_filter.c +++ b/src/core/ext/filters/max_age/max_age_filter.c @@ -402,7 +402,7 @@ static bool maybe_add_max_age_filter(grpc_exec_ctx* exec_ctx, bool enable = grpc_channel_arg_get_integer( grpc_channel_args_find(channel_args, GRPC_ARG_MAX_CONNECTION_AGE_MS), - MAX_CONNECTION_AGE_INTEGER_OPTIONS) != INT_MAX && + MAX_CONNECTION_AGE_INTEGER_OPTIONS) != INT_MAX || grpc_channel_arg_get_integer( grpc_channel_args_find(channel_args, GRPC_ARG_MAX_CONNECTION_IDLE_MS), MAX_CONNECTION_IDLE_INTEGER_OPTIONS) != INT_MAX; diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c index 0ec9353c04..202bcd47f5 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.c +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -161,7 +161,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(error)); } gpr_mu_unlock(&c->mu); - chttp2_connector_unref(exec_ctx, arg); + chttp2_connector_unref(exec_ctx, (grpc_connector *)arg); } else { GPR_ASSERT(c->endpoint != NULL); start_handshake_locked(exec_ctx, c); diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index cccb347bf1..6410a6043d 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -55,7 +55,7 @@ static grpc_channel *client_channel_factory_create_channel( } // Add channel arg containing the server URI. grpc_arg arg = grpc_channel_arg_string_create( - GRPC_ARG_SERVER_URI, + (char *)GRPC_ARG_SERVER_URI, grpc_resolver_factory_add_default_prefix_if_needed(exec_ctx, target)); const char *to_remove[] = {GRPC_ARG_SERVER_URI}; grpc_channel_args *new_args = diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c index 0346d50b6c..dd88136f7b 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c @@ -42,7 +42,7 @@ grpc_channel *grpc_insecure_channel_create_from_fd( (target, fd, args)); grpc_arg default_authority_arg = grpc_channel_arg_string_create( - GRPC_ARG_DEFAULT_AUTHORITY, "test.authority"); + (char *)GRPC_ARG_DEFAULT_AUTHORITY, (char *)"test.authority"); grpc_channel_args *final_args = grpc_channel_args_copy_and_add(args, &default_authority_arg, 1); diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c index d7add0538b..60244e163b 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.c +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -52,7 +52,7 @@ typedef struct { } server_state; typedef struct { - server_state *server_state; + server_state *svr_state; grpc_pollset *accepting_pollset; grpc_tcp_server_acceptor *acceptor; grpc_handshake_manager *handshake_mgr; @@ -63,8 +63,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_handshaker_args *args = (grpc_handshaker_args *)arg; server_connection_state *connection_state = (server_connection_state *)args->user_data; - gpr_mu_lock(&connection_state->server_state->mu); - if (error != GRPC_ERROR_NONE || connection_state->server_state->shutdown) { + gpr_mu_lock(&connection_state->svr_state->mu); + if (error != GRPC_ERROR_NONE || connection_state->svr_state->shutdown) { const char *error_str = grpc_error_string(error); gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str); @@ -89,7 +89,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_transport *transport = grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0); grpc_server_setup_transport( - exec_ctx, connection_state->server_state->server, transport, + exec_ctx, connection_state->svr_state->server, transport, connection_state->accepting_pollset, args->args); grpc_chttp2_transport_start_reading(exec_ctx, transport, args->read_buffer); @@ -97,11 +97,11 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, } } grpc_handshake_manager_pending_list_remove( - &connection_state->server_state->pending_handshake_mgrs, + &connection_state->svr_state->pending_handshake_mgrs, connection_state->handshake_mgr); - gpr_mu_unlock(&connection_state->server_state->mu); + gpr_mu_unlock(&connection_state->svr_state->mu); grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); - grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server); + grpc_tcp_server_unref(exec_ctx, connection_state->svr_state->tcp_server); gpr_free(connection_state->acceptor); gpr_free(connection_state); } @@ -124,8 +124,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, gpr_mu_unlock(&state->mu); grpc_tcp_server_ref(state->tcp_server); server_connection_state *connection_state = - gpr_malloc(sizeof(*connection_state)); - connection_state->server_state = state; + (server_connection_state *)gpr_malloc(sizeof(*connection_state)); + connection_state->svr_state = state; connection_state->accepting_pollset = accepting_pollset; connection_state->acceptor = acceptor; connection_state->handshake_mgr = handshake_mgr; @@ -201,6 +201,7 @@ grpc_error *grpc_chttp2_server_add_port(grpc_exec_ctx *exec_ctx, grpc_error *err = GRPC_ERROR_NONE; server_state *state = NULL; grpc_error **errors = NULL; + size_t naddrs = 0; *port_num = -1; @@ -225,7 +226,7 @@ grpc_error *grpc_chttp2_server_add_port(grpc_exec_ctx *exec_ctx, state->shutdown = true; gpr_mu_init(&state->mu); - const size_t naddrs = resolved->naddrs; + naddrs = resolved->naddrs; errors = (grpc_error **)gpr_malloc(sizeof(*errors) * naddrs); for (i = 0; i < naddrs; i++) { errors[i] = diff --git a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c index 78551df9c3..6d09953830 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c @@ -23,6 +23,7 @@ void grpc_chttp2_plugin_init(void) { grpc_register_tracer(&grpc_http_trace); grpc_register_tracer(&grpc_flowctl_trace); + grpc_register_tracer(&grpc_trace_http2_stream_state); #ifndef NDEBUG grpc_register_tracer(&grpc_trace_chttp2_refcount); #endif diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 3fd701fe2f..acf49632ff 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -64,6 +64,11 @@ #define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false #define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2 +#define DEFAULT_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */ +#define DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */ +#define DEFAULT_MAX_PINGS_BETWEEN_DATA 0 /* unlimited */ +#define DEFAULT_MAX_PING_STRIKES 2 + static int g_default_client_keepalive_time_ms = DEFAULT_CLIENT_KEEPALIVE_TIME_MS; static int g_default_client_keepalive_timeout_ms = @@ -75,6 +80,13 @@ static int g_default_server_keepalive_timeout_ms = static bool g_default_keepalive_permit_without_calls = DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS; +static int g_default_min_sent_ping_interval_without_data_ms = + DEFAULT_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS; +static int g_default_min_recv_ping_interval_without_data_ms = + DEFAULT_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS; +static int g_default_max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA; +static int g_default_max_ping_strikes = DEFAULT_MAX_PING_STRIKES; + #define MAX_CLIENT_STREAM_ID 0x7fffffffu grpc_tracer_flag grpc_http_trace = GRPC_TRACER_INITIALIZER(false, "http"); grpc_tracer_flag grpc_flowctl_trace = GRPC_TRACER_INITIALIZER(false, "flowctl"); @@ -144,18 +156,14 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error); -static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_ping_type ping_type, - grpc_closure *on_initiate, - grpc_closure *on_complete); +static void send_ping_locked( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_ping_type ping_type, grpc_closure *on_initiate, + grpc_closure *on_complete, + grpc_chttp2_initiate_write_reason initiate_write_reason); static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error); -#define DEFAULT_MIN_TIME_BETWEEN_PINGS_MS 0 -#define DEFAULT_MAX_PINGS_BETWEEN_DATA 3 -#define DEFAULT_MAX_PING_STRIKES 2 -#define DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS 300000 /* 5 minutes */ - /** keepalive-relevant functions */ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); @@ -346,7 +354,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (is_client) { grpc_slice_buffer_add(&t->outbuf, grpc_slice_from_copied_string( GRPC_CHTTP2_CLIENT_CONNECT_STRING)); - grpc_chttp2_initiate_write(exec_ctx, t, "initial_write"); } /* configure http2 the way we like it */ @@ -362,14 +369,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, queue_setting_update(exec_ctx, t, GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1); - t->ping_policy = (grpc_chttp2_repeated_ping_policy){ - .max_pings_without_data = DEFAULT_MAX_PINGS_BETWEEN_DATA, - .min_time_between_pings = - gpr_time_from_millis(DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, GPR_TIMESPAN), - .max_ping_strikes = DEFAULT_MAX_PING_STRIKES, - .min_ping_interval_without_data = gpr_time_from_millis( - DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS, GPR_TIMESPAN), - }; + t->ping_policy.max_pings_without_data = g_default_max_pings_without_data; + t->ping_policy.min_sent_ping_interval_without_data = gpr_time_from_millis( + g_default_min_sent_ping_interval_without_data_ms, GPR_TIMESPAN); + t->ping_policy.max_ping_strikes = g_default_max_ping_strikes; + t->ping_policy.min_recv_ping_interval_without_data = gpr_time_from_millis( + g_default_min_recv_ping_interval_without_data_ms, GPR_TIMESPAN); /* Keepalive setting */ if (t->is_client) { @@ -428,29 +433,37 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) { t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer( &channel_args->args[i], - (grpc_integer_options){DEFAULT_MAX_PINGS_BETWEEN_DATA, 0, INT_MAX}); + (grpc_integer_options){g_default_max_pings_without_data, 0, + INT_MAX}); } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) { t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer( &channel_args->args[i], - (grpc_integer_options){DEFAULT_MAX_PING_STRIKES, 0, INT_MAX}); - } else if (0 == strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_MIN_TIME_BETWEEN_PINGS_MS)) { - t->ping_policy.min_time_between_pings = gpr_time_from_millis( - grpc_channel_arg_get_integer( - &channel_args->args[i], - (grpc_integer_options){DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, 0, - INT_MAX}), - GPR_TIMESPAN); + (grpc_integer_options){g_default_max_ping_strikes, 0, INT_MAX}); } else if (0 == - strcmp(channel_args->args[i].key, - GRPC_ARG_HTTP2_MIN_PING_INTERVAL_WITHOUT_DATA_MS)) { - t->ping_policy.min_ping_interval_without_data = gpr_time_from_millis( - grpc_channel_arg_get_integer( - &channel_args->args[i], - (grpc_integer_options){ - DEFAULT_MIN_PING_INTERVAL_WITHOUT_DATA_MS, 0, INT_MAX}), - GPR_TIMESPAN); + strcmp( + channel_args->args[i].key, + GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) { + t->ping_policy.min_sent_ping_interval_without_data = + gpr_time_from_millis( + grpc_channel_arg_get_integer( + &channel_args->args[i], + (grpc_integer_options){ + g_default_min_sent_ping_interval_without_data_ms, 0, + INT_MAX}), + GPR_TIMESPAN); + } else if (0 == + strcmp( + channel_args->args[i].key, + GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) { + t->ping_policy.min_recv_ping_interval_without_data = + gpr_time_from_millis( + grpc_channel_arg_get_integer( + &channel_args->args[i], + (grpc_integer_options){ + g_default_min_recv_ping_interval_without_data_ms, 0, + INT_MAX}), + GPR_TIMESPAN); } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) { t->write_buffer_size = (uint32_t)grpc_channel_arg_get_integer( @@ -557,8 +570,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } - t->ping_state.pings_before_data_required = - t->ping_policy.max_pings_without_data; + /* No pings allowed before receiving a header or data frame. */ + t->ping_state.pings_before_data_required = 0; t->ping_state.is_delayed_ping_timer_set = false; t->ping_recv_state.last_ping_recv_time = gpr_inf_past(GPR_CLOCK_MONOTONIC); @@ -578,7 +591,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED; } - grpc_chttp2_initiate_write(exec_ctx, t, "init"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE); post_benign_reclaimer(exec_ctx, t); } @@ -624,6 +638,9 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error)); + if (t->ping_state.is_delayed_ping_timer_set) { + grpc_timer_cancel(exec_ctx, &t->ping_state.delayed_ping_timer); + } switch (t->keepalive_state) { case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING: grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); @@ -689,7 +706,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer); grpc_slice_buffer_init(&s->frame_storage); + grpc_slice_buffer_init(&s->compressed_data_buffer); + grpc_slice_buffer_init(&s->decompressed_data_buffer); s->pending_byte_stream = false; + s->decompressed_header_bytes = 0; GRPC_CLOSURE_INIT(&s->reset_byte_stream, reset_byte_stream, s, grpc_combiner_scheduler(t->combiner)); @@ -723,14 +743,8 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, grpc_slice_buffer_destroy_internal(exec_ctx, &s->unprocessed_incoming_frames_buffer); grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage); - if (s->compressed_data_buffer) { - grpc_slice_buffer_destroy_internal(exec_ctx, s->compressed_data_buffer); - gpr_free(s->compressed_data_buffer); - } - if (s->decompressed_data_buffer) { - grpc_slice_buffer_destroy_internal(exec_ctx, s->decompressed_data_buffer); - gpr_free(s->decompressed_data_buffer); - } + grpc_slice_buffer_destroy_internal(exec_ctx, &s->compressed_data_buffer); + grpc_slice_buffer_destroy_internal(exec_ctx, &s->decompressed_data_buffer); grpc_chttp2_list_remove_stalled_by_transport(t, s); grpc_chttp2_list_remove_stalled_by_stream(t, s); @@ -846,13 +860,91 @@ static void set_write_state(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } +static void inc_initiate_write_reason( + grpc_exec_ctx *exec_ctx, grpc_chttp2_initiate_write_reason reason) { + switch (reason) { + case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_INITIAL_WRITE(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_START_NEW_STREAM(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_MESSAGE(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_INITIAL_METADATA( + exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_TRAILING_METADATA( + exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RETRY_SEND_PING(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CONTINUE_PINGS(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_GOAWAY_SENT(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_RST_STREAM(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_CLOSE_FROM_API(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_STREAM_FLOW_CONTROL(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL( + exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_SEND_SETTINGS(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_BDP_ESTIMATOR_PING(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_SETTING( + exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FLOW_CONTROL_UNSTALLED_BY_UPDATE( + exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_APPLICATION_PING(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_KEEPALIVE_PING(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_TRANSPORT_FLOW_CONTROL_UNSTALLED( + exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_PING_RESPONSE(exec_ctx); + break; + case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM: + GRPC_STATS_INC_HTTP2_INITIATE_WRITE_DUE_TO_FORCE_RST_STREAM(exec_ctx); + break; + } +} + void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, const char *reason) { + grpc_chttp2_transport *t, + grpc_chttp2_initiate_write_reason reason) { GPR_TIMER_BEGIN("grpc_chttp2_initiate_write", 0); switch (t->write_state) { case GRPC_CHTTP2_WRITE_STATE_IDLE: - set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, reason); + inc_initiate_write_reason(exec_ctx, reason); + set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING, + grpc_chttp2_initiate_write_reason_string(reason)); t->is_first_write_in_batch = true; GRPC_CHTTP2_REF_TRANSPORT(t, "writing"); GRPC_CLOSURE_SCHED( @@ -864,7 +956,7 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, break; case GRPC_CHTTP2_WRITE_STATE_WRITING: set_write_state(exec_ctx, t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE, - reason); + grpc_chttp2_initiate_write_reason_string(reason)); break; case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE: break; @@ -872,16 +964,12 @@ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, GPR_TIMER_END("grpc_chttp2_initiate_write", 0); } -void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, - bool also_initiate_write, const char *reason) { +void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s) { if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become"); } - if (also_initiate_write) { - grpc_chttp2_initiate_write(exec_ctx, t, reason); - } } static grpc_closure_scheduler *write_scheduler(grpc_chttp2_transport *t, @@ -1105,7 +1193,9 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_add(&t->stream_map, s->id, s); post_destructive_reclaimer(exec_ctx, t); - grpc_chttp2_become_writable(exec_ctx, t, s, true, "new_stream"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM); } /* cancel out streams that will never be started */ while (t->next_stream_id >= MAX_CLIENT_STREAM_ID && @@ -1202,7 +1292,9 @@ static void maybe_become_writable_due_to_send_msg(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) { if (s->id != 0 && (!s->write_buffering || s->flow_controlled_buffer.length > t->write_buffer_size)) { - grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE); } } @@ -1353,12 +1445,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; /* Identify stream compression */ - if ((s->stream_compression_send_enabled = - (op_payload->send_initial_metadata.send_initial_metadata->idx.named - .content_encoding != NULL)) == true) { - s->compressed_data_buffer = - (grpc_slice_buffer *)gpr_malloc(sizeof(grpc_slice_buffer)); - grpc_slice_buffer_init(s->compressed_data_buffer); + if (op_payload->send_initial_metadata.send_initial_metadata->idx.named + .content_encoding == NULL || + grpc_stream_compression_method_parse( + GRPC_MDVALUE( + op_payload->send_initial_metadata.send_initial_metadata->idx + .named.content_encoding->md), + true, &s->stream_compression_method) == 0) { + s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS; } s->send_initial_metadata_finished = add_closure_barrier(on_complete); @@ -1404,14 +1498,13 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } } else { GPR_ASSERT(s->id != 0); - bool initiate_write = true; - if (op->send_message && - (op->payload->send_message.send_message->flags & - GRPC_WRITE_BUFFER_HINT)) { - initiate_write = false; + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + if (!(op->send_message && + (op->payload->send_message.send_message->flags & + GRPC_WRITE_BUFFER_HINT))) { + grpc_chttp2_initiate_write( + exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA); } - grpc_chttp2_become_writable(exec_ctx, t, s, initiate_write, - "op.send_initial_metadata"); } } else { s->send_initial_metadata = NULL; @@ -1519,8 +1612,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } else if (s->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ - grpc_chttp2_become_writable(exec_ctx, t, s, true, - "op.send_trailing_metadata"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write( + exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA); } } } @@ -1632,15 +1726,17 @@ static void cancel_pings(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ERROR_UNREF(error); } -static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_ping_type ping_type, - grpc_closure *on_initiate, grpc_closure *on_ack) { +static void send_ping_locked( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_ping_type ping_type, grpc_closure *on_initiate, + grpc_closure *on_ack, + grpc_chttp2_initiate_write_reason initiate_write_reason) { grpc_chttp2_ping_queue *pq = &t->ping_queues[ping_type]; grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INITIATE], on_initiate, GRPC_ERROR_NONE); if (grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_NEXT], on_ack, GRPC_ERROR_NONE)) { - grpc_chttp2_initiate_write(exec_ctx, t, "send_ping"); + grpc_chttp2_initiate_write(exec_ctx, t, initiate_write_reason); } } @@ -1648,7 +1744,10 @@ static void retry_initiate_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, grpc_error *error) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)tp; t->ping_state.is_delayed_ping_timer_set = false; - grpc_chttp2_initiate_write(exec_ctx, t, "retry_send_ping"); + if (error == GRPC_ERROR_NONE) { + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING); + } } void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -1663,7 +1762,8 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]); if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_NEXT])) { - grpc_chttp2_initiate_write(exec_ctx, t, "continue_pings"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS); } } @@ -1676,7 +1776,8 @@ static void send_goaway(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &slice, &http_error); grpc_chttp2_goaway_append(t->last_new_stream_id, (uint32_t)http_error, grpc_slice_ref_internal(slice), &t->qbuf); - grpc_chttp2_initiate_write(exec_ctx, t, "goaway_sent"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT); GRPC_ERROR_UNREF(error); } @@ -1723,7 +1824,8 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, if (op->send_ping) { send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, NULL, - op->send_ping); + op->send_ping, + GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING); } if (op->on_connectivity_state_change != NULL) { @@ -1799,20 +1901,20 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, &s->frame_storage); s->unprocessed_incoming_frames_decompressed = false; } - if (s->stream_compression_recv_enabled && - !s->unprocessed_incoming_frames_decompressed) { - GPR_ASSERT(s->decompressed_data_buffer->length == 0); + if (!s->unprocessed_incoming_frames_decompressed) { + GPR_ASSERT(s->decompressed_data_buffer.length == 0); bool end_of_context; if (!s->stream_decompression_ctx) { s->stream_decompression_ctx = grpc_stream_compression_context_create( - GRPC_STREAM_COMPRESSION_DECOMPRESS); + s->stream_decompression_method); } - if (!grpc_stream_decompress(s->stream_decompression_ctx, - &s->unprocessed_incoming_frames_buffer, - s->decompressed_data_buffer, NULL, - GRPC_HEADER_SIZE_IN_BYTES, - &end_of_context)) { + if (!grpc_stream_decompress( + s->stream_decompression_ctx, + &s->unprocessed_incoming_frames_buffer, + &s->decompressed_data_buffer, NULL, + GRPC_HEADER_SIZE_IN_BYTES - s->decompressed_header_bytes, + &end_of_context)) { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); grpc_slice_buffer_reset_and_unref_internal( @@ -1820,9 +1922,13 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Stream decompression error."); } else { + s->decompressed_header_bytes += s->decompressed_data_buffer.length; + if (s->decompressed_header_bytes == GRPC_HEADER_SIZE_IN_BYTES) { + s->decompressed_header_bytes = 0; + } error = grpc_deframe_unprocessed_incoming_frames( - exec_ctx, &s->data_parser, s, s->decompressed_data_buffer, NULL, - s->recv_message); + exec_ctx, &s->data_parser, s, &s->decompressed_data_buffer, + NULL, s->recv_message); if (end_of_context) { grpc_stream_compression_context_destroy( s->stream_decompression_ctx); @@ -1871,15 +1977,14 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, } bool pending_data = s->pending_byte_stream || s->unprocessed_incoming_frames_buffer.length > 0; - if (s->stream_compression_recv_enabled && s->read_closed && - s->frame_storage.length > 0 && !pending_data && !s->seen_error && - s->recv_trailing_metadata_finished != NULL) { + if (s->read_closed && s->frame_storage.length > 0 && !pending_data && + !s->seen_error && s->recv_trailing_metadata_finished != NULL) { /* Maybe some SYNC_FLUSH data is left in frame_storage. Consume them and * maybe decompress the next 5 bytes in the stream. */ bool end_of_context; if (!s->stream_decompression_ctx) { s->stream_decompression_ctx = grpc_stream_compression_context_create( - GRPC_STREAM_COMPRESSION_DECOMPRESS); + s->stream_decompression_method); } if (!grpc_stream_decompress(s->stream_decompression_ctx, &s->frame_storage, @@ -1892,6 +1997,7 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, } else { if (s->unprocessed_incoming_frames_buffer.length > 0) { s->unprocessed_incoming_frames_decompressed = true; + pending_data = true; } if (end_of_context) { grpc_stream_compression_context_destroy(s->stream_decompression_ctx); @@ -1968,7 +2074,8 @@ void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(s->id, (uint32_t)http_error, &s->stats.outgoing)); - grpc_chttp2_initiate_write(exec_ctx, t, "rst_stream"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM); } } if (due_to_error != GRPC_ERROR_NONE && !s->seen_error) { @@ -2289,7 +2396,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, &s->stats.outgoing)); grpc_chttp2_mark_stream_closed(exec_ctx, t, s, 1, 1, error); - grpc_chttp2_initiate_write(exec_ctx, t, "close_from_api"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API); } typedef struct { @@ -2324,19 +2432,20 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: break; case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: - grpc_chttp2_become_writable(exec_ctx, t, s, true, - "immediate stream flowctl"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write( + exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL); break; case GRPC_CHTTP2_FLOWCTL_QUEUE_UPDATE: - grpc_chttp2_become_writable(exec_ctx, t, s, false, - "queue stream flowctl"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); break; } switch (action.send_transport_update) { case GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED: break; case GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY: - grpc_chttp2_initiate_write(exec_ctx, t, "immediate transport flowctl"); + grpc_chttp2_initiate_write( + exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL); break; // this is the same as no action b/c every time the transport enters the // writing path it will maybe do an update @@ -2354,7 +2463,8 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, (uint32_t)action.max_frame_size); } if (action.send_setting_update == GRPC_CHTTP2_FLOWCTL_UPDATE_IMMEDIATELY) { - grpc_chttp2_initiate_write(exec_ctx, t, "immediate setting update"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS); } } if (action.need_ping) { @@ -2362,7 +2472,8 @@ void grpc_chttp2_act_on_flowctl_action(grpc_exec_ctx *exec_ctx, grpc_bdp_estimator_schedule_ping(&t->flow_control.bdp_estimator); send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE, - &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked); + &t->start_bdp_ping_locked, &t->finish_bdp_ping_locked, + GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING); } } @@ -2441,7 +2552,10 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp, if (t->flow_control.initial_window_update > 0) { grpc_chttp2_stream *s; while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) { - grpc_chttp2_become_writable(exec_ctx, t, s, true, "unstalled"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write( + exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING); } } t->flow_control.initial_window_update = 0; @@ -2538,6 +2652,36 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args *args, &args->args[i], (grpc_integer_options){g_default_keepalive_permit_without_calls, 0, 1}); + } else if (0 == + strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) { + g_default_max_ping_strikes = grpc_channel_arg_get_integer( + &args->args[i], + (grpc_integer_options){g_default_max_ping_strikes, 0, INT_MAX}); + } else if (0 == strcmp(args->args[i].key, + GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) { + g_default_max_pings_without_data = grpc_channel_arg_get_integer( + &args->args[i], (grpc_integer_options){ + g_default_max_pings_without_data, 0, INT_MAX}); + } else if (0 == + strcmp( + args->args[i].key, + GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) { + g_default_min_sent_ping_interval_without_data_ms = + grpc_channel_arg_get_integer( + &args->args[i], + (grpc_integer_options){ + g_default_min_sent_ping_interval_without_data_ms, 0, + INT_MAX}); + } else if (0 == + strcmp( + args->args[i].key, + GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) { + g_default_min_recv_ping_interval_without_data_ms = + grpc_channel_arg_get_integer( + &args->args[i], + (grpc_integer_options){ + g_default_min_recv_ping_interval_without_data_ms, 0, + INT_MAX}); } } } @@ -2556,7 +2700,8 @@ static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, &t->start_keepalive_ping_locked, - &t->finish_keepalive_ping_locked); + &t->finish_keepalive_ping_locked, + GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING); } else { GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); grpc_timer_init( @@ -2671,7 +2816,7 @@ static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_UNREF(s->byte_stream_error); s->byte_stream_error = GRPC_ERROR_NONE; grpc_chttp2_cancel_stream(exec_ctx, s->t, s, GRPC_ERROR_REF(error)); - s->byte_stream_error = error; + s->byte_stream_error = GRPC_ERROR_REF(error); } } @@ -2769,24 +2914,23 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, grpc_error *error; if (s->unprocessed_incoming_frames_buffer.length > 0) { - if (s->stream_compression_recv_enabled && - !s->unprocessed_incoming_frames_decompressed) { + if (!s->unprocessed_incoming_frames_decompressed) { bool end_of_context; if (!s->stream_decompression_ctx) { s->stream_decompression_ctx = grpc_stream_compression_context_create( - GRPC_STREAM_COMPRESSION_DECOMPRESS); + s->stream_decompression_method); } if (!grpc_stream_decompress(s->stream_decompression_ctx, &s->unprocessed_incoming_frames_buffer, - s->decompressed_data_buffer, NULL, MAX_SIZE_T, - &end_of_context)) { + &s->decompressed_data_buffer, NULL, + MAX_SIZE_T, &end_of_context)) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error."); return error; } GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer, - s->decompressed_data_buffer); + &s->decompressed_data_buffer); s->unprocessed_incoming_frames_decompressed = true; if (end_of_context) { grpc_stream_compression_context_destroy(s->stream_decompression_ctx); @@ -2912,7 +3056,8 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, uint32_t frame_size, uint32_t flags) { grpc_chttp2_incoming_byte_stream *incoming_byte_stream = - gpr_malloc(sizeof(*incoming_byte_stream)); + (grpc_chttp2_incoming_byte_stream *)gpr_malloc( + sizeof(*incoming_byte_stream)); incoming_byte_stream->base.length = frame_size; incoming_byte_stream->remaining_bytes = frame_size; incoming_byte_stream->base.flags = flags; @@ -3016,6 +3161,56 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, /******************************************************************************* * MONITORING */ + +const char *grpc_chttp2_initiate_write_reason_string( + grpc_chttp2_initiate_write_reason reason) { + switch (reason) { + case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE: + return "INITIAL_WRITE"; + case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM: + return "START_NEW_STREAM"; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE: + return "SEND_MESSAGE"; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA: + return "SEND_INITIAL_METADATA"; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA: + return "SEND_TRAILING_METADATA"; + case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING: + return "RETRY_SEND_PING"; + case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS: + return "CONTINUE_PINGS"; + case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT: + return "GOAWAY_SENT"; + case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM: + return "RST_STREAM"; + case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API: + return "CLOSE_FROM_API"; + case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL: + return "STREAM_FLOW_CONTROL"; + case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL: + return "TRANSPORT_FLOW_CONTROL"; + case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS: + return "SEND_SETTINGS"; + case GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING: + return "BDP_ESTIMATOR_PING"; + case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING: + return "FLOW_CONTROL_UNSTALLED_BY_SETTING"; + case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE: + return "FLOW_CONTROL_UNSTALLED_BY_UPDATE"; + case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING: + return "APPLICATION_PING"; + case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING: + return "KEEPALIVE_PING"; + case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED: + return "TRANSPORT_FLOW_CONTROL_UNSTALLED"; + case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE: + return "PING_RESPONSE"; + case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM: + return "FORCE_RST_STREAM"; + } + GPR_UNREACHABLE_CODE(return "unknown"); +} + static grpc_endpoint *chttp2_get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *t) { return ((grpc_chttp2_transport *)t)->ep; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.h b/src/core/ext/transport/chttp2/transport/chttp2_transport.h index 0c4e2a91c0..55fb1a8343 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.h +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.h @@ -25,6 +25,7 @@ extern grpc_tracer_flag grpc_http_trace; extern grpc_tracer_flag grpc_flowctl_trace; +extern grpc_tracer_flag grpc_trace_http2_stream_state; #ifndef NDEBUG extern grpc_tracer_flag grpc_trace_chttp2_refcount; diff --git a/src/core/ext/transport/chttp2/transport/flow_control.c b/src/core/ext/transport/chttp2/transport/flow_control.c index cec99f6fb6..569a6349d3 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.c +++ b/src/core/ext/transport/chttp2/transport/flow_control.c @@ -18,6 +18,7 @@ #include "src/core/ext/transport/chttp2/transport/internal.h" +#include <limits.h> #include <math.h> #include <string.h> @@ -59,24 +60,24 @@ static void pretrace(shadow_flow_control* shadow_fc, #define TRACE_PADDING 30 -static char* fmt_int64_diff_str(int64_t old, int64_t new) { +static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) { char* str; - if (old != new) { - gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old, new); + if (old_val != new_val) { + gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old_val, new_val); } else { - gpr_asprintf(&str, "%" PRId64 "", old); + gpr_asprintf(&str, "%" PRId64 "", old_val); } char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING); gpr_free(str); return str_lp; } -static char* fmt_uint32_diff_str(uint32_t old, uint32_t new) { +static char* fmt_uint32_diff_str(uint32_t old_val, uint32_t new_val) { char* str; - if (new > 0 && old != new) { - gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old, new); + if (new_val > 0 && old_val != new_val) { + gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old_val, new_val); } else { - gpr_asprintf(&str, "%" PRIu32 "", old); + gpr_asprintf(&str, "%" PRIu32 "", old_val); } char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING); gpr_free(str); @@ -483,7 +484,8 @@ grpc_chttp2_flowctl_action grpc_chttp2_flowctl_get_bdp_action( if (grpc_bdp_estimator_get_bw(&tfc->bdp_estimator, &bw_dbl)) { // we target the max of BDP or bandwidth in microseconds. int32_t frame_size = (int32_t)GPR_CLAMP( - GPR_MAX((int32_t)bw_dbl / 1000, bdp), 16384, 16777215); + GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000, bdp), 16384, + 16777215); grpc_chttp2_flowctl_urgency frame_size_urgency = delta_is_significant( tfc, frame_size, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE); if (frame_size_urgency != GRPC_CHTTP2_FLOWCTL_NO_ACTION_NEEDED) { diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c index 222d2177b2..73aaab1802 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.c +++ b/src/core/ext/transport/chttp2/transport/frame_data.c @@ -210,7 +210,7 @@ grpc_error *grpc_deframe_unprocessed_incoming_frames( if (cur != end) { grpc_slice_buffer_undo_take_first( - &s->unprocessed_incoming_frames_buffer, + slices, grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); } grpc_slice_unref_internal(exec_ctx, slice); @@ -277,7 +277,7 @@ grpc_error *grpc_deframe_unprocessed_incoming_frames( p->state = GRPC_CHTTP2_DATA_FH_0; cur += p->frame_size; grpc_slice_buffer_undo_take_first( - &s->unprocessed_incoming_frames_buffer, + slices, grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); grpc_slice_unref_internal(exec_ctx, slice); return GRPC_ERROR_NONE; diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c index 582fd7bfaa..d431d6b2df 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.c +++ b/src/core/ext/transport/chttp2/transport/frame_ping.c @@ -92,7 +92,7 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec next_allowed_ping = gpr_time_add(t->ping_recv_state.last_ping_recv_time, - t->ping_policy.min_ping_interval_without_data); + t->ping_policy.min_recv_ping_interval_without_data); if (t->keepalive_permit_without_calls == 0 && grpc_chttp2_stream_map_size(&t->stream_map) == 0) { @@ -117,7 +117,8 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, t->ping_acks, t->ping_ack_capacity * sizeof(*t->ping_acks)); } t->ping_acks[t->ping_ack_count++] = p->opaque_8bytes; - grpc_chttp2_initiate_write(exec_ctx, t, "ping response"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE); } } } diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.c b/src/core/ext/transport/chttp2/transport/frame_settings.c index 806100adaa..2995bf7310 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.c +++ b/src/core/ext/transport/chttp2/transport/frame_settings.c @@ -44,7 +44,8 @@ static uint8_t *fill_header(uint8_t *out, uint32_t length, uint8_t flags) { return out; } -grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new, +grpc_slice grpc_chttp2_settings_create(uint32_t *old_settings, + const uint32_t *new_settings, uint32_t force_mask, size_t count) { size_t i; uint32_t n = 0; @@ -52,21 +53,21 @@ grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new, uint8_t *p; for (i = 0; i < count; i++) { - n += (new[i] != old[i] || (force_mask & (1u << i)) != 0); + n += (new_settings[i] != old_settings[i] || (force_mask & (1u << i)) != 0); } output = GRPC_SLICE_MALLOC(9 + 6 * n); p = fill_header(GRPC_SLICE_START_PTR(output), 6 * n, 0); for (i = 0; i < count; i++) { - if (new[i] != old[i] || (force_mask & (1u << i)) != 0) { + if (new_settings[i] != old_settings[i] || (force_mask & (1u << i)) != 0) { *p++ = (uint8_t)(grpc_setting_id_to_wire_id[i] >> 8); *p++ = (uint8_t)(grpc_setting_id_to_wire_id[i]); - *p++ = (uint8_t)(new[i] >> 24); - *p++ = (uint8_t)(new[i] >> 16); - *p++ = (uint8_t)(new[i] >> 8); - *p++ = (uint8_t)(new[i]); - old[i] = new[i]; + *p++ = (uint8_t)(new_settings[i] >> 24); + *p++ = (uint8_t)(new_settings[i] >> 16); + *p++ = (uint8_t)(new_settings[i] >> 8); + *p++ = (uint8_t)(new_settings[i]); + old_settings[i] = new_settings[i]; } } diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c index c94f7725bf..c9ab8d1b50 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.c +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c @@ -99,8 +99,10 @@ grpc_error *grpc_chttp2_window_update_parser_parse( grpc_chttp2_flowctl_recv_stream_update( &t->flow_control, &s->flow_control, received_update); if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) { - grpc_chttp2_become_writable(exec_ctx, t, s, true, - "stream.read_flow_control"); + grpc_chttp2_mark_stream_writable(exec_ctx, t, s); + grpc_chttp2_initiate_write( + exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE); } } } else { @@ -109,7 +111,9 @@ grpc_error *grpc_chttp2_window_update_parser_parse( received_update); bool is_zero = t->flow_control.remote_window <= 0; if (was_zero && !is_zero) { - grpc_chttp2_initiate_write(exec_ctx, t, "new_global_flow_control"); + grpc_chttp2_initiate_write( + exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED); } } } diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c index 3cd1a7ee5c..a404b664e3 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c @@ -33,6 +33,7 @@ #include "src/core/ext/transport/chttp2/transport/bin_encoder.h" #include "src/core/ext/transport/chttp2/transport/hpack_table.h" #include "src/core/ext/transport/chttp2/transport/varint.h" +#include "src/core/lib/debug/stats.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/transport/metadata.h" @@ -51,8 +52,10 @@ #define MAX_DECODER_SPACE_USAGE 512 static grpc_slice_refcount terminal_slice_refcount = {NULL, NULL}; -static const grpc_slice terminal_slice = {&terminal_slice_refcount, - .data.refcounted = {0, 0}}; +static const grpc_slice terminal_slice = { + &terminal_slice_refcount, /* refcount */ + {{0, 0}} /* data.refcounted */ +}; extern grpc_tracer_flag grpc_http_trace; @@ -269,8 +272,10 @@ static void add_elem(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, } } -static void emit_indexed(grpc_chttp2_hpack_compressor *c, uint32_t elem_index, +static void emit_indexed(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hpack_compressor *c, uint32_t elem_index, framer_state *st) { + GRPC_STATS_INC_HPACK_SEND_INDEXED(exec_ctx); uint32_t len = GRPC_CHTTP2_VARINT_LENGTH(elem_index, 1); GRPC_CHTTP2_WRITE_VARINT(elem_index, 1, 0x80, add_tiny_header_data(st, len), len); @@ -282,30 +287,31 @@ typedef struct { bool insert_null_before_wire_value; } wire_value; -static wire_value get_wire_value(grpc_mdelem elem, bool true_binary_enabled) { +static wire_value get_wire_value(grpc_exec_ctx *exec_ctx, grpc_mdelem elem, + bool true_binary_enabled) { + wire_value wire_val; if (grpc_is_binary_header(GRPC_MDKEY(elem))) { if (true_binary_enabled) { - return (wire_value){ - .huffman_prefix = 0x00, - .insert_null_before_wire_value = true, - .data = grpc_slice_ref_internal(GRPC_MDVALUE(elem)), - }; + GRPC_STATS_INC_HPACK_SEND_BINARY(exec_ctx); + wire_val.huffman_prefix = 0x00; + wire_val.insert_null_before_wire_value = true; + wire_val.data = grpc_slice_ref_internal(GRPC_MDVALUE(elem)); + } else { - return (wire_value){ - .huffman_prefix = 0x80, - .insert_null_before_wire_value = false, - .data = grpc_chttp2_base64_encode_and_huffman_compress( - GRPC_MDVALUE(elem)), - }; + GRPC_STATS_INC_HPACK_SEND_BINARY_BASE64(exec_ctx); + wire_val.huffman_prefix = 0x80; + wire_val.insert_null_before_wire_value = false; + wire_val.data = + grpc_chttp2_base64_encode_and_huffman_compress(GRPC_MDVALUE(elem)); } } else { /* TODO(ctiller): opportunistically compress non-binary headers */ - return (wire_value){ - .huffman_prefix = 0x00, - .insert_null_before_wire_value = false, - .data = grpc_slice_ref_internal(GRPC_MDVALUE(elem)), - }; + GRPC_STATS_INC_HPACK_SEND_UNCOMPRESSED(exec_ctx); + wire_val.huffman_prefix = 0x00; + wire_val.insert_null_before_wire_value = false; + wire_val.data = grpc_slice_ref_internal(GRPC_MDVALUE(elem)); } + return wire_val; } static size_t wire_value_length(wire_value v) { @@ -317,11 +323,14 @@ static void add_wire_value(framer_state *st, wire_value v) { add_header_data(st, v.data); } -static void emit_lithdr_incidx(grpc_chttp2_hpack_compressor *c, +static void emit_lithdr_incidx(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hpack_compressor *c, uint32_t key_index, grpc_mdelem elem, framer_state *st) { + GRPC_STATS_INC_HPACK_SEND_LITHDR_INCIDX(exec_ctx); uint32_t len_pfx = GRPC_CHTTP2_VARINT_LENGTH(key_index, 2); - wire_value value = get_wire_value(elem, st->use_true_binary_metadata); + wire_value value = + get_wire_value(exec_ctx, elem, st->use_true_binary_metadata); size_t len_val = wire_value_length(value); uint32_t len_val_len; GPR_ASSERT(len_val <= UINT32_MAX); @@ -333,11 +342,14 @@ static void emit_lithdr_incidx(grpc_chttp2_hpack_compressor *c, add_wire_value(st, value); } -static void emit_lithdr_noidx(grpc_chttp2_hpack_compressor *c, +static void emit_lithdr_noidx(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hpack_compressor *c, uint32_t key_index, grpc_mdelem elem, framer_state *st) { + GRPC_STATS_INC_HPACK_SEND_LITHDR_NOTIDX(exec_ctx); uint32_t len_pfx = GRPC_CHTTP2_VARINT_LENGTH(key_index, 4); - wire_value value = get_wire_value(elem, st->use_true_binary_metadata); + wire_value value = + get_wire_value(exec_ctx, elem, st->use_true_binary_metadata); size_t len_val = wire_value_length(value); uint32_t len_val_len; GPR_ASSERT(len_val <= UINT32_MAX); @@ -349,10 +361,14 @@ static void emit_lithdr_noidx(grpc_chttp2_hpack_compressor *c, add_wire_value(st, value); } -static void emit_lithdr_incidx_v(grpc_chttp2_hpack_compressor *c, +static void emit_lithdr_incidx_v(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hpack_compressor *c, grpc_mdelem elem, framer_state *st) { + GRPC_STATS_INC_HPACK_SEND_LITHDR_INCIDX_V(exec_ctx); + GRPC_STATS_INC_HPACK_SEND_UNCOMPRESSED(exec_ctx); uint32_t len_key = (uint32_t)GRPC_SLICE_LENGTH(GRPC_MDKEY(elem)); - wire_value value = get_wire_value(elem, st->use_true_binary_metadata); + wire_value value = + get_wire_value(exec_ctx, elem, st->use_true_binary_metadata); uint32_t len_val = (uint32_t)wire_value_length(value); uint32_t len_key_len = GRPC_CHTTP2_VARINT_LENGTH(len_key, 1); uint32_t len_val_len = GRPC_CHTTP2_VARINT_LENGTH(len_val, 1); @@ -367,10 +383,14 @@ static void emit_lithdr_incidx_v(grpc_chttp2_hpack_compressor *c, add_wire_value(st, value); } -static void emit_lithdr_noidx_v(grpc_chttp2_hpack_compressor *c, +static void emit_lithdr_noidx_v(grpc_exec_ctx *exec_ctx, + grpc_chttp2_hpack_compressor *c, grpc_mdelem elem, framer_state *st) { + GRPC_STATS_INC_HPACK_SEND_LITHDR_NOTIDX_V(exec_ctx); + GRPC_STATS_INC_HPACK_SEND_UNCOMPRESSED(exec_ctx); uint32_t len_key = (uint32_t)GRPC_SLICE_LENGTH(GRPC_MDKEY(elem)); - wire_value value = get_wire_value(elem, st->use_true_binary_metadata); + wire_value value = + get_wire_value(exec_ctx, elem, st->use_true_binary_metadata); uint32_t len_val = (uint32_t)wire_value_length(value); uint32_t len_key_len = GRPC_CHTTP2_VARINT_LENGTH(len_key, 1); uint32_t len_val_len = GRPC_CHTTP2_VARINT_LENGTH(len_val, 1); @@ -423,7 +443,7 @@ static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, gpr_free(v); } if (!GRPC_MDELEM_IS_INTERNED(elem)) { - emit_lithdr_noidx_v(c, elem, st); + emit_lithdr_noidx_v(exec_ctx, c, elem, st); return; } @@ -445,16 +465,16 @@ static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_2(elem_hash)], elem) && c->indices_elems[HASH_FRAGMENT_2(elem_hash)] > c->tail_remote_index) { /* HIT: complete element (first cuckoo hash) */ - emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_2(elem_hash)]), - st); + emit_indexed(exec_ctx, c, + dynidx(c, c->indices_elems[HASH_FRAGMENT_2(elem_hash)]), st); return; } if (grpc_mdelem_eq(c->entries_elems[HASH_FRAGMENT_3(elem_hash)], elem) && c->indices_elems[HASH_FRAGMENT_3(elem_hash)] > c->tail_remote_index) { /* HIT: complete element (second cuckoo hash) */ - emit_indexed(c, dynidx(c, c->indices_elems[HASH_FRAGMENT_3(elem_hash)]), - st); + emit_indexed(exec_ctx, c, + dynidx(c, c->indices_elems[HASH_FRAGMENT_3(elem_hash)]), st); return; } @@ -472,11 +492,11 @@ static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, indices_key > c->tail_remote_index) { /* HIT: key (first cuckoo hash) */ if (should_add_elem) { - emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st); + emit_lithdr_incidx(exec_ctx, c, dynidx(c, indices_key), elem, st); add_elem(exec_ctx, c, elem); return; } else { - emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st); + emit_lithdr_noidx(exec_ctx, c, dynidx(c, indices_key), elem, st); return; } GPR_UNREACHABLE_CODE(return ); @@ -488,11 +508,11 @@ static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, indices_key > c->tail_remote_index) { /* HIT: key (first cuckoo hash) */ if (should_add_elem) { - emit_lithdr_incidx(c, dynidx(c, indices_key), elem, st); + emit_lithdr_incidx(exec_ctx, c, dynidx(c, indices_key), elem, st); add_elem(exec_ctx, c, elem); return; } else { - emit_lithdr_noidx(c, dynidx(c, indices_key), elem, st); + emit_lithdr_noidx(exec_ctx, c, dynidx(c, indices_key), elem, st); return; } GPR_UNREACHABLE_CODE(return ); @@ -501,11 +521,11 @@ static void hpack_enc(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_compressor *c, /* no elem, key in the table... fall back to literal emission */ if (should_add_elem) { - emit_lithdr_incidx_v(c, elem, st); + emit_lithdr_incidx_v(exec_ctx, c, elem, st); add_elem(exec_ctx, c, elem); return; } else { - emit_lithdr_noidx_v(c, elem, st); + emit_lithdr_noidx_v(exec_ctx, c, elem, st); return; } GPR_UNREACHABLE_CODE(return ); diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 82ff2c8e2c..3d1df19bc3 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -30,6 +30,7 @@ #include <grpc/support/useful.h> #include "src/core/ext/transport/chttp2/transport/bin_encoder.h" +#include "src/core/lib/debug/stats.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/string.h" @@ -777,8 +778,7 @@ static grpc_error *parse_stream_dep0(grpc_exec_ctx *exec_ctx, return parse_stream_dep1(exec_ctx, p, cur + 1, end); } -/* emit an indexed field; for now just logs it to console; jumps to - begin the next field on completion */ +/* emit an indexed field; jumps to begin the next field on completion */ static grpc_error *finish_indexed_field(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p, const uint8_t *cur, @@ -792,6 +792,7 @@ static grpc_error *finish_indexed_field(grpc_exec_ctx *exec_ctx, GRPC_ERROR_INT_SIZE, (intptr_t)p->table.num_ents); } GRPC_MDELEM_REF(md); + GRPC_STATS_INC_HPACK_RECV_INDEXED(exec_ctx); grpc_error *err = on_hdr(exec_ctx, p, md, 0); if (err != GRPC_ERROR_NONE) return err; return parse_begin(exec_ctx, p, cur, end); @@ -820,14 +821,14 @@ static grpc_error *parse_indexed_field_x(grpc_exec_ctx *exec_ctx, return parse_value0(exec_ctx, p, cur + 1, end); } -/* finish a literal header with incremental indexing: just log, and jump to ' - begin */ +/* finish a literal header with incremental indexing */ static grpc_error *finish_lithdr_incidx(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p, const uint8_t *cur, const uint8_t *end) { grpc_mdelem md = grpc_chttp2_hptbl_lookup(&p->table, p->index); GPR_ASSERT(!GRPC_MDISNULL(md)); /* handled in string parsing */ + GRPC_STATS_INC_HPACK_RECV_LITHDR_INCIDX(exec_ctx); grpc_error *err = on_hdr( exec_ctx, p, grpc_mdelem_from_slices(exec_ctx, grpc_slice_ref_internal(GRPC_MDKEY(md)), @@ -842,6 +843,7 @@ static grpc_error *finish_lithdr_incidx_v(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p, const uint8_t *cur, const uint8_t *end) { + GRPC_STATS_INC_HPACK_RECV_LITHDR_INCIDX_V(exec_ctx); grpc_error *err = on_hdr( exec_ctx, p, grpc_mdelem_from_slices(exec_ctx, take_string(exec_ctx, p, &p->key, true), @@ -898,6 +900,7 @@ static grpc_error *finish_lithdr_notidx(grpc_exec_ctx *exec_ctx, const uint8_t *end) { grpc_mdelem md = grpc_chttp2_hptbl_lookup(&p->table, p->index); GPR_ASSERT(!GRPC_MDISNULL(md)); /* handled in string parsing */ + GRPC_STATS_INC_HPACK_RECV_LITHDR_NOTIDX(exec_ctx); grpc_error *err = on_hdr( exec_ctx, p, grpc_mdelem_from_slices(exec_ctx, grpc_slice_ref_internal(GRPC_MDKEY(md)), @@ -912,6 +915,7 @@ static grpc_error *finish_lithdr_notidx_v(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p, const uint8_t *cur, const uint8_t *end) { + GRPC_STATS_INC_HPACK_RECV_LITHDR_NOTIDX_V(exec_ctx); grpc_error *err = on_hdr( exec_ctx, p, grpc_mdelem_from_slices(exec_ctx, take_string(exec_ctx, p, &p->key, true), @@ -968,6 +972,7 @@ static grpc_error *finish_lithdr_nvridx(grpc_exec_ctx *exec_ctx, const uint8_t *end) { grpc_mdelem md = grpc_chttp2_hptbl_lookup(&p->table, p->index); GPR_ASSERT(!GRPC_MDISNULL(md)); /* handled in string parsing */ + GRPC_STATS_INC_HPACK_RECV_LITHDR_NVRIDX(exec_ctx); grpc_error *err = on_hdr( exec_ctx, p, grpc_mdelem_from_slices(exec_ctx, grpc_slice_ref_internal(GRPC_MDKEY(md)), @@ -982,6 +987,7 @@ static grpc_error *finish_lithdr_nvridx_v(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p, const uint8_t *cur, const uint8_t *end) { + GRPC_STATS_INC_HPACK_RECV_LITHDR_NVRIDX_V(exec_ctx); grpc_error *err = on_hdr( exec_ctx, p, grpc_mdelem_from_slices(exec_ctx, take_string(exec_ctx, p, &p->key, true), @@ -1310,9 +1316,11 @@ static grpc_error *append_string(grpc_exec_ctx *exec_ctx, /* 'true-binary' case */ ++cur; p->binary = NOT_BINARY; + GRPC_STATS_INC_HPACK_RECV_BINARY(exec_ctx); append_bytes(str, cur, (size_t)(end - cur)); return GRPC_ERROR_NONE; } + GRPC_STATS_INC_HPACK_RECV_BINARY_BASE64(exec_ctx); /* fallthrough */ b64_byte0: case B64_BYTE0: @@ -1510,6 +1518,7 @@ static grpc_error *begin_parse_string(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser_string *str) { if (!p->huff && binary == NOT_BINARY && (end - cur) >= (intptr_t)p->strlen && p->current_slice_refcount != NULL) { + GRPC_STATS_INC_HPACK_RECV_UNCOMPRESSED(exec_ctx); str->copied = false; str->data.referenced.refcount = p->current_slice_refcount; str->data.referenced.data.refcounted.bytes = (uint8_t *)cur; @@ -1523,6 +1532,20 @@ static grpc_error *begin_parse_string(grpc_exec_ctx *exec_ctx, p->parsing.str = str; p->huff_state = 0; p->binary = binary; + switch (p->binary) { + case NOT_BINARY: + if (p->huff) { + GRPC_STATS_INC_HPACK_RECV_HUFFMAN(exec_ctx); + } else { + GRPC_STATS_INC_HPACK_RECV_UNCOMPRESSED(exec_ctx); + } + break; + case BINARY_BEGIN: + /* stats incremented later: don't know true binary or not */ + break; + default: + abort(); + } return parse_string(exec_ctx, p, cur, end); } @@ -1649,7 +1672,8 @@ static void force_client_rst_stream(grpc_exec_ctx *exec_ctx, void *sp, grpc_slice_buffer_add( &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing)); - grpc_chttp2_initiate_write(exec_ctx, t, "force_rst_stream"); + grpc_chttp2_initiate_write(exec_ctx, t, + GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM); grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, true, GRPC_ERROR_NONE); } GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "final_rst"); @@ -1659,17 +1683,12 @@ static void parse_stream_compression_md(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_metadata_batch *initial_metadata) { - if (initial_metadata->idx.named.content_encoding != NULL) { - grpc_slice content_encoding = - GRPC_MDVALUE(initial_metadata->idx.named.content_encoding->md); - if (!grpc_slice_eq(content_encoding, GRPC_MDSTR_IDENTITY)) { - if (grpc_slice_eq(content_encoding, GRPC_MDSTR_GZIP)) { - s->stream_compression_recv_enabled = true; - s->decompressed_data_buffer = - (grpc_slice_buffer *)gpr_malloc(sizeof(grpc_slice_buffer)); - grpc_slice_buffer_init(s->decompressed_data_buffer); - } - } + if (initial_metadata->idx.named.content_encoding == NULL || + grpc_stream_compression_method_parse( + GRPC_MDVALUE(initial_metadata->idx.named.content_encoding->md), false, + &s->stream_decompression_method) == 0) { + s->stream_decompression_method = + GRPC_STREAM_COMPRESSION_IDENTITY_DECOMPRESS; } } diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c index cf0a9ca920..ba680a89db 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c @@ -42,8 +42,9 @@ grpc_error *grpc_chttp2_incoming_metadata_buffer_add( grpc_mdelem elem) { buffer->size += GRPC_MDELEM_LENGTH(elem); return grpc_metadata_batch_add_tail( - exec_ctx, &buffer->batch, - gpr_arena_alloc(buffer->arena, sizeof(grpc_linked_mdelem)), elem); + exec_ctx, &buffer->batch, (grpc_linked_mdelem *)gpr_arena_alloc( + buffer->arena, sizeof(grpc_linked_mdelem)), + elem); } grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 0fbedd1e56..49022155aa 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -79,16 +79,43 @@ typedef enum { GRPC_CHTTP2_PCL_COUNT /* must be last */ } grpc_chttp2_ping_closure_list; +typedef enum { + GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE, + GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM, + GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE, + GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA, + GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA, + GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING, + GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS, + GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT, + GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM, + GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API, + GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, + GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, + GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, + GRPC_CHTTP2_INITIATE_WRITE_BDP_ESTIMATOR_PING, + GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING, + GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE, + GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING, + GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING, + GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED, + GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE, + GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM, +} grpc_chttp2_initiate_write_reason; + +const char *grpc_chttp2_initiate_write_reason_string( + grpc_chttp2_initiate_write_reason reason); + typedef struct { grpc_closure_list lists[GRPC_CHTTP2_PCL_COUNT]; uint64_t inflight_id; } grpc_chttp2_ping_queue; typedef struct { - gpr_timespec min_time_between_pings; int max_pings_without_data; int max_ping_strikes; - gpr_timespec min_ping_interval_without_data; + gpr_timespec min_sent_ping_interval_without_data; + gpr_timespec min_recv_ping_interval_without_data; } grpc_chttp2_repeated_ping_policy; typedef struct { @@ -565,25 +592,27 @@ struct grpc_chttp2_stream { grpc_chttp2_write_cb *finish_after_write; size_t sending_bytes; - /** Whether stream compression send is enabled */ - bool stream_compression_recv_enabled; - /** Whether stream compression recv is enabled */ - bool stream_compression_send_enabled; - /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed - */ - bool unprocessed_incoming_frames_decompressed; + /* Stream compression method to be used. */ + grpc_stream_compression_method stream_compression_method; + /* Stream decompression method to be used. */ + grpc_stream_compression_method stream_decompression_method; /** Stream compression decompress context */ grpc_stream_compression_context *stream_decompression_ctx; /** Stream compression compress context */ grpc_stream_compression_context *stream_compression_ctx; /** Buffer storing data that is compressed but not sent */ - grpc_slice_buffer *compressed_data_buffer; + grpc_slice_buffer compressed_data_buffer; /** Amount of uncompressed bytes sent out when compressed_data_buffer is * emptied */ size_t uncompressed_data_size; /** Temporary buffer storing decompressed data */ - grpc_slice_buffer *decompressed_data_buffer; + grpc_slice_buffer decompressed_data_buffer; + /** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed + */ + bool unprocessed_incoming_frames_decompressed; + /** gRPC header bytes that are already decompressed */ + size_t decompressed_header_bytes; }; /** Transport writing call flow: @@ -599,7 +628,8 @@ struct grpc_chttp2_stream { The actual call chain is documented in the implementation of this function. */ void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, const char *reason); + grpc_chttp2_transport *t, + grpc_chttp2_initiate_write_reason reason); typedef struct { /** are we writing? */ @@ -851,10 +881,9 @@ void grpc_chttp2_add_ping_strike(grpc_exec_ctx *exec_ctx, /** add a ref to the stream and add it to the writable list; ref will be dropped in writing.c */ -void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s, - bool also_initiate_write, const char *reason); +void grpc_chttp2_mark_stream_writable(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s); void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 6c12c91365..3db1ad4123 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -383,6 +383,9 @@ error_handler: /* t->parser = grpc_chttp2_data_parser_parse;*/ t->parser = grpc_chttp2_data_parser_parse; t->parser_data = &s->data_parser; + t->ping_state.pings_before_data_required = + t->ping_policy.max_pings_without_data; + t->ping_state.last_ping_sent_time = gpr_inf_past(GPR_CLOCK_MONOTONIC); return GRPC_ERROR_NONE; } else if (grpc_error_get_int(err, GRPC_ERROR_INT_STREAM_ID, NULL)) { /* handle stream errors by closing the stream */ @@ -559,6 +562,10 @@ static grpc_error *init_header_frame_parser(grpc_exec_ctx *exec_ctx, (t->incoming_frame_flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) != 0; } + t->ping_state.pings_before_data_required = + t->ping_policy.max_pings_without_data; + t->ping_state.last_ping_sent_time = gpr_inf_past(GPR_CLOCK_MONOTONIC); + /* could be a new grpc_chttp2_stream or an existing grpc_chttp2_stream */ s = grpc_chttp2_parsing_lookup_stream(t, t->incoming_stream_id); if (s == NULL) { diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c index 7cc85dea9c..47cd22d177 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.c +++ b/src/core/ext/transport/chttp2/transport/stream_lists.c @@ -20,6 +20,27 @@ #include <grpc/support/log.h> +static char *stream_list_id_string(grpc_chttp2_stream_list_id id) { + switch (id) { + case GRPC_CHTTP2_LIST_WRITABLE: + return "writable"; + case GRPC_CHTTP2_LIST_WRITING: + return "writing"; + case GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT: + return "stalled_by_transport"; + case GRPC_CHTTP2_LIST_STALLED_BY_STREAM: + return "stalled_by_stream"; + case GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY: + return "waiting_for_concurrency"; + case STREAM_LIST_COUNT: + GPR_UNREACHABLE_CODE(return "unknown"); + } + GPR_UNREACHABLE_CODE(return "unknown"); +} + +grpc_tracer_flag grpc_trace_http2_stream_state = + GRPC_TRACER_INITIALIZER(false, "http2_stream_state"); + /* core list management */ static bool stream_list_empty(grpc_chttp2_transport *t, @@ -44,6 +65,10 @@ static bool stream_list_pop(grpc_chttp2_transport *t, s->included[id] = 0; } *stream = s; + if (s && GRPC_TRACER_ON(grpc_trace_http2_stream_state)) { + gpr_log(GPR_DEBUG, "%p[%d][%s]: pop from %s", t, s->id, + t->is_client ? "cli" : "svr", stream_list_id_string(id)); + } return s != 0; } @@ -62,6 +87,10 @@ static void stream_list_remove(grpc_chttp2_transport *t, grpc_chttp2_stream *s, } else { t->lists[id].tail = s->links[id].prev; } + if (GRPC_TRACER_ON(grpc_trace_http2_stream_state)) { + gpr_log(GPR_DEBUG, "%p[%d][%s]: remove from %s", t, s->id, + t->is_client ? "cli" : "svr", stream_list_id_string(id)); + } } static bool stream_list_maybe_remove(grpc_chttp2_transport *t, @@ -90,6 +119,10 @@ static void stream_list_add_tail(grpc_chttp2_transport *t, } t->lists[id].tail = s; s->included[id] = 1; + if (GRPC_TRACER_ON(grpc_trace_http2_stream_state)) { + gpr_log(GPR_DEBUG, "%p[%d][%s]: add to %s", t, s->id, + t->is_client ? "cli" : "svr", stream_list_id_string(id)); + } } static bool stream_list_add(grpc_chttp2_transport *t, grpc_chttp2_stream *s, @@ -150,17 +183,12 @@ void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport *t, void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - GRPC_FLOW_CONTROL_IF_TRACING( - gpr_log(GPR_DEBUG, "stream %u stalled by transport", s->id)); stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); } bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t, grpc_chttp2_stream **s) { - bool ret = stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); - GRPC_FLOW_CONTROL_IF_TRACING(if (ret) gpr_log( - GPR_DEBUG, "stream %u un-stalled by transport", (*s)->id)); - return ret; + return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); } void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t, @@ -170,23 +198,15 @@ void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t, void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - GRPC_FLOW_CONTROL_IF_TRACING( - gpr_log(GPR_DEBUG, "stream %u stalled by stream", s->id)); stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM); } bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport *t, grpc_chttp2_stream **s) { - bool ret = stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM); - GRPC_FLOW_CONTROL_IF_TRACING( - if (ret) gpr_log(GPR_DEBUG, "stream %u un-stalled by stream", (*s)->id)); - return ret; + return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM); } bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - bool ret = stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM); - GRPC_FLOW_CONTROL_IF_TRACING( - if (ret) gpr_log(GPR_DEBUG, "stream %u un-stalled by stream", s->id)); - return ret; + return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM); } diff --git a/src/core/ext/transport/chttp2/transport/stream_map.c b/src/core/ext/transport/chttp2/transport/stream_map.c index 650090d8f0..d6079a9a33 100644 --- a/src/core/ext/transport/chttp2/transport/stream_map.c +++ b/src/core/ext/transport/chttp2/transport/stream_map.c @@ -72,8 +72,10 @@ void grpc_chttp2_stream_map_add(grpc_chttp2_stream_map *map, uint32_t key, /* resize when less than 25% of the table is free, because compaction won't help much */ map->capacity = capacity = 3 * capacity / 2; - map->keys = keys = gpr_realloc(keys, capacity * sizeof(uint32_t)); - map->values = values = gpr_realloc(values, capacity * sizeof(void *)); + map->keys = keys = + (uint32_t *)gpr_realloc(keys, capacity * sizeof(uint32_t)); + map->values = values = + (void **)gpr_realloc(values, capacity * sizeof(void *)); } } diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index fa224a49a4..be1af16019 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -68,7 +68,7 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, } if (t->ping_state.pings_before_data_required == 0 && t->ping_policy.max_pings_without_data != 0) { - /* need to send something of substance before sending a ping again */ + /* need to receive something of substance before sending a ping again */ if (GRPC_TRACER_ON(grpc_http_trace) || GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { gpr_log(GPR_DEBUG, "Ping delayed [%p]: too many recent pings: %d/%d", @@ -78,11 +78,18 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, return; } gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec elapsed = gpr_time_sub(now, t->ping_state.last_ping_sent_time); - /*gpr_log(GPR_DEBUG, "elapsed:%d.%09d min:%d.%09d", (int)elapsed.tv_sec, - elapsed.tv_nsec, (int)t->ping_policy.min_time_between_pings.tv_sec, - (int)t->ping_policy.min_time_between_pings.tv_nsec);*/ - if (gpr_time_cmp(elapsed, t->ping_policy.min_time_between_pings) < 0) { + gpr_timespec next_allowed_ping = + gpr_time_add(t->ping_state.last_ping_sent_time, + t->ping_policy.min_sent_ping_interval_without_data); + if (t->keepalive_permit_without_calls == 0 && + grpc_chttp2_stream_map_size(&t->stream_map) == 0) { + next_allowed_ping = gpr_time_add(t->ping_recv_state.last_ping_recv_time, + gpr_time_from_seconds(7200, GPR_TIMESPAN)); + } + /* gpr_log(GPR_DEBUG, "next_allowed_ping:%d.%09d now:%d.%09d", + (int)next_allowed_ping.tv_sec, (int)next_allowed_ping.tv_nsec, + (int)now.tv_sec, (int)now.tv_nsec); */ + if (gpr_time_cmp(next_allowed_ping, now) > 0) { /* not enough elapsed time between successive pings */ if (GRPC_TRACER_ON(grpc_http_trace) || GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { @@ -93,9 +100,7 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, if (!t->ping_state.is_delayed_ping_timer_set) { t->ping_state.is_delayed_ping_timer_set = true; grpc_timer_init(exec_ctx, &t->ping_state.delayed_ping_timer, - gpr_time_add(t->ping_state.last_ping_sent_time, - t->ping_policy.min_time_between_pings), - &t->retry_initiate_ping_locked, + next_allowed_ping, &t->retry_initiate_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC)); } return; @@ -119,6 +124,12 @@ static void maybe_initiate_ping(grpc_exec_ctx *exec_ctx, grpc_chttp2_ping_create(false, pq->inflight_id)); GRPC_STATS_INC_HTTP2_PINGS_SENT(exec_ctx); t->ping_state.last_ping_sent_time = now; + if (GRPC_TRACER_ON(grpc_http_trace) || + GRPC_TRACER_ON(grpc_bdp_estimator_trace)) { + gpr_log(GPR_DEBUG, "Ping sent [%p]: %d/%d", t->peer_string, + t->ping_state.pings_before_data_required, + t->ping_policy.max_pings_without_data); + } t->ping_state.pings_before_data_required -= (t->ping_state.pings_before_data_required != 0); } @@ -201,9 +212,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( if (t->flow_control.remote_window > 0) { while (grpc_chttp2_list_pop_stalled_by_transport(t, &s)) { - if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s) && - stream_ref_if_not_destroyed(&s->refcount->refs)) { - grpc_chttp2_initiate_write(exec_ctx, t, "transport.read_flow_control"); + if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) { + stream_ref_if_not_destroyed(&s->refcount->refs); } } } @@ -258,8 +268,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( .stats = &s->stats.outgoing}; grpc_chttp2_encode_header(exec_ctx, &t->hpack_compressor, NULL, 0, s->send_initial_metadata, &hopt, &t->outbuf); - t->ping_state.pings_before_data_required = - t->ping_policy.max_pings_without_data; + now_writing = true; if (!t->is_client) { t->ping_recv_state.last_ping_recv_time = gpr_inf_past(GPR_CLOCK_MONOTONIC); @@ -298,8 +307,6 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( grpc_slice_buffer_add( &t->outbuf, grpc_chttp2_window_update_create(s->id, stream_announce, &s->stats.outgoing)); - t->ping_state.pings_before_data_required = - t->ping_policy.max_pings_without_data; if (!t->is_client) { t->ping_recv_state.last_ping_recv_time = gpr_inf_past(GPR_CLOCK_MONOTONIC); @@ -310,8 +317,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( if (sent_initial_metadata) { /* send any body bytes, if allowed by flow control */ if (s->flow_controlled_buffer.length > 0 || - (s->stream_compression_send_enabled && - s->compressed_data_buffer->length > 0)) { + s->compressed_data_buffer.length > 0) { uint32_t stream_remote_window = (uint32_t)GPR_MAX( 0, s->flow_control.remote_window_delta + @@ -325,59 +331,60 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( bool is_last_data_frame = false; bool is_last_frame = false; size_t sending_bytes_before = s->sending_bytes; - if (s->stream_compression_send_enabled) { - while ((s->flow_controlled_buffer.length > 0 || - s->compressed_data_buffer->length > 0) && - max_outgoing > 0) { - if (s->compressed_data_buffer->length > 0) { - uint32_t send_bytes = (uint32_t)GPR_MIN( - max_outgoing, s->compressed_data_buffer->length); - is_last_data_frame = - (send_bytes == s->compressed_data_buffer->length && - s->flow_controlled_buffer.length == 0 && - s->fetching_send_message == NULL); - is_last_frame = - is_last_data_frame && s->send_trailing_metadata != NULL && - grpc_metadata_batch_is_empty(s->send_trailing_metadata); - grpc_chttp2_encode_data(s->id, s->compressed_data_buffer, - send_bytes, is_last_frame, - &s->stats.outgoing, &t->outbuf); - grpc_chttp2_flowctl_sent_data(&t->flow_control, - &s->flow_control, send_bytes); - max_outgoing -= send_bytes; - if (s->compressed_data_buffer->length == 0) { - s->sending_bytes += s->uncompressed_data_size; - } - } else { - if (s->stream_compression_ctx == NULL) { - s->stream_compression_ctx = - grpc_stream_compression_context_create( - GRPC_STREAM_COMPRESSION_COMPRESS); + while ((s->flow_controlled_buffer.length > 0 || + s->compressed_data_buffer.length > 0) && + max_outgoing > 0) { + if (s->compressed_data_buffer.length > 0) { + uint32_t send_bytes = (uint32_t)GPR_MIN( + max_outgoing, s->compressed_data_buffer.length); + is_last_data_frame = + (send_bytes == s->compressed_data_buffer.length && + s->flow_controlled_buffer.length == 0 && + s->fetching_send_message == NULL); + if (is_last_data_frame && s->send_trailing_metadata != NULL && + s->stream_compression_ctx != NULL) { + if (!grpc_stream_compress( + s->stream_compression_ctx, &s->flow_controlled_buffer, + &s->compressed_data_buffer, NULL, MAX_SIZE_T, + GRPC_STREAM_COMPRESSION_FLUSH_FINISH)) { + gpr_log(GPR_ERROR, "Stream compression failed."); } - s->uncompressed_data_size = s->flow_controlled_buffer.length; - GPR_ASSERT(grpc_stream_compress( - s->stream_compression_ctx, &s->flow_controlled_buffer, - s->compressed_data_buffer, NULL, MAX_SIZE_T, - GRPC_STREAM_COMPRESSION_FLUSH_SYNC)); + grpc_stream_compression_context_destroy( + s->stream_compression_ctx); + s->stream_compression_ctx = NULL; + /* After finish, bytes in s->compressed_data_buffer may be + * more than max_outgoing. Start another round of the current + * while loop so that send_bytes and is_last_data_frame are + * recalculated. */ + continue; + } + is_last_frame = + is_last_data_frame && s->send_trailing_metadata != NULL && + grpc_metadata_batch_is_empty(s->send_trailing_metadata); + grpc_chttp2_encode_data(s->id, &s->compressed_data_buffer, + send_bytes, is_last_frame, + &s->stats.outgoing, &t->outbuf); + grpc_chttp2_flowctl_sent_data(&t->flow_control, &s->flow_control, + send_bytes); + max_outgoing -= send_bytes; + if (s->compressed_data_buffer.length == 0) { + s->sending_bytes += s->uncompressed_data_size; + } + } else { + if (s->stream_compression_ctx == NULL) { + s->stream_compression_ctx = + grpc_stream_compression_context_create( + s->stream_compression_method); + } + s->uncompressed_data_size = s->flow_controlled_buffer.length; + if (!grpc_stream_compress( + s->stream_compression_ctx, &s->flow_controlled_buffer, + &s->compressed_data_buffer, NULL, MAX_SIZE_T, + GRPC_STREAM_COMPRESSION_FLUSH_SYNC)) { + gpr_log(GPR_ERROR, "Stream compression failed."); } } - } else { - uint32_t send_bytes = (uint32_t)GPR_MIN( - max_outgoing, s->flow_controlled_buffer.length); - is_last_data_frame = s->fetching_send_message == NULL && - send_bytes == s->flow_controlled_buffer.length; - is_last_frame = - is_last_data_frame && s->send_trailing_metadata != NULL && - grpc_metadata_batch_is_empty(s->send_trailing_metadata); - grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, - send_bytes, is_last_frame, - &s->stats.outgoing, &t->outbuf); - grpc_chttp2_flowctl_sent_data(&t->flow_control, &s->flow_control, - send_bytes); - s->sending_bytes += send_bytes; } - t->ping_state.pings_before_data_required = - t->ping_policy.max_pings_without_data; if (!t->is_client) { t->ping_recv_state.last_ping_recv_time = gpr_inf_past(GPR_CLOCK_MONOTONIC); @@ -391,6 +398,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing)); } + grpc_chttp2_mark_stream_closed(exec_ctx, t, s, !t->is_client, 1, + GRPC_ERROR_NONE); } result.early_results_scheduled |= update_list(exec_ctx, t, s, @@ -399,8 +408,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( &s->flow_controlled_bytes_flowed, GRPC_ERROR_NONE); now_writing = true; if (s->flow_controlled_buffer.length > 0 || - (s->stream_compression_send_enabled && - s->compressed_data_buffer->length > 0)) { + s->compressed_data_buffer.length > 0) { GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork"); grpc_chttp2_list_add_writable_stream(t, s); } @@ -416,8 +424,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( if (s->send_trailing_metadata != NULL && s->fetching_send_message == NULL && s->flow_controlled_buffer.length == 0 && - (!s->stream_compression_send_enabled || - s->compressed_data_buffer->length == 0)) { + s->compressed_data_buffer.length == 0) { GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata")); if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) { grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true, @@ -449,6 +456,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( &t->outbuf, grpc_chttp2_rst_stream_create( s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing)); } + grpc_chttp2_mark_stream_closed(exec_ctx, t, s, !t->is_client, 1, + GRPC_ERROR_NONE); now_writing = true; result.early_results_scheduled = true; grpc_chttp2_complete_closure_step( @@ -484,8 +493,6 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write( grpc_slice_buffer_add( &t->outbuf, grpc_chttp2_window_update_create(0, transport_announce, &throwaway_stats)); - t->ping_state.pings_before_data_required = - t->ping_policy.max_pings_without_data; if (!t->is_client) { t->ping_recv_state.last_ping_recv_time = gpr_inf_past(GPR_CLOCK_MONOTONIC); @@ -519,10 +526,6 @@ void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ERROR_REF(error)); s->sending_bytes = 0; } - if (s->sent_trailing_metadata) { - grpc_chttp2_mark_stream_closed(exec_ctx, t, s, !t->is_client, 1, - GRPC_ERROR_REF(error)); - } GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:end"); } grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &t->outbuf); diff --git a/src/core/ext/transport/inproc/inproc_transport.c b/src/core/ext/transport/inproc/inproc_transport.c index 036853a53b..31739d07dd 100644 --- a/src/core/ext/transport/inproc/inproc_transport.c +++ b/src/core/ext/transport/inproc/inproc_transport.c @@ -37,7 +37,6 @@ if (GRPC_TRACER_ON(grpc_inproc_trace)) gpr_log(__VA_ARGS__); \ } while (0) -static const grpc_transport_vtable inproc_vtable; static grpc_slice g_empty_slice; static grpc_slice g_fake_path_key; static grpc_slice g_fake_path_value; @@ -1167,6 +1166,55 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { } /******************************************************************************* + * INTEGRATION GLUE + */ + +static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_pollset *pollset) { + // Nothing to do here +} + +static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_pollset_set *pollset_set) { + // Nothing to do here +} + +static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *t) { + return NULL; +} + +/******************************************************************************* + * GLOBAL INIT AND DESTROY + */ +static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} + +void grpc_inproc_transport_init(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, NULL, + grpc_schedule_on_exec_ctx); + g_empty_slice = grpc_slice_from_static_buffer(NULL, 0); + + grpc_slice key_tmp = grpc_slice_from_static_string(":path"); + g_fake_path_key = grpc_slice_intern(key_tmp); + grpc_slice_unref_internal(&exec_ctx, key_tmp); + + g_fake_path_value = grpc_slice_from_static_string("/"); + + grpc_slice auth_tmp = grpc_slice_from_static_string(":authority"); + g_fake_auth_key = grpc_slice_intern(auth_tmp); + grpc_slice_unref_internal(&exec_ctx, auth_tmp); + + g_fake_auth_value = grpc_slice_from_static_string("inproc-fail"); + grpc_exec_ctx_finish(&exec_ctx); +} + +static const grpc_transport_vtable inproc_vtable = { + sizeof(inproc_stream), "inproc", init_stream, + set_pollset, set_pollset_set, perform_stream_op, + perform_transport_op, destroy_stream, destroy_transport, + get_endpoint}; + +/******************************************************************************* * Main inproc transport functions */ static void inproc_transports_create(grpc_exec_ctx *exec_ctx, @@ -1178,7 +1226,7 @@ static void inproc_transports_create(grpc_exec_ctx *exec_ctx, inproc_transport *st = (inproc_transport *)gpr_zalloc(sizeof(*st)); inproc_transport *ct = (inproc_transport *)gpr_zalloc(sizeof(*ct)); // Share one lock between both sides since both sides get affected - st->mu = ct->mu = gpr_malloc(sizeof(*st->mu)); + st->mu = ct->mu = (shared_mu *)gpr_malloc(sizeof(*st->mu)); gpr_mu_init(&st->mu->mu); gpr_ref_init(&st->mu->refs, 2); st->base.vtable = &inproc_vtable; @@ -1215,8 +1263,8 @@ grpc_channel *grpc_inproc_channel_create(grpc_server *server, grpc_arg default_authority_arg; default_authority_arg.type = GRPC_ARG_STRING; - default_authority_arg.key = GRPC_ARG_DEFAULT_AUTHORITY; - default_authority_arg.value.string = "inproc.authority"; + default_authority_arg.key = (char *)GRPC_ARG_DEFAULT_AUTHORITY; + default_authority_arg.value.string = (char *)"inproc.authority"; grpc_channel_args *client_args = grpc_channel_args_copy_and_add(args, &default_authority_arg, 1); @@ -1240,55 +1288,6 @@ grpc_channel *grpc_inproc_channel_create(grpc_server *server, return channel; } -/******************************************************************************* - * INTEGRATION GLUE - */ - -static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_pollset *pollset) { - // Nothing to do here -} - -static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, grpc_pollset_set *pollset_set) { - // Nothing to do here -} - -static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *t) { - return NULL; -} - -static const grpc_transport_vtable inproc_vtable = { - sizeof(inproc_stream), "inproc", init_stream, - set_pollset, set_pollset_set, perform_stream_op, - perform_transport_op, destroy_stream, destroy_transport, - get_endpoint}; - -/******************************************************************************* - * GLOBAL INIT AND DESTROY - */ -static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} - -void grpc_inproc_transport_init(void) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, NULL, - grpc_schedule_on_exec_ctx); - g_empty_slice = grpc_slice_from_static_buffer(NULL, 0); - - grpc_slice key_tmp = grpc_slice_from_static_string(":path"); - g_fake_path_key = grpc_slice_intern(key_tmp); - grpc_slice_unref_internal(&exec_ctx, key_tmp); - - g_fake_path_value = grpc_slice_from_static_string("/"); - - grpc_slice auth_tmp = grpc_slice_from_static_string(":authority"); - g_fake_auth_key = grpc_slice_intern(auth_tmp); - grpc_slice_unref_internal(&exec_ctx, auth_tmp); - - g_fake_auth_value = grpc_slice_from_static_string("inproc-fail"); - grpc_exec_ctx_finish(&exec_ctx); -} - void grpc_inproc_transport_shutdown(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_slice_unref_internal(&exec_ctx, g_empty_slice); |