diff options
Diffstat (limited to 'src/core/ext/lb_policy/round_robin/round_robin.c')
-rw-r--r-- | src/core/ext/lb_policy/round_robin/round_robin.c | 45 |
1 files changed, 37 insertions, 8 deletions
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 37a9b18b97..e101c0369c 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -120,6 +120,8 @@ typedef struct { grpc_connectivity_state connectivity_state; /** the subchannel's target user data */ void *user_data; + /** vtable to operate over \a user_data */ + const grpc_lb_user_data_vtable *user_data_vtable; } subchannel_data; struct round_robin_lb_policy { @@ -186,9 +188,13 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) { } if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)", - (void *)p->ready_list_last_pick, - (void *)p->ready_list_last_pick->subchannel); + gpr_log(GPR_DEBUG, + "[READYLIST, RR: %p] ADVANCED LAST PICK. NOW AT NODE %p (SC %p, " + "CSC %p)", + (void *)p, (void *)p->ready_list_last_pick, + (void *)p->ready_list_last_pick->subchannel, + (void *)grpc_subchannel_get_connected_subchannel( + p->ready_list_last_pick->subchannel)); } } @@ -255,9 +261,18 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; ready_list *elem; + + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol); + } + for (size_t i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin_destroy"); + if (sd->user_data != NULL) { + GPR_ASSERT(sd->user_data_vtable != NULL); + sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + } gpr_free(sd); } @@ -285,6 +300,9 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { size_t i; gpr_mu_lock(&p->mu); + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol); + } p->shutdown = 1; while ((pp = p->pending_picks)) { @@ -296,7 +314,7 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE("Channel Shutdown"), "shutdown"); + GRPC_ERROR_CREATE("Channel Shutdown"), "rr_shutdown"); for (i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, @@ -395,6 +413,11 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *pp; ready_list *selected; gpr_mu_lock(&p->mu); + + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol); + } + if ((selected = peek_next_connected_locked(p))) { /* readily available, report right away */ *target = GRPC_CONNECTED_SUBCHANNEL_REF( @@ -435,7 +458,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, subchannel_data *sd = arg; round_robin_lb_policy *p = sd->policy; pending_pick *pp; - ready_list *selected; int unref = 0; @@ -456,12 +478,14 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ - selected = peek_next_connected_locked(p); + ready_list *selected = peek_next_connected_locked(p); + GPR_ASSERT(selected != NULL); if (p->pending_picks != NULL) { /* if the selected subchannel is going to be used for the pending * picks, update the last picked pointer */ advance_last_picked_locked(p); } + while ((pp = p->pending_picks)) { p->pending_picks = pp->next; @@ -585,6 +609,7 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_subchannel_get_connected_subchannel(selected->subchannel), "picked"); grpc_connected_subchannel_ping(exec_ctx, target, closure); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "picked"); } else { gpr_mu_unlock(&p->mu); grpc_exec_ctx_sched(exec_ctx, closure, @@ -653,7 +678,11 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, sd->policy = p; sd->index = subchannel_idx; sd->subchannel = subchannel; - sd->user_data = addresses->addresses[i].user_data; + sd->user_data_vtable = addresses->user_data_vtable; + if (sd->user_data_vtable != NULL) { + sd->user_data = + sd->user_data_vtable->copy(addresses->addresses[i].user_data); + } ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed, sd); |