diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/client_channel.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.cc | 499 |
1 files changed, 250 insertions, 249 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 00c51ba543..b18fa20c65 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -76,24 +76,24 @@ typedef struct { wait_for_ready_value wait_for_ready; } method_parameters; -static method_parameters *method_parameters_ref( - method_parameters *method_params) { +static method_parameters* method_parameters_ref( + method_parameters* method_params) { gpr_ref(&method_params->refs); return method_params; } -static void method_parameters_unref(method_parameters *method_params) { +static void method_parameters_unref(method_parameters* method_params) { if (gpr_unref(&method_params->refs)) { gpr_free(method_params); } } -static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) { - method_parameters_unref((method_parameters *)value); +static void method_parameters_free(grpc_exec_ctx* exec_ctx, void* value) { + method_parameters_unref((method_parameters*)value); } -static bool parse_wait_for_ready(grpc_json *field, - wait_for_ready_value *wait_for_ready) { +static bool parse_wait_for_ready(grpc_json* field, + wait_for_ready_value* wait_for_ready) { if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) { return false; } @@ -102,13 +102,13 @@ static bool parse_wait_for_ready(grpc_json *field, return true; } -static bool parse_timeout(grpc_json *field, grpc_millis *timeout) { +static bool parse_timeout(grpc_json* field, grpc_millis* timeout) { if (field->type != GRPC_JSON_STRING) return false; size_t len = strlen(field->value); if (field->value[len - 1] != 's') return false; - char *buf = gpr_strdup(field->value); + char* buf = gpr_strdup(field->value); buf[len - 1] = '\0'; // Remove trailing 's'. - char *decimal_point = strchr(buf, '.'); + char* decimal_point = strchr(buf, '.'); int nanos = 0; if (decimal_point != NULL) { *decimal_point = '\0'; @@ -141,10 +141,10 @@ static bool parse_timeout(grpc_json *field, grpc_millis *timeout) { return true; } -static void *method_parameters_create_from_json(const grpc_json *json) { +static void* method_parameters_create_from_json(const grpc_json* json) { wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET; grpc_millis timeout = 0; - for (grpc_json *field = json->child; field != NULL; field = field->next) { + for (grpc_json* field = json->child; field != NULL; field = field->next) { if (field->key == NULL) continue; if (strcmp(field->key, "waitForReady") == 0) { if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate. @@ -154,8 +154,8 @@ static void *method_parameters_create_from_json(const grpc_json *json) { if (!parse_timeout(field, &timeout)) return NULL; } } - method_parameters *value = - (method_parameters *)gpr_malloc(sizeof(method_parameters)); + method_parameters* value = + (method_parameters*)gpr_malloc(sizeof(method_parameters)); gpr_ref_init(&value->refs, 1); value->timeout = timeout; value->wait_for_ready = wait_for_ready; @@ -170,24 +170,24 @@ struct external_connectivity_watcher; typedef struct client_channel_channel_data { /** resolver for this channel */ - grpc_resolver *resolver; + grpc_resolver* resolver; /** have we started resolving this channel */ bool started_resolving; /** is deadline checking enabled? */ bool deadline_checking_enabled; /** client channel factory */ - grpc_client_channel_factory *client_channel_factory; + grpc_client_channel_factory* client_channel_factory; /** combiner protecting all variables below in this data structure */ - grpc_combiner *combiner; + grpc_combiner* combiner; /** currently active load balancer */ - grpc_lb_policy *lb_policy; + grpc_lb_policy* lb_policy; /** retry throttle data */ - grpc_server_retry_throttle_data *retry_throttle_data; + grpc_server_retry_throttle_data* retry_throttle_data; /** maps method names to method_parameters structs */ - grpc_slice_hash_table *method_params_table; + grpc_slice_hash_table* method_params_table; /** incoming resolver result - set by resolver.next() */ - grpc_channel_args *resolver_result; + grpc_channel_args* resolver_result; /** a list of closures that are all waiting for resolver result to come in */ grpc_closure_list waiting_for_resolver_result_closures; /** resolver callback */ @@ -197,42 +197,42 @@ typedef struct client_channel_channel_data { /** when an lb_policy arrives, should we try to exit idle */ bool exit_idle_when_lb_policy_arrives; /** owning stack */ - grpc_channel_stack *owning_stack; + grpc_channel_stack* owning_stack; /** interested parties (owned) */ - grpc_pollset_set *interested_parties; + grpc_pollset_set* interested_parties; /* external_connectivity_watcher_list head is guarded by its own mutex, since * counts need to be grabbed immediately without polling on a cq */ gpr_mu external_connectivity_watcher_list_mu; - struct external_connectivity_watcher *external_connectivity_watcher_list_head; + struct external_connectivity_watcher* external_connectivity_watcher_list_head; /* the following properties are guarded by a mutex since API's require them to be instantaneously available */ gpr_mu info_mu; - char *info_lb_policy_name; + char* info_lb_policy_name; /** service config in JSON form */ - char *info_service_config_json; + char* info_service_config_json; } channel_data; /** We create one watcher for each new lb_policy that is returned from a resolver, to watch for state changes from the lb_policy. When a state change is seen, we update the channel, and create a new watcher. */ typedef struct { - channel_data *chand; + channel_data* chand; grpc_closure on_changed; grpc_connectivity_state state; - grpc_lb_policy *lb_policy; + grpc_lb_policy* lb_policy; } lb_policy_connectivity_watcher; -static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, - grpc_lb_policy *lb_policy, +static void watch_lb_policy_locked(grpc_exec_ctx* exec_ctx, channel_data* chand, + grpc_lb_policy* lb_policy, grpc_connectivity_state current_state); -static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, - channel_data *chand, +static void set_channel_connectivity_state_locked(grpc_exec_ctx* exec_ctx, + channel_data* chand, grpc_connectivity_state state, - grpc_error *error, - const char *reason) { + grpc_error* error, + const char* reason) { /* TODO: Improve failure handling: * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE. * - Hand over pending picks from old policies during the switch that happens @@ -259,9 +259,9 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, reason); } -static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, - void *arg, grpc_error *error) { - lb_policy_connectivity_watcher *w = (lb_policy_connectivity_watcher *)arg; +static void on_lb_policy_state_changed_locked(grpc_exec_ctx* exec_ctx, + void* arg, grpc_error* error) { + lb_policy_connectivity_watcher* w = (lb_policy_connectivity_watcher*)arg; grpc_connectivity_state publish_state = w->state; /* check if the notification is for the latest policy */ if (w->lb_policy == w->chand->lb_policy) { @@ -285,11 +285,11 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, gpr_free(w); } -static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, - grpc_lb_policy *lb_policy, +static void watch_lb_policy_locked(grpc_exec_ctx* exec_ctx, channel_data* chand, + grpc_lb_policy* lb_policy, grpc_connectivity_state current_state) { - lb_policy_connectivity_watcher *w = - (lb_policy_connectivity_watcher *)gpr_malloc(sizeof(*w)); + lb_policy_connectivity_watcher* w = + (lb_policy_connectivity_watcher*)gpr_malloc(sizeof(*w)); GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); w->chand = chand; GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w, @@ -300,8 +300,8 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, &w->on_changed); } -static void start_resolving_locked(grpc_exec_ctx *exec_ctx, - channel_data *chand) { +static void start_resolving_locked(grpc_exec_ctx* exec_ctx, + channel_data* chand) { if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand); } @@ -313,19 +313,19 @@ static void start_resolving_locked(grpc_exec_ctx *exec_ctx, } typedef struct { - char *server_name; - grpc_server_retry_throttle_data *retry_throttle_data; + char* server_name; + grpc_server_retry_throttle_data* retry_throttle_data; } service_config_parsing_state; -static void parse_retry_throttle_params(const grpc_json *field, void *arg) { - service_config_parsing_state *parsing_state = - (service_config_parsing_state *)arg; +static void parse_retry_throttle_params(const grpc_json* field, void* arg) { + service_config_parsing_state* parsing_state = + (service_config_parsing_state*)arg; if (strcmp(field->key, "retryThrottling") == 0) { if (parsing_state->retry_throttle_data != NULL) return; // Duplicate. if (field->type != GRPC_JSON_OBJECT) return; int max_milli_tokens = 0; int milli_token_ratio = 0; - for (grpc_json *sub_field = field->child; sub_field != NULL; + for (grpc_json* sub_field = field->child; sub_field != NULL; sub_field = sub_field->next) { if (sub_field->key == NULL) return; if (strcmp(sub_field->key, "maxTokens") == 0) { @@ -341,7 +341,7 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) { size_t whole_len = strlen(sub_field->value); uint32_t multiplier = 1; uint32_t decimal_value = 0; - const char *decimal_point = strchr(sub_field->value, '.'); + const char* decimal_point = strchr(sub_field->value, '.'); if (decimal_point != NULL) { whole_len = (size_t)(decimal_point - sub_field->value); multiplier = 1000; @@ -372,25 +372,25 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) { } } -static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, - void *arg, grpc_error *error) { - channel_data *chand = (channel_data *)arg; +static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx, + void* arg, grpc_error* error) { + channel_data* chand = (channel_data*)arg; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand, grpc_error_string(error)); } // Extract the following fields from the resolver result, if non-NULL. bool lb_policy_updated = false; - char *lb_policy_name_dup = NULL; + char* lb_policy_name_dup = NULL; bool lb_policy_name_changed = false; - grpc_lb_policy *new_lb_policy = NULL; - char *service_config_json = NULL; - grpc_server_retry_throttle_data *retry_throttle_data = NULL; - grpc_slice_hash_table *method_params_table = NULL; + grpc_lb_policy* new_lb_policy = NULL; + char* service_config_json = NULL; + grpc_server_retry_throttle_data* retry_throttle_data = NULL; + grpc_slice_hash_table* method_params_table = NULL; if (chand->resolver_result != NULL) { // Find LB policy name. - const char *lb_policy_name = NULL; - const grpc_arg *channel_arg = + const char* lb_policy_name = NULL; + const grpc_arg* channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME); if (channel_arg != NULL) { GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); @@ -401,8 +401,8 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); if (channel_arg != NULL && channel_arg->type == GRPC_ARG_POINTER) { - grpc_lb_addresses *addresses = - (grpc_lb_addresses *)channel_arg->value.pointer.p; + grpc_lb_addresses* addresses = + (grpc_lb_addresses*)channel_arg->value.pointer.p; bool found_balancer_address = false; for (size_t i = 0; i < addresses->num_addresses; ++i) { if (addresses->addresses[i].is_balancer) { @@ -453,14 +453,14 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, if (channel_arg != NULL) { GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); service_config_json = gpr_strdup(channel_arg->value.string); - grpc_service_config *service_config = + grpc_service_config* service_config = grpc_service_config_create(service_config_json); if (service_config != NULL) { channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI); GPR_ASSERT(channel_arg != NULL); GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); - grpc_uri *uri = + grpc_uri* uri = grpc_uri_parse(exec_ctx, channel_arg->value.string, true); GPR_ASSERT(uri->path[0] != '\0'); service_config_parsing_state parsing_state; @@ -563,7 +563,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, &chand->waiting_for_resolver_result_closures); } else { // Not shutting down. grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_error *state_error = + grpc_error* state_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); if (new_lb_policy != NULL) { if (GRPC_TRACER_ON(grpc_client_channel_trace)) { @@ -595,12 +595,12 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, } } -static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error_ignored) { - grpc_transport_op *op = (grpc_transport_op *)arg; - grpc_channel_element *elem = - (grpc_channel_element *)op->handler_private.extra_arg; - channel_data *chand = (channel_data *)elem->channel_data; +static void start_transport_op_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error_ignored) { + grpc_transport_op* op = (grpc_transport_op*)arg; + grpc_channel_element* elem = + (grpc_channel_element*)op->handler_private.extra_arg; + channel_data* chand = (channel_data*)elem->channel_data; if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( @@ -651,10 +651,10 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); } -static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_transport_op *op) { - channel_data *chand = (channel_data *)elem->channel_data; +static void cc_start_transport_op(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, + grpc_transport_op* op) { + channel_data* chand = (channel_data*)elem->channel_data; GPR_ASSERT(op->set_accept_stream == false); if (op->bind_pollset != NULL) { @@ -671,10 +671,10 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE); } -static void cc_get_channel_info(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - const grpc_channel_info *info) { - channel_data *chand = (channel_data *)elem->channel_data; +static void cc_get_channel_info(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, + const grpc_channel_info* info) { + channel_data* chand = (channel_data*)elem->channel_data; gpr_mu_lock(&chand->info_mu); if (info->lb_policy_name != NULL) { *info->lb_policy_name = chand->info_lb_policy_name == NULL @@ -691,10 +691,10 @@ static void cc_get_channel_info(grpc_exec_ctx *exec_ctx, } /* Constructor for channel_data */ -static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_channel_element_args *args) { - channel_data *chand = (channel_data *)elem->channel_data; +static grpc_error* cc_init_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem, + grpc_channel_element_args* args) { + channel_data* chand = (channel_data*)elem->channel_data; GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); // Initialize data members. @@ -715,7 +715,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, "client_channel"); grpc_client_channel_start_backup_polling(exec_ctx, chand->interested_parties); // Record client channel factory. - const grpc_arg *arg = grpc_channel_args_find(args->channel_args, + const grpc_arg* arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_CLIENT_CHANNEL_FACTORY); if (arg == NULL) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING( @@ -726,9 +726,9 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, "client channel factory arg must be a pointer"); } grpc_client_channel_factory_ref( - (grpc_client_channel_factory *)arg->value.pointer.p); + (grpc_client_channel_factory*)arg->value.pointer.p); chand->client_channel_factory = - (grpc_client_channel_factory *)arg->value.pointer.p; + (grpc_client_channel_factory*)arg->value.pointer.p; // Get server name to resolve, using proxy mapper if needed. arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI); if (arg == NULL) { @@ -739,8 +739,8 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "server uri arg must be a string"); } - char *proxy_name = NULL; - grpc_channel_args *new_args = NULL; + char* proxy_name = NULL; + grpc_channel_args* new_args = NULL; grpc_proxy_mappers_map_name(exec_ctx, arg->value.string, args->channel_args, &proxy_name, &new_args); // Instantiate resolver. @@ -758,21 +758,22 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } -static void shutdown_resolver_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_resolver *resolver = (grpc_resolver *)arg; +static void shutdown_resolver_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_resolver* resolver = (grpc_resolver*)arg; grpc_resolver_shutdown_locked(exec_ctx, resolver); GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel"); } /* Destructor for channel_data */ -static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem) { - channel_data *chand = (channel_data *)elem->channel_data; +static void cc_destroy_channel_elem(grpc_exec_ctx* exec_ctx, + grpc_channel_element* elem) { + channel_data* chand = (channel_data*)elem->channel_data; if (chand->resolver != NULL) { GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver, - grpc_combiner_scheduler(chand->combiner)), + exec_ctx, + GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver, + grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } if (chand->client_channel_factory != NULL) { @@ -832,45 +833,45 @@ typedef struct client_channel_call_data { grpc_slice path; // Request path. gpr_timespec call_start_time; grpc_millis deadline; - gpr_arena *arena; - grpc_call_stack *owning_call; - grpc_call_combiner *call_combiner; + gpr_arena* arena; + grpc_call_stack* owning_call; + grpc_call_combiner* call_combiner; - grpc_server_retry_throttle_data *retry_throttle_data; - method_parameters *method_params; + grpc_server_retry_throttle_data* retry_throttle_data; + method_parameters* method_params; - grpc_subchannel_call *subchannel_call; - grpc_error *error; + grpc_subchannel_call* subchannel_call; + grpc_error* error; - grpc_lb_policy *lb_policy; // Holds ref while LB pick is pending. + grpc_lb_policy* lb_policy; // Holds ref while LB pick is pending. grpc_closure lb_pick_closure; grpc_closure lb_pick_cancel_closure; - grpc_connected_subchannel *connected_subchannel; + grpc_connected_subchannel* connected_subchannel; grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; - grpc_polling_entity *pollent; + grpc_polling_entity* pollent; - grpc_transport_stream_op_batch *waiting_for_pick_batches[MAX_WAITING_BATCHES]; + grpc_transport_stream_op_batch* waiting_for_pick_batches[MAX_WAITING_BATCHES]; size_t waiting_for_pick_batches_count; grpc_closure handle_pending_batch_in_call_combiner[MAX_WAITING_BATCHES]; - grpc_transport_stream_op_batch *initial_metadata_batch; + grpc_transport_stream_op_batch* initial_metadata_batch; grpc_linked_mdelem lb_token_mdelem; grpc_closure on_complete; - grpc_closure *original_on_complete; + grpc_closure* original_on_complete; } call_data; -grpc_subchannel_call *grpc_client_channel_get_subchannel_call( - grpc_call_element *elem) { - call_data *calld = (call_data *)elem->call_data; +grpc_subchannel_call* grpc_client_channel_get_subchannel_call( + grpc_call_element* elem) { + call_data* calld = (call_data*)elem->call_data; return calld->subchannel_call; } // This is called via the call combiner, so access to calld is synchronized. static void waiting_for_pick_batches_add( - call_data *calld, grpc_transport_stream_op_batch *batch) { + call_data* calld, grpc_transport_stream_op_batch* batch) { if (batch->send_initial_metadata) { GPR_ASSERT(calld->initial_metadata_batch == NULL); calld->initial_metadata_batch = batch; @@ -882,9 +883,9 @@ static void waiting_for_pick_batches_add( } // This is called via the call combiner, so access to calld is synchronized. -static void fail_pending_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, - void *arg, grpc_error *error) { - call_data *calld = (call_data *)arg; +static void fail_pending_batch_in_call_combiner(grpc_exec_ctx* exec_ctx, + void* arg, grpc_error* error) { + call_data* calld = (call_data*)arg; if (calld->waiting_for_pick_batches_count > 0) { --calld->waiting_for_pick_batches_count; grpc_transport_stream_op_batch_finish_with_failure( @@ -895,10 +896,10 @@ static void fail_pending_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, } // This is called via the call combiner, so access to calld is synchronized. -static void waiting_for_pick_batches_fail(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_error *error) { - call_data *calld = (call_data *)elem->call_data; +static void waiting_for_pick_batches_fail(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + grpc_error* error) { + call_data* calld = (call_data*)elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s", @@ -926,9 +927,9 @@ static void waiting_for_pick_batches_fail(grpc_exec_ctx *exec_ctx, } // This is called via the call combiner, so access to calld is synchronized. -static void run_pending_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, - void *arg, grpc_error *ignored) { - call_data *calld = (call_data *)arg; +static void run_pending_batch_in_call_combiner(grpc_exec_ctx* exec_ctx, + void* arg, grpc_error* ignored) { + call_data* calld = (call_data*)arg; if (calld->waiting_for_pick_batches_count > 0) { --calld->waiting_for_pick_batches_count; grpc_subchannel_call_process_op( @@ -938,13 +939,14 @@ static void run_pending_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, } // This is called via the call combiner, so access to calld is synchronized. -static void waiting_for_pick_batches_resume(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; +static void waiting_for_pick_batches_resume(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem) { + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { - gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIuPTR - " pending batches to subchannel_call=%p", + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: sending %" PRIuPTR + " pending batches to subchannel_call=%p", chand, calld, calld->waiting_for_pick_batches_count, calld->subchannel_call); } @@ -964,10 +966,10 @@ static void waiting_for_pick_batches_resume(grpc_exec_ctx *exec_ctx, // Applies service config to the call. Must be invoked once we know // that the resolver has returned results to the channel. -static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; +static void apply_service_config_to_call_locked(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem) { + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call", chand, calld); @@ -977,7 +979,7 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, grpc_server_retry_throttle_data_ref(chand->retry_throttle_data); } if (chand->method_params_table != NULL) { - calld->method_params = (method_parameters *)grpc_method_config_table_get( + calld->method_params = (method_parameters*)grpc_method_config_table_get( exec_ctx, chand->method_params_table, calld->path); if (calld->method_params != NULL) { method_parameters_ref(calld->method_params); @@ -997,11 +999,11 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, } } -static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_error *error) { - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; +static void create_subchannel_call_locked(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + grpc_error* error) { + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; const grpc_connected_subchannel_call_args call_args = { calld->pollent, // pollent calld->path, // path @@ -1011,7 +1013,7 @@ static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, calld->subchannel_call_context, // context calld->call_combiner // call_combiner }; - grpc_error *new_error = grpc_connected_subchannel_create_call( + grpc_error* new_error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &calld->subchannel_call); if (GRPC_TRACER_ON(grpc_client_channel_trace)) { @@ -1028,10 +1030,10 @@ static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, } // Invoked when a pick is completed, on both success or failure. -static void pick_done_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_error *error) { - call_data *calld = (call_data *)elem->call_data; - channel_data *chand = (channel_data *)elem->channel_data; +static void pick_done_locked(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_error* error) { + call_data* calld = (call_data*)elem->call_data; + channel_data* chand = (channel_data*)elem->channel_data; if (calld->connected_subchannel == NULL) { // Failed to create subchannel. GRPC_ERROR_UNREF(calld->error); @@ -1057,10 +1059,10 @@ static void pick_done_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, // either (a) the pick was deferred pending a resolver result or (b) the // pick was done asynchronously. Removes the call's polling entity from // chand->interested_parties before invoking pick_done_locked(). -static void async_pick_done_locked(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, grpc_error *error) { - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; +static void async_pick_done_locked(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, grpc_error* error) { + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); pick_done_locked(exec_ctx, elem, error); @@ -1068,11 +1070,11 @@ static void async_pick_done_locked(grpc_exec_ctx *exec_ctx, // Note: This runs under the client_channel combiner, but will NOT be // holding the call combiner. -static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_call_element *elem = (grpc_call_element *)arg; - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; +static void pick_callback_cancel_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_call_element* elem = (grpc_call_element*)arg; + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; if (calld->lb_policy != NULL) { if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", @@ -1087,11 +1089,11 @@ static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx, void *arg, // Callback invoked by grpc_lb_policy_pick_locked() for async picks. // Unrefs the LB policy and invokes async_pick_done_locked(). -static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_call_element *elem = (grpc_call_element *)arg; - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; +static void pick_callback_done_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_call_element* elem = (grpc_call_element*)arg; + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously", chand, calld); @@ -1105,10 +1107,10 @@ static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg, // Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked(). // If the pick was completed synchronously, unrefs the LB policy and // returns true. -static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; +static bool pick_callback_start_locked(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem) { + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p", chand, calld, chand->lb_policy); @@ -1165,7 +1167,7 @@ static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx, } typedef struct { - grpc_call_element *elem; + grpc_call_element* elem; bool finished; grpc_closure closure; grpc_closure cancel_closure; @@ -1173,11 +1175,10 @@ typedef struct { // Note: This runs under the client_channel combiner, but will NOT be // holding the call combiner. -static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx, - void *arg, - grpc_error *error) { - pick_after_resolver_result_args *args = - (pick_after_resolver_result_args *)arg; +static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx* exec_ctx, + void* arg, + grpc_error* error) { + pick_after_resolver_result_args* args = (pick_after_resolver_result_args*)arg; if (args->finished) { gpr_free(args); return; @@ -1190,9 +1191,9 @@ static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx, // is called, it will be a no-op. We also immediately invoke // async_pick_done_locked() to propagate the error back to the caller. args->finished = true; - grpc_call_element *elem = args->elem; - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; + grpc_call_element* elem = args->elem; + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick waiting for resolver result", @@ -1208,14 +1209,13 @@ static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx, "Pick cancelled", &error, 1)); } -static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem); +static void pick_after_resolver_result_start_locked(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem); -static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, - void *arg, - grpc_error *error) { - pick_after_resolver_result_args *args = - (pick_after_resolver_result_args *)arg; +static void pick_after_resolver_result_done_locked(grpc_exec_ctx* exec_ctx, + void* arg, + grpc_error* error) { + pick_after_resolver_result_args* args = (pick_after_resolver_result_args*)arg; if (args->finished) { /* cancelled, do nothing */ if (GRPC_TRACER_ON(grpc_client_channel_trace)) { @@ -1225,9 +1225,9 @@ static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, return; } args->finished = true; - grpc_call_element *elem = args->elem; - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; + grpc_call_element* elem = args->elem; + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; if (error != GRPC_ERROR_NONE) { if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data", @@ -1274,17 +1274,17 @@ static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, } } -static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - channel_data *chand = (channel_data *)elem->channel_data; - call_data *calld = (call_data *)elem->call_data; +static void pick_after_resolver_result_start_locked(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem) { + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: deferring pick pending resolver result", chand, calld); } - pick_after_resolver_result_args *args = - (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args)); + pick_after_resolver_result_args* args = + (pick_after_resolver_result_args*)gpr_zalloc(sizeof(*args)); args->elem = elem; GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked, args, grpc_combiner_scheduler(chand->combiner)); @@ -1297,11 +1297,11 @@ static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx, grpc_combiner_scheduler(chand->combiner))); } -static void start_pick_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *ignored) { - grpc_call_element *elem = (grpc_call_element *)arg; - call_data *calld = (call_data *)elem->call_data; - channel_data *chand = (channel_data *)elem->channel_data; +static void start_pick_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* ignored) { + grpc_call_element* elem = (grpc_call_element*)arg; + call_data* calld = (call_data*)elem->call_data; + channel_data* chand = (channel_data*)elem->channel_data; GPR_ASSERT(calld->connected_subchannel == NULL); if (chand->lb_policy != NULL) { // We already have an LB policy, so ask it for a pick. @@ -1331,9 +1331,9 @@ static void start_pick_locked(grpc_exec_ctx *exec_ctx, void *arg, chand->interested_parties); } -static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_call_element *elem = (grpc_call_element *)arg; - call_data *calld = (call_data *)elem->call_data; +static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + grpc_call_element* elem = (grpc_call_element*)arg; + call_data* calld = (call_data*)elem->call_data; if (calld->retry_throttle_data != NULL) { if (error == GRPC_ERROR_NONE) { grpc_server_retry_throttle_data_record_success( @@ -1352,10 +1352,10 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } static void cc_start_transport_stream_op_batch( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *batch) { - call_data *calld = (call_data *)elem->call_data; - channel_data *chand = (channel_data *)elem->channel_data; + grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op_batch* batch) { + call_data* calld = (call_data*)elem->call_data; + channel_data* chand = (channel_data*)elem->channel_data; if (chand->deadline_checking_enabled) { grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem, batch); @@ -1446,11 +1446,11 @@ done: } /* Constructor for call_data */ -static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - const grpc_call_element_args *args) { - call_data *calld = (call_data *)elem->call_data; - channel_data *chand = (channel_data *)elem->channel_data; +static grpc_error* cc_init_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + const grpc_call_element_args* args) { + call_data* calld = (call_data*)elem->call_data; + channel_data* chand = (channel_data*)elem->channel_data; // Initialize data members. calld->path = grpc_slice_ref_internal(args->path); calld->call_start_time = args->start_time; @@ -1466,12 +1466,12 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, } /* Destructor for call_data */ -static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - const grpc_call_final_info *final_info, - grpc_closure *then_schedule_closure) { - call_data *calld = (call_data *)elem->call_data; - channel_data *chand = (channel_data *)elem->channel_data; +static void cc_destroy_call_elem(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + const grpc_call_final_info* final_info, + grpc_closure* then_schedule_closure) { + call_data* calld = (call_data*)elem->call_data; + channel_data* chand = (channel_data*)elem->channel_data; if (chand->deadline_checking_enabled) { grpc_deadline_state_destroy(exec_ctx, elem); } @@ -1502,10 +1502,10 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } -static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_polling_entity *pollent) { - call_data *calld = (call_data *)elem->call_data; +static void cc_set_pollset_or_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + grpc_polling_entity* pollent) { + call_data* calld = (call_data*)elem->call_data; calld->pollent = pollent; } @@ -1527,9 +1527,9 @@ const grpc_channel_filter grpc_client_channel_filter = { "client-channel", }; -static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error_ignored) { - channel_data *chand = (channel_data *)arg; +static void try_to_connect_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error_ignored) { + channel_data* chand = (channel_data*)arg; if (chand->lb_policy != NULL) { grpc_lb_policy_exit_idle_locked(exec_ctx, chand->lb_policy); } else { @@ -1542,34 +1542,35 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg, } grpc_connectivity_state grpc_client_channel_check_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) { - channel_data *chand = (channel_data *)elem->channel_data; + grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, int try_to_connect) { + channel_data* chand = (channel_data*)elem->channel_data; grpc_connectivity_state out = grpc_connectivity_state_check(&chand->state_tracker); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_CREATE(try_to_connect_locked, chand, - grpc_combiner_scheduler(chand->combiner)), + exec_ctx, + GRPC_CLOSURE_CREATE(try_to_connect_locked, chand, + grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } return out; } typedef struct external_connectivity_watcher { - channel_data *chand; + channel_data* chand; grpc_polling_entity pollent; - grpc_closure *on_complete; - grpc_closure *watcher_timer_init; - grpc_connectivity_state *state; + grpc_closure* on_complete; + grpc_closure* watcher_timer_init; + grpc_connectivity_state* state; grpc_closure my_closure; - struct external_connectivity_watcher *next; + struct external_connectivity_watcher* next; } external_connectivity_watcher; -static external_connectivity_watcher *lookup_external_connectivity_watcher( - channel_data *chand, grpc_closure *on_complete) { +static external_connectivity_watcher* lookup_external_connectivity_watcher( + channel_data* chand, grpc_closure* on_complete) { gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); - external_connectivity_watcher *w = + external_connectivity_watcher* w = chand->external_connectivity_watcher_list_head; while (w != NULL && w->on_complete != on_complete) { w = w->next; @@ -1579,7 +1580,7 @@ static external_connectivity_watcher *lookup_external_connectivity_watcher( } static void external_connectivity_watcher_list_append( - channel_data *chand, external_connectivity_watcher *w) { + channel_data* chand, external_connectivity_watcher* w) { GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete)); gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu); @@ -1590,7 +1591,7 @@ static void external_connectivity_watcher_list_append( } static void external_connectivity_watcher_list_remove( - channel_data *chand, external_connectivity_watcher *too_remove) { + channel_data* chand, external_connectivity_watcher* too_remove) { GPR_ASSERT( lookup_external_connectivity_watcher(chand, too_remove->on_complete)); gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); @@ -1599,7 +1600,7 @@ static void external_connectivity_watcher_list_remove( gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); return; } - external_connectivity_watcher *w = + external_connectivity_watcher* w = chand->external_connectivity_watcher_list_head; while (w != NULL) { if (w->next == too_remove) { @@ -1613,12 +1614,12 @@ static void external_connectivity_watcher_list_remove( } int grpc_client_channel_num_external_connectivity_watchers( - grpc_channel_element *elem) { - channel_data *chand = (channel_data *)elem->channel_data; + grpc_channel_element* elem) { + channel_data* chand = (channel_data*)elem->channel_data; int count = 0; gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); - external_connectivity_watcher *w = + external_connectivity_watcher* w = chand->external_connectivity_watcher_list_head; while (w != NULL) { count++; @@ -1629,10 +1630,10 @@ int grpc_client_channel_num_external_connectivity_watchers( return count; } -static void on_external_watch_complete_locked(grpc_exec_ctx *exec_ctx, - void *arg, grpc_error *error) { - external_connectivity_watcher *w = (external_connectivity_watcher *)arg; - grpc_closure *follow_up = w->on_complete; +static void on_external_watch_complete_locked(grpc_exec_ctx* exec_ctx, + void* arg, grpc_error* error) { + external_connectivity_watcher* w = (external_connectivity_watcher*)arg; + grpc_closure* follow_up = w->on_complete; grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent, w->chand->interested_parties); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, @@ -1642,10 +1643,10 @@ static void on_external_watch_complete_locked(grpc_exec_ctx *exec_ctx, GRPC_CLOSURE_RUN(exec_ctx, follow_up, GRPC_ERROR_REF(error)); } -static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error_ignored) { - external_connectivity_watcher *w = (external_connectivity_watcher *)arg; - external_connectivity_watcher *found = NULL; +static void watch_connectivity_state_locked(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error_ignored) { + external_connectivity_watcher* w = (external_connectivity_watcher*)arg; + external_connectivity_watcher* found = NULL; if (w->state != NULL) { external_connectivity_watcher_list_append(w->chand, w); GRPC_CLOSURE_RUN(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE); @@ -1670,12 +1671,12 @@ static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, } void grpc_client_channel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, - grpc_polling_entity pollent, grpc_connectivity_state *state, - grpc_closure *closure, grpc_closure *watcher_timer_init) { - channel_data *chand = (channel_data *)elem->channel_data; - external_connectivity_watcher *w = - (external_connectivity_watcher *)gpr_zalloc(sizeof(*w)); + grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, + grpc_polling_entity pollent, grpc_connectivity_state* state, + grpc_closure* closure, grpc_closure* watcher_timer_init) { + channel_data* chand = (channel_data*)elem->channel_data; + external_connectivity_watcher* w = + (external_connectivity_watcher*)gpr_zalloc(sizeof(*w)); w->chand = chand; w->pollent = pollent; w->on_complete = closure; |