diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/client_channel.c')
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.c | 466 |
1 files changed, 293 insertions, 173 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index f2f27b9175..f29c5d55ed 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -1,33 +1,18 @@ /* * - * Copyright 2015, Google Inc. - * All rights reserved. + * Copyright 2015 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ @@ -167,6 +152,8 @@ static void *method_parameters_create_from_json(const grpc_json *json) { return value; } +struct external_connectivity_watcher; + /************************************************************************* * CHANNEL-WIDE FUNCTIONS */ @@ -204,6 +191,11 @@ typedef struct client_channel_channel_data { /** interested parties (owned) */ grpc_pollset_set *interested_parties; + /* external_connectivity_watcher_list head is guarded by its own mutex, since + * counts need to be grabbed immediately without polling on a cq */ + gpr_mu external_connectivity_watcher_list_mu; + struct external_connectivity_watcher *external_connectivity_watcher_list_head; + /* the following properties are guarded by a mutex since API's require them to be instantaneously available */ gpr_mu info_mu; @@ -283,8 +275,8 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); w->chand = chand; - grpc_closure_init(&w->on_changed, on_lb_policy_state_changed_locked, w, - grpc_combiner_scheduler(chand->combiner, false)); + GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w, + grpc_combiner_scheduler(chand->combiner)); w->state = current_state; w->lb_policy = lb_policy; grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state, @@ -372,7 +364,7 @@ static void wrapped_on_pick_closure_cb(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(wc_arg != NULL); GPR_ASSERT(wc_arg->wrapped_closure != NULL); GPR_ASSERT(wc_arg->lb_policy != NULL); - grpc_closure_run(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error)); GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->lb_policy, "pick_subchannel_wrapping"); gpr_free(wc_arg); } @@ -382,7 +374,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, channel_data *chand = arg; char *lb_policy_name = NULL; grpc_lb_policy *lb_policy = NULL; - grpc_lb_policy *old_lb_policy; + grpc_lb_policy *old_lb_policy = NULL; grpc_slice_hash_table *method_params_table = NULL; grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; bool exit_idle = false; @@ -392,6 +384,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, service_config_parsing_state parsing_state; memset(&parsing_state, 0, sizeof(parsing_state)); + bool lb_policy_updated = false; if (chand->resolver_result != NULL) { // Find LB policy name. const grpc_arg *channel_arg = @@ -431,14 +424,27 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, lb_policy_args.args = chand->resolver_result; lb_policy_args.client_channel_factory = chand->client_channel_factory; lb_policy_args.combiner = chand->combiner; - lb_policy = - grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args); - if (lb_policy != NULL) { - GRPC_LB_POLICY_REF(lb_policy, "config_change"); - GRPC_ERROR_UNREF(state_error); - state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy, - &state_error); + + const bool lb_policy_type_changed = + (chand->info_lb_policy_name == NULL) || + (strcmp(chand->info_lb_policy_name, lb_policy_name) != 0); + if (chand->lb_policy != NULL && !lb_policy_type_changed) { + // update + lb_policy_updated = true; + grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args); + } else { + lb_policy = + grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args); + if (lb_policy != NULL) { + GRPC_LB_POLICY_REF(lb_policy, "config_change"); + GRPC_ERROR_UNREF(state_error); + state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy, + &state_error); + old_lb_policy = chand->lb_policy; + chand->lb_policy = lb_policy; + } } + // Find service config. channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG); @@ -485,8 +491,6 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, gpr_free(chand->info_lb_policy_name); chand->info_lb_policy_name = lb_policy_name; } - old_lb_policy = chand->lb_policy; - chand->lb_policy = lb_policy; if (service_config_json != NULL) { gpr_free(chand->info_service_config_json); chand->info_service_config_json = service_config_json; @@ -502,24 +506,28 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, } chand->method_params_table = method_params_table; if (lb_policy != NULL) { - grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &chand->waiting_for_config_closures); } else if (chand->resolver == NULL /* disconnected */) { grpc_closure_list_fail_all(&chand->waiting_for_config_closures, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Channel disconnected", &error, 1)); - grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &chand->waiting_for_config_closures); } - if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { + if (!lb_policy_updated && lb_policy != NULL && + chand->exit_idle_when_lb_policy_arrives) { GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); exit_idle = true; chand->exit_idle_when_lb_policy_arrives = false; } if (error == GRPC_ERROR_NONE && chand->resolver) { - set_channel_connectivity_state_locked( - exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver"); - if (lb_policy != NULL) { - watch_lb_policy_locked(exec_ctx, chand, lb_policy, state); + if (!lb_policy_updated) { + set_channel_connectivity_state_locked(exec_ctx, chand, state, + GRPC_ERROR_REF(state_error), + "new_lb+resolver"); + if (lb_policy != NULL) { + watch_lb_policy_locked(exec_ctx, chand, lb_policy, state); + } } GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); grpc_resolver_next_locked(exec_ctx, chand->resolver, @@ -539,7 +547,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, "resolver_gone"); } - if (exit_idle) { + if (!lb_policy_updated && lb_policy != NULL && exit_idle) { grpc_lb_policy_exit_idle_locked(exec_ctx, lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle"); } @@ -548,9 +556,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_pollset_set_del_pollset_set( exec_ctx, old_lb_policy->interested_parties, chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); + old_lb_policy = NULL; } - if (lb_policy != NULL) { + if (!lb_policy_updated && lb_policy != NULL) { GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change"); } @@ -574,7 +583,7 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, if (op->send_ping != NULL) { if (chand->lb_policy == NULL) { - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, op->send_ping, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing")); } else { @@ -595,7 +604,7 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, if (!chand->started_resolving) { grpc_closure_list_fail_all(&chand->waiting_for_config_closures, GRPC_ERROR_REF(op->disconnect_with_error)); - grpc_closure_list_sched(exec_ctx, &chand->waiting_for_config_closures); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &chand->waiting_for_config_closures); } if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, @@ -609,7 +618,7 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, } GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op"); - grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); } static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, @@ -625,10 +634,10 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, op->handler_private.extra_arg = elem; GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op"); - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, - grpc_closure_init(&op->handler_private.closure, start_transport_op_locked, - op, grpc_combiner_scheduler(chand->combiner, false)), + GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked, + op, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } @@ -659,12 +668,18 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); // Initialize data members. - chand->combiner = grpc_combiner_create(NULL); + chand->combiner = grpc_combiner_create(); gpr_mu_init(&chand->info_mu); + gpr_mu_init(&chand->external_connectivity_watcher_list_mu); + + gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); + chand->external_connectivity_watcher_list_head = NULL; + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + chand->owning_stack = args->channel_stack; - grpc_closure_init(&chand->on_resolver_result_changed, + GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed, on_resolver_result_changed_locked, chand, - grpc_combiner_scheduler(chand->combiner, false)); + grpc_combiner_scheduler(chand->combiner)); chand->interested_parties = grpc_pollset_set_create(); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); @@ -722,10 +737,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { channel_data *chand = elem->channel_data; if (chand->resolver != NULL) { - grpc_closure_sched( - exec_ctx, - grpc_closure_create(shutdown_resolver_locked, chand->resolver, - grpc_combiner_scheduler(chand->combiner, false)), + GRPC_CLOSURE_SCHED( + exec_ctx, GRPC_CLOSURE_CREATE(shutdown_resolver_locked, chand->resolver, + grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } if (chand->client_channel_factory != NULL) { @@ -749,17 +763,13 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_pollset_set_destroy(exec_ctx, chand->interested_parties); GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel"); gpr_mu_destroy(&chand->info_mu); + gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu); } /************************************************************************* * PER-CALL FUNCTIONS */ -#define GET_CALL(call_data) \ - ((grpc_subchannel_call *)(gpr_atm_acq_load(&(call_data)->subchannel_call))) - -#define CANCELLED_CALL ((grpc_subchannel_call *)1) - /** Call data. Holds a pointer to grpc_subchannel_call and the associated machinery to create such a pointer. Handles queueing of stream ops until a call object is ready, waiting @@ -780,11 +790,9 @@ typedef struct client_channel_call_data { grpc_server_retry_throttle_data *retry_throttle_data; method_parameters *method_params; - grpc_error *cancel_error; - - /** either 0 for no call, 1 for cancelled, or a pointer to a - grpc_subchannel_call */ - gpr_atm subchannel_call; + /** either 0 for no call, a pointer to a grpc_subchannel_call (if the lowest + bit is 0), or a pointer to an error (if the lowest bit is 1) */ + gpr_atm subchannel_call_or_error; gpr_arena *arena; bool pick_pending; @@ -806,10 +814,43 @@ typedef struct client_channel_call_data { grpc_closure *original_on_complete; } call_data; +typedef struct { + grpc_subchannel_call *subchannel_call; + grpc_error *error; +} call_or_error; + +static call_or_error get_call_or_error(call_data *p) { + gpr_atm c = gpr_atm_acq_load(&p->subchannel_call_or_error); + if (c == 0) + return (call_or_error){NULL, NULL}; + else if (c & 1) + return (call_or_error){NULL, (grpc_error *)((c) & ~(gpr_atm)1)}; + else + return (call_or_error){(grpc_subchannel_call *)c, NULL}; +} + +static bool set_call_or_error(call_data *p, call_or_error coe) { + // this should always be under a lock + call_or_error existing = get_call_or_error(p); + if (existing.error != GRPC_ERROR_NONE) { + GRPC_ERROR_UNREF(coe.error); + return false; + } + GPR_ASSERT(existing.subchannel_call == NULL); + if (coe.error != GRPC_ERROR_NONE) { + GPR_ASSERT(coe.subchannel_call == NULL); + gpr_atm_rel_store(&p->subchannel_call_or_error, 1 | (gpr_atm)coe.error); + } else { + GPR_ASSERT(coe.subchannel_call != NULL); + gpr_atm_rel_store(&p->subchannel_call_or_error, + (gpr_atm)coe.subchannel_call); + } + return true; +} + grpc_subchannel_call *grpc_client_channel_get_subchannel_call( grpc_call_element *call_elem) { - grpc_subchannel_call *scc = GET_CALL((call_data *)call_elem->call_data); - return scc == CANCELLED_CALL ? NULL : scc; + return get_call_or_error(call_elem->call_data).subchannel_call; } static void add_waiting_locked(call_data *calld, @@ -841,18 +882,18 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { return; } - grpc_subchannel_call *call = GET_CALL(calld); + call_or_error call = get_call_or_error(calld); grpc_transport_stream_op_batch **ops = calld->waiting_ops; size_t nops = calld->waiting_ops_count; - if (call == CANCELLED_CALL) { - fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED); + if (call.error != GRPC_ERROR_NONE) { + fail_locked(exec_ctx, calld, GRPC_ERROR_REF(call.error)); return; } calld->waiting_ops = NULL; calld->waiting_ops_count = 0; calld->waiting_ops_capacity = 0; for (size_t i = 0; i < nops; i++) { - grpc_subchannel_call_process_op(exec_ctx, call, ops[i]); + grpc_subchannel_call_process_op(exec_ctx, call.subchannel_call, ops[i]); } gpr_free(ops); } @@ -913,19 +954,23 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, calld->pick_pending = false; grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); + call_or_error coe = get_call_or_error(calld); if (calld->connected_subchannel == NULL) { - gpr_atm_no_barrier_store(&calld->subchannel_call, (gpr_atm)CANCELLED_CALL); - fail_locked(exec_ctx, calld, - error == GRPC_ERROR_NONE - ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Call dropped by load balancing policy") - : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Failed to create subchannel", &error, 1)); - } else if (GET_CALL(calld) == CANCELLED_CALL) { + grpc_error *failure = + error == GRPC_ERROR_NONE + ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Call dropped by load balancing policy") + : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Failed to create subchannel", &error, 1); + set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)}); + fail_locked(exec_ctx, calld, failure); + } else if (coe.error != GRPC_ERROR_NONE) { /* already cancelled before subchannel became ready */ + grpc_error *child_errors[] = {error, coe.error}; grpc_error *cancellation_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Cancelled before creating subchannel", &error, 1); + "Cancelled before creating subchannel", child_errors, + GPR_ARRAY_SIZE(child_errors)); /* if due to deadline, attach the deadline exceeded status to the error */ if (gpr_time_cmp(calld->deadline, gpr_now(GPR_CLOCK_MONOTONIC)) < 0) { cancellation_error = @@ -945,8 +990,8 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, .context = calld->subchannel_call_context}; grpc_error *new_error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); - gpr_atm_rel_store(&calld->subchannel_call, - (gpr_atm)(uintptr_t)subchannel_call); + GPR_ASSERT(set_call_or_error( + calld, (call_or_error){.subchannel_call = subchannel_call})); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); fail_locked(exec_ctx, calld, new_error); @@ -959,8 +1004,9 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = elem->call_data; - grpc_subchannel_call *subchannel_call = GET_CALL(calld); - if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) { + grpc_subchannel_call *subchannel_call = + get_call_or_error(calld).subchannel_call; + if (subchannel_call == NULL) { return NULL; } else { return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); @@ -992,13 +1038,13 @@ static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg, if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ } else if (error != GRPC_ERROR_NONE) { - grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error)); } else { if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->initial_metadata_flags, cpa->connected_subchannel, cpa->subchannel_call_context, cpa->on_ready)) { - grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE); } } gpr_free(cpa); @@ -1018,7 +1064,7 @@ static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, continue_picking_args *cpa = closure->cb_arg; if (cpa->connected_subchannel == &calld->connected_subchannel) { cpa->connected_subchannel = NULL; - grpc_closure_sched(exec_ctx, cpa->on_ready, + GRPC_CLOSURE_SCHED(exec_ctx, cpa->on_ready, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick cancelled", &error, 1)); } @@ -1067,7 +1113,7 @@ static bool pick_subchannel_locked( // the LB policy for the duration of the pick. wrapped_on_pick_closure_arg *w_on_pick_arg = gpr_zalloc(sizeof(*w_on_pick_arg)); - grpc_closure_init(&w_on_pick_arg->wrapper_closure, + GRPC_CLOSURE_INIT(&w_on_pick_arg->wrapper_closure, wrapped_on_pick_closure_cb, w_on_pick_arg, grpc_schedule_on_exec_ctx); w_on_pick_arg->wrapped_closure = on_ready; @@ -1101,12 +1147,12 @@ static bool pick_subchannel_locked( cpa->subchannel_call_context = subchannel_call_context; cpa->on_ready = on_ready; cpa->elem = elem; - grpc_closure_init(&cpa->closure, continue_picking_locked, cpa, - grpc_combiner_scheduler(chand->combiner, true)); + GRPC_CLOSURE_INIT(&cpa->closure, continue_picking_locked, cpa, + grpc_combiner_scheduler(chand->combiner)); grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure, GRPC_ERROR_NONE); } else { - grpc_closure_sched(exec_ctx, on_ready, + GRPC_CLOSURE_SCHED(exec_ctx, on_ready, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); } @@ -1119,58 +1165,45 @@ static void start_transport_stream_op_batch_locked_inner( grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - grpc_subchannel_call *call; /* need to recheck that another thread hasn't set the call */ - call = GET_CALL(calld); - if (call == CANCELLED_CALL) { + call_or_error coe = get_call_or_error(calld); + if (coe.error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); + exec_ctx, op, GRPC_ERROR_REF(coe.error)); /* early out */ return; } - if (call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, call, op); + if (coe.subchannel_call != NULL) { + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op); /* early out */ return; } /* if this is a cancellation, then we can raise our cancelled flag */ if (op->cancel_stream) { - if (!gpr_atm_rel_cas(&calld->subchannel_call, 0, - (gpr_atm)(uintptr_t)CANCELLED_CALL)) { - /* recurse to retry */ - start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem); - /* early out */ - return; + grpc_error *error = op->payload->cancel_stream.cancel_error; + /* Stash a copy of cancel_error in our call data, so that we can use + it for subsequent operations. This ensures that if the call is + cancelled before any ops are passed down (e.g., if the deadline + is in the past when the call starts), we can return the right + error to the caller when the first op does get passed down. */ + set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)}); + if (calld->pick_pending) { + cancel_pick_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); } else { - /* Stash a copy of cancel_error in our call data, so that we can use - it for subsequent operations. This ensures that if the call is - cancelled before any ops are passed down (e.g., if the deadline - is in the past when the call starts), we can return the right - error to the caller when the first op does get passed down. */ - calld->cancel_error = - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error); - if (calld->pick_pending) { - cancel_pick_locked( - exec_ctx, elem, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); - } else { - fail_locked(exec_ctx, calld, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); - } - grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); - /* early out */ - return; + fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); } + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, + GRPC_ERROR_REF(error)); + /* early out */ + return; } /* if we don't have a subchannel, try to get one */ if (!calld->pick_pending && calld->connected_subchannel == NULL && op->send_initial_metadata) { calld->pick_pending = true; - grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem, - grpc_combiner_scheduler(chand->combiner, true)); + GRPC_CLOSURE_INIT(&calld->next_step, subchannel_ready_locked, elem, + grpc_combiner_scheduler(chand->combiner)); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); /* If a subchannel is not available immediately, the polling entity from call_data should be provided to channel_data's interested_parties, so @@ -1184,10 +1217,10 @@ static void start_transport_stream_op_batch_locked_inner( calld->pick_pending = false; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); if (calld->connected_subchannel == NULL) { - gpr_atm_no_barrier_store(&calld->subchannel_call, - (gpr_atm)CANCELLED_CALL); grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Call dropped by load balancing policy"); + set_call_or_error(calld, + (call_or_error){.error = GRPC_ERROR_REF(error)}); fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); return; // Early out. @@ -1209,8 +1242,8 @@ static void start_transport_stream_op_batch_locked_inner( .context = calld->subchannel_call_context}; grpc_error *error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); - gpr_atm_rel_store(&calld->subchannel_call, - (gpr_atm)(uintptr_t)subchannel_call); + GPR_ASSERT(set_call_or_error( + calld, (call_or_error){.subchannel_call = subchannel_call})); if (error != GRPC_ERROR_NONE) { fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); @@ -1242,7 +1275,7 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { calld->retry_throttle_data); } } - grpc_closure_run(exec_ctx, calld->original_on_complete, + GRPC_CLOSURE_RUN(exec_ctx, calld->original_on_complete, GRPC_ERROR_REF(error)); } @@ -1258,7 +1291,7 @@ static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx, if (op->recv_trailing_metadata) { GPR_ASSERT(op->on_complete != NULL); calld->original_on_complete = op->on_complete; - grpc_closure_init(&calld->on_complete, on_complete, elem, + GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem, grpc_schedule_on_exec_ctx); op->on_complete = &calld->on_complete; } @@ -1289,17 +1322,17 @@ static void cc_start_transport_stream_op_batch( op); } /* try to (atomically) get the call */ - grpc_subchannel_call *call = GET_CALL(calld); + call_or_error coe = get_call_or_error(calld); GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0); - if (call == CANCELLED_CALL) { + if (coe.error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); + exec_ctx, op, GRPC_ERROR_REF(coe.error)); GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); /* early out */ return; } - if (call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, call, op); + if (coe.subchannel_call != NULL) { + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op); GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); /* early out */ return; @@ -1307,11 +1340,10 @@ static void cc_start_transport_stream_op_batch( /* we failed; lock and figure out what to do */ GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch"); op->handler_private.extra_arg = elem; - grpc_closure_sched( - exec_ctx, - grpc_closure_init(&op->handler_private.closure, - start_transport_stream_op_batch_locked, op, - grpc_combiner_scheduler(chand->combiner, false)), + GRPC_CLOSURE_SCHED( + exec_ctx, GRPC_CLOSURE_INIT(&op->handler_private.closure, + start_transport_stream_op_batch_locked, op, + grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); } @@ -1348,12 +1380,14 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, if (calld->method_params != NULL) { method_parameters_unref(calld->method_params); } - GRPC_ERROR_UNREF(calld->cancel_error); - grpc_subchannel_call *call = GET_CALL(calld); - if (call != NULL && call != CANCELLED_CALL) { - grpc_subchannel_call_set_cleanup_closure(call, then_schedule_closure); + call_or_error coe = get_call_or_error(calld); + GRPC_ERROR_UNREF(coe.error); + if (coe.subchannel_call != NULL) { + grpc_subchannel_call_set_cleanup_closure(coe.subchannel_call, + then_schedule_closure); then_schedule_closure = NULL; - GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call, + "client_channel_destroy_call"); } GPR_ASSERT(!calld->pick_pending); GPR_ASSERT(calld->waiting_ops_count == 0); @@ -1368,7 +1402,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, } } gpr_free(calld->waiting_ops); - grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, @@ -1422,59 +1456,145 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_connectivity_state_check(&chand->state_tracker); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); - grpc_closure_sched( - exec_ctx, - grpc_closure_create(try_to_connect_locked, chand, - grpc_combiner_scheduler(chand->combiner, false)), + GRPC_CLOSURE_SCHED( + exec_ctx, GRPC_CLOSURE_CREATE(try_to_connect_locked, chand, + grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } return out; } -typedef struct { +typedef struct external_connectivity_watcher { channel_data *chand; - grpc_pollset *pollset; + grpc_polling_entity pollent; grpc_closure *on_complete; + grpc_closure *watcher_timer_init; grpc_connectivity_state *state; grpc_closure my_closure; + struct external_connectivity_watcher *next; } external_connectivity_watcher; +static external_connectivity_watcher *lookup_external_connectivity_watcher( + channel_data *chand, grpc_closure *on_complete) { + gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); + external_connectivity_watcher *w = + chand->external_connectivity_watcher_list_head; + while (w != NULL && w->on_complete != on_complete) { + w = w->next; + } + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + return w; +} + +static void external_connectivity_watcher_list_append( + channel_data *chand, external_connectivity_watcher *w) { + GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete)); + + gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu); + GPR_ASSERT(!w->next); + w->next = chand->external_connectivity_watcher_list_head; + chand->external_connectivity_watcher_list_head = w; + gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu); +} + +static void external_connectivity_watcher_list_remove( + channel_data *chand, external_connectivity_watcher *too_remove) { + GPR_ASSERT( + lookup_external_connectivity_watcher(chand, too_remove->on_complete)); + gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); + if (too_remove == chand->external_connectivity_watcher_list_head) { + chand->external_connectivity_watcher_list_head = too_remove->next; + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + return; + } + external_connectivity_watcher *w = + chand->external_connectivity_watcher_list_head; + while (w != NULL) { + if (w->next == too_remove) { + w->next = w->next->next; + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + return; + } + w = w->next; + } + GPR_UNREACHABLE_CODE(return ); +} + +int grpc_client_channel_num_external_connectivity_watchers( + grpc_channel_element *elem) { + channel_data *chand = elem->channel_data; + int count = 0; + + gpr_mu_lock(&chand->external_connectivity_watcher_list_mu); + external_connectivity_watcher *w = + chand->external_connectivity_watcher_list_head; + while (w != NULL) { + count++; + w = w->next; + } + gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu); + + return count; +} + static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { external_connectivity_watcher *w = arg; grpc_closure *follow_up = w->on_complete; - grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties, - w->pollset); + grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent, + w->chand->interested_parties); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); + external_connectivity_watcher_list_remove(w->chand, w); gpr_free(w); - grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(exec_ctx, follow_up, GRPC_ERROR_REF(error)); } static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_ignored) { external_connectivity_watcher *w = arg; - grpc_closure_init(&w->my_closure, on_external_watch_complete, w, - grpc_schedule_on_exec_ctx); - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure); + external_connectivity_watcher *found = NULL; + if (w->state != NULL) { + external_connectivity_watcher_list_append(w->chand, w); + GRPC_CLOSURE_RUN(exec_ctx, w->watcher_timer_init, GRPC_ERROR_NONE); + GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete, w, + grpc_schedule_on_exec_ctx); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure); + } else { + GPR_ASSERT(w->watcher_timer_init == NULL); + found = lookup_external_connectivity_watcher(w->chand, w->on_complete); + if (found) { + GPR_ASSERT(found->on_complete == w->on_complete); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &found->chand->state_tracker, NULL, &found->my_closure); + } + grpc_polling_entity_del_from_pollset_set(exec_ctx, &w->pollent, + w->chand->interested_parties); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, + "external_connectivity_watcher"); + gpr_free(w); + } } void grpc_client_channel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, - grpc_connectivity_state *state, grpc_closure *closure) { + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_polling_entity pollent, grpc_connectivity_state *state, + grpc_closure *closure, grpc_closure *watcher_timer_init) { channel_data *chand = elem->channel_data; - external_connectivity_watcher *w = gpr_malloc(sizeof(*w)); + external_connectivity_watcher *w = gpr_zalloc(sizeof(*w)); w->chand = chand; - w->pollset = pollset; + w->pollent = pollent; w->on_complete = closure; w->state = state; - grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); + w->watcher_timer_init = watcher_timer_init; + grpc_polling_entity_add_to_pollset_set(exec_ctx, &w->pollent, + chand->interested_parties); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, - grpc_closure_init(&w->my_closure, watch_connectivity_state_locked, w, - grpc_combiner_scheduler(chand->combiner, true)), + GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w, + grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } |