diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/client_channel.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.cc | 490 |
1 files changed, 257 insertions, 233 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 973cc5f703..5bb5fde985 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -31,6 +31,7 @@ #include <grpc/support/sync.h> #include <grpc/support/useful.h> +#include "src/core/ext/filters/client_channel/backup_poller.h" #include "src/core/ext/filters/client_channel/http_connect_handshaker.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" @@ -75,24 +76,28 @@ typedef struct { wait_for_ready_value wait_for_ready; } method_parameters; -static method_parameters *method_parameters_ref( - method_parameters *method_params) { +static method_parameters* method_parameters_ref( + method_parameters* method_params) { gpr_ref(&method_params->refs); return method_params; } -static void method_parameters_unref(method_parameters *method_params) { +static void method_parameters_unref(method_parameters* method_params) { if (gpr_unref(&method_params->refs)) { gpr_free(method_params); } } -static void method_parameters_free(void *value) { - method_parameters_unref((method_parameters *)value); +// Wrappers to pass to grpc_service_config_create_method_config_table(). +static void* method_parameters_ref_wrapper(void* value) { + return method_parameters_ref((method_parameters*)value); +} +static void method_parameters_unref_wrapper(void* value) { + method_parameters_unref((method_parameters*)value); } -static bool parse_wait_for_ready(grpc_json *field, - wait_for_ready_value *wait_for_ready) { +static bool parse_wait_for_ready(grpc_json* field, + wait_for_ready_value* wait_for_ready) { if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) { return false; } @@ -101,13 +106,13 @@ static bool parse_wait_for_ready(grpc_json *field, return true; } -static bool parse_timeout(grpc_json *field, grpc_millis *timeout) { +static bool parse_timeout(grpc_json* field, grpc_millis* timeout) { if (field->type != GRPC_JSON_STRING) return false; size_t len = strlen(field->value); if (field->value[len - 1] != 's') return false; - char *buf = gpr_strdup(field->value); + char* buf = gpr_strdup(field->value); buf[len - 1] = '\0'; // Remove trailing 's'. - char *decimal_point = strchr(buf, '.'); + char* decimal_point = strchr(buf, '.'); int nanos = 0; if (decimal_point != NULL) { *decimal_point = '\0'; @@ -116,34 +121,26 @@ static bool parse_timeout(grpc_json *field, grpc_millis *timeout) { gpr_free(buf); return false; } - // There should always be exactly 3, 6, or 9 fractional digits. - int multiplier = 1; - switch (strlen(decimal_point + 1)) { - case 9: - break; - case 6: - multiplier *= 1000; - break; - case 3: - multiplier *= 1000000; - break; - default: // Unsupported number of digits. - gpr_free(buf); - return false; + int num_digits = (int)strlen(decimal_point + 1); + if (num_digits > 9) { // We don't accept greater precision than nanos. + gpr_free(buf); + return false; + } + for (int i = 0; i < (9 - num_digits); ++i) { + nanos *= 10; } - nanos *= multiplier; } - int seconds = gpr_parse_nonnegative_int(buf); + int seconds = decimal_point == buf ? 0 : gpr_parse_nonnegative_int(buf); gpr_free(buf); if (seconds == -1) return false; *timeout = seconds * GPR_MS_PER_SEC + nanos / GPR_NS_PER_MS; return true; } -static void *method_parameters_create_from_json(const grpc_json *json) { +static void* method_parameters_create_from_json(const grpc_json* json) { wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET; grpc_millis timeout = 0; - for (grpc_json *field = json->child; field != NULL; field = field->next) { + for (grpc_json* field = json->child; field != NULL; field = field->next) { if (field->key == NULL) continue; if (strcmp(field->key, "waitForReady") == 0) { if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate. @@ -153,8 +150,8 @@ static void *method_parameters_create_from_json(const grpc_json *json) { if (!parse_timeout(field, &timeout)) return NULL; } } - method_parameters *value = - (method_parameters *)gpr_malloc(sizeof(method_parameters)); + method_parameters* value = + (method_parameters*)gpr_malloc(sizeof(method_parameters)); gpr_ref_init(&value->refs, 1); value->timeout = timeout; value->wait_for_ready = wait_for_ready; @@ -169,24 +166,24 @@ struct external_connectivity_watcher; typedef struct client_channel_channel_data { /** resolver for this channel */ - grpc_resolver *resolver; + grpc_resolver* resolver; /** have we started resolving this channel */ bool started_resolving; /** is deadline checking enabled? */ bool deadline_checking_enabled; /** client channel factory */ - grpc_client_channel_factory *client_channel_factory; + grpc_client_channel_factory* client_channel_factory; /** combiner protecting all variables below in this data structure */ - grpc_combiner *combiner; + grpc_combiner* combiner; /** currently active load balancer */ - grpc_lb_policy *lb_policy; + grpc_lb_policy* lb_policy; /** retry throttle data */ - grpc_server_retry_throttle_data *retry_throttle_data; + grpc_server_retry_throttle_data* retry_throttle_data; /** maps method names to method_parameters structs */ - grpc_slice_hash_table *method_params_table; + grpc_slice_hash_table* method_params_table; /** incoming resolver result - set by resolver.next() */ - grpc_channel_args *resolver_result; + grpc_channel_args* resolver_result; /** a list of closures that are all waiting for resolver result to come in */ grpc_closure_list waiting_for_resolver_result_closures; /** resolver callback */ @@ -196,41 +193,41 @@ typedef struct client_channel_channel_data { /** when an lb_policy arrives, should we try to exit idle */ bool exit_idle_when_lb_policy_arrives; /** owning stack */ - grpc_channel_stack *owning_stack; + grpc_channel_stack* owning_stack; /** interested parties (owned) */ - grpc_pollset_set *interested_parties; + grpc_pollset_set* interested_parties; /* external_connectivity_watcher_list head is guarded by its own mutex, since * counts need to be grabbed immediately without polling on a cq */ gpr_mu external_connectivity_watcher_list_mu; - struct external_connectivity_watcher *external_connectivity_watcher_list_head; + struct external_connectivity_watcher* external_connectivity_watcher_list_head; /* the following properties are guarded by a mutex since API's require them to be instantaneously available */ gpr_mu info_mu; - char *info_lb_policy_name; + char* info_lb_policy_name; /** service config in JSON form */ - char *info_service_config_json; + char* info_service_config_json; } channel_data; /** We create one watcher for each new lb_policy that is returned from a resolver, to watch for state changes from the lb_policy. When a state change is seen, we update the channel, and create a new watcher. */ typedef struct { - channel_data *chand; + channel_data* chand; grpc_closure on_changed; grpc_connectivity_state state; - grpc_lb_policy *lb_policy; + grpc_lb_policy* lb_policy; } lb_policy_connectivity_watcher; -static void watch_lb_policy_locked(channel_data *chand, - grpc_lb_policy *lb_policy, +static void watch_lb_policy_locked(channel_data* chand, + grpc_lb_policy* lb_policy, grpc_connectivity_state current_state); -static void set_channel_connectivity_state_locked(channel_data *chand, +static void set_channel_connectivity_state_locked(channel_data* chand, grpc_connectivity_state state, - grpc_error *error, - const char *reason) { + grpc_error* error, + const char* reason) { /* TODO: Improve failure handling: * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE. * - Hand over pending picks from old policies during the switch that happens @@ -256,8 +253,8 @@ static void set_channel_connectivity_state_locked(channel_data *chand, grpc_connectivity_state_set(&chand->state_tracker, state, error, reason); } -static void on_lb_policy_state_changed_locked(void *arg, grpc_error *error) { - lb_policy_connectivity_watcher *w = (lb_policy_connectivity_watcher *)arg; +static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) { + lb_policy_connectivity_watcher* w = (lb_policy_connectivity_watcher*)arg; grpc_connectivity_state publish_state = w->state; /* check if the notification is for the latest policy */ if (w->lb_policy == w->chand->lb_policy) { @@ -281,11 +278,11 @@ static void on_lb_policy_state_changed_locked(void *arg, grpc_error *error) { gpr_free(w); } -static void watch_lb_policy_locked(channel_data *chand, - grpc_lb_policy *lb_policy, +static void watch_lb_policy_locked(channel_data* chand, + grpc_lb_policy* lb_policy, grpc_connectivity_state current_state) { - lb_policy_connectivity_watcher *w = - (lb_policy_connectivity_watcher *)gpr_malloc(sizeof(*w)); + lb_policy_connectivity_watcher* w = + (lb_policy_connectivity_watcher*)gpr_malloc(sizeof(*w)); GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); w->chand = chand; GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w, @@ -296,7 +293,7 @@ static void watch_lb_policy_locked(channel_data *chand, &w->on_changed); } -static void start_resolving_locked(channel_data *chand) { +static void start_resolving_locked(channel_data* chand) { if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand); } @@ -308,19 +305,19 @@ static void start_resolving_locked(channel_data *chand) { } typedef struct { - char *server_name; - grpc_server_retry_throttle_data *retry_throttle_data; + char* server_name; + grpc_server_retry_throttle_data* retry_throttle_data; } service_config_parsing_state; -static void parse_retry_throttle_params(const grpc_json *field, void *arg) { - service_config_parsing_state *parsing_state = - (service_config_parsing_state *)arg; +static void parse_retry_throttle_params(const grpc_json* field, void* arg) { + service_config_parsing_state* parsing_state = + (service_config_parsing_state*)arg; if (strcmp(field->key, "retryThrottling") == 0) { if (parsing_state->retry_throttle_data != NULL) return; // Duplicate. if (field->type != GRPC_JSON_OBJECT) return; int max_milli_tokens = 0; int milli_token_ratio = 0; - for (grpc_json *sub_field = field->child; sub_field != NULL; + for (grpc_json* sub_field = field->child; sub_field != NULL; sub_field = sub_field->next) { if (sub_field->key == NULL) return; if (strcmp(sub_field->key, "maxTokens") == 0) { @@ -336,7 +333,7 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) { size_t whole_len = strlen(sub_field->value); uint32_t multiplier = 1; uint32_t decimal_value = 0; - const char *decimal_point = strchr(sub_field->value, '.'); + const char* decimal_point = strchr(sub_field->value, '.'); if (decimal_point != NULL) { whole_len = (size_t)(decimal_point - sub_field->value); multiplier = 1000; @@ -367,24 +364,24 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) { } } -static void on_resolver_result_changed_locked(void *arg, grpc_error *error) { - channel_data *chand = (channel_data *)arg; +static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { + channel_data* chand = (channel_data*)arg; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand, grpc_error_string(error)); } // Extract the following fields from the resolver result, if non-NULL. bool lb_policy_updated = false; - char *lb_policy_name_dup = NULL; + char* lb_policy_name_dup = NULL; bool lb_policy_name_changed = false; - grpc_lb_policy *new_lb_policy = NULL; - char *service_config_json = NULL; - grpc_server_retry_throttle_data *retry_throttle_data = NULL; - grpc_slice_hash_table *method_params_table = NULL; + grpc_lb_policy* new_lb_policy = NULL; + char* service_config_json = NULL; + grpc_server_retry_throttle_data* retry_throttle_data = NULL; + grpc_slice_hash_table* method_params_table = NULL; if (chand->resolver_result != NULL) { // Find LB policy name. - const char *lb_policy_name = NULL; - const grpc_arg *channel_arg = + const char* lb_policy_name = NULL; + const grpc_arg* channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME); if (channel_arg != NULL) { GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); @@ -395,8 +392,8 @@ static void on_resolver_result_changed_locked(void *arg, grpc_error *error) { channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); if (channel_arg != NULL && channel_arg->type == GRPC_ARG_POINTER) { - grpc_lb_addresses *addresses = - (grpc_lb_addresses *)channel_arg->value.pointer.p; + grpc_lb_addresses* addresses = + (grpc_lb_addresses*)channel_arg->value.pointer.p; bool found_balancer_address = false; for (size_t i = 0; i < addresses->num_addresses; ++i) { if (addresses->addresses[i].is_balancer) { @@ -446,14 +443,14 @@ static void on_resolver_result_changed_locked(void *arg, grpc_error *error) { if (channel_arg != NULL) { GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); service_config_json = gpr_strdup(channel_arg->value.string); - grpc_service_config *service_config = + grpc_service_config* service_config = grpc_service_config_create(service_config_json); if (service_config != NULL) { channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); GPR_ASSERT(channel_arg != NULL); GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); - grpc_uri *uri = grpc_uri_parse(channel_arg->value.string, true); + grpc_uri* uri = grpc_uri_parse(channel_arg->value.string, true); GPR_ASSERT(uri->path[0] != '\0'); service_config_parsing_state parsing_state; memset(&parsing_state, 0, sizeof(parsing_state)); @@ -465,7 +462,7 @@ static void on_resolver_result_changed_locked(void *arg, grpc_error *error) { retry_throttle_data = parsing_state.retry_throttle_data; method_params_table = grpc_service_config_create_method_config_table( service_config, method_parameters_create_from_json, - method_parameters_free); + method_parameters_ref_wrapper, method_parameters_unref_wrapper); grpc_service_config_destroy(service_config); } } @@ -553,7 +550,7 @@ static void on_resolver_result_changed_locked(void *arg, grpc_error *error) { GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures); } else { // Not shutting down. grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_error *state_error = + grpc_error* state_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); if (new_lb_policy != NULL) { if (GRPC_TRACER_ON(grpc_client_channel_trace)) { @@ -581,11 +578,11 @@ static void on_resolver_result_changed_locked(void *arg, grpc_error *error) { } } -static void start_transport_op_locked(void *arg, grpc_error *error_ignored) { - grpc_transport_op *op = (grpc_transport_op *)arg; - grpc_channel_element *elem = - (grpc_channel_element *)op->handler_private.extra_arg; - channel_data *chand = (channel_data *)elem->channel_data; +static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { + grpc_transport_op* op = (grpc_transport_op*)arg; + grpc_channel_element* elem = + (grpc_channel_element*)op->handler_private.extra_arg; + channel_data* chand = (channel_data*)elem->channel_data; if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( @@ -633,9 +630,9 @@ static void start_transport_op_locked(void *arg, grpc_error *error_ignored) { GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE); } -static void cc_start_transport_op(grpc_channel_element *elem, - grpc_transport_op *op) { - channel_data *chand = (channel_data *)elem->channel_data; +static void cc_start_transport_op(grpc_channel_element* elem, + grpc_transport_op* op) { + channel_data* chand = (channel_data*)elem->channel_data; GPR_ASSERT(op->set_accept_stream == false); if (op->bind_pollset != NULL) { @@ -650,9 +647,9 @@ static void cc_start_transport_op(grpc_channel_element *elem, GRPC_ERROR_NONE); } -static void cc_get_channel_info(grpc_channel_element *elem, - const grpc_channel_info *info) { - channel_data *chand = (channel_data *)elem->channel_data; +static void cc_get_channel_info(grpc_channel_element* elem, + const grpc_channel_info* info) { + channel_data* chand = (channel_data*)elem->channel_data; gpr_mu_lock(&chand->info_mu); if (info->lb_policy_name != NULL) { *info->lb_policy_name = chand->info_lb_policy_name == NULL @@ -669,9 +666,9 @@ static void cc_get_channel_info(grpc_channel_element *elem, } /* Constructor for channel_data */ -static grpc_error *cc_init_channel_elem(grpc_channel_element *elem, - grpc_channel_element_args *args) { - channel_data *chand = (channel_data *)elem->channel_data; +static grpc_error* cc_init_channel_elem(grpc_channel_element* elem, + grpc_channel_element_args* args) { + channel_data* chand = (channel_data*)elem->channel_data; GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); // Initialize data members. @@ -690,8 +687,9 @@ static grpc_error *cc_init_channel_elem(grpc_channel_element *elem, chand->interested_parties = grpc_pollset_set_create(); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); + grpc_client_channel_start_backup_polling(chand->interested_parties); // Record client channel factory. - const grpc_arg *arg = grpc_channel_args_find(args->channel_args, + const grpc_arg* arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_CLIENT_CHANNEL_FACTORY); if (arg == NULL) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( @@ -702,9 +700,9 @@ static grpc_error *cc_init_channel_elem(grpc_channel_element *elem, "client channel factory arg must be a pointer"); } grpc_client_channel_factory_ref( - (grpc_client_channel_factory *)arg->value.pointer.p); + (grpc_client_channel_factory*)arg->value.pointer.p); chand->client_channel_factory = - (grpc_client_channel_factory *)arg->value.pointer.p; + (grpc_client_channel_factory*)arg->value.pointer.p; // Get server name to resolve, using proxy mapper if needed. arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI); if (arg == NULL) { @@ -715,8 +713,8 @@ static grpc_error *cc_init_channel_elem(grpc_channel_element *elem, return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "server uri arg must be a string"); } - char *proxy_name = NULL; - grpc_channel_args *new_args = NULL; + char* proxy_name = NULL; + grpc_channel_args* new_args = NULL; grpc_proxy_mappers_map_name(arg->value.string, args->channel_args, &proxy_name, &new_args); // Instantiate resolver. @@ -734,15 +732,15 @@ static grpc_error *cc_init_channel_elem(grpc_channel_element *elem, return GRPC_ERROR_NONE; } -static void shutdown_resolver_locked(void *arg, grpc_error *error) { - grpc_resolver *resolver = (grpc_resolver *)arg; +static void shutdown_resolver_locked(void* arg, grpc_error* error) { + grpc_resolver* resolver = (grpc_resolver*)arg; grpc_resolver_shutdown_locked(resolver); GRPC_RESOLVER_UNREF(resolver, "channel"); } /* Destructor for channel_data */ -static void cc_destroy_channel_elem(grpc_channel_element *elem) { - channel_data *chand = (channel_data *)elem->channel_data; +static void cc_destroy_channel_elem(grpc_channel_element* elem) { + channel_data* chand = (channel_data*)elem->channel_data; if (chand->resolver != NULL) { GRPC_CLOSURE_SCHED( GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver, @@ -765,6 +763,7 @@ static void cc_destroy_channel_elem(grpc_channel_element *elem) { if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(chand->method_params_table); } + grpc_client_channel_stop_backup_polling(chand->interested_parties); grpc_connectivity_state_destroy(&chand->state_tracker); grpc_pollset_set_destroy(chand->interested_parties); GRPC_COMBINER_UNREF(chand->combiner, "client_channel"); @@ -804,45 +803,45 @@ typedef struct client_channel_call_data { grpc_slice path; // Request path. gpr_timespec call_start_time; grpc_millis deadline; - gpr_arena *arena; - grpc_call_stack *owning_call; - grpc_call_combiner *call_combiner; + gpr_arena* arena; + grpc_call_stack* owning_call; + grpc_call_combiner* call_combiner; - grpc_server_retry_throttle_data *retry_throttle_data; - method_parameters *method_params; + grpc_server_retry_throttle_data* retry_throttle_data; + method_parameters* method_params; - grpc_subchannel_call *subchannel_call; - grpc_error *error; + grpc_subchannel_call* subchannel_call; + grpc_error* error; - grpc_lb_policy *lb_policy; // Holds ref while LB pick is pending. + grpc_lb_policy* lb_policy; // Holds ref while LB pick is pending. grpc_closure lb_pick_closure; grpc_closure lb_pick_cancel_closure; - grpc_connected_subchannel *connected_subchannel; + grpc_connected_subchannel* connected_subchannel; grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; - grpc_polling_entity *pollent; + grpc_polling_entity* pollent; - grpc_transport_stream_op_batch *waiting_for_pick_batches[MAX_WAITING_BATCHES]; + grpc_transport_stream_op_batch* waiting_for_pick_batches[MAX_WAITING_BATCHES]; size_t waiting_for_pick_batches_count; grpc_closure handle_pending_batch_in_call_combiner[MAX_WAITING_BATCHES]; - grpc_transport_stream_op_batch *initial_metadata_batch; + grpc_transport_stream_op_batch* initial_metadata_batch; grpc_linked_mdelem lb_token_mdelem; grpc_closure on_complete; - grpc_closure *original_on_complete; + grpc_closure* original_on_complete; } call_data; -grpc_subchannel_call *grpc_client_channel_get_subchannel_call( - grpc_call_element *elem) { - call_data *calld = (call_data *)elem->call_data; +grpc_subchannel_call* grpc_client_channel_get_subchannel_call( + grpc_call_element* elem) { + call_data* calld = (call_data*)elem->call_data; return calld->subchannel_call; } // This is called via the call combiner, so access to calld is synchronized. static void waiting_for_pick_batches_add( - call_data *calld, grpc_transport_stream_op_batch *batch) { + call_data* calld, grpc_transport_stream_op_batch* batch) { if (batch->send_initial_metadata) { GPR_ASSERT(calld->initial_metadata_batch == NULL); calld->initial_metadata_batch = batch; @@ -854,8 +853,8 @@ static void waiting_for_pick_batches_add( } // This is called via the call combiner, so access to calld is synchronized. -static void fail_pending_batch_in_call_combiner(void *arg, grpc_error *error) { - call_data *calld = (call_data *)arg; +static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) { + call_data* calld = (call_data*)arg; if (calld->waiting_for_pick_batches_count > 0) { --calld->waiting_for_pick_batches_count; grpc_transport_stream_op_batch_finish_with_failure( @@ -865,12 +864,12 @@ static void fail_pending_batch_in_call_combiner(void *arg, grpc_error *error) { } // This is called via the call combiner, so access to calld is synchronized. -static void waiting_for_pick_batches_fail(grpc_call_element *elem, - grpc_error *error) { - call_data *calld = (call_data *)elem->call_data; +static void waiting_for_pick_batches_fail(grpc_call_element* elem, + grpc_error* error) { + call_data* calld = (call_data*)elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, - "chand=%p calld=%p: failing %" PRIdPTR " pending batches: %s", + "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s", elem->channel_data, calld, calld->waiting_for_pick_batches_count, grpc_error_string(error)); } @@ -894,8 +893,8 @@ static void waiting_for_pick_batches_fail(grpc_call_element *elem, } // This is called via the call combiner, so access to calld is synchronized. -static void run_pending_batch_in_call_combiner(void *arg, grpc_error *ignored) { - call_data *calld = (call_data *)arg; +static void run_pending_batch_in_call_combiner(void* arg, grpc_error* ignored) { + call_data* calld = (call_data*)arg; if (calld->waiting_for_pick_batches_count > 0) { --calld->waiting_for_pick_batches_count; grpc_subchannel_call_process_op( @@ -905,12 +904,13 @@ static void run_pending_batch_in_call_combiner(void *arg, grpc_error *ignored) { } // This is called via the call combiner, so access to calld is synchronized. -static void waiting_for_pick_batches_resume(grpc_call_element *elem) { - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; +static void waiting_for_pick_batches_resume(grpc_call_element* elem) { + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR - " pending batches to subchannel_call=%p", + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: sending %" PRIuPTR + " pending batches to subchannel_call=%p", chand, calld, calld->waiting_for_pick_batches_count, calld->subchannel_call); } @@ -929,9 +929,9 @@ static void waiting_for_pick_batches_resume(grpc_call_element *elem) { // Applies service config to the call. Must be invoked once we know // that the resolver has returned results to the channel. -static void apply_service_config_to_call_locked(grpc_call_element *elem) { - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; +static void apply_service_config_to_call_locked(grpc_call_element* elem) { + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call", chand, calld); @@ -941,7 +941,7 @@ static void apply_service_config_to_call_locked(grpc_call_element *elem) { grpc_server_retry_throttle_data_ref(chand->retry_throttle_data); } if (chand->method_params_table != NULL) { - calld->method_params = (method_parameters *)grpc_method_config_table_get( + calld->method_params = (method_parameters*)grpc_method_config_table_get( chand->method_params_table, calld->path); if (calld->method_params != NULL) { method_parameters_ref(calld->method_params); @@ -961,10 +961,10 @@ static void apply_service_config_to_call_locked(grpc_call_element *elem) { } } -static void create_subchannel_call_locked(grpc_call_element *elem, - grpc_error *error) { - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; +static void create_subchannel_call_locked(grpc_call_element* elem, + grpc_error* error) { + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; const grpc_connected_subchannel_call_args call_args = { calld->pollent, // pollent calld->path, // path @@ -974,7 +974,7 @@ static void create_subchannel_call_locked(grpc_call_element *elem, calld->subchannel_call_context, // context calld->call_combiner // call_combiner }; - grpc_error *new_error = grpc_connected_subchannel_create_call( + grpc_error* new_error = grpc_connected_subchannel_create_call( calld->connected_subchannel, &call_args, &calld->subchannel_call); if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s", @@ -990,9 +990,9 @@ static void create_subchannel_call_locked(grpc_call_element *elem, } // Invoked when a pick is completed, on both success or failure. -static void pick_done_locked(grpc_call_element *elem, grpc_error *error) { - call_data *calld = (call_data *)elem->call_data; - channel_data *chand = (channel_data *)elem->channel_data; +static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { + call_data* calld = (call_data*)elem->call_data; + channel_data* chand = (channel_data*)elem->channel_data; if (calld->connected_subchannel == NULL) { // Failed to create subchannel. GRPC_ERROR_UNREF(calld->error); @@ -1018,9 +1018,9 @@ static void pick_done_locked(grpc_call_element *elem, grpc_error *error) { // either (a) the pick was deferred pending a resolver result or (b) the // pick was done asynchronously. Removes the call's polling entity from // chand->interested_parties before invoking pick_done_locked(). -static void async_pick_done_locked(grpc_call_element *elem, grpc_error *error) { - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; +static void async_pick_done_locked(grpc_call_element* elem, grpc_error* error) { + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; grpc_polling_entity_del_from_pollset_set(calld->pollent, chand->interested_parties); pick_done_locked(elem, error); @@ -1028,10 +1028,10 @@ static void async_pick_done_locked(grpc_call_element *elem, grpc_error *error) { // Note: This runs under the client_channel combiner, but will NOT be // holding the call combiner. -static void pick_callback_cancel_locked(void *arg, grpc_error *error) { - grpc_call_element *elem = (grpc_call_element *)arg; - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; +static void pick_callback_cancel_locked(void* arg, grpc_error* error) { + grpc_call_element* elem = (grpc_call_element*)arg; + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; if (calld->lb_policy != NULL) { if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", @@ -1045,10 +1045,10 @@ static void pick_callback_cancel_locked(void *arg, grpc_error *error) { // Callback invoked by grpc_lb_policy_pick_locked() for async picks. // Unrefs the LB policy and invokes async_pick_done_locked(). -static void pick_callback_done_locked(void *arg, grpc_error *error) { - grpc_call_element *elem = (grpc_call_element *)arg; - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; +static void pick_callback_done_locked(void* arg, grpc_error* error) { + grpc_call_element* elem = (grpc_call_element*)arg; + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously", chand, calld); @@ -1062,9 +1062,9 @@ static void pick_callback_done_locked(void *arg, grpc_error *error) { // Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked(). // If the pick was completed synchronously, unrefs the LB policy and // returns true. -static bool pick_callback_start_locked(grpc_call_element *elem) { - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; +static bool pick_callback_start_locked(grpc_call_element* elem) { + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p", chand, calld, chand->lb_policy); @@ -1121,7 +1121,7 @@ static bool pick_callback_start_locked(grpc_call_element *elem) { } typedef struct { - grpc_call_element *elem; + grpc_call_element* elem; bool finished; grpc_closure closure; grpc_closure cancel_closure; @@ -1129,10 +1129,9 @@ typedef struct { // Note: This runs under the client_channel combiner, but will NOT be // holding the call combiner. -static void pick_after_resolver_result_cancel_locked(void *arg, - grpc_error *error) { - pick_after_resolver_result_args *args = - (pick_after_resolver_result_args *)arg; +static void pick_after_resolver_result_cancel_locked(void* arg, + grpc_error* error) { + pick_after_resolver_result_args* args = (pick_after_resolver_result_args*)arg; if (args->finished) { gpr_free(args); return; @@ -1145,9 +1144,9 @@ static void pick_after_resolver_result_cancel_locked(void *arg, // is called, it will be a no-op. We also immediately invoke // async_pick_done_locked() to propagate the error back to the caller. args->finished = true; - grpc_call_element *elem = args->elem; - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; + grpc_call_element* elem = args->elem; + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick waiting for resolver result", @@ -1162,10 +1161,11 @@ static void pick_after_resolver_result_cancel_locked(void *arg, "Pick cancelled", &error, 1)); } -static void pick_after_resolver_result_done_locked(void *arg, - grpc_error *error) { - pick_after_resolver_result_args *args = - (pick_after_resolver_result_args *)arg; +static void pick_after_resolver_result_start_locked(grpc_call_element* elem); + +static void pick_after_resolver_result_done_locked(void* arg, + grpc_error* error) { + pick_after_resolver_result_args* args = (pick_after_resolver_result_args*)arg; if (args->finished) { /* cancelled, do nothing */ if (GRPC_TRACER_ON(grpc_client_channel_trace)) { @@ -1175,16 +1175,16 @@ static void pick_after_resolver_result_done_locked(void *arg, return; } args->finished = true; - grpc_call_element *elem = args->elem; - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; + grpc_call_element* elem = args->elem; + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; if (error != GRPC_ERROR_NONE) { if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data", chand, calld); } async_pick_done_locked(elem, GRPC_ERROR_REF(error)); - } else { + } else if (chand->lb_policy != NULL) { if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick", chand, calld); @@ -1198,18 +1198,42 @@ static void pick_after_resolver_result_done_locked(void *arg, async_pick_done_locked(elem, GRPC_ERROR_NONE); } } + // TODO(roth): It should be impossible for chand->lb_policy to be NULL + // here, so the rest of this code should never actually be executed. + // However, we have reports of a crash on iOS that triggers this case, + // so we are temporarily adding this to restore branches that were + // removed in https://github.com/grpc/grpc/pull/12297. Need to figure + // out what is actually causing this to occur and then figure out the + // right way to deal with it. + else if (chand->resolver != NULL) { + // No LB policy, so try again. + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: resolver returned but no LB policy, " + "trying again", + chand, calld); + } + pick_after_resolver_result_start_locked(elem); + } else { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver disconnected", chand, + calld); + } + async_pick_done_locked( + elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); + } } -static void pick_after_resolver_result_start_locked(grpc_call_element *elem) { - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; +static void pick_after_resolver_result_start_locked(grpc_call_element* elem) { + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: deferring pick pending resolver result", chand, calld); } - pick_after_resolver_result_args *args = - (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args)); + pick_after_resolver_result_args* args = + (pick_after_resolver_result_args*)gpr_zalloc(sizeof(*args)); args->elem = elem; GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked, args, grpc_combiner_scheduler(chand->combiner)); @@ -1222,10 +1246,10 @@ static void pick_after_resolver_result_start_locked(grpc_call_element *elem) { grpc_combiner_scheduler(chand->combiner))); } -static void start_pick_locked(void *arg, grpc_error *ignored) { - grpc_call_element *elem = (grpc_call_element *)arg; - call_data *calld = (call_data *)elem->call_data; - channel_data *chand = (channel_data *)elem->channel_data; +static void start_pick_locked(void* arg, grpc_error* ignored) { + grpc_call_element* elem = (grpc_call_element*)arg; + call_data* calld = (call_data*)elem->call_data; + channel_data* chand = (channel_data*)elem->channel_data; GPR_ASSERT(calld->connected_subchannel == NULL); if (chand->lb_policy != NULL) { // We already have an LB policy, so ask it for a pick. @@ -1255,9 +1279,9 @@ static void start_pick_locked(void *arg, grpc_error *ignored) { chand->interested_parties); } -static void on_complete(void *arg, grpc_error *error) { - grpc_call_element *elem = (grpc_call_element *)arg; - call_data *calld = (call_data *)elem->call_data; +static void on_complete(void* arg, grpc_error* error) { + grpc_call_element* elem = (grpc_call_element*)arg; + call_data* calld = (call_data*)elem->call_data; if (calld->retry_throttle_data != NULL) { if (error == GRPC_ERROR_NONE) { grpc_server_retry_throttle_data_record_success( @@ -1275,9 +1299,9 @@ static void on_complete(void *arg, grpc_error *error) { } static void cc_start_transport_stream_op_batch( - grpc_call_element *elem, grpc_transport_stream_op_batch *batch) { - call_data *calld = (call_data *)elem->call_data; - channel_data *chand = (channel_data *)elem->channel_data; + grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { + call_data* calld = (call_data*)elem->call_data; + channel_data* chand = (channel_data*)elem->channel_data; if (chand->deadline_checking_enabled) { grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch); } @@ -1365,10 +1389,10 @@ done: } /* Constructor for call_data */ -static grpc_error *cc_init_call_elem(grpc_call_element *elem, - const grpc_call_element_args *args) { - call_data *calld = (call_data *)elem->call_data; - channel_data *chand = (channel_data *)elem->channel_data; +static grpc_error* cc_init_call_elem(grpc_call_element* elem, + const grpc_call_element_args* args) { + call_data* calld = (call_data*)elem->call_data; + channel_data* chand = (channel_data*)elem->channel_data; // Initialize data members. calld->path = grpc_slice_ref_internal(args->path); calld->call_start_time = args->start_time; @@ -1384,11 +1408,11 @@ static grpc_error *cc_init_call_elem(grpc_call_element *elem, } /* Destructor for call_data */ -static void cc_destroy_call_elem(grpc_call_element *elem, - const grpc_call_final_info *final_info, - grpc_closure *then_schedule_closure) { - call_data *calld = (call_data *)elem->call_data; - channel_data *chand = (channel_data *)elem->channel_data; +static void cc_destroy_call_elem(grpc_call_element* elem, + const grpc_call_final_info* final_info, + grpc_closure* then_schedule_closure) { + call_data* calld = (call_data*)elem->call_data; + channel_data* chand = (channel_data*)elem->channel_data; if (chand->deadline_checking_enabled) { grpc_deadline_state_destroy(elem); } @@ -1418,9 +1442,9 @@ static void cc_destroy_call_elem(grpc_call_element *elem, GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); } -static void cc_set_pollset_or_pollset_set(grpc_call_element *elem, - grpc_polling_entity *pollent) { - call_data *calld = (call_data *)elem->call_data; +static void cc_set_pollset_or_pollset_set(grpc_call_element* elem, + grpc_polling_entity* pollent) { + call_data* calld = (call_data*)elem->call_data; calld->pollent = pollent; } @@ -1442,8 +1466,8 @@ const grpc_channel_filter grpc_client_channel_filter = { "client-channel", }; -static void try_to_connect_locked(void *arg, grpc_error *error_ignored) { - channel_data *chand = (channel_data *)arg; +static void try_to_connect_locked(void* arg, grpc_error* error_ignored) { + channel_data* chand = (channel_data*)arg; if (chand->lb_policy != NULL) { grpc_lb_policy_exit_idle_locked(chand->lb_policy); } else { @@ -1456,8 +1480,8 @@ static void try_to_connect_locked(void *arg, grpc_error *error_ignored) { } grpc_connectivity_state grpc_client_channel_check_connectivity_state( - grpc_channel_element *elem, int try_to_connect) { - channel_data *chand = (channel_data *)elem->channel_data; + grpc_channel_element* elem, int try_to_connect) { + channel_data* chand = (channel_data*)elem->channel_data; grpc_connectivity_state out = grpc_connectivity_state_check(&chand->state_tracker); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { @@ -1471,19 +1495,19 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( } typedef struct external_connectivity_watcher { - channel_data *chand; + channel_data* chand; grpc_polling_entity pollent; - grpc_closure *on_complete; - grpc_closure *watcher_timer_init; - grpc_connectivity_state *state; + grpc_closure* on_complete; + grpc_closure* watcher_timer_init; + grpc_connectivity_state* state; grpc_closure my_closure; - struct external_connectivity_watcher *next; + struct external_connectivity_watcher* next; } external_connectivity_watcher; -static external_connectivity_watcher *lookup_external_connectivity_watcher( - channel_data *chand, grpc_closure *on_complete) { +static external_connectivity_watcher* lookup_external_connectivity_watcher( + channel_data* chand, grpc_closure* on_complete) { gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); - external_connectivity_watcher *w = + external_connectivity_watcher* w = chand->external_connectivity_watcher_list_head; while (w != NULL && w->on_complete != on_complete) { w = w->next; @@ -1493,7 +1517,7 @@ static external_connectivity_watcher *lookup_external_connectivity_watcher( } static void external_connectivity_watcher_list_append( - channel_data *chand, external_connectivity_watcher *w) { + channel_data* chand, external_connectivity_watcher* w) { GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete)); gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu); @@ -1504,7 +1528,7 @@ static void external_connectivity_watcher_list_append( } static void external_connectivity_watcher_list_remove( - channel_data *chand, external_connectivity_watcher *too_remove) { + channel_data* chand, external_connectivity_watcher* too_remove) { GPR_ASSERT( lookup_external_connectivity_watcher(chand, too_remove->on_complete)); gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); @@ -1513,7 +1537,7 @@ static void external_connectivity_watcher_list_remove( gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); return; } - external_connectivity_watcher *w = + external_connectivity_watcher* w = chand->external_connectivity_watcher_list_head; while (w != NULL) { if (w->next == too_remove) { @@ -1527,12 +1551,12 @@ static void external_connectivity_watcher_list_remove( } int grpc_client_channel_num_external_connectivity_watchers( - grpc_channel_element *elem) { - channel_data *chand = (channel_data *)elem->channel_data; + grpc_channel_element* elem) { + channel_data* chand = (channel_data*)elem->channel_data; int count = 0; gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); - external_connectivity_watcher *w = + external_connectivity_watcher* w = chand->external_connectivity_watcher_list_head; while (w != NULL) { count++; @@ -1543,9 +1567,9 @@ int grpc_client_channel_num_external_connectivity_watchers( return count; } -static void on_external_watch_complete(void *arg, grpc_error *error) { - external_connectivity_watcher *w = (external_connectivity_watcher *)arg; - grpc_closure *follow_up = w->on_complete; +static void on_external_watch_complete_locked(void* arg, grpc_error* error) { + external_connectivity_watcher* w = (external_connectivity_watcher*)arg; + grpc_closure* follow_up = w->on_complete; grpc_polling_entity_del_from_pollset_set(&w->pollent, w->chand->interested_parties); GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, @@ -1555,15 +1579,15 @@ static void on_external_watch_complete(void *arg, grpc_error *error) { GRPC_CLOSURE_RUN(follow_up, GRPC_ERROR_REF(error)); } -static void watch_connectivity_state_locked(void *arg, - grpc_error *error_ignored) { - external_connectivity_watcher *w = (external_connectivity_watcher *)arg; - external_connectivity_watcher *found = NULL; +static void watch_connectivity_state_locked(void* arg, + grpc_error* error_ignored) { + external_connectivity_watcher* w = (external_connectivity_watcher*)arg; + external_connectivity_watcher* found = NULL; if (w->state != NULL) { external_connectivity_watcher_list_append(w->chand, w); GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE); - GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete, w, - grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w, + grpc_combiner_scheduler(w->chand->combiner)); grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker, w->state, &w->my_closure); } else { @@ -1583,12 +1607,12 @@ static void watch_connectivity_state_locked(void *arg, } void grpc_client_channel_watch_connectivity_state( - grpc_channel_element *elem, grpc_polling_entity pollent, - grpc_connectivity_state *state, grpc_closure *closure, - grpc_closure *watcher_timer_init) { - channel_data *chand = (channel_data *)elem->channel_data; - external_connectivity_watcher *w = - (external_connectivity_watcher *)gpr_zalloc(sizeof(*w)); + grpc_channel_element* elem, grpc_polling_entity pollent, + grpc_connectivity_state* state, grpc_closure* closure, + grpc_closure* watcher_timer_init) { + channel_data* chand = (channel_data*)elem->channel_data; + external_connectivity_watcher* w = + (external_connectivity_watcher*)gpr_zalloc(sizeof(*w)); w->chand = chand; w->pollent = pollent; w->on_complete = closure; |