diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/client_channel.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.cc | 77 |
1 files changed, 35 insertions, 42 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index e99022a91b..a8a7a37be0 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -41,12 +41,12 @@ #include "src/core/ext/filters/deadline/deadline_filter.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/support/string.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata.h" @@ -553,6 +553,7 @@ static void on_resolver_result_changed_locked(void* arg, grpc_error* error) { } grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties, chand->interested_parties); + grpc_lb_policy_shutdown_locked(chand->lb_policy, new_lb_policy); GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); } chand->lb_policy = new_lb_policy; @@ -658,6 +659,7 @@ static void start_transport_op_locked(void* arg, grpc_error* error_ignored) { if (chand->lb_policy != nullptr) { grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties, chand->interested_parties); + grpc_lb_policy_shutdown_locked(chand->lb_policy, nullptr); GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); chand->lb_policy = nullptr; } @@ -792,6 +794,7 @@ static void cc_destroy_channel_elem(grpc_channel_element* elem) { if (chand->lb_policy != nullptr) { grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties, chand->interested_parties); + grpc_lb_policy_shutdown_locked(chand->lb_policy, nullptr); GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); } gpr_free(chand->info_lb_policy_name); @@ -852,12 +855,10 @@ typedef struct client_channel_call_data { grpc_subchannel_call* subchannel_call; grpc_error* error; - grpc_lb_policy* lb_policy; // Holds ref while LB pick is pending. + grpc_lb_policy_pick_state pick; grpc_closure lb_pick_closure; grpc_closure lb_pick_cancel_closure; - grpc_connected_subchannel* connected_subchannel; - grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; grpc_polling_entity* pollent; grpc_transport_stream_op_batch* waiting_for_pick_batches[MAX_WAITING_BATCHES]; @@ -866,8 +867,6 @@ typedef struct client_channel_call_data { grpc_transport_stream_op_batch* initial_metadata_batch; - grpc_linked_mdelem lb_token_mdelem; - grpc_closure on_complete; grpc_closure* original_on_complete; } call_data; @@ -1004,17 +1003,17 @@ static void create_subchannel_call_locked(grpc_call_element* elem, grpc_error* error) { channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; - const grpc_connected_subchannel_call_args call_args = { - calld->pollent, // pollent - calld->path, // path - calld->call_start_time, // start_time - calld->deadline, // deadline - calld->arena, // arena - calld->subchannel_call_context, // context - calld->call_combiner // call_combiner + const grpc_core::ConnectedSubchannel::CallArgs call_args = { + calld->pollent, // pollent + calld->path, // path + calld->call_start_time, // start_time + calld->deadline, // deadline + calld->arena, // arena + calld->pick.subchannel_call_context, // context + calld->call_combiner // call_combiner }; - grpc_error* new_error = grpc_connected_subchannel_create_call( - calld->connected_subchannel, &call_args, &calld->subchannel_call); + grpc_error* new_error = calld->pick.connected_subchannel->CreateCall( + call_args, &calld->subchannel_call); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s", chand, calld, calld->subchannel_call, grpc_error_string(new_error)); @@ -1032,7 +1031,7 @@ static void create_subchannel_call_locked(grpc_call_element* elem, static void pick_done_locked(grpc_call_element* elem, grpc_error* error) { call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; - if (calld->connected_subchannel == nullptr) { + if (calld->pick.connected_subchannel == nullptr) { // Failed to create subchannel. GRPC_ERROR_UNREF(calld->error); calld->error = error == GRPC_ERROR_NONE @@ -1071,13 +1070,16 @@ static void pick_callback_cancel_locked(void* arg, grpc_error* error) { grpc_call_element* elem = (grpc_call_element*)arg; channel_data* chand = (channel_data*)elem->channel_data; call_data* calld = (call_data*)elem->call_data; - if (calld->lb_policy != nullptr) { + // Note: chand->lb_policy may have changed since we started our pick, + // in which case we will be cancelling the pick on a policy other than + // the one we started it on. However, this will just be a no-op. + if (error != GRPC_ERROR_NONE && chand->lb_policy != nullptr) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", - chand, calld, calld->lb_policy); + chand, calld, chand->lb_policy); } - grpc_lb_policy_cancel_pick_locked( - calld->lb_policy, &calld->connected_subchannel, GRPC_ERROR_REF(error)); + grpc_lb_policy_cancel_pick_locked(chand->lb_policy, &calld->pick, + GRPC_ERROR_REF(error)); } GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel"); } @@ -1092,9 +1094,6 @@ static void pick_callback_done_locked(void* arg, grpc_error* error) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously", chand, calld); } - GPR_ASSERT(calld->lb_policy != nullptr); - GRPC_LB_POLICY_UNREF(calld->lb_policy, "pick_subchannel"); - calld->lb_policy = nullptr; async_pick_done_locked(elem, GRPC_ERROR_REF(error)); } @@ -1128,26 +1127,21 @@ static bool pick_callback_start_locked(grpc_call_element* elem) { initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; } } - const grpc_lb_policy_pick_args inputs = { + calld->pick.initial_metadata = calld->initial_metadata_batch->payload->send_initial_metadata - .send_initial_metadata, - initial_metadata_flags, &calld->lb_token_mdelem}; - // Keep a ref to the LB policy in calld while the pick is pending. - GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel"); - calld->lb_policy = chand->lb_policy; + .send_initial_metadata; + calld->pick.initial_metadata_flags = initial_metadata_flags; GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem, grpc_combiner_scheduler(chand->combiner)); - const bool pick_done = grpc_lb_policy_pick_locked( - chand->lb_policy, &inputs, &calld->connected_subchannel, - calld->subchannel_call_context, nullptr, &calld->lb_pick_closure); + calld->pick.on_complete = &calld->lb_pick_closure; + const bool pick_done = + grpc_lb_policy_pick_locked(chand->lb_policy, &calld->pick); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously", chand, calld); } - GRPC_LB_POLICY_UNREF(calld->lb_policy, "pick_subchannel"); - calld->lb_policy = nullptr; } else { GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel"); grpc_call_combiner_set_notify_on_cancel( @@ -1289,7 +1283,7 @@ static void start_pick_locked(void* arg, grpc_error* ignored) { grpc_call_element* elem = (grpc_call_element*)arg; call_data* calld = (call_data*)elem->call_data; channel_data* chand = (channel_data*)elem->channel_data; - GPR_ASSERT(calld->connected_subchannel == nullptr); + GPR_ASSERT(calld->pick.connected_subchannel == nullptr); if (chand->lb_policy != nullptr) { // We already have an LB policy, so ask it for a pick. if (pick_callback_start_locked(elem)) { @@ -1467,15 +1461,14 @@ static void cc_destroy_call_elem(grpc_call_element* elem, GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call, "client_channel_destroy_call"); } - GPR_ASSERT(calld->lb_policy == nullptr); GPR_ASSERT(calld->waiting_for_pick_batches_count == 0); - if (calld->connected_subchannel != nullptr) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(calld->connected_subchannel, "picked"); + if (calld->pick.connected_subchannel != nullptr) { + calld->pick.connected_subchannel.reset(); } for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { - if (calld->subchannel_call_context[i].value != nullptr) { - calld->subchannel_call_context[i].destroy( - calld->subchannel_call_context[i].value); + if (calld->pick.subchannel_call_context[i].value != nullptr) { + calld->pick.subchannel_call_context[i].destroy( + calld->pick.subchannel_call_context[i].value); } } GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); |