aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2018-06-01 12:04:16 -0700
committerGravatar Mark D. Roth <roth@google.com>2018-06-01 12:04:16 -0700
commit7e0e2021f3e9a7d6b862a7e69ff88807dd9783f1 (patch)
tree6838369786c2ccddc14192cedfba67193a8de268 /src
parent5d83bedb77d14a5d0240bc9f11c86c45704f30f6 (diff)
C++-ify some client_channel internals.
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc520
1 files changed, 277 insertions, 243 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 936d70b959..ea6775a8d8 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -891,6 +891,7 @@ typedef struct client_channel_call_data {
grpc_closure pick_cancel_closure;
grpc_polling_entity* pollent;
+ bool pollent_added_to_interested_parties;
// Batches are added to this list when received from above.
// They are removed when we are done handling the batch (i.e., when
@@ -949,7 +950,6 @@ static void retry_commit(grpc_call_element* elem,
static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
static void on_complete(void* arg, grpc_error* error);
static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
-static void pick_after_resolver_result_start_locked(grpc_call_element* elem);
static void start_pick_locked(void* arg, grpc_error* ignored);
//
@@ -2684,59 +2684,133 @@ static void pick_done(void* arg, grpc_error* error) {
}
}
+static void maybe_add_call_to_channel_interested_parties_locked(
+ grpc_call_element* elem) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (!calld->pollent_added_to_interested_parties) {
+ calld->pollent_added_to_interested_parties = true;
+ grpc_polling_entity_add_to_pollset_set(calld->pollent,
+ chand->interested_parties);
+ }
+}
+
+static void maybe_del_call_from_channel_interested_parties_locked(
+ grpc_call_element* elem) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (calld->pollent_added_to_interested_parties) {
+ calld->pollent_added_to_interested_parties = false;
+ grpc_polling_entity_del_from_pollset_set(calld->pollent,
+ chand->interested_parties);
+ }
+}
+
// Invoked when a pick is completed to leave the client_channel combiner
// and continue processing in the call combiner.
+// If needed, removes the call's polling entity from chand->interested_parties.
static void pick_done_locked(grpc_call_element* elem, grpc_error* error) {
call_data* calld = static_cast<call_data*>(elem->call_data);
+ maybe_del_call_from_channel_interested_parties_locked(elem);
GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_SCHED(&calld->pick_closure, error);
}
-// A wrapper around pick_done_locked() that is used in cases where
-// either (a) the pick was deferred pending a resolver result or (b) the
-// pick was done asynchronously. Removes the call's polling entity from
-// chand->interested_parties before invoking pick_done_locked().
-static void async_pick_done_locked(grpc_call_element* elem, grpc_error* error) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- grpc_polling_entity_del_from_pollset_set(calld->pollent,
- chand->interested_parties);
- pick_done_locked(elem, error);
-}
+namespace grpc_core {
-// Note: This runs under the client_channel combiner, but will NOT be
-// holding the call combiner.
-static void pick_callback_cancel_locked(void* arg, grpc_error* error) {
- grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- // Note: chand->lb_policy may have changed since we started our pick,
- // in which case we will be cancelling the pick on a policy other than
- // the one we started it on. However, this will just be a no-op.
- if (GPR_LIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) {
+// Performs subchannel pick via LB policy.
+class LbPicker {
+ public:
+ // Starts a pick on chand->lb_policy.
+ static void StartLocked(grpc_call_element* elem) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: cancelling pick from LB policy %p",
+ gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p",
chand, calld, chand->lb_policy.get());
}
- chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error));
+ // If this is a retry, use the send_initial_metadata payload that
+ // we've cached; otherwise, use the pending batch. The
+ // send_initial_metadata batch will be the first pending batch in the
+ // list, as set by get_batch_index() above.
+ calld->pick.initial_metadata =
+ calld->seen_send_initial_metadata
+ ? &calld->send_initial_metadata
+ : calld->pending_batches[0]
+ .batch->payload->send_initial_metadata.send_initial_metadata;
+ calld->pick.initial_metadata_flags =
+ calld->seen_send_initial_metadata
+ ? calld->send_initial_metadata_flags
+ : calld->pending_batches[0]
+ .batch->payload->send_initial_metadata
+ .send_initial_metadata_flags;
+ GRPC_CLOSURE_INIT(&calld->pick_closure, &LbPicker::DoneLocked, elem,
+ grpc_combiner_scheduler(chand->combiner));
+ calld->pick.on_complete = &calld->pick_closure;
+ GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback");
+ const bool pick_done = chand->lb_policy->PickLocked(&calld->pick);
+ if (GPR_LIKELY(pick_done)) {
+ // Pick completed synchronously.
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously",
+ chand, calld);
+ }
+ pick_done_locked(elem, GRPC_ERROR_NONE);
+ GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
+ } else {
+ // Pick will be returned asynchronously.
+ // Add the polling entity from call_data to the channel_data's
+ // interested_parties, so that the I/O of the LB policy can be done
+ // under it. It will be removed in pick_done_locked().
+ maybe_add_call_to_channel_interested_parties_locked(elem);
+ // Request notification on call cancellation.
+ GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
+ grpc_call_combiner_set_notify_on_cancel(
+ calld->call_combiner,
+ GRPC_CLOSURE_INIT(&calld->pick_cancel_closure,
+ &LbPicker::CancelLocked, elem,
+ grpc_combiner_scheduler(chand->combiner)));
+ }
}
- GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel");
-}
-// Callback invoked by LoadBalancingPolicy::PickLocked() for async picks.
-// Unrefs the LB policy and invokes async_pick_done_locked().
-static void pick_callback_done_locked(void* arg, grpc_error* error) {
- grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously", chand,
- calld);
+ private:
+ // Callback invoked by LoadBalancingPolicy::PickLocked() for async picks.
+ // Unrefs the LB policy and invokes pick_done_locked().
+ static void DoneLocked(void* arg, grpc_error* error) {
+ grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously",
+ chand, calld);
+ }
+ pick_done_locked(elem, GRPC_ERROR_REF(error));
+ GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
}
- async_pick_done_locked(elem, GRPC_ERROR_REF(error));
- GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
-}
+
+ // Note: This runs under the client_channel combiner, but will NOT be
+ // holding the call combiner.
+ static void CancelLocked(void* arg, grpc_error* error) {
+ grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ // Note: chand->lb_policy may have changed since we started our pick,
+ // in which case we will be cancelling the pick on a policy other than
+ // the one we started it on. However, this will just be a no-op.
+ if (GPR_UNLIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: cancelling pick from LB policy %p", chand,
+ calld, chand->lb_policy.get());
+ }
+ chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error));
+ }
+ GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel");
+ }
+};
+
+} // namespace grpc_core
// Applies service config to the call. Must be invoked once we know
// that the resolver has returned results to the channel.
@@ -2766,6 +2840,24 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) {
grpc_deadline_state_reset(elem, calld->deadline);
}
}
+ // If the service config set wait_for_ready and the application
+ // did not explicitly set it, use the value from the service config.
+ uint32_t* send_initial_metadata_flags =
+ &calld->pending_batches[0]
+ .batch->payload->send_initial_metadata
+ .send_initial_metadata_flags;
+ if (GPR_UNLIKELY(
+ calld->method_params->wait_for_ready() !=
+ ClientChannelMethodParams::WAIT_FOR_READY_UNSET &&
+ !(*send_initial_metadata_flags &
+ GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) {
+ if (calld->method_params->wait_for_ready() ==
+ ClientChannelMethodParams::WAIT_FOR_READY_TRUE) {
+ *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
+ } else {
+ *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
+ }
+ }
}
}
// If no retry policy, disable retries.
@@ -2776,215 +2868,164 @@ static void apply_service_config_to_call_locked(grpc_call_element* elem) {
}
}
-// Starts a pick on chand->lb_policy.
-// Returns true if pick is completed synchronously.
-static bool pick_callback_start_locked(grpc_call_element* elem) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+// Invoked once resolver results are available.
+static void process_service_config_and_start_lb_pick_locked(
+ grpc_call_element* elem) {
call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p", chand,
- calld, chand->lb_policy.get());
- }
// Only get service config data on the first attempt.
if (GPR_LIKELY(calld->num_attempts_completed == 0)) {
apply_service_config_to_call_locked(elem);
}
- // If the application explicitly set wait_for_ready, use that.
- // Otherwise, if the service config specified a value for this
- // method, use that.
- //
- // The send_initial_metadata batch will be the first one in the list,
- // as set by get_batch_index() above.
- calld->pick.initial_metadata =
- calld->seen_send_initial_metadata
- ? &calld->send_initial_metadata
- : calld->pending_batches[0]
- .batch->payload->send_initial_metadata.send_initial_metadata;
- uint32_t send_initial_metadata_flags =
- calld->seen_send_initial_metadata
- ? calld->send_initial_metadata_flags
- : calld->pending_batches[0]
- .batch->payload->send_initial_metadata
- .send_initial_metadata_flags;
- const bool wait_for_ready_set_from_api =
- send_initial_metadata_flags &
- GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
- const bool wait_for_ready_set_from_service_config =
- calld->method_params != nullptr &&
- calld->method_params->wait_for_ready() !=
- ClientChannelMethodParams::WAIT_FOR_READY_UNSET;
- if (GPR_UNLIKELY(!wait_for_ready_set_from_api &&
- wait_for_ready_set_from_service_config)) {
- if (calld->method_params->wait_for_ready() ==
- ClientChannelMethodParams::WAIT_FOR_READY_TRUE) {
- send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
- } else {
- send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
- }
- }
- calld->pick.initial_metadata_flags = send_initial_metadata_flags;
- GRPC_CLOSURE_INIT(&calld->pick_closure, pick_callback_done_locked, elem,
- grpc_combiner_scheduler(chand->combiner));
- calld->pick.on_complete = &calld->pick_closure;
- GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback");
- const bool pick_done = chand->lb_policy->PickLocked(&calld->pick);
- if (GPR_LIKELY(pick_done)) {
- // Pick completed synchronously.
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously",
- chand, calld);
- }
- GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
- } else {
- GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
- grpc_call_combiner_set_notify_on_cancel(
- calld->call_combiner,
- GRPC_CLOSURE_INIT(&calld->pick_cancel_closure,
- pick_callback_cancel_locked, elem,
- grpc_combiner_scheduler(chand->combiner)));
- }
- return pick_done;
+ // Start LB pick.
+ grpc_core::LbPicker::StartLocked(elem);
}
-typedef struct {
- grpc_call_element* elem;
- bool finished;
- grpc_closure closure;
- grpc_closure cancel_closure;
-} pick_after_resolver_result_args;
-
-// Note: This runs under the client_channel combiner, but will NOT be
-// holding the call combiner.
-static void pick_after_resolver_result_cancel_locked(void* arg,
- grpc_error* error) {
- pick_after_resolver_result_args* args =
- static_cast<pick_after_resolver_result_args*>(arg);
- if (GPR_LIKELY(args->finished)) {
- gpr_free(args);
- return;
- }
- // If we don't yet have a resolver result, then a closure for
- // pick_after_resolver_result_done_locked() will have been added to
- // chand->waiting_for_resolver_result_closures, and it may not be invoked
- // until after this call has been destroyed. We mark the operation as
- // finished, so that when pick_after_resolver_result_done_locked()
- // is called, it will be a no-op. We also immediately invoke
- // async_pick_done_locked() to propagate the error back to the caller.
- args->finished = true;
- grpc_call_element* elem = args->elem;
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: cancelling pick waiting for resolver result",
- chand, calld);
- }
- // Note: Although we are not in the call combiner here, we are
- // basically stealing the call combiner from the pending pick, so
- // it's safe to call async_pick_done_locked() here -- we are
- // essentially calling it here instead of calling it in
- // pick_after_resolver_result_done_locked().
- async_pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Pick cancelled", &error, 1));
-}
-
-static void pick_after_resolver_result_done_locked(void* arg,
- grpc_error* error) {
- pick_after_resolver_result_args* args =
- static_cast<pick_after_resolver_result_args*>(arg);
- if (GPR_UNLIKELY(args->finished)) {
- /* cancelled, do nothing */
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "call cancelled before resolver result");
- }
- gpr_free(args);
- return;
- }
- args->finished = true;
- grpc_call_element* elem = args->elem;
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
+namespace grpc_core {
+
+// Handles waiting for a resolver result.
+// Used only for the first call on an idle channel.
+class ResolverResultWaiter {
+ public:
+ explicit ResolverResultWaiter(grpc_call_element* elem) : elem_(elem) {
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data",
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: deferring pick pending resolver result",
chand, calld);
}
- async_pick_done_locked(elem, GRPC_ERROR_REF(error));
- } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
- // Shutting down.
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand,
- calld);
+ // Add closure to be run when a resolver result is available.
+ GRPC_CLOSURE_INIT(&done_closure_, &ResolverResultWaiter::DoneLocked, this,
+ grpc_combiner_scheduler(chand->combiner));
+ AddToWaitingList();
+ // Set cancellation closure, so that we abort if the call is cancelled.
+ GRPC_CLOSURE_INIT(&cancel_closure_, &ResolverResultWaiter::CancelLocked,
+ this, grpc_combiner_scheduler(chand->combiner));
+ grpc_call_combiner_set_notify_on_cancel(calld->call_combiner,
+ &cancel_closure_);
+ }
+
+ private:
+ // Adds closure_ to chand->waiting_for_resolver_result_closures.
+ void AddToWaitingList() {
+ channel_data* chand = static_cast<channel_data*>(elem_->channel_data);
+ grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
+ &done_closure_, GRPC_ERROR_NONE);
+ }
+
+ // Invoked when a resolver result is available.
+ static void DoneLocked(void* arg, grpc_error* error) {
+ ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
+ // If CancelLocked() has already run, delete ourselves without doing
+ // anything. Note that the call stack may have already been destroyed,
+ // so it's not safe to access anything in elem_.
+ if (GPR_UNLIKELY(self->finished_)) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "call cancelled before resolver result");
+ }
+ Delete(self);
+ return;
}
- async_pick_done_locked(
- elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
- } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) {
- // Transient resolver failure.
- // If call has wait_for_ready=true, try again; otherwise, fail.
- uint32_t send_initial_metadata_flags =
- calld->seen_send_initial_metadata
- ? calld->send_initial_metadata_flags
- : calld->pending_batches[0]
- .batch->payload->send_initial_metadata
- .send_initial_metadata_flags;
- if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) {
+ // Otherwise, process the resolver result.
+ grpc_call_element* elem = self->elem_;
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: resolver returned but no LB policy; "
- "wait_for_ready=true; trying again",
+ gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data",
chand, calld);
}
- pick_after_resolver_result_start_locked(elem);
+ pick_done_locked(elem, GRPC_ERROR_REF(error));
+ } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
+ // Shutting down.
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand,
+ calld);
+ }
+ pick_done_locked(elem,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
+ } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) {
+ // Transient resolver failure.
+ // If call has wait_for_ready=true, try again; otherwise, fail.
+ uint32_t send_initial_metadata_flags =
+ calld->seen_send_initial_metadata
+ ? calld->send_initial_metadata_flags
+ : calld->pending_batches[0]
+ .batch->payload->send_initial_metadata
+ .send_initial_metadata_flags;
+ if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: resolver returned but no LB policy; "
+ "wait_for_ready=true; trying again",
+ chand, calld);
+ }
+ // Re-add ourselves to the waiting list.
+ self->AddToWaitingList();
+ // Return early so that we don't set finished_ to true below.
+ return;
+ } else {
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: resolver returned but no LB policy; "
+ "wait_for_ready=false; failing",
+ chand, calld);
+ }
+ pick_done_locked(
+ elem,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
+ }
} else {
if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: resolver returned but no LB policy; "
- "wait_for_ready=false; failing",
+ gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing LB pick",
chand, calld);
}
- async_pick_done_locked(
- elem,
- grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
+ process_service_config_and_start_lb_pick_locked(elem);
}
- } else {
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing pick",
- chand, calld);
+ self->finished_ = true;
+ }
+
+ // Invoked when the call is cancelled.
+ // Note: This runs under the client_channel combiner, but will NOT be
+ // holding the call combiner.
+ static void CancelLocked(void* arg, grpc_error* error) {
+ ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
+ // If DoneLocked() has already run, delete ourselves without doing anything.
+ if (GPR_LIKELY(self->finished_)) {
+ Delete(self);
+ return;
}
- if (GPR_LIKELY(pick_callback_start_locked(elem))) {
- // Even if the LB policy returns a result synchronously, we have
- // already added our polling entity to chand->interested_parties
- // in order to wait for the resolver result, so we need to
- // remove it here. Therefore, we call async_pick_done_locked()
- // instead of pick_done_locked().
- async_pick_done_locked(elem, GRPC_ERROR_NONE);
+ // If we are being cancelled, immediately invoke pick_done_locked()
+ // to propagate the error back to the caller.
+ if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
+ grpc_call_element* elem = self->elem_;
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: cancelling call waiting for name "
+ "resolution",
+ chand, calld);
+ }
+ // Note: Although we are not in the call combiner here, we are
+ // basically stealing the call combiner from the pending pick, so
+ // it's safe to call pick_done_locked() here -- we are essentially
+ // calling it here instead of calling it in DoneLocked().
+ pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Pick cancelled", &error, 1));
}
+ self->finished_ = true;
}
-}
-static void pick_after_resolver_result_start_locked(grpc_call_element* elem) {
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: deferring pick pending resolver result", chand,
- calld);
- }
- pick_after_resolver_result_args* args =
- static_cast<pick_after_resolver_result_args*>(gpr_zalloc(sizeof(*args)));
- args->elem = elem;
- GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked,
- args, grpc_combiner_scheduler(chand->combiner));
- grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
- &args->closure, GRPC_ERROR_NONE);
- grpc_call_combiner_set_notify_on_cancel(
- calld->call_combiner,
- GRPC_CLOSURE_INIT(&args->cancel_closure,
- pick_after_resolver_result_cancel_locked, args,
- grpc_combiner_scheduler(chand->combiner)));
-}
+ grpc_call_element* elem_;
+ grpc_closure done_closure_;
+ grpc_closure cancel_closure_;
+ bool finished_ = false;
+};
+
+} // namespace grpc_core
static void start_pick_locked(void* arg, grpc_error* ignored) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
@@ -2993,31 +3034,24 @@ static void start_pick_locked(void* arg, grpc_error* ignored) {
GPR_ASSERT(calld->pick.connected_subchannel == nullptr);
GPR_ASSERT(calld->subchannel_call == nullptr);
if (GPR_LIKELY(chand->lb_policy != nullptr)) {
- // We already have an LB policy, so ask it for a pick.
- if (GPR_LIKELY(pick_callback_start_locked(elem))) {
- // Pick completed synchronously.
- pick_done_locked(elem, GRPC_ERROR_NONE);
- return;
- }
+ // We already have resolver results, so process the service config
+ // and start an LB pick.
+ process_service_config_and_start_lb_pick_locked(elem);
+ } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
+ pick_done_locked(elem,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
} else {
// We do not yet have an LB policy, so wait for a resolver result.
- if (GPR_UNLIKELY(chand->resolver == nullptr)) {
- pick_done_locked(elem,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
- return;
- }
if (GPR_UNLIKELY(!chand->started_resolving)) {
start_resolving_locked(chand);
}
- pick_after_resolver_result_start_locked(elem);
+ // Create a new waiter, which will delete itself when done.
+ grpc_core::New<grpc_core::ResolverResultWaiter>(elem);
+ // Add the polling entity from call_data to the channel_data's
+ // interested_parties, so that the I/O of the resolver can be done
+ // under it. It will be removed in pick_done_locked().
+ maybe_add_call_to_channel_interested_parties_locked(elem);
}
- // We need to wait for either a resolver result or for an async result
- // from the LB policy. Add the polling entity from call_data to the
- // channel_data's interested_parties, so that the I/O of the LB policy
- // and resolver can be done under it. The polling entity will be
- // removed in async_pick_done_locked().
- grpc_polling_entity_add_to_pollset_set(calld->pollent,
- chand->interested_parties);
}
//