diff options
author | 2018-06-11 16:33:15 -0700 | |
---|---|---|
committer | 2018-06-11 16:33:15 -0700 | |
commit | dddf20793269a7ba5f658ae5fdcde2c175cf196b (patch) | |
tree | 595d0fbc0c501132857b63f00f073ec6a0f54f58 /src/core/ext/filters | |
parent | 935ae7d01242c21f0230ead18927087a70c03e5b (diff) | |
parent | 37b292a0f7500e434c9ae41919248e2641ddc7f1 (diff) |
Merge branch 'master' into epollerr
Diffstat (limited to 'src/core/ext/filters')
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.cc | 520 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc | 4 |
2 files changed, 279 insertions, 245 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 936d70b959..ea6775a8d8 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -891,6 +891,7 @@ typedef struct client_channel_call_data { grpc_closure pick_cancel_closure; grpc_polling_entity* pollent; + bool pollent_added_to_interested_parties; // Batches are added to this list when received from above. // They are removed when we are done handling the batch (i.e., when @@ -949,7 +950,6 @@ static void retry_commit(grpc_call_element* elem, static void start_internal_recv_trailing_metadata(grpc_call_element* elem); static void on_complete(void* arg, grpc_error* error); static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored); -static void pick_after_resolver_result_start_locked(grpc_call_element* elem); static void start_pick_locked(void* arg, grpc_error* ignored); // @@ -2684,59 +2684,133 @@ static void pick_done(void* arg, grpc_error* error) { } } +static void maybe_add_call_to_channel_interested_parties_locked( + grpc_call_element* elem) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (!calld->pollent_added_to_interested_parties) { + calld->pollent_added_to_interested_parties = true; + grpc_polling_entity_add_to_pollset_set(calld->pollent, + chand->interested_parties); + } +} + +static void maybe_del_call_from_channel_interested_parties_locked( + grpc_call_element* elem) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (calld->pollent_added_to_interested_parties) { + calld->pollent_added_to_interested_parties = false; + grpc_polling_entity_del_from_pollset_set(calld->pollent, + chand->interested_parties); + } +} + // Invoked when a pick is completed to leave the client_channel combiner // and continue processing in the call combiner. +// If needed, removes the call's polling entity from chand->interested_parties. static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { call_data* calld = static_cast<call_data*>(elem->call_data); + maybe_del_call_from_channel_interested_parties_locked(elem); GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_SCHED(&calld->pick_closure, error); } -// A wrapper around pick_done_locked() that is used in cases where -// either (a) the pick was deferred pending a resolver result or (b) the -// pick was done asynchronously. Removes the call's polling entity from -// chand->interested_parties before invoking pick_done_locked(). -static void async_pick_done_locked(grpc_call_element* elem, grpc_error* error) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - grpc_polling_entity_del_from_pollset_set(calld->pollent, - chand->interested_parties); - pick_done_locked(elem, error); -} +namespace grpc_core { -// Note: This runs under the client_channel combiner, but will NOT be -// holding the call combiner. -static void pick_callback_cancel_locked(void* arg, grpc_error* error) { - grpc_call_element* elem = static_cast<grpc_call_element*>(arg); - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - // Note: chand->lb_policy may have changed since we started our pick, - // in which case we will be cancelling the pick on a policy other than - // the one we started it on. However, this will just be a no-op. - if (GPR_LIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) { +// Performs subchannel pick via LB policy. +class LbPicker { + public: + // Starts a pick on chand->lb_policy. + static void StartLocked(grpc_call_element* elem) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: cancelling pick from LB policy %p", + gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p", chand, calld, chand->lb_policy.get()); } - chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error)); + // If this is a retry, use the send_initial_metadata payload that + // we've cached; otherwise, use the pending batch. The + // send_initial_metadata batch will be the first pending batch in the + // list, as set by get_batch_index() above. + calld->pick.initial_metadata = + calld->seen_send_initial_metadata + ? &calld->send_initial_metadata + : calld->pending_batches[0] + .batch->payload->send_initial_metadata.send_initial_metadata; + calld->pick.initial_metadata_flags = + calld->seen_send_initial_metadata + ? calld->send_initial_metadata_flags + : calld->pending_batches[0] + .batch->payload->send_initial_metadata + .send_initial_metadata_flags; + GRPC_CLOSURE_INIT(&calld->pick_closure, &LbPicker::DoneLocked, elem, + grpc_combiner_scheduler(chand->combiner)); + calld->pick.on_complete = &calld->pick_closure; + GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback"); + const bool pick_done = chand->lb_policy->PickLocked(&calld->pick); + if (GPR_LIKELY(pick_done)) { + // Pick completed synchronously. + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously", + chand, calld); + } + pick_done_locked(elem, GRPC_ERROR_NONE); + GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); + } else { + // Pick will be returned asynchronously. + // Add the polling entity from call_data to the channel_data's + // interested_parties, so that the I/O of the LB policy can be done + // under it. It will be removed in pick_done_locked(). + maybe_add_call_to_channel_interested_parties_locked(elem); + // Request notification on call cancellation. + GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); + grpc_call_combiner_set_notify_on_cancel( + calld->call_combiner, + GRPC_CLOSURE_INIT(&calld->pick_cancel_closure, + &LbPicker::CancelLocked, elem, + grpc_combiner_scheduler(chand->combiner))); + } } - GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel"); -} -// Callback invoked by LoadBalancingPolicy::PickLocked() for async picks. -// Unrefs the LB policy and invokes async_pick_done_locked(). -static void pick_callback_done_locked(void* arg, grpc_error* error) { - grpc_call_element* elem = static_cast<grpc_call_element*>(arg); - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously", chand, - calld); + private: + // Callback invoked by LoadBalancingPolicy::PickLocked() for async picks. + // Unrefs the LB policy and invokes pick_done_locked(). + static void DoneLocked(void* arg, grpc_error* error) { + grpc_call_element* elem = static_cast<grpc_call_element*>(arg); + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously", + chand, calld); + } + pick_done_locked(elem, GRPC_ERROR_REF(error)); + GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); } - async_pick_done_locked(elem, GRPC_ERROR_REF(error)); - GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); -} + + // Note: This runs under the client_channel combiner, but will NOT be + // holding the call combiner. + static void CancelLocked(void* arg, grpc_error* error) { + grpc_call_element* elem = static_cast<grpc_call_element*>(arg); + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + // Note: chand->lb_policy may have changed since we started our pick, + // in which case we will be cancelling the pick on a policy other than + // the one we started it on. However, this will just be a no-op. + if (GPR_UNLIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: cancelling pick from LB policy %p", chand, + calld, chand->lb_policy.get()); + } + chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error)); + } + GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel"); + } +}; + +} // namespace grpc_core // Applies service config to the call. Must be invoked once we know // that the resolver has returned results to the channel. @@ -2766,6 +2840,24 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) { grpc_deadline_state_reset(elem, calld->deadline); } } + // If the service config set wait_for_ready and the application + // did not explicitly set it, use the value from the service config. + uint32_t* send_initial_metadata_flags = + &calld->pending_batches[0] + .batch->payload->send_initial_metadata + .send_initial_metadata_flags; + if (GPR_UNLIKELY( + calld->method_params->wait_for_ready() != + ClientChannelMethodParams::WAIT_FOR_READY_UNSET && + !(*send_initial_metadata_flags & + GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) { + if (calld->method_params->wait_for_ready() == + ClientChannelMethodParams::WAIT_FOR_READY_TRUE) { + *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; + } else { + *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; + } + } } } // If no retry policy, disable retries. @@ -2776,215 +2868,164 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) { } } -// Starts a pick on chand->lb_policy. -// Returns true if pick is completed synchronously. -static bool pick_callback_start_locked(grpc_call_element* elem) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); +// Invoked once resolver results are available. +static void process_service_config_and_start_lb_pick_locked( + grpc_call_element* elem) { call_data* calld = static_cast<call_data*>(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p", chand, - calld, chand->lb_policy.get()); - } // Only get service config data on the first attempt. if (GPR_LIKELY(calld->num_attempts_completed == 0)) { apply_service_config_to_call_locked(elem); } - // If the application explicitly set wait_for_ready, use that. - // Otherwise, if the service config specified a value for this - // method, use that. - // - // The send_initial_metadata batch will be the first one in the list, - // as set by get_batch_index() above. - calld->pick.initial_metadata = - calld->seen_send_initial_metadata - ? &calld->send_initial_metadata - : calld->pending_batches[0] - .batch->payload->send_initial_metadata.send_initial_metadata; - uint32_t send_initial_metadata_flags = - calld->seen_send_initial_metadata - ? calld->send_initial_metadata_flags - : calld->pending_batches[0] - .batch->payload->send_initial_metadata - .send_initial_metadata_flags; - const bool wait_for_ready_set_from_api = - send_initial_metadata_flags & - GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; - const bool wait_for_ready_set_from_service_config = - calld->method_params != nullptr && - calld->method_params->wait_for_ready() != - ClientChannelMethodParams::WAIT_FOR_READY_UNSET; - if (GPR_UNLIKELY(!wait_for_ready_set_from_api && - wait_for_ready_set_from_service_config)) { - if (calld->method_params->wait_for_ready() == - ClientChannelMethodParams::WAIT_FOR_READY_TRUE) { - send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; - } else { - send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; - } - } - calld->pick.initial_metadata_flags = send_initial_metadata_flags; - GRPC_CLOSURE_INIT(&calld->pick_closure, pick_callback_done_locked, elem, - grpc_combiner_scheduler(chand->combiner)); - calld->pick.on_complete = &calld->pick_closure; - GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback"); - const bool pick_done = chand->lb_policy->PickLocked(&calld->pick); - if (GPR_LIKELY(pick_done)) { - // Pick completed synchronously. - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously", - chand, calld); - } - GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback"); - } else { - GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); - grpc_call_combiner_set_notify_on_cancel( - calld->call_combiner, - GRPC_CLOSURE_INIT(&calld->pick_cancel_closure, - pick_callback_cancel_locked, elem, - grpc_combiner_scheduler(chand->combiner))); - } - return pick_done; + // Start LB pick. + grpc_core::LbPicker::StartLocked(elem); } -typedef struct { - grpc_call_element* elem; - bool finished; - grpc_closure closure; - grpc_closure cancel_closure; -} pick_after_resolver_result_args; - -// Note: This runs under the client_channel combiner, but will NOT be -// holding the call combiner. -static void pick_after_resolver_result_cancel_locked(void* arg, - grpc_error* error) { - pick_after_resolver_result_args* args = - static_cast<pick_after_resolver_result_args*>(arg); - if (GPR_LIKELY(args->finished)) { - gpr_free(args); - return; - } - // If we don't yet have a resolver result, then a closure for - // pick_after_resolver_result_done_locked() will have been added to - // chand->waiting_for_resolver_result_closures, and it may not be invoked - // until after this call has been destroyed. We mark the operation as - // finished, so that when pick_after_resolver_result_done_locked() - // is called, it will be a no-op. We also immediately invoke - // async_pick_done_locked() to propagate the error back to the caller. - args->finished = true; - grpc_call_element* elem = args->elem; - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: cancelling pick waiting for resolver result", - chand, calld); - } - // Note: Although we are not in the call combiner here, we are - // basically stealing the call combiner from the pending pick, so - // it's safe to call async_pick_done_locked() here -- we are - // essentially calling it here instead of calling it in - // pick_after_resolver_result_done_locked(). - async_pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick cancelled", &error, 1)); -} - -static void pick_after_resolver_result_done_locked(void* arg, - grpc_error* error) { - pick_after_resolver_result_args* args = - static_cast<pick_after_resolver_result_args*>(arg); - if (GPR_UNLIKELY(args->finished)) { - /* cancelled, do nothing */ - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "call cancelled before resolver result"); - } - gpr_free(args); - return; - } - args->finished = true; - grpc_call_element* elem = args->elem; - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { +namespace grpc_core { + +// Handles waiting for a resolver result. +// Used only for the first call on an idle channel. +class ResolverResultWaiter { + public: + explicit ResolverResultWaiter(grpc_call_element* elem) : elem_(elem) { + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data", + gpr_log(GPR_INFO, + "chand=%p calld=%p: deferring pick pending resolver result", chand, calld); } - async_pick_done_locked(elem, GRPC_ERROR_REF(error)); - } else if (GPR_UNLIKELY(chand->resolver == nullptr)) { - // Shutting down. - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand, - calld); + // Add closure to be run when a resolver result is available. + GRPC_CLOSURE_INIT(&done_closure_, &ResolverResultWaiter::DoneLocked, this, + grpc_combiner_scheduler(chand->combiner)); + AddToWaitingList(); + // Set cancellation closure, so that we abort if the call is cancelled. + GRPC_CLOSURE_INIT(&cancel_closure_, &ResolverResultWaiter::CancelLocked, + this, grpc_combiner_scheduler(chand->combiner)); + grpc_call_combiner_set_notify_on_cancel(calld->call_combiner, + &cancel_closure_); + } + + private: + // Adds closure_ to chand->waiting_for_resolver_result_closures. + void AddToWaitingList() { + channel_data* chand = static_cast<channel_data*>(elem_->channel_data); + grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, + &done_closure_, GRPC_ERROR_NONE); + } + + // Invoked when a resolver result is available. + static void DoneLocked(void* arg, grpc_error* error) { + ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg); + // If CancelLocked() has already run, delete ourselves without doing + // anything. Note that the call stack may have already been destroyed, + // so it's not safe to access anything in elem_. + if (GPR_UNLIKELY(self->finished_)) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "call cancelled before resolver result"); + } + Delete(self); + return; } - async_pick_done_locked( - elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); - } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) { - // Transient resolver failure. - // If call has wait_for_ready=true, try again; otherwise, fail. - uint32_t send_initial_metadata_flags = - calld->seen_send_initial_metadata - ? calld->send_initial_metadata_flags - : calld->pending_batches[0] - .batch->payload->send_initial_metadata - .send_initial_metadata_flags; - if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) { + // Otherwise, process the resolver result. + grpc_call_element* elem = self->elem_; + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: resolver returned but no LB policy; " - "wait_for_ready=true; trying again", + gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data", chand, calld); } - pick_after_resolver_result_start_locked(elem); + pick_done_locked(elem, GRPC_ERROR_REF(error)); + } else if (GPR_UNLIKELY(chand->resolver == nullptr)) { + // Shutting down. + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand, + calld); + } + pick_done_locked(elem, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); + } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) { + // Transient resolver failure. + // If call has wait_for_ready=true, try again; otherwise, fail. + uint32_t send_initial_metadata_flags = + calld->seen_send_initial_metadata + ? calld->send_initial_metadata_flags + : calld->pending_batches[0] + .batch->payload->send_initial_metadata + .send_initial_metadata_flags; + if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: resolver returned but no LB policy; " + "wait_for_ready=true; trying again", + chand, calld); + } + // Re-add ourselves to the waiting list. + self->AddToWaitingList(); + // Return early so that we don't set finished_ to true below. + return; + } else { + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: resolver returned but no LB policy; " + "wait_for_ready=false; failing", + chand, calld); + } + pick_done_locked( + elem, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + } } else { if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: resolver returned but no LB policy; " - "wait_for_ready=false; failing", + gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing LB pick", chand, calld); } - async_pick_done_locked( - elem, - grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + process_service_config_and_start_lb_pick_locked(elem); } - } else { - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing pick", - chand, calld); + self->finished_ = true; + } + + // Invoked when the call is cancelled. + // Note: This runs under the client_channel combiner, but will NOT be + // holding the call combiner. + static void CancelLocked(void* arg, grpc_error* error) { + ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg); + // If DoneLocked() has already run, delete ourselves without doing anything. + if (GPR_LIKELY(self->finished_)) { + Delete(self); + return; } - if (GPR_LIKELY(pick_callback_start_locked(elem))) { - // Even if the LB policy returns a result synchronously, we have - // already added our polling entity to chand->interested_parties - // in order to wait for the resolver result, so we need to - // remove it here. Therefore, we call async_pick_done_locked() - // instead of pick_done_locked(). - async_pick_done_locked(elem, GRPC_ERROR_NONE); + // If we are being cancelled, immediately invoke pick_done_locked() + // to propagate the error back to the caller. + if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { + grpc_call_element* elem = self->elem_; + channel_data* chand = static_cast<channel_data*>(elem->channel_data); + call_data* calld = static_cast<call_data*>(elem->call_data); + if (grpc_client_channel_trace.enabled()) { + gpr_log(GPR_INFO, + "chand=%p calld=%p: cancelling call waiting for name " + "resolution", + chand, calld); + } + // Note: Although we are not in the call combiner here, we are + // basically stealing the call combiner from the pending pick, so + // it's safe to call pick_done_locked() here -- we are essentially + // calling it here instead of calling it in DoneLocked(). + pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Pick cancelled", &error, 1)); } + self->finished_ = true; } -} -static void pick_after_resolver_result_start_locked(grpc_call_element* elem) { - channel_data* chand = static_cast<channel_data*>(elem->channel_data); - call_data* calld = static_cast<call_data*>(elem->call_data); - if (grpc_client_channel_trace.enabled()) { - gpr_log(GPR_INFO, - "chand=%p calld=%p: deferring pick pending resolver result", chand, - calld); - } - pick_after_resolver_result_args* args = - static_cast<pick_after_resolver_result_args*>(gpr_zalloc(sizeof(*args))); - args->elem = elem; - GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked, - args, grpc_combiner_scheduler(chand->combiner)); - grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, - &args->closure, GRPC_ERROR_NONE); - grpc_call_combiner_set_notify_on_cancel( - calld->call_combiner, - GRPC_CLOSURE_INIT(&args->cancel_closure, - pick_after_resolver_result_cancel_locked, args, - grpc_combiner_scheduler(chand->combiner))); -} + grpc_call_element* elem_; + grpc_closure done_closure_; + grpc_closure cancel_closure_; + bool finished_ = false; +}; + +} // namespace grpc_core static void start_pick_locked(void* arg, grpc_error* ignored) { grpc_call_element* elem = static_cast<grpc_call_element*>(arg); @@ -2993,31 +3034,24 @@ static void start_pick_locked(void* arg, grpc_error* ignored) { GPR_ASSERT(calld->pick.connected_subchannel == nullptr); GPR_ASSERT(calld->subchannel_call == nullptr); if (GPR_LIKELY(chand->lb_policy != nullptr)) { - // We already have an LB policy, so ask it for a pick. - if (GPR_LIKELY(pick_callback_start_locked(elem))) { - // Pick completed synchronously. - pick_done_locked(elem, GRPC_ERROR_NONE); - return; - } + // We already have resolver results, so process the service config + // and start an LB pick. + process_service_config_and_start_lb_pick_locked(elem); + } else if (GPR_UNLIKELY(chand->resolver == nullptr)) { + pick_done_locked(elem, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); } else { // We do not yet have an LB policy, so wait for a resolver result. - if (GPR_UNLIKELY(chand->resolver == nullptr)) { - pick_done_locked(elem, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); - return; - } if (GPR_UNLIKELY(!chand->started_resolving)) { start_resolving_locked(chand); } - pick_after_resolver_result_start_locked(elem); + // Create a new waiter, which will delete itself when done. + grpc_core::New<grpc_core::ResolverResultWaiter>(elem); + // Add the polling entity from call_data to the channel_data's + // interested_parties, so that the I/O of the resolver can be done + // under it. It will be removed in pick_done_locked(). + maybe_add_call_to_channel_interested_parties_locked(elem); } - // We need to wait for either a resolver result or for an async result - // from the LB policy. Add the polling entity from call_data to the - // channel_data's interested_parties, so that the I/O of the LB policy - // and resolver can be done under it. The polling entity will be - // removed in async_pick_done_locked(). - grpc_polling_entity_add_to_pollset_set(calld->pollent, - chand->interested_parties); } // diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index 73d2af6e42..00a84302ed 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -18,7 +18,7 @@ #include <grpc/support/port_platform.h> #include "src/core/lib/iomgr/port.h" -#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET) +#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) #include <ares.h> #include <sys/ioctl.h> @@ -348,4 +348,4 @@ void grpc_ares_ev_driver_start(grpc_ares_ev_driver* ev_driver) { gpr_mu_unlock(&ev_driver->mu); } -#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET) */ +#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */ |