aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/client_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/client_channel.c')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c466
1 files changed, 293 insertions, 173 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index f2f27b9175..f29c5d55ed 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2015, Google Inc.
- * All rights reserved.
+ * Copyright 2015 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -167,6 +152,8 @@ static void *method_parameters_create_from_json(const grpc_json *json) {
return value;
}
+struct external_connectivity_watcher;
+
/*************************************************************************
* CHANNEL-WIDE FUNCTIONS
*/
@@ -204,6 +191,11 @@ typedef struct client_channel_channel_data {
/** interested parties (owned) */
grpc_pollset_set *interested_parties;
+ /* external_connectivity_watcher_list head is guarded by its own mutex, since
+ * counts need to be grabbed immediately without polling on a cq */
+ gpr_mu external_connectivity_watcher_list_mu;
+ struct external_connectivity_watcher *external_connectivity_watcher_list_head;
+
/* the following properties are guarded by a mutex since API's require them
to be instantaneously available */
gpr_mu info_mu;
@@ -283,8 +275,8 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
w->chand = chand;
- grpc_closure_init(&w->on_changed, on_lb_policy_state_changed_locked, w,
- grpc_combiner_scheduler(chand->combiner, false));
+ GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w,
+ grpc_combiner_scheduler(chand->combiner));
w->state = current_state;
w->lb_policy = lb_policy;
grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state,
@@ -372,7 +364,7 @@ static void wrapped_on_pick_closure_cb(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(wc_arg != NULL);
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
GPR_ASSERT(wc_arg->lb_policy != NULL);
- grpc_closure_run(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_RUN(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error));
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->lb_policy, "pick_subchannel_wrapping");
gpr_free(wc_arg);
}
@@ -382,7 +374,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
channel_data *chand = arg;
char *lb_policy_name = NULL;
grpc_lb_policy *lb_policy = NULL;
- grpc_lb_policy *old_lb_policy;
+ grpc_lb_policy *old_lb_policy = NULL;
grpc_slice_hash_table *method_params_table = NULL;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
bool exit_idle = false;
@@ -392,6 +384,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
service_config_parsing_state parsing_state;
memset(&parsing_state, 0, sizeof(parsing_state));
+ bool lb_policy_updated = false;
if (chand->resolver_result != NULL) {
// Find LB policy name.
const grpc_arg *channel_arg =
@@ -431,14 +424,27 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
lb_policy_args.args = chand->resolver_result;
lb_policy_args.client_channel_factory = chand->client_channel_factory;
lb_policy_args.combiner = chand->combiner;
- lb_policy =
- grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
- if (lb_policy != NULL) {
- GRPC_LB_POLICY_REF(lb_policy, "config_change");
- GRPC_ERROR_UNREF(state_error);
- state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy,
- &state_error);
+
+ const bool lb_policy_type_changed =
+ (chand->info_lb_policy_name == NULL) ||
+ (strcmp(chand->info_lb_policy_name, lb_policy_name) != 0);
+ if (chand->lb_policy != NULL && !lb_policy_type_changed) {
+ // update
+ lb_policy_updated = true;
+ grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args);
+ } else {
+ lb_policy =
+ grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
+ if (lb_policy != NULL) {
+ GRPC_LB_POLICY_REF(lb_policy, "config_change");
+ GRPC_ERROR_UNREF(state_error);
+ state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy,
+ &state_error);
+ old_lb_policy = chand->lb_policy;
+ chand->lb_policy = lb_policy;
+ }
}
+
// Find service config.
channel_arg =
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
@@ -485,8 +491,6 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
gpr_free(chand->info_lb_policy_name);
chand->info_lb_policy_name = lb_policy_name;
}
- old_lb_policy = chand->lb_policy;
- chand->lb_policy = lb_policy;
if (service_config_json != NULL) {
gpr_free(chand->info_service_config_json);
chand->info_service_config_json = service_config_json;
@@ -502,24 +506,28 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
}
chand->method_params_table = method_params_table;
if (lb_policy != NULL) {
- grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &chand->waiting_for_config_closures);
} else if (chand->resolver == NULL /* disconnected */) {
grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Channel disconnected", &error, 1));
- grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &chand->waiting_for_config_closures);
}
- if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
+ if (!lb_policy_updated && lb_policy != NULL &&
+ chand->exit_idle_when_lb_policy_arrives) {
GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
exit_idle = true;
chand->exit_idle_when_lb_policy_arrives = false;
}
if (error == GRPC_ERROR_NONE && chand->resolver) {
- set_channel_connectivity_state_locked(
- exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver");
- if (lb_policy != NULL) {
- watch_lb_policy_locked(exec_ctx, chand, lb_policy, state);
+ if (!lb_policy_updated) {
+ set_channel_connectivity_state_locked(exec_ctx, chand, state,
+ GRPC_ERROR_REF(state_error),
+ "new_lb+resolver");
+ if (lb_policy != NULL) {
+ watch_lb_policy_locked(exec_ctx, chand, lb_policy, state);
+ }
}
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
grpc_resolver_next_locked(exec_ctx, chand->resolver,
@@ -539,7 +547,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
"resolver_gone");
}
- if (exit_idle) {
+ if (!lb_policy_updated && lb_policy != NULL && exit_idle) {
grpc_lb_policy_exit_idle_locked(exec_ctx, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
}
@@ -548,9 +556,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_pollset_set_del_pollset_set(
exec_ctx, old_lb_policy->interested_parties, chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
+ old_lb_policy = NULL;
}
- if (lb_policy != NULL) {
+ if (!lb_policy_updated && lb_policy != NULL) {
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
}
@@ -574,7 +583,7 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (op->send_ping != NULL) {
if (chand->lb_policy == NULL) {
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx, op->send_ping,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
} else {
@@ -595,7 +604,7 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (!chand->started_resolving) {
grpc_closure_list_fail_all(&chand->waiting_for_config_closures,
GRPC_ERROR_REF(op->disconnect_with_error));
- grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &chand->waiting_for_config_closures);
}
if (chand->lb_policy != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx,
@@ -609,7 +618,7 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op");
- grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
}
static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
@@ -625,10 +634,10 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
op->handler_private.extra_arg = elem;
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx,
- grpc_closure_init(&op->handler_private.closure, start_transport_op_locked,
- op, grpc_combiner_scheduler(chand->combiner, false)),
+ GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
+ op, grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
}
@@ -659,12 +668,18 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
// Initialize data members.
- chand->combiner = grpc_combiner_create(NULL);
+ chand->combiner = grpc_combiner_create();
gpr_mu_init(&chand->info_mu);
+ gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
+
+ gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
+ chand->external_connectivity_watcher_list_head = NULL;
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+
chand->owning_stack = args->channel_stack;
- grpc_closure_init(&chand->on_resolver_result_changed,
+ GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed,
on_resolver_result_changed_locked, chand,
- grpc_combiner_scheduler(chand->combiner, false));
+ grpc_combiner_scheduler(chand->combiner));
chand->interested_parties = grpc_pollset_set_create();
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
@@ -722,10 +737,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
if (chand->resolver != NULL) {
- grpc_closure_sched(
- exec_ctx,
- grpc_closure_create(shutdown_resolver_locked, chand->resolver,
- grpc_combiner_scheduler(chand->combiner, false)),
+ GRPC_CLOSURE_SCHED(
+ exec_ctx, GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver,
+ grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
}
if (chand->client_channel_factory != NULL) {
@@ -749,17 +763,13 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_pollset_set_destroy(exec_ctx, chand->interested_parties);
GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel");
gpr_mu_destroy(&chand->info_mu);
+ gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
}
/*************************************************************************
* PER-CALL FUNCTIONS
*/
-#define GET_CALL(call_data) \
- ((grpc_subchannel_call *)(gpr_atm_acq_load(&(call_data)->subchannel_call)))
-
-#define CANCELLED_CALL ((grpc_subchannel_call *)1)
-
/** Call data. Holds a pointer to grpc_subchannel_call and the
associated machinery to create such a pointer.
Handles queueing of stream ops until a call object is ready, waiting
@@ -780,11 +790,9 @@ typedef struct client_channel_call_data {
grpc_server_retry_throttle_data *retry_throttle_data;
method_parameters *method_params;
- grpc_error *cancel_error;
-
- /** either 0 for no call, 1 for cancelled, or a pointer to a
- grpc_subchannel_call */
- gpr_atm subchannel_call;
+ /** either 0 for no call, a pointer to a grpc_subchannel_call (if the lowest
+ bit is 0), or a pointer to an error (if the lowest bit is 1) */
+ gpr_atm subchannel_call_or_error;
gpr_arena *arena;
bool pick_pending;
@@ -806,10 +814,43 @@ typedef struct client_channel_call_data {
grpc_closure *original_on_complete;
} call_data;
+typedef struct {
+ grpc_subchannel_call *subchannel_call;
+ grpc_error *error;
+} call_or_error;
+
+static call_or_error get_call_or_error(call_data *p) {
+ gpr_atm c = gpr_atm_acq_load(&p->subchannel_call_or_error);
+ if (c == 0)
+ return (call_or_error){NULL, NULL};
+ else if (c & 1)
+ return (call_or_error){NULL, (grpc_error *)((c) & ~(gpr_atm)1)};
+ else
+ return (call_or_error){(grpc_subchannel_call *)c, NULL};
+}
+
+static bool set_call_or_error(call_data *p, call_or_error coe) {
+ // this should always be under a lock
+ call_or_error existing = get_call_or_error(p);
+ if (existing.error != GRPC_ERROR_NONE) {
+ GRPC_ERROR_UNREF(coe.error);
+ return false;
+ }
+ GPR_ASSERT(existing.subchannel_call == NULL);
+ if (coe.error != GRPC_ERROR_NONE) {
+ GPR_ASSERT(coe.subchannel_call == NULL);
+ gpr_atm_rel_store(&p->subchannel_call_or_error, 1 | (gpr_atm)coe.error);
+ } else {
+ GPR_ASSERT(coe.subchannel_call != NULL);
+ gpr_atm_rel_store(&p->subchannel_call_or_error,
+ (gpr_atm)coe.subchannel_call);
+ }
+ return true;
+}
+
grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
grpc_call_element *call_elem) {
- grpc_subchannel_call *scc = GET_CALL((call_data *)call_elem->call_data);
- return scc == CANCELLED_CALL ? NULL : scc;
+ return get_call_or_error(call_elem->call_data).subchannel_call;
}
static void add_waiting_locked(call_data *calld,
@@ -841,18 +882,18 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
return;
}
- grpc_subchannel_call *call = GET_CALL(calld);
+ call_or_error call = get_call_or_error(calld);
grpc_transport_stream_op_batch **ops = calld->waiting_ops;
size_t nops = calld->waiting_ops_count;
- if (call == CANCELLED_CALL) {
- fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED);
+ if (call.error != GRPC_ERROR_NONE) {
+ fail_locked(exec_ctx, calld, GRPC_ERROR_REF(call.error));
return;
}
calld->waiting_ops = NULL;
calld->waiting_ops_count = 0;
calld->waiting_ops_capacity = 0;
for (size_t i = 0; i < nops; i++) {
- grpc_subchannel_call_process_op(exec_ctx, call, ops[i]);
+ grpc_subchannel_call_process_op(exec_ctx, call.subchannel_call, ops[i]);
}
gpr_free(ops);
}
@@ -913,19 +954,23 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
calld->pick_pending = false;
grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent,
chand->interested_parties);
+ call_or_error coe = get_call_or_error(calld);
if (calld->connected_subchannel == NULL) {
- gpr_atm_no_barrier_store(&calld->subchannel_call, (gpr_atm)CANCELLED_CALL);
- fail_locked(exec_ctx, calld,
- error == GRPC_ERROR_NONE
- ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Call dropped by load balancing policy")
- : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Failed to create subchannel", &error, 1));
- } else if (GET_CALL(calld) == CANCELLED_CALL) {
+ grpc_error *failure =
+ error == GRPC_ERROR_NONE
+ ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Call dropped by load balancing policy")
+ : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Failed to create subchannel", &error, 1);
+ set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)});
+ fail_locked(exec_ctx, calld, failure);
+ } else if (coe.error != GRPC_ERROR_NONE) {
/* already cancelled before subchannel became ready */
+ grpc_error *child_errors[] = {error, coe.error};
grpc_error *cancellation_error =
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Cancelled before creating subchannel", &error, 1);
+ "Cancelled before creating subchannel", child_errors,
+ GPR_ARRAY_SIZE(child_errors));
/* if due to deadline, attach the deadline exceeded status to the error */
if (gpr_time_cmp(calld->deadline, gpr_now(GPR_CLOCK_MONOTONIC)) < 0) {
cancellation_error =
@@ -945,8 +990,8 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
.context = calld->subchannel_call_context};
grpc_error *new_error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
- gpr_atm_rel_store(&calld->subchannel_call,
- (gpr_atm)(uintptr_t)subchannel_call);
+ GPR_ASSERT(set_call_or_error(
+ calld, (call_or_error){.subchannel_call = subchannel_call}));
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
fail_locked(exec_ctx, calld, new_error);
@@ -959,8 +1004,9 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
call_data *calld = elem->call_data;
- grpc_subchannel_call *subchannel_call = GET_CALL(calld);
- if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) {
+ grpc_subchannel_call *subchannel_call =
+ get_call_or_error(calld).subchannel_call;
+ if (subchannel_call == NULL) {
return NULL;
} else {
return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
@@ -992,13 +1038,13 @@ static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg,
if (cpa->connected_subchannel == NULL) {
/* cancelled, do nothing */
} else if (error != GRPC_ERROR_NONE) {
- grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error));
} else {
if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->initial_metadata_flags,
cpa->connected_subchannel,
cpa->subchannel_call_context, cpa->on_ready)) {
- grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE);
}
}
gpr_free(cpa);
@@ -1018,7 +1064,7 @@ static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
continue_picking_args *cpa = closure->cb_arg;
if (cpa->connected_subchannel == &calld->connected_subchannel) {
cpa->connected_subchannel = NULL;
- grpc_closure_sched(exec_ctx, cpa->on_ready,
+ GRPC_CLOSURE_SCHED(exec_ctx, cpa->on_ready,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick cancelled", &error, 1));
}
@@ -1067,7 +1113,7 @@ static bool pick_subchannel_locked(
// the LB policy for the duration of the pick.
wrapped_on_pick_closure_arg *w_on_pick_arg =
gpr_zalloc(sizeof(*w_on_pick_arg));
- grpc_closure_init(&w_on_pick_arg->wrapper_closure,
+ GRPC_CLOSURE_INIT(&w_on_pick_arg->wrapper_closure,
wrapped_on_pick_closure_cb, w_on_pick_arg,
grpc_schedule_on_exec_ctx);
w_on_pick_arg->wrapped_closure = on_ready;
@@ -1101,12 +1147,12 @@ static bool pick_subchannel_locked(
cpa->subchannel_call_context = subchannel_call_context;
cpa->on_ready = on_ready;
cpa->elem = elem;
- grpc_closure_init(&cpa->closure, continue_picking_locked, cpa,
- grpc_combiner_scheduler(chand->combiner, true));
+ GRPC_CLOSURE_INIT(&cpa->closure, continue_picking_locked, cpa,
+ grpc_combiner_scheduler(chand->combiner));
grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure,
GRPC_ERROR_NONE);
} else {
- grpc_closure_sched(exec_ctx, on_ready,
+ GRPC_CLOSURE_SCHED(exec_ctx, on_ready,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
}
@@ -1119,58 +1165,45 @@ static void start_transport_stream_op_batch_locked_inner(
grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- grpc_subchannel_call *call;
/* need to recheck that another thread hasn't set the call */
- call = GET_CALL(calld);
- if (call == CANCELLED_CALL) {
+ call_or_error coe = get_call_or_error(calld);
+ if (coe.error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
+ exec_ctx, op, GRPC_ERROR_REF(coe.error));
/* early out */
return;
}
- if (call != NULL) {
- grpc_subchannel_call_process_op(exec_ctx, call, op);
+ if (coe.subchannel_call != NULL) {
+ grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op);
/* early out */
return;
}
/* if this is a cancellation, then we can raise our cancelled flag */
if (op->cancel_stream) {
- if (!gpr_atm_rel_cas(&calld->subchannel_call, 0,
- (gpr_atm)(uintptr_t)CANCELLED_CALL)) {
- /* recurse to retry */
- start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
- /* early out */
- return;
+ grpc_error *error = op->payload->cancel_stream.cancel_error;
+ /* Stash a copy of cancel_error in our call data, so that we can use
+ it for subsequent operations. This ensures that if the call is
+ cancelled before any ops are passed down (e.g., if the deadline
+ is in the past when the call starts), we can return the right
+ error to the caller when the first op does get passed down. */
+ set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)});
+ if (calld->pick_pending) {
+ cancel_pick_locked(exec_ctx, elem, GRPC_ERROR_REF(error));
} else {
- /* Stash a copy of cancel_error in our call data, so that we can use
- it for subsequent operations. This ensures that if the call is
- cancelled before any ops are passed down (e.g., if the deadline
- is in the past when the call starts), we can return the right
- error to the caller when the first op does get passed down. */
- calld->cancel_error =
- GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error);
- if (calld->pick_pending) {
- cancel_pick_locked(
- exec_ctx, elem,
- GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
- } else {
- fail_locked(exec_ctx, calld,
- GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
- }
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op,
- GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error));
- /* early out */
- return;
+ fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
}
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op,
+ GRPC_ERROR_REF(error));
+ /* early out */
+ return;
}
/* if we don't have a subchannel, try to get one */
if (!calld->pick_pending && calld->connected_subchannel == NULL &&
op->send_initial_metadata) {
calld->pick_pending = true;
- grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem,
- grpc_combiner_scheduler(chand->combiner, true));
+ GRPC_CLOSURE_INIT(&calld->next_step, subchannel_ready_locked, elem,
+ grpc_combiner_scheduler(chand->combiner));
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
/* If a subchannel is not available immediately, the polling entity from
call_data should be provided to channel_data's interested_parties, so
@@ -1184,10 +1217,10 @@ static void start_transport_stream_op_batch_locked_inner(
calld->pick_pending = false;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
if (calld->connected_subchannel == NULL) {
- gpr_atm_no_barrier_store(&calld->subchannel_call,
- (gpr_atm)CANCELLED_CALL);
grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Call dropped by load balancing policy");
+ set_call_or_error(calld,
+ (call_or_error){.error = GRPC_ERROR_REF(error)});
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
return; // Early out.
@@ -1209,8 +1242,8 @@ static void start_transport_stream_op_batch_locked_inner(
.context = calld->subchannel_call_context};
grpc_error *error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
- gpr_atm_rel_store(&calld->subchannel_call,
- (gpr_atm)(uintptr_t)subchannel_call);
+ GPR_ASSERT(set_call_or_error(
+ calld, (call_or_error){.subchannel_call = subchannel_call}));
if (error != GRPC_ERROR_NONE) {
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
@@ -1242,7 +1275,7 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
calld->retry_throttle_data);
}
}
- grpc_closure_run(exec_ctx, calld->original_on_complete,
+ GRPC_CLOSURE_RUN(exec_ctx, calld->original_on_complete,
GRPC_ERROR_REF(error));
}
@@ -1258,7 +1291,7 @@ static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx,
if (op->recv_trailing_metadata) {
GPR_ASSERT(op->on_complete != NULL);
calld->original_on_complete = op->on_complete;
- grpc_closure_init(&calld->on_complete, on_complete, elem,
+ GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem,
grpc_schedule_on_exec_ctx);
op->on_complete = &calld->on_complete;
}
@@ -1289,17 +1322,17 @@ static void cc_start_transport_stream_op_batch(
op);
}
/* try to (atomically) get the call */
- grpc_subchannel_call *call = GET_CALL(calld);
+ call_or_error coe = get_call_or_error(calld);
GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0);
- if (call == CANCELLED_CALL) {
+ if (coe.error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
+ exec_ctx, op, GRPC_ERROR_REF(coe.error));
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
/* early out */
return;
}
- if (call != NULL) {
- grpc_subchannel_call_process_op(exec_ctx, call, op);
+ if (coe.subchannel_call != NULL) {
+ grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op);
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
/* early out */
return;
@@ -1307,11 +1340,10 @@ static void cc_start_transport_stream_op_batch(
/* we failed; lock and figure out what to do */
GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch");
op->handler_private.extra_arg = elem;
- grpc_closure_sched(
- exec_ctx,
- grpc_closure_init(&op->handler_private.closure,
- start_transport_stream_op_batch_locked, op,
- grpc_combiner_scheduler(chand->combiner, false)),
+ GRPC_CLOSURE_SCHED(
+ exec_ctx, GRPC_CLOSURE_INIT(&op->handler_private.closure,
+ start_transport_stream_op_batch_locked, op,
+ grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
GPR_TIMER_END("cc_start_transport_stream_op_batch", 0);
}
@@ -1348,12 +1380,14 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
if (calld->method_params != NULL) {
method_parameters_unref(calld->method_params);
}
- GRPC_ERROR_UNREF(calld->cancel_error);
- grpc_subchannel_call *call = GET_CALL(calld);
- if (call != NULL && call != CANCELLED_CALL) {
- grpc_subchannel_call_set_cleanup_closure(call, then_schedule_closure);
+ call_or_error coe = get_call_or_error(calld);
+ GRPC_ERROR_UNREF(coe.error);
+ if (coe.subchannel_call != NULL) {
+ grpc_subchannel_call_set_cleanup_closure(coe.subchannel_call,
+ then_schedule_closure);
then_schedule_closure = NULL;
- GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");
+ GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call,
+ "client_channel_destroy_call");
}
GPR_ASSERT(!calld->pick_pending);
GPR_ASSERT(calld->waiting_ops_count == 0);
@@ -1368,7 +1402,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
}
}
gpr_free(calld->waiting_ops);
- grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
}
static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
@@ -1422,59 +1456,145 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_connectivity_state_check(&chand->state_tracker);
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
- grpc_closure_sched(
- exec_ctx,
- grpc_closure_create(try_to_connect_locked, chand,
- grpc_combiner_scheduler(chand->combiner, false)),
+ GRPC_CLOSURE_SCHED(
+ exec_ctx, GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
+ grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
}
return out;
}
-typedef struct {
+typedef struct external_connectivity_watcher {
channel_data *chand;
- grpc_pollset *pollset;
+ grpc_polling_entity pollent;
grpc_closure *on_complete;
+ grpc_closure *watcher_timer_init;
grpc_connectivity_state *state;
grpc_closure my_closure;
+ struct external_connectivity_watcher *next;
} external_connectivity_watcher;
+static external_connectivity_watcher *lookup_external_connectivity_watcher(
+ channel_data *chand, grpc_closure *on_complete) {
+ gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
+ external_connectivity_watcher *w =
+ chand->external_connectivity_watcher_list_head;
+ while (w != NULL && w->on_complete != on_complete) {
+ w = w->next;
+ }
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+ return w;
+}
+
+static void external_connectivity_watcher_list_append(
+ channel_data *chand, external_connectivity_watcher *w) {
+ GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
+
+ gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
+ GPR_ASSERT(!w->next);
+ w->next = chand->external_connectivity_watcher_list_head;
+ chand->external_connectivity_watcher_list_head = w;
+ gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
+}
+
+static void external_connectivity_watcher_list_remove(
+ channel_data *chand, external_connectivity_watcher *too_remove) {
+ GPR_ASSERT(
+ lookup_external_connectivity_watcher(chand, too_remove->on_complete));
+ gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
+ if (too_remove == chand->external_connectivity_watcher_list_head) {
+ chand->external_connectivity_watcher_list_head = too_remove->next;
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+ return;
+ }
+ external_connectivity_watcher *w =
+ chand->external_connectivity_watcher_list_head;
+ while (w != NULL) {
+ if (w->next == too_remove) {
+ w->next = w->next->next;
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+ return;
+ }
+ w = w->next;
+ }
+ GPR_UNREACHABLE_CODE(return );
+}
+
+int grpc_client_channel_num_external_connectivity_watchers(
+ grpc_channel_element *elem) {
+ channel_data *chand = elem->channel_data;
+ int count = 0;
+
+ gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
+ external_connectivity_watcher *w =
+ chand->external_connectivity_watcher_list_head;
+ while (w != NULL) {
+ count++;
+ w = w->next;
+ }
+ gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
+
+ return count;
+}
+
static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
external_connectivity_watcher *w = arg;
grpc_closure *follow_up = w->on_complete;
- grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties,
- w->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
+ w->chand->interested_parties);
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
"external_connectivity_watcher");
+ external_connectivity_watcher_list_remove(w->chand, w);
gpr_free(w);
- grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_RUN(exec_ctx, follow_up, GRPC_ERROR_REF(error));
}
static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error_ignored) {
external_connectivity_watcher *w = arg;
- grpc_closure_init(&w->my_closure, on_external_watch_complete, w,
- grpc_schedule_on_exec_ctx);
- grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
+ external_connectivity_watcher *found = NULL;
+ if (w->state != NULL) {
+ external_connectivity_watcher_list_append(w->chand, w);
+ GRPC_CLOSURE_RUN(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete, w,
+ grpc_schedule_on_exec_ctx);
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure);
+ } else {
+ GPR_ASSERT(w->watcher_timer_init == NULL);
+ found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
+ if (found) {
+ GPR_ASSERT(found->on_complete == w->on_complete);
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &found->chand->state_tracker, NULL, &found->my_closure);
+ }
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent,
+ w->chand->interested_parties);
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack,
+ "external_connectivity_watcher");
+ gpr_free(w);
+ }
}
void grpc_client_channel_watch_connectivity_state(
- grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
- grpc_connectivity_state *state, grpc_closure *closure) {
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_polling_entity pollent, grpc_connectivity_state *state,
+ grpc_closure *closure, grpc_closure *watcher_timer_init) {
channel_data *chand = elem->channel_data;
- external_connectivity_watcher *w = gpr_malloc(sizeof(*w));
+ external_connectivity_watcher *w = gpr_zalloc(sizeof(*w));
w->chand = chand;
- w->pollset = pollset;
+ w->pollent = pollent;
w->on_complete = closure;
w->state = state;
- grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset);
+ w->watcher_timer_init = watcher_timer_init;
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, &w->pollent,
+ chand->interested_parties);
GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
"external_connectivity_watcher");
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx,
- grpc_closure_init(&w->my_closure, watch_connectivity_state_locked, w,
- grpc_combiner_scheduler(chand->combiner, true)),
+ GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
+ grpc_combiner_scheduler(chand->combiner)),
GRPC_ERROR_NONE);
}