aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/client_channel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/client_channel.cc')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc490
1 files changed, 257 insertions, 233 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 973cc5f703..5bb5fde985 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -31,6 +31,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
+#include "src/core/ext/filters/client_channel/backup_poller.h"
#include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
@@ -75,24 +76,28 @@ typedef struct {
wait_for_ready_value wait_for_ready;
} method_parameters;
-static method_parameters *method_parameters_ref(
- method_parameters *method_params) {
+static method_parameters* method_parameters_ref(
+ method_parameters* method_params) {
gpr_ref(&method_params->refs);
return method_params;
}
-static void method_parameters_unref(method_parameters *method_params) {
+static void method_parameters_unref(method_parameters* method_params) {
if (gpr_unref(&method_params->refs)) {
gpr_free(method_params);
}
}
-static void method_parameters_free(void *value) {
- method_parameters_unref((method_parameters *)value);
+// Wrappers to pass to grpc_service_config_create_method_config_table().
+static void* method_parameters_ref_wrapper(void* value) {
+ return method_parameters_ref((method_parameters*)value);
+}
+static void method_parameters_unref_wrapper(void* value) {
+ method_parameters_unref((method_parameters*)value);
}
-static bool parse_wait_for_ready(grpc_json *field,
- wait_for_ready_value *wait_for_ready) {
+static bool parse_wait_for_ready(grpc_json* field,
+ wait_for_ready_value* wait_for_ready) {
if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
return false;
}
@@ -101,13 +106,13 @@ static bool parse_wait_for_ready(grpc_json *field,
return true;
}
-static bool parse_timeout(grpc_json *field, grpc_millis *timeout) {
+static bool parse_timeout(grpc_json* field, grpc_millis* timeout) {
if (field->type != GRPC_JSON_STRING) return false;
size_t len = strlen(field->value);
if (field->value[len - 1] != 's') return false;
- char *buf = gpr_strdup(field->value);
+ char* buf = gpr_strdup(field->value);
buf[len - 1] = '\0'; // Remove trailing 's'.
- char *decimal_point = strchr(buf, '.');
+ char* decimal_point = strchr(buf, '.');
int nanos = 0;
if (decimal_point != NULL) {
*decimal_point = '\0';
@@ -116,34 +121,26 @@ static bool parse_timeout(grpc_json *field, grpc_millis *timeout) {
gpr_free(buf);
return false;
}
- // There should always be exactly 3, 6, or 9 fractional digits.
- int multiplier = 1;
- switch (strlen(decimal_point + 1)) {
- case 9:
- break;
- case 6:
- multiplier *= 1000;
- break;
- case 3:
- multiplier *= 1000000;
- break;
- default: // Unsupported number of digits.
- gpr_free(buf);
- return false;
+ int num_digits = (int)strlen(decimal_point + 1);
+ if (num_digits > 9) { // We don't accept greater precision than nanos.
+ gpr_free(buf);
+ return false;
+ }
+ for (int i = 0; i < (9 - num_digits); ++i) {
+ nanos *= 10;
}
- nanos *= multiplier;
}
- int seconds = gpr_parse_nonnegative_int(buf);
+ int seconds = decimal_point == buf ? 0 : gpr_parse_nonnegative_int(buf);
gpr_free(buf);
if (seconds == -1) return false;
*timeout = seconds * GPR_MS_PER_SEC + nanos / GPR_NS_PER_MS;
return true;
}
-static void *method_parameters_create_from_json(const grpc_json *json) {
+static void* method_parameters_create_from_json(const grpc_json* json) {
wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET;
grpc_millis timeout = 0;
- for (grpc_json *field = json->child; field != NULL; field = field->next) {
+ for (grpc_json* field = json->child; field != NULL; field = field->next) {
if (field->key == NULL) continue;
if (strcmp(field->key, "waitForReady") == 0) {
if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate.
@@ -153,8 +150,8 @@ static void *method_parameters_create_from_json(const grpc_json *json) {
if (!parse_timeout(field, &timeout)) return NULL;
}
}
- method_parameters *value =
- (method_parameters *)gpr_malloc(sizeof(method_parameters));
+ method_parameters* value =
+ (method_parameters*)gpr_malloc(sizeof(method_parameters));
gpr_ref_init(&value->refs, 1);
value->timeout = timeout;
value->wait_for_ready = wait_for_ready;
@@ -169,24 +166,24 @@ struct external_connectivity_watcher;
typedef struct client_channel_channel_data {
/** resolver for this channel */
- grpc_resolver *resolver;
+ grpc_resolver* resolver;
/** have we started resolving this channel */
bool started_resolving;
/** is deadline checking enabled? */
bool deadline_checking_enabled;
/** client channel factory */
- grpc_client_channel_factory *client_channel_factory;
+ grpc_client_channel_factory* client_channel_factory;
/** combiner protecting all variables below in this data structure */
- grpc_combiner *combiner;
+ grpc_combiner* combiner;
/** currently active load balancer */
- grpc_lb_policy *lb_policy;
+ grpc_lb_policy* lb_policy;
/** retry throttle data */
- grpc_server_retry_throttle_data *retry_throttle_data;
+ grpc_server_retry_throttle_data* retry_throttle_data;
/** maps method names to method_parameters structs */
- grpc_slice_hash_table *method_params_table;
+ grpc_slice_hash_table* method_params_table;
/** incoming resolver result - set by resolver.next() */
- grpc_channel_args *resolver_result;
+ grpc_channel_args* resolver_result;
/** a list of closures that are all waiting for resolver result to come in */
grpc_closure_list waiting_for_resolver_result_closures;
/** resolver callback */
@@ -196,41 +193,41 @@ typedef struct client_channel_channel_data {
/** when an lb_policy arrives, should we try to exit idle */
bool exit_idle_when_lb_policy_arrives;
/** owning stack */
- grpc_channel_stack *owning_stack;
+ grpc_channel_stack* owning_stack;
/** interested parties (owned) */
- grpc_pollset_set *interested_parties;
+ grpc_pollset_set* interested_parties;
/* external_connectivity_watcher_list head is guarded by its own mutex, since
* counts need to be grabbed immediately without polling on a cq */
gpr_mu external_connectivity_watcher_list_mu;
- struct external_connectivity_watcher *external_connectivity_watcher_list_head;
+ struct external_connectivity_watcher* external_connectivity_watcher_list_head;
/* the following properties are guarded by a mutex since API's require them
to be instantaneously available */
gpr_mu info_mu;
- char *info_lb_policy_name;
+ char* info_lb_policy_name;
/** service config in JSON form */
- char *info_service_config_json;
+ char* info_service_config_json;
} channel_data;
/** We create one watcher for each new lb_policy that is returned from a
resolver, to watch for state changes from the lb_policy. When a state
change is seen, we update the channel, and create a new watcher. */
typedef struct {
- channel_data *chand;
+ channel_data* chand;
grpc_closure on_changed;
grpc_connectivity_state state;
- grpc_lb_policy *lb_policy;
+ grpc_lb_policy* lb_policy;
} lb_policy_connectivity_watcher;
-static void watch_lb_policy_locked(channel_data *chand,
- grpc_lb_policy *lb_policy,
+static void watch_lb_policy_locked(channel_data* chand,
+ grpc_lb_policy* lb_policy,
grpc_connectivity_state current_state);
-static void set_channel_connectivity_state_locked(channel_data *chand,
+static void set_channel_connectivity_state_locked(channel_data* chand,
grpc_connectivity_state state,
- grpc_error *error,
- const char *reason) {
+ grpc_error* error,
+ const char* reason) {
/* TODO: Improve failure handling:
* - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
* - Hand over pending picks from old policies during the switch that happens
@@ -256,8 +253,8 @@ static void set_channel_connectivity_state_locked(channel_data *chand,
grpc_connectivity_state_set(&chand->state_tracker, state, error, reason);
}
-static void on_lb_policy_state_changed_locked(void *arg, grpc_error *error) {
- lb_policy_connectivity_watcher *w = (lb_policy_connectivity_watcher *)arg;
+static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) {
+ lb_policy_connectivity_watcher* w = (lb_policy_connectivity_watcher*)arg;
grpc_connectivity_state publish_state = w->state;
/* check if the notification is for the latest policy */
if (w->lb_policy == w->chand->lb_policy) {
@@ -281,11 +278,11 @@ static void on_lb_policy_state_changed_locked(void *arg, grpc_error *error) {
gpr_free(w);
}
-static void watch_lb_policy_locked(channel_data *chand,
- grpc_lb_policy *lb_policy,
+static void watch_lb_policy_locked(channel_data* chand,
+ grpc_lb_policy* lb_policy,
grpc_connectivity_state current_state) {
- lb_policy_connectivity_watcher *w =
- (lb_policy_connectivity_watcher *)gpr_malloc(sizeof(*w));
+ lb_policy_connectivity_watcher* w =
+ (lb_policy_connectivity_watcher*)gpr_malloc(sizeof(*w));
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
w->chand = chand;
GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w,
@@ -296,7 +293,7 @@ static void watch_lb_policy_locked(channel_data *chand,
&w->on_changed);
}
-static void start_resolving_locked(channel_data *chand) {
+static void start_resolving_locked(channel_data* chand) {
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand);
}
@@ -308,19 +305,19 @@ static void start_resolving_locked(channel_data *chand) {
}
typedef struct {
- char *server_name;
- grpc_server_retry_throttle_data *retry_throttle_data;
+ char* server_name;
+ grpc_server_retry_throttle_data* retry_throttle_data;
} service_config_parsing_state;
-static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
- service_config_parsing_state *parsing_state =
- (service_config_parsing_state *)arg;
+static void parse_retry_throttle_params(const grpc_json* field, void* arg) {
+ service_config_parsing_state* parsing_state =
+ (service_config_parsing_state*)arg;
if (strcmp(field->key, "retryThrottling") == 0) {
if (parsing_state->retry_throttle_data != NULL) return; // Duplicate.
if (field->type != GRPC_JSON_OBJECT) return;
int max_milli_tokens = 0;
int milli_token_ratio = 0;
- for (grpc_json *sub_field = field->child; sub_field != NULL;
+ for (grpc_json* sub_field = field->child; sub_field != NULL;
sub_field = sub_field->next) {
if (sub_field->key == NULL) return;
if (strcmp(sub_field->key, "maxTokens") == 0) {
@@ -336,7 +333,7 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
size_t whole_len = strlen(sub_field->value);
uint32_t multiplier = 1;
uint32_t decimal_value = 0;
- const char *decimal_point = strchr(sub_field->value, '.');
+ const char* decimal_point = strchr(sub_field->value, '.');
if (decimal_point != NULL) {
whole_len = (size_t)(decimal_point - sub_field->value);
multiplier = 1000;
@@ -367,24 +364,24 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
}
}
-static void on_resolver_result_changed_locked(void *arg, grpc_error *error) {
- channel_data *chand = (channel_data *)arg;
+static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
+ channel_data* chand = (channel_data*)arg;
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand,
grpc_error_string(error));
}
// Extract the following fields from the resolver result, if non-NULL.
bool lb_policy_updated = false;
- char *lb_policy_name_dup = NULL;
+ char* lb_policy_name_dup = NULL;
bool lb_policy_name_changed = false;
- grpc_lb_policy *new_lb_policy = NULL;
- char *service_config_json = NULL;
- grpc_server_retry_throttle_data *retry_throttle_data = NULL;
- grpc_slice_hash_table *method_params_table = NULL;
+ grpc_lb_policy* new_lb_policy = NULL;
+ char* service_config_json = NULL;
+ grpc_server_retry_throttle_data* retry_throttle_data = NULL;
+ grpc_slice_hash_table* method_params_table = NULL;
if (chand->resolver_result != NULL) {
// Find LB policy name.
- const char *lb_policy_name = NULL;
- const grpc_arg *channel_arg =
+ const char* lb_policy_name = NULL;
+ const grpc_arg* channel_arg =
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
if (channel_arg != NULL) {
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
@@ -395,8 +392,8 @@ static void on_resolver_result_changed_locked(void *arg, grpc_error *error) {
channel_arg =
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
if (channel_arg != NULL && channel_arg->type == GRPC_ARG_POINTER) {
- grpc_lb_addresses *addresses =
- (grpc_lb_addresses *)channel_arg->value.pointer.p;
+ grpc_lb_addresses* addresses =
+ (grpc_lb_addresses*)channel_arg->value.pointer.p;
bool found_balancer_address = false;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (addresses->addresses[i].is_balancer) {
@@ -446,14 +443,14 @@ static void on_resolver_result_changed_locked(void *arg, grpc_error *error) {
if (channel_arg != NULL) {
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
service_config_json = gpr_strdup(channel_arg->value.string);
- grpc_service_config *service_config =
+ grpc_service_config* service_config =
grpc_service_config_create(service_config_json);
if (service_config != NULL) {
channel_arg =
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
GPR_ASSERT(channel_arg != NULL);
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
- grpc_uri *uri = grpc_uri_parse(channel_arg->value.string, true);
+ grpc_uri* uri = grpc_uri_parse(channel_arg->value.string, true);
GPR_ASSERT(uri->path[0] != '\0');
service_config_parsing_state parsing_state;
memset(&parsing_state, 0, sizeof(parsing_state));
@@ -465,7 +462,7 @@ static void on_resolver_result_changed_locked(void *arg, grpc_error *error) {
retry_throttle_data = parsing_state.retry_throttle_data;
method_params_table = grpc_service_config_create_method_config_table(
service_config, method_parameters_create_from_json,
- method_parameters_free);
+ method_parameters_ref_wrapper, method_parameters_unref_wrapper);
grpc_service_config_destroy(service_config);
}
}
@@ -553,7 +550,7 @@ static void on_resolver_result_changed_locked(void *arg, grpc_error *error) {
GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
} else { // Not shutting down.
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
- grpc_error *state_error =
+ grpc_error* state_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
if (new_lb_policy != NULL) {
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
@@ -581,11 +578,11 @@ static void on_resolver_result_changed_locked(void *arg, grpc_error *error) {
}
}
-static void start_transport_op_locked(void *arg, grpc_error *error_ignored) {
- grpc_transport_op *op = (grpc_transport_op *)arg;
- grpc_channel_element *elem =
- (grpc_channel_element *)op->handler_private.extra_arg;
- channel_data *chand = (channel_data *)elem->channel_data;
+static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
+ grpc_transport_op* op = (grpc_transport_op*)arg;
+ grpc_channel_element* elem =
+ (grpc_channel_element*)op->handler_private.extra_arg;
+ channel_data* chand = (channel_data*)elem->channel_data;
if (op->on_connectivity_state_change != NULL) {
grpc_connectivity_state_notify_on_state_change(
@@ -633,9 +630,9 @@ static void start_transport_op_locked(void *arg, grpc_error *error_ignored) {
GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
}
-static void cc_start_transport_op(grpc_channel_element *elem,
- grpc_transport_op *op) {
- channel_data *chand = (channel_data *)elem->channel_data;
+static void cc_start_transport_op(grpc_channel_element* elem,
+ grpc_transport_op* op) {
+ channel_data* chand = (channel_data*)elem->channel_data;
GPR_ASSERT(op->set_accept_stream == false);
if (op->bind_pollset != NULL) {
@@ -650,9 +647,9 @@ static void cc_start_transport_op(grpc_channel_element *elem,
GRPC_ERROR_NONE);
}
-static void cc_get_channel_info(grpc_channel_element *elem,
- const grpc_channel_info *info) {
- channel_data *chand = (channel_data *)elem->channel_data;
+static void cc_get_channel_info(grpc_channel_element* elem,
+ const grpc_channel_info* info) {
+ channel_data* chand = (channel_data*)elem->channel_data;
gpr_mu_lock(&chand->info_mu);
if (info->lb_policy_name != NULL) {
*info->lb_policy_name = chand->info_lb_policy_name == NULL
@@ -669,9 +666,9 @@ static void cc_get_channel_info(grpc_channel_element *elem,
}
/* Constructor for channel_data */
-static grpc_error *cc_init_channel_elem(grpc_channel_element *elem,
- grpc_channel_element_args *args) {
- channel_data *chand = (channel_data *)elem->channel_data;
+static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
+ grpc_channel_element_args* args) {
+ channel_data* chand = (channel_data*)elem->channel_data;
GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
// Initialize data members.
@@ -690,8 +687,9 @@ static grpc_error *cc_init_channel_elem(grpc_channel_element *elem,
chand->interested_parties = grpc_pollset_set_create();
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
+ grpc_client_channel_start_backup_polling(chand->interested_parties);
// Record client channel factory.
- const grpc_arg *arg = grpc_channel_args_find(args->channel_args,
+ const grpc_arg* arg = grpc_channel_args_find(args->channel_args,
GRPC_ARG_CLIENT_CHANNEL_FACTORY);
if (arg == NULL) {
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
@@ -702,9 +700,9 @@ static grpc_error *cc_init_channel_elem(grpc_channel_element *elem,
"client channel factory arg must be a pointer");
}
grpc_client_channel_factory_ref(
- (grpc_client_channel_factory *)arg->value.pointer.p);
+ (grpc_client_channel_factory*)arg->value.pointer.p);
chand->client_channel_factory =
- (grpc_client_channel_factory *)arg->value.pointer.p;
+ (grpc_client_channel_factory*)arg->value.pointer.p;
// Get server name to resolve, using proxy mapper if needed.
arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
if (arg == NULL) {
@@ -715,8 +713,8 @@ static grpc_error *cc_init_channel_elem(grpc_channel_element *elem,
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"server uri arg must be a string");
}
- char *proxy_name = NULL;
- grpc_channel_args *new_args = NULL;
+ char* proxy_name = NULL;
+ grpc_channel_args* new_args = NULL;
grpc_proxy_mappers_map_name(arg->value.string, args->channel_args,
&proxy_name, &new_args);
// Instantiate resolver.
@@ -734,15 +732,15 @@ static grpc_error *cc_init_channel_elem(grpc_channel_element *elem,
return GRPC_ERROR_NONE;
}
-static void shutdown_resolver_locked(void *arg, grpc_error *error) {
- grpc_resolver *resolver = (grpc_resolver *)arg;
+static void shutdown_resolver_locked(void* arg, grpc_error* error) {
+ grpc_resolver* resolver = (grpc_resolver*)arg;
grpc_resolver_shutdown_locked(resolver);
GRPC_RESOLVER_UNREF(resolver, "channel");
}
/* Destructor for channel_data */
-static void cc_destroy_channel_elem(grpc_channel_element *elem) {
- channel_data *chand = (channel_data *)elem->channel_data;
+static void cc_destroy_channel_elem(grpc_channel_element* elem) {
+ channel_data* chand = (channel_data*)elem->channel_data;
if (chand->resolver != NULL) {
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver,
@@ -765,6 +763,7 @@ static void cc_destroy_channel_elem(grpc_channel_element *elem) {
if (chand->method_params_table != NULL) {
grpc_slice_hash_table_unref(chand->method_params_table);
}
+ grpc_client_channel_stop_backup_polling(chand->interested_parties);
grpc_connectivity_state_destroy(&chand->state_tracker);
grpc_pollset_set_destroy(chand->interested_parties);
GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
@@ -804,45 +803,45 @@ typedef struct client_channel_call_data {
grpc_slice path; // Request path.
gpr_timespec call_start_time;
grpc_millis deadline;
- gpr_arena *arena;
- grpc_call_stack *owning_call;
- grpc_call_combiner *call_combiner;
+ gpr_arena* arena;
+ grpc_call_stack* owning_call;
+ grpc_call_combiner* call_combiner;
- grpc_server_retry_throttle_data *retry_throttle_data;
- method_parameters *method_params;
+ grpc_server_retry_throttle_data* retry_throttle_data;
+ method_parameters* method_params;
- grpc_subchannel_call *subchannel_call;
- grpc_error *error;
+ grpc_subchannel_call* subchannel_call;
+ grpc_error* error;
- grpc_lb_policy *lb_policy; // Holds ref while LB pick is pending.
+ grpc_lb_policy* lb_policy; // Holds ref while LB pick is pending.
grpc_closure lb_pick_closure;
grpc_closure lb_pick_cancel_closure;
- grpc_connected_subchannel *connected_subchannel;
+ grpc_connected_subchannel* connected_subchannel;
grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT];
- grpc_polling_entity *pollent;
+ grpc_polling_entity* pollent;
- grpc_transport_stream_op_batch *waiting_for_pick_batches[MAX_WAITING_BATCHES];
+ grpc_transport_stream_op_batch* waiting_for_pick_batches[MAX_WAITING_BATCHES];
size_t waiting_for_pick_batches_count;
grpc_closure handle_pending_batch_in_call_combiner[MAX_WAITING_BATCHES];
- grpc_transport_stream_op_batch *initial_metadata_batch;
+ grpc_transport_stream_op_batch* initial_metadata_batch;
grpc_linked_mdelem lb_token_mdelem;
grpc_closure on_complete;
- grpc_closure *original_on_complete;
+ grpc_closure* original_on_complete;
} call_data;
-grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
- grpc_call_element *elem) {
- call_data *calld = (call_data *)elem->call_data;
+grpc_subchannel_call* grpc_client_channel_get_subchannel_call(
+ grpc_call_element* elem) {
+ call_data* calld = (call_data*)elem->call_data;
return calld->subchannel_call;
}
// This is called via the call combiner, so access to calld is synchronized.
static void waiting_for_pick_batches_add(
- call_data *calld, grpc_transport_stream_op_batch *batch) {
+ call_data* calld, grpc_transport_stream_op_batch* batch) {
if (batch->send_initial_metadata) {
GPR_ASSERT(calld->initial_metadata_batch == NULL);
calld->initial_metadata_batch = batch;
@@ -854,8 +853,8 @@ static void waiting_for_pick_batches_add(
}
// This is called via the call combiner, so access to calld is synchronized.
-static void fail_pending_batch_in_call_combiner(void *arg, grpc_error *error) {
- call_data *calld = (call_data *)arg;
+static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) {
+ call_data* calld = (call_data*)arg;
if (calld->waiting_for_pick_batches_count > 0) {
--calld->waiting_for_pick_batches_count;
grpc_transport_stream_op_batch_finish_with_failure(
@@ -865,12 +864,12 @@ static void fail_pending_batch_in_call_combiner(void *arg, grpc_error *error) {
}
// This is called via the call combiner, so access to calld is synchronized.
-static void waiting_for_pick_batches_fail(grpc_call_element *elem,
- grpc_error *error) {
- call_data *calld = (call_data *)elem->call_data;
+static void waiting_for_pick_batches_fail(grpc_call_element* elem,
+ grpc_error* error) {
+ call_data* calld = (call_data*)elem->call_data;
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG,
- "chand=%p calld=%p: failing %" PRIdPTR " pending batches: %s",
+ "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
elem->channel_data, calld, calld->waiting_for_pick_batches_count,
grpc_error_string(error));
}
@@ -894,8 +893,8 @@ static void waiting_for_pick_batches_fail(grpc_call_element *elem,
}
// This is called via the call combiner, so access to calld is synchronized.
-static void run_pending_batch_in_call_combiner(void *arg, grpc_error *ignored) {
- call_data *calld = (call_data *)arg;
+static void run_pending_batch_in_call_combiner(void* arg, grpc_error* ignored) {
+ call_data* calld = (call_data*)arg;
if (calld->waiting_for_pick_batches_count > 0) {
--calld->waiting_for_pick_batches_count;
grpc_subchannel_call_process_op(
@@ -905,12 +904,13 @@ static void run_pending_batch_in_call_combiner(void *arg, grpc_error *ignored) {
}
// This is called via the call combiner, so access to calld is synchronized.
-static void waiting_for_pick_batches_resume(grpc_call_element *elem) {
- channel_data *chand = (channel_data *)elem->channel_data;
- call_data *calld = (call_data *)elem->call_data;
+static void waiting_for_pick_batches_resume(grpc_call_element* elem) {
+ channel_data* chand = (channel_data*)elem->channel_data;
+ call_data* calld = (call_data*)elem->call_data;
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
- gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR
- " pending batches to subchannel_call=%p",
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: sending %" PRIuPTR
+ " pending batches to subchannel_call=%p",
chand, calld, calld->waiting_for_pick_batches_count,
calld->subchannel_call);
}
@@ -929,9 +929,9 @@ static void waiting_for_pick_batches_resume(grpc_call_element *elem) {
// Applies service config to the call. Must be invoked once we know
// that the resolver has returned results to the channel.
-static void apply_service_config_to_call_locked(grpc_call_element *elem) {
- channel_data *chand = (channel_data *)elem->channel_data;
- call_data *calld = (call_data *)elem->call_data;
+static void apply_service_config_to_call_locked(grpc_call_element* elem) {
+ channel_data* chand = (channel_data*)elem->channel_data;
+ call_data* calld = (call_data*)elem->call_data;
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call",
chand, calld);
@@ -941,7 +941,7 @@ static void apply_service_config_to_call_locked(grpc_call_element *elem) {
grpc_server_retry_throttle_data_ref(chand->retry_throttle_data);
}
if (chand->method_params_table != NULL) {
- calld->method_params = (method_parameters *)grpc_method_config_table_get(
+ calld->method_params = (method_parameters*)grpc_method_config_table_get(
chand->method_params_table, calld->path);
if (calld->method_params != NULL) {
method_parameters_ref(calld->method_params);
@@ -961,10 +961,10 @@ static void apply_service_config_to_call_locked(grpc_call_element *elem) {
}
}
-static void create_subchannel_call_locked(grpc_call_element *elem,
- grpc_error *error) {
- channel_data *chand = (channel_data *)elem->channel_data;
- call_data *calld = (call_data *)elem->call_data;
+static void create_subchannel_call_locked(grpc_call_element* elem,
+ grpc_error* error) {
+ channel_data* chand = (channel_data*)elem->channel_data;
+ call_data* calld = (call_data*)elem->call_data;
const grpc_connected_subchannel_call_args call_args = {
calld->pollent, // pollent
calld->path, // path
@@ -974,7 +974,7 @@ static void create_subchannel_call_locked(grpc_call_element *elem,
calld->subchannel_call_context, // context
calld->call_combiner // call_combiner
};
- grpc_error *new_error = grpc_connected_subchannel_create_call(
+ grpc_error* new_error = grpc_connected_subchannel_create_call(
calld->connected_subchannel, &call_args, &calld->subchannel_call);
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
@@ -990,9 +990,9 @@ static void create_subchannel_call_locked(grpc_call_element *elem,
}
// Invoked when a pick is completed, on both success or failure.
-static void pick_done_locked(grpc_call_element *elem, grpc_error *error) {
- call_data *calld = (call_data *)elem->call_data;
- channel_data *chand = (channel_data *)elem->channel_data;
+static void pick_done_locked(grpc_call_element* elem, grpc_error* error) {
+ call_data* calld = (call_data*)elem->call_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
if (calld->connected_subchannel == NULL) {
// Failed to create subchannel.
GRPC_ERROR_UNREF(calld->error);
@@ -1018,9 +1018,9 @@ static void pick_done_locked(grpc_call_element *elem, grpc_error *error) {
// either (a) the pick was deferred pending a resolver result or (b) the
// pick was done asynchronously. Removes the call's polling entity from
// chand->interested_parties before invoking pick_done_locked().
-static void async_pick_done_locked(grpc_call_element *elem, grpc_error *error) {
- channel_data *chand = (channel_data *)elem->channel_data;
- call_data *calld = (call_data *)elem->call_data;
+static void async_pick_done_locked(grpc_call_element* elem, grpc_error* error) {
+ channel_data* chand = (channel_data*)elem->channel_data;
+ call_data* calld = (call_data*)elem->call_data;
grpc_polling_entity_del_from_pollset_set(calld->pollent,
chand->interested_parties);
pick_done_locked(elem, error);
@@ -1028,10 +1028,10 @@ static void async_pick_done_locked(grpc_call_element *elem, grpc_error *error) {
// Note: This runs under the client_channel combiner, but will NOT be
// holding the call combiner.
-static void pick_callback_cancel_locked(void *arg, grpc_error *error) {
- grpc_call_element *elem = (grpc_call_element *)arg;
- channel_data *chand = (channel_data *)elem->channel_data;
- call_data *calld = (call_data *)elem->call_data;
+static void pick_callback_cancel_locked(void* arg, grpc_error* error) {
+ grpc_call_element* elem = (grpc_call_element*)arg;
+ channel_data* chand = (channel_data*)elem->channel_data;
+ call_data* calld = (call_data*)elem->call_data;
if (calld->lb_policy != NULL) {
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p",
@@ -1045,10 +1045,10 @@ static void pick_callback_cancel_locked(void *arg, grpc_error *error) {
// Callback invoked by grpc_lb_policy_pick_locked() for async picks.
// Unrefs the LB policy and invokes async_pick_done_locked().
-static void pick_callback_done_locked(void *arg, grpc_error *error) {
- grpc_call_element *elem = (grpc_call_element *)arg;
- channel_data *chand = (channel_data *)elem->channel_data;
- call_data *calld = (call_data *)elem->call_data;
+static void pick_callback_done_locked(void* arg, grpc_error* error) {
+ grpc_call_element* elem = (grpc_call_element*)arg;
+ channel_data* chand = (channel_data*)elem->channel_data;
+ call_data* calld = (call_data*)elem->call_data;
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously",
chand, calld);
@@ -1062,9 +1062,9 @@ static void pick_callback_done_locked(void *arg, grpc_error *error) {
// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked().
// If the pick was completed synchronously, unrefs the LB policy and
// returns true.
-static bool pick_callback_start_locked(grpc_call_element *elem) {
- channel_data *chand = (channel_data *)elem->channel_data;
- call_data *calld = (call_data *)elem->call_data;
+static bool pick_callback_start_locked(grpc_call_element* elem) {
+ channel_data* chand = (channel_data*)elem->channel_data;
+ call_data* calld = (call_data*)elem->call_data;
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p",
chand, calld, chand->lb_policy);
@@ -1121,7 +1121,7 @@ static bool pick_callback_start_locked(grpc_call_element *elem) {
}
typedef struct {
- grpc_call_element *elem;
+ grpc_call_element* elem;
bool finished;
grpc_closure closure;
grpc_closure cancel_closure;
@@ -1129,10 +1129,9 @@ typedef struct {
// Note: This runs under the client_channel combiner, but will NOT be
// holding the call combiner.
-static void pick_after_resolver_result_cancel_locked(void *arg,
- grpc_error *error) {
- pick_after_resolver_result_args *args =
- (pick_after_resolver_result_args *)arg;
+static void pick_after_resolver_result_cancel_locked(void* arg,
+ grpc_error* error) {
+ pick_after_resolver_result_args* args = (pick_after_resolver_result_args*)arg;
if (args->finished) {
gpr_free(args);
return;
@@ -1145,9 +1144,9 @@ static void pick_after_resolver_result_cancel_locked(void *arg,
// is called, it will be a no-op. We also immediately invoke
// async_pick_done_locked() to propagate the error back to the caller.
args->finished = true;
- grpc_call_element *elem = args->elem;
- channel_data *chand = (channel_data *)elem->channel_data;
- call_data *calld = (call_data *)elem->call_data;
+ grpc_call_element* elem = args->elem;
+ channel_data* chand = (channel_data*)elem->channel_data;
+ call_data* calld = (call_data*)elem->call_data;
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG,
"chand=%p calld=%p: cancelling pick waiting for resolver result",
@@ -1162,10 +1161,11 @@ static void pick_after_resolver_result_cancel_locked(void *arg,
"Pick cancelled", &error, 1));
}
-static void pick_after_resolver_result_done_locked(void *arg,
- grpc_error *error) {
- pick_after_resolver_result_args *args =
- (pick_after_resolver_result_args *)arg;
+static void pick_after_resolver_result_start_locked(grpc_call_element* elem);
+
+static void pick_after_resolver_result_done_locked(void* arg,
+ grpc_error* error) {
+ pick_after_resolver_result_args* args = (pick_after_resolver_result_args*)arg;
if (args->finished) {
/* cancelled, do nothing */
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
@@ -1175,16 +1175,16 @@ static void pick_after_resolver_result_done_locked(void *arg,
return;
}
args->finished = true;
- grpc_call_element *elem = args->elem;
- channel_data *chand = (channel_data *)elem->channel_data;
- call_data *calld = (call_data *)elem->call_data;
+ grpc_call_element* elem = args->elem;
+ channel_data* chand = (channel_data*)elem->channel_data;
+ call_data* calld = (call_data*)elem->call_data;
if (error != GRPC_ERROR_NONE) {
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data",
chand, calld);
}
async_pick_done_locked(elem, GRPC_ERROR_REF(error));
- } else {
+ } else if (chand->lb_policy != NULL) {
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick",
chand, calld);
@@ -1198,18 +1198,42 @@ static void pick_after_resolver_result_done_locked(void *arg,
async_pick_done_locked(elem, GRPC_ERROR_NONE);
}
}
+ // TODO(roth): It should be impossible for chand->lb_policy to be NULL
+ // here, so the rest of this code should never actually be executed.
+ // However, we have reports of a crash on iOS that triggers this case,
+ // so we are temporarily adding this to restore branches that were
+ // removed in https://github.com/grpc/grpc/pull/12297. Need to figure
+ // out what is actually causing this to occur and then figure out the
+ // right way to deal with it.
+ else if (chand->resolver != NULL) {
+ // No LB policy, so try again.
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG,
+ "chand=%p calld=%p: resolver returned but no LB policy, "
+ "trying again",
+ chand, calld);
+ }
+ pick_after_resolver_result_start_locked(elem);
+ } else {
+ if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
+ gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver disconnected", chand,
+ calld);
+ }
+ async_pick_done_locked(
+ elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
+ }
}
-static void pick_after_resolver_result_start_locked(grpc_call_element *elem) {
- channel_data *chand = (channel_data *)elem->channel_data;
- call_data *calld = (call_data *)elem->call_data;
+static void pick_after_resolver_result_start_locked(grpc_call_element* elem) {
+ channel_data* chand = (channel_data*)elem->channel_data;
+ call_data* calld = (call_data*)elem->call_data;
if (GRPC_TRACER_ON(grpc_client_channel_trace)) {
gpr_log(GPR_DEBUG,
"chand=%p calld=%p: deferring pick pending resolver result", chand,
calld);
}
- pick_after_resolver_result_args *args =
- (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args));
+ pick_after_resolver_result_args* args =
+ (pick_after_resolver_result_args*)gpr_zalloc(sizeof(*args));
args->elem = elem;
GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked,
args, grpc_combiner_scheduler(chand->combiner));
@@ -1222,10 +1246,10 @@ static void pick_after_resolver_result_start_locked(grpc_call_element *elem) {
grpc_combiner_scheduler(chand->combiner)));
}
-static void start_pick_locked(void *arg, grpc_error *ignored) {
- grpc_call_element *elem = (grpc_call_element *)arg;
- call_data *calld = (call_data *)elem->call_data;
- channel_data *chand = (channel_data *)elem->channel_data;
+static void start_pick_locked(void* arg, grpc_error* ignored) {
+ grpc_call_element* elem = (grpc_call_element*)arg;
+ call_data* calld = (call_data*)elem->call_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
GPR_ASSERT(calld->connected_subchannel == NULL);
if (chand->lb_policy != NULL) {
// We already have an LB policy, so ask it for a pick.
@@ -1255,9 +1279,9 @@ static void start_pick_locked(void *arg, grpc_error *ignored) {
chand->interested_parties);
}
-static void on_complete(void *arg, grpc_error *error) {
- grpc_call_element *elem = (grpc_call_element *)arg;
- call_data *calld = (call_data *)elem->call_data;
+static void on_complete(void* arg, grpc_error* error) {
+ grpc_call_element* elem = (grpc_call_element*)arg;
+ call_data* calld = (call_data*)elem->call_data;
if (calld->retry_throttle_data != NULL) {
if (error == GRPC_ERROR_NONE) {
grpc_server_retry_throttle_data_record_success(
@@ -1275,9 +1299,9 @@ static void on_complete(void *arg, grpc_error *error) {
}
static void cc_start_transport_stream_op_batch(
- grpc_call_element *elem, grpc_transport_stream_op_batch *batch) {
- call_data *calld = (call_data *)elem->call_data;
- channel_data *chand = (channel_data *)elem->channel_data;
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
+ call_data* calld = (call_data*)elem->call_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
if (chand->deadline_checking_enabled) {
grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
}
@@ -1365,10 +1389,10 @@ done:
}
/* Constructor for call_data */
-static grpc_error *cc_init_call_elem(grpc_call_element *elem,
- const grpc_call_element_args *args) {
- call_data *calld = (call_data *)elem->call_data;
- channel_data *chand = (channel_data *)elem->channel_data;
+static grpc_error* cc_init_call_elem(grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ call_data* calld = (call_data*)elem->call_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
// Initialize data members.
calld->path = grpc_slice_ref_internal(args->path);
calld->call_start_time = args->start_time;
@@ -1384,11 +1408,11 @@ static grpc_error *cc_init_call_elem(grpc_call_element *elem,
}
/* Destructor for call_data */
-static void cc_destroy_call_elem(grpc_call_element *elem,
- const grpc_call_final_info *final_info,
- grpc_closure *then_schedule_closure) {
- call_data *calld = (call_data *)elem->call_data;
- channel_data *chand = (channel_data *)elem->channel_data;
+static void cc_destroy_call_elem(grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ grpc_closure* then_schedule_closure) {
+ call_data* calld = (call_data*)elem->call_data;
+ channel_data* chand = (channel_data*)elem->channel_data;
if (chand->deadline_checking_enabled) {
grpc_deadline_state_destroy(elem);
}
@@ -1418,9 +1442,9 @@ static void cc_destroy_call_elem(grpc_call_element *elem,
GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
}
-static void cc_set_pollset_or_pollset_set(grpc_call_element *elem,
- grpc_polling_entity *pollent) {
- call_data *calld = (call_data *)elem->call_data;
+static void cc_set_pollset_or_pollset_set(grpc_call_element* elem,
+ grpc_polling_entity* pollent) {
+ call_data* calld = (call_data*)elem->call_data;
calld->pollent = pollent;
}
@@ -1442,8 +1466,8 @@ const grpc_channel_filter grpc_client_channel_filter = {
"client-channel",
};
-static void try_to_connect_locked(void *arg, grpc_error *error_ignored) {
- channel_data *chand = (channel_data *)arg;
+static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
+ channel_data* chand = (channel_data*)arg;
if (chand->lb_policy != NULL) {
grpc_lb_policy_exit_idle_locked(chand->lb_policy);
} else {
@@ -1456,8 +1480,8 @@ static void try_to_connect_locked(void *arg, grpc_error *error_ignored) {
}
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
- grpc_channel_element *elem, int try_to_connect) {
- channel_data *chand = (channel_data *)elem->channel_data;
+ grpc_channel_element* elem, int try_to_connect) {
+ channel_data* chand = (channel_data*)elem->channel_data;
grpc_connectivity_state out =
grpc_connectivity_state_check(&chand->state_tracker);
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
@@ -1471,19 +1495,19 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
}
typedef struct external_connectivity_watcher {
- channel_data *chand;
+ channel_data* chand;
grpc_polling_entity pollent;
- grpc_closure *on_complete;
- grpc_closure *watcher_timer_init;
- grpc_connectivity_state *state;
+ grpc_closure* on_complete;
+ grpc_closure* watcher_timer_init;
+ grpc_connectivity_state* state;
grpc_closure my_closure;
- struct external_connectivity_watcher *next;
+ struct external_connectivity_watcher* next;
} external_connectivity_watcher;
-static external_connectivity_watcher *lookup_external_connectivity_watcher(
- channel_data *chand, grpc_closure *on_complete) {
+static external_connectivity_watcher* lookup_external_connectivity_watcher(
+ channel_data* chand, grpc_closure* on_complete) {
gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
- external_connectivity_watcher *w =
+ external_connectivity_watcher* w =
chand->external_connectivity_watcher_list_head;
while (w != NULL && w->on_complete != on_complete) {
w = w->next;
@@ -1493,7 +1517,7 @@ static external_connectivity_watcher *lookup_external_connectivity_watcher(
}
static void external_connectivity_watcher_list_append(
- channel_data *chand, external_connectivity_watcher *w) {
+ channel_data* chand, external_connectivity_watcher* w) {
GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
@@ -1504,7 +1528,7 @@ static void external_connectivity_watcher_list_append(
}
static void external_connectivity_watcher_list_remove(
- channel_data *chand, external_connectivity_watcher *too_remove) {
+ channel_data* chand, external_connectivity_watcher* too_remove) {
GPR_ASSERT(
lookup_external_connectivity_watcher(chand, too_remove->on_complete));
gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
@@ -1513,7 +1537,7 @@ static void external_connectivity_watcher_list_remove(
gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
return;
}
- external_connectivity_watcher *w =
+ external_connectivity_watcher* w =
chand->external_connectivity_watcher_list_head;
while (w != NULL) {
if (w->next == too_remove) {
@@ -1527,12 +1551,12 @@ static void external_connectivity_watcher_list_remove(
}
int grpc_client_channel_num_external_connectivity_watchers(
- grpc_channel_element *elem) {
- channel_data *chand = (channel_data *)elem->channel_data;
+ grpc_channel_element* elem) {
+ channel_data* chand = (channel_data*)elem->channel_data;
int count = 0;
gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
- external_connectivity_watcher *w =
+ external_connectivity_watcher* w =
chand->external_connectivity_watcher_list_head;
while (w != NULL) {
count++;
@@ -1543,9 +1567,9 @@ int grpc_client_channel_num_external_connectivity_watchers(
return count;
}
-static void on_external_watch_complete(void *arg, grpc_error *error) {
- external_connectivity_watcher *w = (external_connectivity_watcher *)arg;
- grpc_closure *follow_up = w->on_complete;
+static void on_external_watch_complete_locked(void* arg, grpc_error* error) {
+ external_connectivity_watcher* w = (external_connectivity_watcher*)arg;
+ grpc_closure* follow_up = w->on_complete;
grpc_polling_entity_del_from_pollset_set(&w->pollent,
w->chand->interested_parties);
GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
@@ -1555,15 +1579,15 @@ static void on_external_watch_complete(void *arg, grpc_error *error) {
GRPC_CLOSURE_RUN(follow_up, GRPC_ERROR_REF(error));
}
-static void watch_connectivity_state_locked(void *arg,
- grpc_error *error_ignored) {
- external_connectivity_watcher *w = (external_connectivity_watcher *)arg;
- external_connectivity_watcher *found = NULL;
+static void watch_connectivity_state_locked(void* arg,
+ grpc_error* error_ignored) {
+ external_connectivity_watcher* w = (external_connectivity_watcher*)arg;
+ external_connectivity_watcher* found = NULL;
if (w->state != NULL) {
external_connectivity_watcher_list_append(w->chand, w);
GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
- GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete, w,
- grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
+ grpc_combiner_scheduler(w->chand->combiner));
grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker,
w->state, &w->my_closure);
} else {
@@ -1583,12 +1607,12 @@ static void watch_connectivity_state_locked(void *arg,
}
void grpc_client_channel_watch_connectivity_state(
- grpc_channel_element *elem, grpc_polling_entity pollent,
- grpc_connectivity_state *state, grpc_closure *closure,
- grpc_closure *watcher_timer_init) {
- channel_data *chand = (channel_data *)elem->channel_data;
- external_connectivity_watcher *w =
- (external_connectivity_watcher *)gpr_zalloc(sizeof(*w));
+ grpc_channel_element* elem, grpc_polling_entity pollent,
+ grpc_connectivity_state* state, grpc_closure* closure,
+ grpc_closure* watcher_timer_init) {
+ channel_data* chand = (channel_data*)elem->channel_data;
+ external_connectivity_watcher* w =
+ (external_connectivity_watcher*)gpr_zalloc(sizeof(*w));
w->chand = chand;
w->pollent = pollent;
w->on_complete = closure;