diff options
Diffstat (limited to 'src/core/ext/filters/client_channel')
8 files changed, 132 insertions, 117 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index 013096d46b..3ec539884b 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -276,7 +276,7 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, w->chand = chand; grpc_closure_init(&w->on_changed, on_lb_policy_state_changed_locked, w, - grpc_combiner_scheduler(chand->combiner, false)); + grpc_combiner_scheduler(chand->combiner)); w->state = current_state; w->lb_policy = lb_policy; grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state, @@ -637,7 +637,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_closure_sched( exec_ctx, grpc_closure_init(&op->handler_private.closure, start_transport_op_locked, - op, grpc_combiner_scheduler(chand->combiner, false)), + op, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } @@ -668,7 +668,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); // Initialize data members. - chand->combiner = grpc_combiner_create(NULL); + chand->combiner = grpc_combiner_create(); gpr_mu_init(&chand->info_mu); gpr_mu_init(&chand->external_connectivity_watcher_list_mu); @@ -679,7 +679,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, chand->owning_stack = args->channel_stack; grpc_closure_init(&chand->on_resolver_result_changed, on_resolver_result_changed_locked, chand, - grpc_combiner_scheduler(chand->combiner, false)); + grpc_combiner_scheduler(chand->combiner)); chand->interested_parties = grpc_pollset_set_create(); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); @@ -738,9 +738,8 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, channel_data *chand = elem->channel_data; if (chand->resolver != NULL) { grpc_closure_sched( - exec_ctx, - grpc_closure_create(shutdown_resolver_locked, chand->resolver, - grpc_combiner_scheduler(chand->combiner, false)), + exec_ctx, grpc_closure_create(shutdown_resolver_locked, chand->resolver, + grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } if (chand->client_channel_factory != NULL) { @@ -771,11 +770,6 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, * PER-CALL FUNCTIONS */ -#define GET_CALL(call_data) \ - ((grpc_subchannel_call *)(gpr_atm_acq_load(&(call_data)->subchannel_call))) - -#define CANCELLED_CALL ((grpc_subchannel_call *)1) - /** Call data. Holds a pointer to grpc_subchannel_call and the associated machinery to create such a pointer. Handles queueing of stream ops until a call object is ready, waiting @@ -796,11 +790,9 @@ typedef struct client_channel_call_data { grpc_server_retry_throttle_data *retry_throttle_data; method_parameters *method_params; - grpc_error *cancel_error; - - /** either 0 for no call, 1 for cancelled, or a pointer to a - grpc_subchannel_call */ - gpr_atm subchannel_call; + /** either 0 for no call, a pointer to a grpc_subchannel_call (if the lowest + bit is 0), or a pointer to an error (if the lowest bit is 1) */ + gpr_atm subchannel_call_or_error; gpr_arena *arena; bool pick_pending; @@ -822,10 +814,43 @@ typedef struct client_channel_call_data { grpc_closure *original_on_complete; } call_data; +typedef struct { + grpc_subchannel_call *subchannel_call; + grpc_error *error; +} call_or_error; + +static call_or_error get_call_or_error(call_data *p) { + gpr_atm c = gpr_atm_acq_load(&p->subchannel_call_or_error); + if (c == 0) + return (call_or_error){NULL, NULL}; + else if (c & 1) + return (call_or_error){NULL, (grpc_error *)((c) & ~(gpr_atm)1)}; + else + return (call_or_error){(grpc_subchannel_call *)c, NULL}; +} + +static bool set_call_or_error(call_data *p, call_or_error coe) { + // this should always be under a lock + call_or_error existing = get_call_or_error(p); + if (existing.error != GRPC_ERROR_NONE) { + GRPC_ERROR_UNREF(coe.error); + return false; + } + GPR_ASSERT(existing.subchannel_call == NULL); + if (coe.error != GRPC_ERROR_NONE) { + GPR_ASSERT(coe.subchannel_call == NULL); + gpr_atm_rel_store(&p->subchannel_call_or_error, 1 | (gpr_atm)coe.error); + } else { + GPR_ASSERT(coe.subchannel_call != NULL); + gpr_atm_rel_store(&p->subchannel_call_or_error, + (gpr_atm)coe.subchannel_call); + } + return true; +} + grpc_subchannel_call *grpc_client_channel_get_subchannel_call( grpc_call_element *call_elem) { - grpc_subchannel_call *scc = GET_CALL((call_data *)call_elem->call_data); - return scc == CANCELLED_CALL ? NULL : scc; + return get_call_or_error(call_elem->call_data).subchannel_call; } static void add_waiting_locked(call_data *calld, @@ -857,18 +882,18 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { return; } - grpc_subchannel_call *call = GET_CALL(calld); + call_or_error call = get_call_or_error(calld); grpc_transport_stream_op_batch **ops = calld->waiting_ops; size_t nops = calld->waiting_ops_count; - if (call == CANCELLED_CALL) { - fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED); + if (call.error != GRPC_ERROR_NONE) { + fail_locked(exec_ctx, calld, GRPC_ERROR_REF(call.error)); return; } calld->waiting_ops = NULL; calld->waiting_ops_count = 0; calld->waiting_ops_capacity = 0; for (size_t i = 0; i < nops; i++) { - grpc_subchannel_call_process_op(exec_ctx, call, ops[i]); + grpc_subchannel_call_process_op(exec_ctx, call.subchannel_call, ops[i]); } gpr_free(ops); } @@ -929,19 +954,23 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, calld->pick_pending = false; grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); + call_or_error coe = get_call_or_error(calld); if (calld->connected_subchannel == NULL) { - gpr_atm_no_barrier_store(&calld->subchannel_call, (gpr_atm)CANCELLED_CALL); - fail_locked(exec_ctx, calld, - error == GRPC_ERROR_NONE - ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Call dropped by load balancing policy") - : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Failed to create subchannel", &error, 1)); - } else if (GET_CALL(calld) == CANCELLED_CALL) { + grpc_error *failure = + error == GRPC_ERROR_NONE + ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Call dropped by load balancing policy") + : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Failed to create subchannel", &error, 1); + set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)}); + fail_locked(exec_ctx, calld, failure); + } else if (coe.error != GRPC_ERROR_NONE) { /* already cancelled before subchannel became ready */ + grpc_error *child_errors[] = {error, coe.error}; grpc_error *cancellation_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Cancelled before creating subchannel", &error, 1); + "Cancelled before creating subchannel", child_errors, + GPR_ARRAY_SIZE(child_errors)); /* if due to deadline, attach the deadline exceeded status to the error */ if (gpr_time_cmp(calld->deadline, gpr_now(GPR_CLOCK_MONOTONIC)) < 0) { cancellation_error = @@ -961,8 +990,8 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, .context = calld->subchannel_call_context}; grpc_error *new_error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); - gpr_atm_rel_store(&calld->subchannel_call, - (gpr_atm)(uintptr_t)subchannel_call); + GPR_ASSERT(set_call_or_error( + calld, (call_or_error){.subchannel_call = subchannel_call})); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); fail_locked(exec_ctx, calld, new_error); @@ -975,8 +1004,9 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { call_data *calld = elem->call_data; - grpc_subchannel_call *subchannel_call = GET_CALL(calld); - if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) { + grpc_subchannel_call *subchannel_call = + get_call_or_error(calld).subchannel_call; + if (subchannel_call == NULL) { return NULL; } else { return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); @@ -1118,7 +1148,7 @@ static bool pick_subchannel_locked( cpa->on_ready = on_ready; cpa->elem = elem; grpc_closure_init(&cpa->closure, continue_picking_locked, cpa, - grpc_combiner_scheduler(chand->combiner, true)); + grpc_combiner_scheduler(chand->combiner)); grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure, GRPC_ERROR_NONE); } else { @@ -1135,58 +1165,45 @@ static void start_transport_stream_op_batch_locked_inner( grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - grpc_subchannel_call *call; /* need to recheck that another thread hasn't set the call */ - call = GET_CALL(calld); - if (call == CANCELLED_CALL) { + call_or_error coe = get_call_or_error(calld); + if (coe.error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); + exec_ctx, op, GRPC_ERROR_REF(coe.error)); /* early out */ return; } - if (call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, call, op); + if (coe.subchannel_call != NULL) { + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op); /* early out */ return; } /* if this is a cancellation, then we can raise our cancelled flag */ if (op->cancel_stream) { - if (!gpr_atm_rel_cas(&calld->subchannel_call, 0, - (gpr_atm)(uintptr_t)CANCELLED_CALL)) { - /* recurse to retry */ - start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem); - /* early out */ - return; + grpc_error *error = op->payload->cancel_stream.cancel_error; + /* Stash a copy of cancel_error in our call data, so that we can use + it for subsequent operations. This ensures that if the call is + cancelled before any ops are passed down (e.g., if the deadline + is in the past when the call starts), we can return the right + error to the caller when the first op does get passed down. */ + set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)}); + if (calld->pick_pending) { + cancel_pick_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); } else { - /* Stash a copy of cancel_error in our call data, so that we can use - it for subsequent operations. This ensures that if the call is - cancelled before any ops are passed down (e.g., if the deadline - is in the past when the call starts), we can return the right - error to the caller when the first op does get passed down. */ - calld->cancel_error = - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error); - if (calld->pick_pending) { - cancel_pick_locked( - exec_ctx, elem, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); - } else { - fail_locked(exec_ctx, calld, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); - } - grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); - /* early out */ - return; + fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); } + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, + GRPC_ERROR_REF(error)); + /* early out */ + return; } /* if we don't have a subchannel, try to get one */ if (!calld->pick_pending && calld->connected_subchannel == NULL && op->send_initial_metadata) { calld->pick_pending = true; grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem, - grpc_combiner_scheduler(chand->combiner, true)); + grpc_combiner_scheduler(chand->combiner)); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); /* If a subchannel is not available immediately, the polling entity from call_data should be provided to channel_data's interested_parties, so @@ -1200,10 +1217,10 @@ static void start_transport_stream_op_batch_locked_inner( calld->pick_pending = false; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); if (calld->connected_subchannel == NULL) { - gpr_atm_no_barrier_store(&calld->subchannel_call, - (gpr_atm)CANCELLED_CALL); grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Call dropped by load balancing policy"); + set_call_or_error(calld, + (call_or_error){.error = GRPC_ERROR_REF(error)}); fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); return; // Early out. @@ -1225,8 +1242,8 @@ static void start_transport_stream_op_batch_locked_inner( .context = calld->subchannel_call_context}; grpc_error *error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); - gpr_atm_rel_store(&calld->subchannel_call, - (gpr_atm)(uintptr_t)subchannel_call); + GPR_ASSERT(set_call_or_error( + calld, (call_or_error){.subchannel_call = subchannel_call})); if (error != GRPC_ERROR_NONE) { fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); @@ -1305,17 +1322,17 @@ static void cc_start_transport_stream_op_batch( op); } /* try to (atomically) get the call */ - grpc_subchannel_call *call = GET_CALL(calld); + call_or_error coe = get_call_or_error(calld); GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0); - if (call == CANCELLED_CALL) { + if (coe.error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); + exec_ctx, op, GRPC_ERROR_REF(coe.error)); GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); /* early out */ return; } - if (call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, call, op); + if (coe.subchannel_call != NULL) { + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op); GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); /* early out */ return; @@ -1324,10 +1341,9 @@ static void cc_start_transport_stream_op_batch( GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch"); op->handler_private.extra_arg = elem; grpc_closure_sched( - exec_ctx, - grpc_closure_init(&op->handler_private.closure, - start_transport_stream_op_batch_locked, op, - grpc_combiner_scheduler(chand->combiner, false)), + exec_ctx, grpc_closure_init(&op->handler_private.closure, + start_transport_stream_op_batch_locked, op, + grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); } @@ -1364,12 +1380,14 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, if (calld->method_params != NULL) { method_parameters_unref(calld->method_params); } - GRPC_ERROR_UNREF(calld->cancel_error); - grpc_subchannel_call *call = GET_CALL(calld); - if (call != NULL && call != CANCELLED_CALL) { - grpc_subchannel_call_set_cleanup_closure(call, then_schedule_closure); + call_or_error coe = get_call_or_error(calld); + GRPC_ERROR_UNREF(coe.error); + if (coe.subchannel_call != NULL) { + grpc_subchannel_call_set_cleanup_closure(coe.subchannel_call, + then_schedule_closure); then_schedule_closure = NULL; - GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call, + "client_channel_destroy_call"); } GPR_ASSERT(!calld->pick_pending); GPR_ASSERT(calld->waiting_ops_count == 0); @@ -1439,9 +1457,8 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( if (out == GRPC_CHANNEL_IDLE && try_to_connect) { GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); grpc_closure_sched( - exec_ctx, - grpc_closure_create(try_to_connect_locked, chand, - grpc_combiner_scheduler(chand->combiner, false)), + exec_ctx, grpc_closure_create(try_to_connect_locked, chand, + grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } return out; @@ -1578,6 +1595,6 @@ void grpc_client_channel_watch_connectivity_state( grpc_closure_sched( exec_ctx, grpc_closure_init(&w->my_closure, watch_connectivity_state_locked, w, - grpc_combiner_scheduler(chand->combiner, true)), + grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); } diff --git a/src/core/ext/filters/client_channel/lb_policy.c b/src/core/ext/filters/client_channel/lb_policy.c index e3efb735df..0c8e179925 100644 --- a/src/core/ext/filters/client_channel/lb_policy.c +++ b/src/core/ext/filters/client_channel/lb_policy.c @@ -74,11 +74,10 @@ void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1); gpr_atm check = 1 << WEAK_REF_BITS; if ((old_val & mask) == check) { - grpc_closure_sched( - exec_ctx, - grpc_closure_create(shutdown_locked, policy, - grpc_combiner_scheduler(policy->combiner, false)), - GRPC_ERROR_NONE); + grpc_closure_sched(exec_ctx, grpc_closure_create( + shutdown_locked, policy, + grpc_combiner_scheduler(policy->combiner)), + GRPC_ERROR_NONE); } else { grpc_lb_policy_weak_unref(exec_ctx, policy REF_FUNC_PASS_ARGS("strong-unref")); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index 5ecbd3ba93..d3182f35fe 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -741,7 +741,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, gpr_zalloc(sizeof(rr_connectivity_data)); grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed_locked, rr_connectivity, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); rr_connectivity->glb_policy = glb_policy; rr_connectivity->state = rr_state; @@ -1006,7 +1006,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_closure_init(&glb_policy->lb_channel_on_connectivity_changed, glb_lb_channel_on_connectivity_changed_cb, glb_policy, - grpc_combiner_scheduler(args->combiner, false)); + grpc_combiner_scheduler(args->combiner)); grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, "grpclb"); @@ -1252,7 +1252,7 @@ static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx, gpr_time_add(now, glb_policy->client_stats_report_interval); grpc_closure_init(&glb_policy->client_load_report_closure, send_client_load_report_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer, next_client_load_report_time, &glb_policy->client_load_report_closure, now); @@ -1280,7 +1280,7 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx, op.data.send_message.send_message = glb_policy->client_load_report_payload; grpc_closure_init(&glb_policy->client_load_report_closure, client_load_report_done_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_call_error call_error = grpc_call_start_batch_and_execute( exec_ctx, glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure); @@ -1386,13 +1386,13 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, grpc_closure_init(&glb_policy->lb_on_sent_initial_request, lb_on_sent_initial_request_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_closure_init(&glb_policy->lb_on_server_status_received, lb_on_server_status_received_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_closure_init(&glb_policy->lb_on_response_received, lb_on_response_received_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_combiner_scheduler(glb_policy->base.combiner)); gpr_backoff_init(&glb_policy->lb_call_backoff_state, GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS, @@ -1693,9 +1693,9 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, } } GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); - grpc_closure_init( - &glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked, - glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_closure_init(&glb_policy->lb_on_call_retry, + lb_call_on_retry_timer_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner)); glb_policy->retry_timer_active = true; grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try, &glb_policy->lb_on_call_retry, now); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c index 51cc632649..0e2b4131f1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c @@ -655,7 +655,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, pf_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p, - grpc_combiner_scheduler(args->combiner, false)); + grpc_combiner_scheduler(args->combiner)); return &p->base; } diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index 33d9522380..9ecb0a39da 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -749,7 +749,7 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, sd->subchannel = subchannel; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed_locked, sd, - grpc_combiner_scheduler(args->combiner, false)); + grpc_combiner_scheduler(args->combiner)); /* use some sentinel value outside of the range of * grpc_connectivity_state to signal an undefined previous state. We * won't be referring to this value again and it'll be overwritten after diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c index 2bf44a4f1b..92f060b422 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.c @@ -268,10 +268,10 @@ static grpc_resolver *dns_ares_create(grpc_exec_ctx *exec_ctx, GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); grpc_closure_init(&r->dns_ares_on_retry_timer_locked, dns_ares_on_retry_timer_locked, r, - grpc_combiner_scheduler(r->base.combiner, false)); + grpc_combiner_scheduler(r->base.combiner)); grpc_closure_init(&r->dns_ares_on_resolved_locked, dns_ares_on_resolved_locked, r, - grpc_combiner_scheduler(r->base.combiner, false)); + grpc_combiner_scheduler(r->base.combiner)); return &r->base; } diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c index afa978ff01..6c263c32f9 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c @@ -179,7 +179,7 @@ static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_log(GPR_DEBUG, "retrying immediately"); } grpc_closure_init(&r->on_retry, dns_on_retry_timer_locked, r, - grpc_combiner_scheduler(r->base.combiner, false)); + grpc_combiner_scheduler(r->base.combiner)); grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry, now); } if (r->resolved_result != NULL) { @@ -201,7 +201,7 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, grpc_resolve_address( exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties, grpc_closure_create(dns_on_resolved_locked, r, - grpc_combiner_scheduler(r->base.combiner, false)), + grpc_combiner_scheduler(r->base.combiner)), &r->addresses); } diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c index 59560e068d..cbb0e628ed 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c @@ -158,10 +158,9 @@ void grpc_fake_resolver_response_generator_set_response( GPR_ASSERT(generator->resolver != NULL); generator->next_response = grpc_channel_args_copy(next_response); grpc_closure_sched( - exec_ctx, - grpc_closure_create( - set_response_cb, generator, - grpc_combiner_scheduler(generator->resolver->base.combiner, false)), + exec_ctx, grpc_closure_create(set_response_cb, generator, + grpc_combiner_scheduler( + generator->resolver->base.combiner)), GRPC_ERROR_NONE); } |