diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc | 108 |
1 files changed, 56 insertions, 52 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc index 08ea4f480b..db38ef5305 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc @@ -28,17 +28,18 @@ #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" -void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx *exec_ctx, - grpc_lb_subchannel_data *sd, - const char *reason) { +void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx* exec_ctx, + grpc_lb_subchannel_data* sd, + const char* reason) { if (sd->subchannel != NULL) { if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { - gpr_log( - GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR - " of %" PRIuPTR " (subchannel %p): unreffing subchannel", - sd->subchannel_list->tracer->name, sd->subchannel_list->policy, - sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), - sd->subchannel_list->num_subchannels, sd->subchannel); + gpr_log(GPR_DEBUG, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): unreffing subchannel", + sd->subchannel_list->tracer->name, sd->subchannel_list->policy, + sd->subchannel_list, + (size_t)(sd - sd->subchannel_list->subchannels), + sd->subchannel_list->num_subchannels, sd->subchannel); } GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, reason); sd->subchannel = NULL; @@ -56,7 +57,7 @@ void grpc_lb_subchannel_data_unref_subchannel(grpc_exec_ctx *exec_ctx, } void grpc_lb_subchannel_data_start_connectivity_watch( - grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) { + grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd) { if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { gpr_log(GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR @@ -74,25 +75,26 @@ void grpc_lb_subchannel_data_start_connectivity_watch( } void grpc_lb_subchannel_data_stop_connectivity_watch( - grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd) { + grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd) { if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { - gpr_log( - GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): stopping connectivity watch", - sd->subchannel_list->tracer->name, sd->subchannel_list->policy, - sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), - sd->subchannel_list->num_subchannels, sd->subchannel); + gpr_log(GPR_DEBUG, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): stopping connectivity watch", + sd->subchannel_list->tracer->name, sd->subchannel_list->policy, + sd->subchannel_list, + (size_t)(sd - sd->subchannel_list->subchannels), + sd->subchannel_list->num_subchannels, sd->subchannel); } GPR_ASSERT(sd->connectivity_notification_pending); sd->connectivity_notification_pending = false; } -grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( - grpc_exec_ctx *exec_ctx, grpc_lb_policy *p, grpc_tracer_flag *tracer, - const grpc_lb_addresses *addresses, const grpc_lb_policy_args *args, +grpc_lb_subchannel_list* grpc_lb_subchannel_list_create( + grpc_exec_ctx* exec_ctx, grpc_lb_policy* p, grpc_tracer_flag* tracer, + const grpc_lb_addresses* addresses, const grpc_lb_policy_args* args, grpc_iomgr_cb_func connectivity_changed_cb) { - grpc_lb_subchannel_list *subchannel_list = - (grpc_lb_subchannel_list *)gpr_zalloc(sizeof(*subchannel_list)); + grpc_lb_subchannel_list* subchannel_list = + (grpc_lb_subchannel_list*)gpr_zalloc(sizeof(*subchannel_list)); if (GRPC_TRACER_ON(*tracer)) { gpr_log(GPR_DEBUG, "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", @@ -101,11 +103,11 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( subchannel_list->policy = p; subchannel_list->tracer = tracer; gpr_ref_init(&subchannel_list->refcount, 1); - subchannel_list->subchannels = (grpc_lb_subchannel_data *)gpr_zalloc( + subchannel_list->subchannels = (grpc_lb_subchannel_data*)gpr_zalloc( sizeof(grpc_lb_subchannel_data) * addresses->num_addresses); // We need to remove the LB addresses in order to be able to compare the // subchannel keys of subchannels from a different batch of addresses. - static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, + static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, GRPC_ARG_LB_ADDRESSES}; // Create a subchannel for each address. grpc_subchannel_args sc_args; @@ -116,18 +118,18 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( memset(&sc_args, 0, sizeof(grpc_subchannel_args)); grpc_arg addr_arg = grpc_create_subchannel_address_arg(&addresses->addresses[i].address); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove( + grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &addr_arg, 1); gpr_free(addr_arg.value.string); sc_args.args = new_args; - grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( + grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); grpc_channel_args_destroy(exec_ctx, new_args); if (subchannel == NULL) { // Subchannel could not be created. if (GRPC_TRACER_ON(*tracer)) { - char *address_uri = + char* address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); gpr_log(GPR_DEBUG, "[%s %p] could not create subchannel for address uri %s, " @@ -138,15 +140,16 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( continue; } if (GRPC_TRACER_ON(*tracer)) { - char *address_uri = + char* address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR - ": Created subchannel %p for address uri %s", + gpr_log(GPR_DEBUG, + "[%s %p] subchannel list %p index %" PRIuPTR + ": Created subchannel %p for address uri %s", tracer->name, p, subchannel_list, subchannel_index, subchannel, address_uri); gpr_free(address_uri); } - grpc_lb_subchannel_data *sd = + grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[subchannel_index++]; sd->subchannel_list = subchannel_list; sd->subchannel = subchannel; @@ -169,15 +172,15 @@ grpc_lb_subchannel_list *grpc_lb_subchannel_list_create( return subchannel_list; } -static void subchannel_list_destroy(grpc_exec_ctx *exec_ctx, - grpc_lb_subchannel_list *subchannel_list) { +static void subchannel_list_destroy(grpc_exec_ctx* exec_ctx, + grpc_lb_subchannel_list* subchannel_list) { if (GRPC_TRACER_ON(*subchannel_list->tracer)) { gpr_log(GPR_DEBUG, "[%s %p] Destroying subchannel_list %p", subchannel_list->tracer->name, subchannel_list->policy, subchannel_list); } for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { - grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i]; + grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i]; grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd, "subchannel_list_destroy"); } @@ -185,8 +188,8 @@ static void subchannel_list_destroy(grpc_exec_ctx *exec_ctx, gpr_free(subchannel_list); } -void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list *subchannel_list, - const char *reason) { +void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list* subchannel_list, + const char* reason) { gpr_ref_non_zero(&subchannel_list->refcount); if (GRPC_TRACER_ON(*subchannel_list->tracer)) { const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); @@ -197,9 +200,9 @@ void grpc_lb_subchannel_list_ref(grpc_lb_subchannel_list *subchannel_list, } } -void grpc_lb_subchannel_list_unref(grpc_exec_ctx *exec_ctx, - grpc_lb_subchannel_list *subchannel_list, - const char *reason) { +void grpc_lb_subchannel_list_unref(grpc_exec_ctx* exec_ctx, + grpc_lb_subchannel_list* subchannel_list, + const char* reason) { const bool done = gpr_unref(&subchannel_list->refcount); if (GRPC_TRACER_ON(*subchannel_list->tracer)) { const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); @@ -214,35 +217,36 @@ void grpc_lb_subchannel_list_unref(grpc_exec_ctx *exec_ctx, } void grpc_lb_subchannel_list_ref_for_connectivity_watch( - grpc_lb_subchannel_list *subchannel_list, const char *reason) { + grpc_lb_subchannel_list* subchannel_list, const char* reason) { GRPC_LB_POLICY_WEAK_REF(subchannel_list->policy, reason); grpc_lb_subchannel_list_ref(subchannel_list, reason); } void grpc_lb_subchannel_list_unref_for_connectivity_watch( - grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list, - const char *reason) { + grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_list* subchannel_list, + const char* reason) { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, subchannel_list->policy, reason); grpc_lb_subchannel_list_unref(exec_ctx, subchannel_list, reason); } static void subchannel_data_cancel_connectivity_watch( - grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_data *sd, const char *reason) { + grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd, const char* reason) { if (GRPC_TRACER_ON(*sd->subchannel_list->tracer)) { - gpr_log( - GPR_DEBUG, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR - " (subchannel %p): canceling connectivity watch (%s)", - sd->subchannel_list->tracer->name, sd->subchannel_list->policy, - sd->subchannel_list, (size_t)(sd - sd->subchannel_list->subchannels), - sd->subchannel_list->num_subchannels, sd->subchannel, reason); + gpr_log(GPR_DEBUG, + "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): canceling connectivity watch (%s)", + sd->subchannel_list->tracer->name, sd->subchannel_list->policy, + sd->subchannel_list, + (size_t)(sd - sd->subchannel_list->subchannels), + sd->subchannel_list->num_subchannels, sd->subchannel, reason); } grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, &sd->connectivity_changed_closure); } void grpc_lb_subchannel_list_shutdown_and_unref( - grpc_exec_ctx *exec_ctx, grpc_lb_subchannel_list *subchannel_list, - const char *reason) { + grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_list* subchannel_list, + const char* reason) { if (GRPC_TRACER_ON(*subchannel_list->tracer)) { gpr_log(GPR_DEBUG, "[%s %p] Shutting down subchannel_list %p (%s)", subchannel_list->tracer->name, subchannel_list->policy, @@ -251,7 +255,7 @@ void grpc_lb_subchannel_list_shutdown_and_unref( GPR_ASSERT(!subchannel_list->shutting_down); subchannel_list->shutting_down = true; for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { - grpc_lb_subchannel_data *sd = &subchannel_list->subchannels[i]; + grpc_lb_subchannel_data* sd = &subchannel_list->subchannels[i]; // If there's a pending notification for this subchannel, cancel it; // the callback is responsible for unreffing the subchannel. // Otherwise, unref the subchannel directly. |