diff options
author | David G. Quintas <dgq@google.com> | 2016-11-02 08:12:52 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-11-02 08:12:52 +0100 |
commit | 105b39450aa03a55af537a4a85220e7bf5e48f70 (patch) | |
tree | 118fbdeca68ed5fb83536a3a3b9b02e173265ce4 /src/core/ext/lb_policy | |
parent | 00bbb4122a8875876eb3075c32585fe296d5ba85 (diff) | |
parent | 6e7baf75dd8ab88d452e7001c41699857afd6e98 (diff) |
Merge pull request #8568 from dgquintas/grpclb_leaks_fix
gRPC LB fixes from end two end testing
Diffstat (limited to 'src/core/ext/lb_policy')
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 671 | ||||
-rw-r--r-- | src/core/ext/lb_policy/round_robin/round_robin.c | 42 |
2 files changed, 370 insertions, 343 deletions
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 5bbf857079..734108a9db 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -43,30 +43,23 @@ * policy to select from this list of LB server backends. * * The first time the policy gets a request for a pick, a ping, or to exit the - * idle state, \a query_for_backends() is called. It creates an instance of \a - * lb_client_data, an internal struct meant to contain the data associated with - * the internal communication with the LB server. This instance is created via - * \a lb_client_data_create(). There, the call over lb_channel to pick-first - * from {a1..an} is created, the \a LoadBalancingRequest message is assembled - * and all necessary callbacks for the progress of the internal call configured. + * idle state, \a query_for_backends_locked() is called. This function sets up + * and initiates the internal communication with the LB server. In particular, + * it's responsible for instantiating the internal *streaming* call to the LB + * server (whichever address from {a1..an} pick-first chose). This call is + * serviced by two callbacks, \a lb_on_server_status_received and \a + * lb_on_response_received. The former will be called when the call to the LB + * server completes. This can happen if the LB server closes the connection or + * if this policy itself cancels the call (for example because it's shutting + * down). If the internal call times out, the usual behavior of pick-first + * applies, continuing to pick from the list {a1..an}. * - * Back in \a query_for_backends(), the internal *streaming* call to the LB - * server (whichever address from {a1..an} pick-first chose) is kicked off. - * It'll progress over the callbacks configured in \a lb_client_data_create() - * (see the field docstrings of \a lb_client_data for more details). - * - * If the call fails with UNIMPLEMENTED, the original call will also fail. - * There's a misconfiguration somewhere: at least one of {a1..an} isn't a LB - * server, which contradicts the LB bit being set. If the internal call times - * out, the usual behavior of pick-first applies, continuing to pick from the - * list {a1..an}. - * - * Upon sucesss, a \a LoadBalancingResponse is expected in \a res_recv_cb. An - * invalid one results in the termination of the streaming call. A new streaming - * call should be created if possible, failing the original call otherwise. - * For a valid \a LoadBalancingResponse, the server list of actual backends is - * extracted. A Round Robin policy will be created from this list. There are two - * possible scenarios: + * Upon sucesss, the incoming \a LoadBalancingResponse is processed by \a + * res_recv. An invalid one results in the termination of the streaming call. A + * new streaming call should be created if possible, failing the original call + * otherwise. For a valid \a LoadBalancingResponse, the server list of actual + * backends is extracted. A Round Robin policy will be created from this list. + * There are two possible scenarios: * * 1. This is the first server list received. There was no previous instance of * the Round Robin policy. \a rr_handover_locked() will instantiate the RR @@ -84,10 +77,10 @@ * Once a RR policy instance is in place (and getting updated as described), * calls to for a pick, a ping or a cancellation will be serviced right away by * forwarding them to the RR instance. Any time there's no RR policy available - * (ie, right after the creation of the gRPCLB policy, if an empty serverlist - * is received, etc), pick/ping requests are added to a list of pending - * picks/pings to be flushed and serviced as part of \a rr_handover_locked() the - * moment the RR policy instance becomes available. + * (ie, right after the creation of the gRPCLB policy, if an empty serverlist is + * received, etc), pick/ping requests are added to a list of pending picks/pings + * to be flushed and serviced as part of \a rr_handover_locked() the moment the + * RR policy instance becomes available. * * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the * high level design and details. */ @@ -120,12 +113,20 @@ #include "src/core/ext/lb_policy/grpclb/grpclb.h" #include "src/core/ext/lb_policy/grpclb/load_balancer_api.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/support/backoff.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/static_metadata.h" +#define BACKOFF_MULTIPLIER 1.6 +#define BACKOFF_JITTER 0.2 +#define BACKOFF_MIN_SECONDS 10 +#define BACKOFF_MAX_SECONDS 60 + int grpc_lb_glb_trace = 0; /* add lb_token of selected subchannel (address) to the call's initial @@ -174,13 +175,12 @@ typedef struct wrapped_rr_closure_arg { static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { wrapped_rr_closure_arg *wc_arg = arg; - if (wc_arg->rr_policy != NULL) { - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", - (intptr_t)wc_arg->rr_policy); - } - GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure"); + GPR_ASSERT(wc_arg->wrapped_closure != NULL); + grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error), + NULL); + + if (wc_arg->rr_policy != NULL) { /* if target is NULL, no pick has been made by the RR policy (eg, all * addresses failed to connect). There won't be any user_data/token * available */ @@ -189,10 +189,12 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, wc_arg->lb_token_mdelem_storage, GRPC_MDELEM_REF(wc_arg->lb_token)); } + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", + (intptr_t)wc_arg->rr_policy); + } + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure"); } - GPR_ASSERT(wc_arg->wrapped_closure != NULL); - grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error), - NULL); GPR_ASSERT(wc_arg->free_when_done != NULL); gpr_free(wc_arg->free_when_done); } @@ -264,7 +266,6 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) { * glb_lb_policy */ typedef struct rr_connectivity_data rr_connectivity_data; -struct lb_client_data; static const grpc_lb_policy_vtable glb_lb_policy_vtable; typedef struct glb_lb_policy { /** base policy: must be first */ @@ -296,20 +297,47 @@ typedef struct glb_lb_policy { * response has arrived. */ grpc_grpclb_serverlist *serverlist; - /** addresses from \a serverlist */ - grpc_lb_addresses *addresses; - /** list of picks that are waiting on RR's policy connectivity */ pending_pick *pending_picks; /** list of pings that are waiting on RR's policy connectivity */ pending_ping *pending_pings; - /** client data associated with the LB server communication */ - struct lb_client_data *lb_client; + bool shutting_down; + + /************************************************************/ + /* client data associated with the LB server communication */ + /************************************************************/ + /* Status from the LB server has been received. This signals the end of the LB + * call. */ + grpc_closure lb_on_server_status_received; + + /* A response from the LB server has been received. Process it */ + grpc_closure lb_on_response_received; + + grpc_call *lb_call; /* streaming call to the LB server, */ + + grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */ + grpc_metadata_array + lb_trailing_metadata_recv; /* trailing MD from LB server */ + + /* what's being sent to the LB server. Note that its value may vary if the LB + * server indicates a redirect. */ + grpc_byte_buffer *lb_request_payload; + + /* response the LB server, if any. Processed in lb_on_response_received() */ + grpc_byte_buffer *lb_response_payload; + + /* call status code and details, set in lb_on_server_status_received() */ + grpc_status_code lb_call_status; + char *lb_call_status_details; + size_t lb_call_status_details_capacity; + + /** LB call retry backoff state */ + gpr_backoff lb_call_backoff_state; - /** for tracking of the RR connectivity */ - rr_connectivity_data *rr_connectivity; + /** LB call retry timer */ + grpc_timer lb_call_retry_timer; } glb_lb_policy; /* Keeps track and reacts to changes in connectivity of the RR instance */ @@ -358,6 +386,28 @@ static int lb_token_cmp(void *token1, void *token2) { static const grpc_lb_user_data_vtable lb_token_vtable = { lb_token_copy, lb_token_destroy, lb_token_cmp}; +static void parse_server(const grpc_grpclb_server *server, + grpc_resolved_address *addr) { + const uint16_t netorder_port = htons((uint16_t)server->port); + /* the addresses are given in binary format (a in(6)_addr struct) in + * server->ip_address.bytes. */ + const grpc_grpclb_ip_address *ip = &server->ip_address; + memset(addr, 0, sizeof(*addr)); + if (ip->size == 4) { + addr->len = sizeof(struct sockaddr_in); + struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr->addr; + addr4->sin_family = AF_INET; + memcpy(&addr4->sin_addr, ip->bytes, ip->size); + addr4->sin_port = netorder_port; + } else if (ip->size == 16) { + addr->len = sizeof(struct sockaddr_in6); + struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr->addr; + addr6->sin6_family = AF_INET; + memcpy(&addr6->sin6_addr, ip->bytes, ip->size); + addr6->sin6_port = netorder_port; + } +} + /* Returns addresses extracted from \a serverlist. */ static grpc_lb_addresses *process_serverlist( const grpc_grpclb_serverlist *serverlist) { @@ -384,25 +434,8 @@ static grpc_lb_addresses *process_serverlist( if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue; /* address processing */ - const uint16_t netorder_port = htons((uint16_t)server->port); - /* the addresses are given in binary format (a in(6)_addr struct) in - * server->ip_address.bytes. */ - const grpc_grpclb_ip_address *ip = &server->ip_address; grpc_resolved_address addr; - memset(&addr, 0, sizeof(addr)); - if (ip->size == 4) { - addr.len = sizeof(struct sockaddr_in); - struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr.addr; - addr4->sin_family = AF_INET; - memcpy(&addr4->sin_addr, ip->bytes, ip->size); - addr4->sin_port = netorder_port; - } else if (ip->size == 16) { - addr.len = sizeof(struct sockaddr_in6); - struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr.addr; - addr6->sin6_family = AF_INET; - memcpy(&addr6->sin6_addr, ip->bytes, ip->size); - addr6->sin6_port = netorder_port; - } + parse_server(server, &addr); /* lb token processing */ void *user_data; @@ -429,7 +462,6 @@ static grpc_lb_addresses *process_serverlist( ++addr_idx; } GPR_ASSERT(addr_idx == num_valid); - return lb_addresses; } @@ -450,7 +482,7 @@ static bool pick_from_internal_rr_locked( gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", (intptr_t)wc_arg->rr_policy); } - GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick"); + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); /* add the load reporting initial metadata */ initial_metadata_add_lb_token(pick_args->initial_metadata, @@ -463,7 +495,6 @@ static bool pick_from_internal_rr_locked( * pending pick list inside the RR policy (glb_policy->rr_policy). * Eventually, wrapped_on_complete will be called, which will -among other * things- add the LB token to the call's initial metadata */ - return pick_done; } @@ -472,54 +503,70 @@ static grpc_lb_policy *create_rr_locked( glb_lb_policy *glb_policy) { GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); - if (glb_policy->addresses != NULL) { - /* dispose of the previous version */ - grpc_lb_addresses_destroy(glb_policy->addresses); - } - glb_policy->addresses = process_serverlist(serverlist); - grpc_lb_policy_args args; memset(&args, 0, sizeof(args)); args.client_channel_factory = glb_policy->cc_factory; + grpc_lb_addresses *addresses = process_serverlist(serverlist); // Replace the LB addresses in the channel args that we pass down to // the subchannel. static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; - const grpc_arg arg = - grpc_lb_addresses_create_channel_arg(glb_policy->addresses); + const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses); args.args = grpc_channel_args_copy_and_add_and_remove( glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg, 1); grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); + GPR_ASSERT(rr != NULL); + grpc_lb_addresses_destroy(addresses); grpc_channel_args_destroy(args.args); - return rr; } +static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +/* glb_policy->rr_policy may be NULL (initial handover) */ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, grpc_error *error) { GPR_ASSERT(glb_policy->serverlist != NULL && glb_policy->serverlist->num_servers > 0); + + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "RR handover. Old RR: %p", (void *)glb_policy->rr_policy); + } + if (glb_policy->rr_policy != NULL) { + /* if we are phasing out an existing RR instance, unref it. */ + GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover"); + } + glb_policy->rr_policy = create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy); - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")", - (intptr_t)glb_policy->rr_policy); + gpr_log(GPR_INFO, "Created RR policy (%p)", (void *)glb_policy->rr_policy); } + GPR_ASSERT(glb_policy->rr_policy != NULL); grpc_pollset_set_add_pollset_set(exec_ctx, glb_policy->rr_policy->interested_parties, glb_policy->base.interested_parties); - glb_policy->rr_connectivity->state = grpc_lb_policy_check_connectivity( + + rr_connectivity_data *rr_connectivity = + gpr_malloc(sizeof(rr_connectivity_data)); + memset(rr_connectivity, 0, sizeof(rr_connectivity_data)); + grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed, + rr_connectivity); + rr_connectivity->glb_policy = glb_policy; + rr_connectivity->state = grpc_lb_policy_check_connectivity( exec_ctx, glb_policy->rr_policy, &error); - grpc_lb_policy_notify_on_state_change( - exec_ctx, glb_policy->rr_policy, &glb_policy->rr_connectivity->state, - &glb_policy->rr_connectivity->on_change); + grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - glb_policy->rr_connectivity->state, - GRPC_ERROR_REF(error), "rr_handover"); + rr_connectivity->state, GRPC_ERROR_REF(error), + "rr_handover"); + /* subscribe */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb"); + grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, + &rr_connectivity->state, + &rr_connectivity->on_change); grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy); /* flush pending ops */ @@ -553,35 +600,27 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + /* If shutdown or error free the arg. Rely on the rest of the code to set the + * right grpclb status. */ rr_connectivity_data *rr_conn_data = arg; glb_lb_policy *glb_policy = rr_conn_data->glb_policy; - if (rr_conn_data->state == GRPC_CHANNEL_SHUTDOWN) { - if (glb_policy->serverlist != NULL) { - /* a RR policy is shutting down but there's a serverlist available -> - * perform a handover */ - gpr_mu_lock(&glb_policy->mu); - rr_handover_locked(exec_ctx, glb_policy, error); - gpr_mu_unlock(&glb_policy->mu); - } else { - /* shutting down and no new serverlist available. Bail out. */ - gpr_free(rr_conn_data); - } + if (rr_conn_data->state != GRPC_CHANNEL_SHUTDOWN && + !glb_policy->shutting_down) { + gpr_mu_lock(&glb_policy->mu); + /* RR not shutting down. Mimic the RR's policy state */ + grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, + rr_conn_data->state, GRPC_ERROR_REF(error), + "rr_connectivity_cb"); + /* resubscribe. Reuse the "rr_connectivity_cb" weak ref. */ + grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, + &rr_conn_data->state, + &rr_conn_data->on_change); + gpr_mu_unlock(&glb_policy->mu); } else { - if (error == GRPC_ERROR_NONE) { - gpr_mu_lock(&glb_policy->mu); - /* RR not shutting down. Mimic the RR's policy state */ - grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - rr_conn_data->state, GRPC_ERROR_REF(error), - "glb_rr_connectivity_changed"); - /* resubscribe */ - grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, - &rr_conn_data->state, - &rr_conn_data->on_change); - gpr_mu_unlock(&glb_policy->mu); - } else { /* error */ - gpr_free(rr_conn_data); - } + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "rr_connectivity_cb"); + gpr_free(rr_conn_data); } } @@ -684,18 +723,11 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, return NULL; } - rr_connectivity_data *rr_connectivity = - gpr_malloc(sizeof(rr_connectivity_data)); - memset(rr_connectivity, 0, sizeof(rr_connectivity_data)); - grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed, - rr_connectivity); - rr_connectivity->glb_policy = glb_policy; - glb_policy->rr_connectivity = rr_connectivity; - grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable); gpr_mu_init(&glb_policy->mu); grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, "grpclb"); + return &glb_policy->base; } @@ -712,14 +744,13 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } gpr_mu_destroy(&glb_policy->mu); - grpc_lb_addresses_destroy(glb_policy->addresses); gpr_free(glb_policy); } -static void lb_client_data_destroy(struct lb_client_data *lb_client); static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; gpr_mu_lock(&glb_policy->mu); + glb_policy->shutting_down = true; pending_pick *pp = glb_policy->pending_picks; glb_policy->pending_picks = NULL; @@ -743,15 +774,16 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } if (glb_policy->rr_policy) { - /* unsubscribe */ - grpc_lb_policy_notify_on_state_change( - exec_ctx, glb_policy->rr_policy, NULL, - &glb_policy->rr_connectivity->on_change); GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); } - lb_client_data_destroy(glb_policy->lb_client); - glb_policy->lb_client = NULL; + if (glb_policy->started_picking) { + if (glb_policy->lb_call != NULL) { + grpc_call_cancel(glb_policy->lb_call, NULL); + /* lb_on_server_status_received will pick up the cancellation and clean up + */ + } + } grpc_connectivity_state_set( exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, @@ -782,17 +814,12 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_ERROR_UNREF(error); } -static grpc_call *lb_client_data_get_call(struct lb_client_data *lb_client); static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_eq, grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; gpr_mu_lock(&glb_policy->mu); - if (glb_policy->lb_client != NULL) { - /* cancel the call to the load balancer service, if any */ - grpc_call_cancel(lb_client_data_get_call(glb_policy->lb_client), NULL); - } pending_pick *pp = glb_policy->pending_picks; glb_policy->pending_picks = NULL; while (pp != NULL) { @@ -812,18 +839,20 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_ERROR_UNREF(error); } -static void query_for_backends(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy); -static void start_picking(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { +static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy); +static void start_picking_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { glb_policy->started_picking = true; - query_for_backends(exec_ctx, glb_policy); + gpr_backoff_reset(&glb_policy->lb_call_backoff_state); + query_for_backends_locked(exec_ctx, glb_policy); } static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; gpr_mu_lock(&glb_policy->mu); if (!glb_policy->started_picking) { - start_picking(exec_ctx, glb_policy); + start_picking_locked(exec_ctx, glb_policy); } gpr_mu_unlock(&glb_policy->mu); } @@ -849,8 +878,8 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (glb_policy->rr_policy != NULL) { if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "about to PICK from 0x%" PRIxPTR "", - (intptr_t)glb_policy->rr_policy); + gpr_log(GPR_INFO, "grpclb %p about to PICK from RR %p", + (void *)glb_policy, (void *)glb_policy->rr_policy); } GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick"); @@ -867,11 +896,17 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy, pick_args, target, wc_arg); } else { + if (grpc_lb_glb_trace) { + gpr_log(GPR_DEBUG, + "No RR policy in grpclb instance %p. Adding to grpclb's pending " + "picks", + (void *)(glb_policy)); + } add_pending_pick(&glb_policy->pending_picks, pick_args, target, on_complete); if (!glb_policy->started_picking) { - start_picking(exec_ctx, glb_policy); + start_picking_locked(exec_ctx, glb_policy); } pick_done = false; } @@ -900,7 +935,7 @@ static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } else { add_pending_ping(&glb_policy->pending_pings, closure); if (!glb_policy->started_picking) { - start_picking(exec_ctx, glb_policy); + start_picking_locked(exec_ctx, glb_policy); } } gpr_mu_unlock(&glb_policy->mu); @@ -918,250 +953,182 @@ static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&glb_policy->mu); } -/* - * lb_client_data - * - * Used internally for the client call to the LB */ -typedef struct lb_client_data { - gpr_mu mu; - - /* called once initial metadata's been sent */ - grpc_closure md_sent; - - /* called once the LoadBalanceRequest has been sent to the LB server. See - * src/proto/grpc/.../load_balancer.proto */ - grpc_closure req_sent; - - /* A response from the LB server has been received (or error). Process it */ - grpc_closure res_rcvd; - - /* After the client has sent a close to the LB server */ - grpc_closure close_sent; - - /* ... and the status from the LB server has been received */ - grpc_closure srv_status_rcvd; - - grpc_call *lb_call; /* streaming call to the LB server, */ - gpr_timespec deadline; /* for the streaming call to the LB server */ - - grpc_metadata_array initial_metadata_recv; /* initial MD from LB server */ - grpc_metadata_array trailing_metadata_recv; /* trailing MD from LB server */ - - /* what's being sent to the LB server. Note that its value may vary if the LB - * server indicates a redirect. */ - grpc_byte_buffer *request_payload; - - /* response from the LB server, if any. Processed in res_recv_cb() */ - grpc_byte_buffer *response_payload; - - /* the call's status and status detailset in srv_status_rcvd_cb() */ - grpc_status_code status; - char *status_details; - size_t status_details_capacity; - - /* pointer back to the enclosing policy */ - glb_lb_policy *glb_policy; -} lb_client_data; - -static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); -static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); - -static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) { +static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void lb_call_init(glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->server_name != NULL); GPR_ASSERT(glb_policy->server_name[0] != '\0'); - lb_client_data *lb_client = gpr_malloc(sizeof(lb_client_data)); - memset(lb_client, 0, sizeof(lb_client_data)); - - gpr_mu_init(&lb_client->mu); - grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client); - - grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client); - grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client); - grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client); - grpc_closure_init(&lb_client->srv_status_rcvd, srv_status_rcvd_cb, lb_client); - - lb_client->deadline = glb_policy->deadline; - /* Note the following LB call progresses every time there's activity in \a * glb_policy->base.interested_parties, which is comprised of the polling * entities from \a client_channel. */ - lb_client->lb_call = grpc_channel_create_pollset_set_call( + glb_policy->lb_call = grpc_channel_create_pollset_set_call( glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, "/grpc.lb.v1.LoadBalancer/BalanceLoad", glb_policy->server_name, - lb_client->deadline, NULL); + glb_policy->deadline, NULL); - grpc_metadata_array_init(&lb_client->initial_metadata_recv); - grpc_metadata_array_init(&lb_client->trailing_metadata_recv); + grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv); + grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv); grpc_grpclb_request *request = grpc_grpclb_request_create(glb_policy->server_name); gpr_slice request_payload_slice = grpc_grpclb_request_encode(request); - lb_client->request_payload = + glb_policy->lb_request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); gpr_slice_unref(request_payload_slice); grpc_grpclb_request_destroy(request); - lb_client->status_details = NULL; - lb_client->status_details_capacity = 0; - lb_client->glb_policy = glb_policy; - return lb_client; + glb_policy->lb_call_status_details = NULL; + glb_policy->lb_call_status_details_capacity = 0; + + grpc_closure_init(&glb_policy->lb_on_server_status_received, + lb_on_server_status_received, glb_policy); + grpc_closure_init(&glb_policy->lb_on_response_received, + lb_on_response_received, glb_policy); + + gpr_backoff_init(&glb_policy->lb_call_backoff_state, BACKOFF_MULTIPLIER, + BACKOFF_JITTER, BACKOFF_MIN_SECONDS * 1000, + BACKOFF_MAX_SECONDS * 1000); } -static void lb_client_data_destroy(lb_client_data *lb_client) { - grpc_call_destroy(lb_client->lb_call); - grpc_metadata_array_destroy(&lb_client->initial_metadata_recv); - grpc_metadata_array_destroy(&lb_client->trailing_metadata_recv); +static void lb_call_destroy(glb_lb_policy *glb_policy) { + GPR_ASSERT(glb_policy->lb_call != NULL); + grpc_call_destroy(glb_policy->lb_call); + glb_policy->lb_call = NULL; - grpc_byte_buffer_destroy(lb_client->request_payload); + grpc_metadata_array_destroy(&glb_policy->lb_initial_metadata_recv); + grpc_metadata_array_destroy(&glb_policy->lb_trailing_metadata_recv); - gpr_free(lb_client->status_details); - gpr_mu_destroy(&lb_client->mu); - gpr_free(lb_client); -} -static grpc_call *lb_client_data_get_call(lb_client_data *lb_client) { - return lb_client->lb_call; + grpc_byte_buffer_destroy(glb_policy->lb_request_payload); + gpr_free(glb_policy->lb_call_status_details); } /* * Auxiliary functions and LB client callbacks. */ -static void query_for_backends(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy) { +static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->lb_channel != NULL); + lb_call_init(glb_policy); + + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)", + (void *)glb_policy, (void *)glb_policy->lb_call); + } + GPR_ASSERT(glb_policy->lb_call != NULL); - glb_policy->lb_client = lb_client_data_create(glb_policy); grpc_call_error call_error; - grpc_op ops[1]; + grpc_op ops[4]; memset(ops, 0, sizeof(ops)); + grpc_op *op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; op->flags = 0; op->reserved = NULL; op++; - call_error = grpc_call_start_batch_and_execute( - exec_ctx, glb_policy->lb_client->lb_call, ops, (size_t)(op - ops), - &glb_policy->lb_client->md_sent); - GPR_ASSERT(GRPC_CALL_OK == call_error); - op = ops; - op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; - op->data.recv_status_on_client.trailing_metadata = - &glb_policy->lb_client->trailing_metadata_recv; - op->data.recv_status_on_client.status = &glb_policy->lb_client->status; - op->data.recv_status_on_client.status_details = - &glb_policy->lb_client->status_details; - op->data.recv_status_on_client.status_details_capacity = - &glb_policy->lb_client->status_details_capacity; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &glb_policy->lb_initial_metadata_recv; op->flags = 0; op->reserved = NULL; op++; - call_error = grpc_call_start_batch_and_execute( - exec_ctx, glb_policy->lb_client->lb_call, ops, (size_t)(op - ops), - &glb_policy->lb_client->srv_status_rcvd); - GPR_ASSERT(GRPC_CALL_OK == call_error); -} - -static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - lb_client_data *lb_client = arg; - GPR_ASSERT(lb_client->lb_call); - grpc_op ops[1]; - memset(ops, 0, sizeof(ops)); - grpc_op *op = ops; + GPR_ASSERT(glb_policy->lb_request_payload != NULL); op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message = lb_client->request_payload; + op->data.send_message = glb_policy->lb_request_payload; op->flags = 0; op->reserved = NULL; op++; - grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops), - &lb_client->req_sent); - GPR_ASSERT(GRPC_CALL_OK == call_error); -} - -static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - lb_client_data *lb_client = arg; - GPR_ASSERT(lb_client->lb_call); - - grpc_op ops[2]; - memset(ops, 0, sizeof(ops)); - grpc_op *op = ops; - op->op = GRPC_OP_RECV_INITIAL_METADATA; - op->data.recv_initial_metadata = &lb_client->initial_metadata_recv; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = + &glb_policy->lb_trailing_metadata_recv; + op->data.recv_status_on_client.status = &glb_policy->lb_call_status; + op->data.recv_status_on_client.status_details = + &glb_policy->lb_call_status_details; + op->data.recv_status_on_client.status_details_capacity = + &glb_policy->lb_call_status_details_capacity; op->flags = 0; op->reserved = NULL; op++; + /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref + * count goes to zero) to be unref'd in lb_on_server_status_received */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received"); + call_error = grpc_call_start_batch_and_execute( + exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_on_server_status_received); + GPR_ASSERT(GRPC_CALL_OK == call_error); + op = ops; op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message = &lb_client->response_payload; + op->data.recv_message = &glb_policy->lb_response_payload; op->flags = 0; op->reserved = NULL; op++; - grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops), - &lb_client->res_rcvd); + /* take another weak ref to be unref'd in lb_on_response_received */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received"); + call_error = grpc_call_start_batch_and_execute( + exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_on_response_received); GPR_ASSERT(GRPC_CALL_OK == call_error); } -static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - lb_client_data *lb_client = arg; +static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = arg; + grpc_op ops[2]; memset(ops, 0, sizeof(ops)); grpc_op *op = ops; - if (lb_client->response_payload != NULL) { + if (glb_policy->lb_response_payload != NULL) { + gpr_backoff_reset(&glb_policy->lb_call_backoff_state); /* Received data from the LB server. Look inside - * lb_client->response_payload, for a serverlist. */ + * glb_policy->lb_response_payload, for a serverlist. */ grpc_byte_buffer_reader bbr; - grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload); + grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload); gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); - grpc_byte_buffer_destroy(lb_client->response_payload); + grpc_byte_buffer_destroy(glb_policy->lb_response_payload); grpc_grpclb_serverlist *serverlist = grpc_grpclb_response_parse_serverlist(response_slice); if (serverlist != NULL) { + GPR_ASSERT(glb_policy->lb_call != NULL); gpr_slice_unref(response_slice); if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Serverlist with %lu servers received", (unsigned long)serverlist->num_servers); + for (size_t i = 0; i < serverlist->num_servers; ++i) { + grpc_resolved_address addr; + parse_server(serverlist->servers[i], &addr); + char *ipport; + grpc_sockaddr_to_string(&ipport, &addr, false); + gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport); + gpr_free(ipport); + } } /* update serverlist */ if (serverlist->num_servers > 0) { - gpr_mu_lock(&lb_client->glb_policy->mu); - if (grpc_grpclb_serverlist_equals(lb_client->glb_policy->serverlist, - serverlist)) { + gpr_mu_lock(&glb_policy->mu); + if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) { if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Incoming server list identical to current, ignoring."); } } else { /* new serverlist */ - if (lb_client->glb_policy->serverlist != NULL) { + if (glb_policy->serverlist != NULL) { /* dispose of the old serverlist */ - grpc_grpclb_destroy_serverlist(lb_client->glb_policy->serverlist); + grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } /* and update the copy in the glb_lb_policy instance */ - lb_client->glb_policy->serverlist = serverlist; - } - if (lb_client->glb_policy->rr_policy == NULL) { - /* initial "handover", in this case from a null RR policy, meaning - * it'll just create the first RR policy instance */ - rr_handover_locked(exec_ctx, lb_client->glb_policy, error); - } else { - /* unref the RR policy, eventually leading to its substitution with a - * new one constructed from the received serverlist (see - * glb_rr_connectivity_changed) */ - GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy, - "serverlist_received"); + glb_policy->serverlist = serverlist; + + rr_handover_locked(exec_ctx, glb_policy, error); } - gpr_mu_unlock(&lb_client->glb_policy->mu); + gpr_mu_unlock(&glb_policy->mu); } else { if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, @@ -1169,60 +1136,94 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { "response with > 0 servers is received"); } } + } else { /* serverlist == NULL */ + gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.", + gpr_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); + gpr_slice_unref(response_slice); + } + if (!glb_policy->shutting_down) { /* keep listening for serverlist updates */ op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message = &lb_client->response_payload; + op->data.recv_message = &glb_policy->lb_response_payload; op->flags = 0; op->reserved = NULL; op++; + /* reuse the "lb_on_response_received" weak ref taken in + * query_for_backends_locked() */ const grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops), - &lb_client->res_rcvd); /* loop */ + exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_on_response_received); /* loop */ GPR_ASSERT(GRPC_CALL_OK == call_error); - return; } - - GPR_ASSERT(serverlist == NULL); - gpr_log(GPR_ERROR, "Invalid LB response received: '%s'", - gpr_dump_slice(response_slice, GPR_DUMP_ASCII)); - gpr_slice_unref(response_slice); - - /* Disconnect from server returning invalid response. */ - op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; - op->flags = 0; - op->reserved = NULL; - op++; - grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops), - &lb_client->close_sent); - GPR_ASSERT(GRPC_CALL_OK == call_error); + } else { /* empty payload: call cancelled. */ + /* dispose of the "lb_on_response_received" weak ref taken in + * query_for_backends_locked() and reused in every reception loop */ + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "lb_on_response_received_empty_payload"); } - /* empty payload: call cancelled by server. Cleanups happening in - * srv_status_rcvd_cb */ } -static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, - "Close from LB client sent. Waiting from server status now"); +static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = arg; + gpr_mu_lock(&glb_policy->mu); + + if (!glb_policy->shutting_down) { + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)", + (void *)glb_policy); + } + GPR_ASSERT(glb_policy->lb_call == NULL); + query_for_backends_locked(exec_ctx, glb_policy); } + gpr_mu_unlock(&glb_policy->mu); + + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "grpclb_on_retry_timer"); } -static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - lb_client_data *lb_client = arg; +static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = arg; + gpr_mu_lock(&glb_policy->mu); + + GPR_ASSERT(glb_policy->lb_call != NULL); + if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, - "status from lb server received. Status = %d, Details = '%s', " - "Capacity " - "= %lu", - lb_client->status, lb_client->status_details, - (unsigned long)lb_client->status_details_capacity); + gpr_log(GPR_DEBUG, + "Status from LB server received. Status = %d, Details = '%s', " + "(call: %p)", + glb_policy->lb_call_status, glb_policy->lb_call_status_details, + (void *)glb_policy->lb_call); + } + + /* We need to performe cleanups no matter what. */ + lb_call_destroy(glb_policy); + + if (!glb_policy->shutting_down) { + /* if we aren't shutting down, restart the LB client call after some time */ + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec next_try = + gpr_backoff_step(&glb_policy->lb_call_backoff_state, now); + if (grpc_lb_glb_trace) { + gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...", + (void *)glb_policy); + gpr_timespec timeout = gpr_time_sub(next_try, now); + if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) { + gpr_log(GPR_DEBUG, "... retrying in %" PRId64 ".%09d seconds.", + timeout.tv_sec, timeout.tv_nsec); + } else { + gpr_log(GPR_DEBUG, "... retrying immediately."); + } + } + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); + grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try, + lb_call_on_retry_timer, glb_policy, now); } - /* TODO(dgq): deal with stream termination properly (fire up another one? - * fail the original call?) */ + gpr_mu_unlock(&glb_policy->mu); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "lb_on_server_status_received"); } /* Code wiring the policy with the rest of the core */ diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 37a9b18b97..b0c461730b 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -120,6 +120,8 @@ typedef struct { grpc_connectivity_state connectivity_state; /** the subchannel's target user data */ void *user_data; + /** vtable to operate over \a user_data */ + const grpc_lb_user_data_vtable *user_data_vtable; } subchannel_data; struct round_robin_lb_policy { @@ -186,9 +188,13 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) { } if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)", - (void *)p->ready_list_last_pick, - (void *)p->ready_list_last_pick->subchannel); + gpr_log(GPR_DEBUG, + "[READYLIST, RR: %p] ADVANCED LAST PICK. NOW AT NODE %p (SC %p, " + "CSC %p)", + (void *)p, (void *)p->ready_list_last_pick, + (void *)p->ready_list_last_pick->subchannel, + (void *)grpc_subchannel_get_connected_subchannel( + p->ready_list_last_pick->subchannel)); } } @@ -255,9 +261,18 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; ready_list *elem; + + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol); + } + for (size_t i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin_destroy"); + if (sd->user_data != NULL) { + GPR_ASSERT(sd->user_data_vtable != NULL); + sd->user_data_vtable->destroy(sd->user_data); + } gpr_free(sd); } @@ -285,6 +300,9 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { size_t i; gpr_mu_lock(&p->mu); + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol); + } p->shutdown = 1; while ((pp = p->pending_picks)) { @@ -296,7 +314,7 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE("Channel Shutdown"), "shutdown"); + GRPC_ERROR_CREATE("Channel Shutdown"), "rr_shutdown"); for (i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, @@ -395,6 +413,11 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *pp; ready_list *selected; gpr_mu_lock(&p->mu); + + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol); + } + if ((selected = peek_next_connected_locked(p))) { /* readily available, report right away */ *target = GRPC_CONNECTED_SUBCHANNEL_REF( @@ -435,7 +458,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, subchannel_data *sd = arg; round_robin_lb_policy *p = sd->policy; pending_pick *pp; - ready_list *selected; int unref = 0; @@ -456,12 +478,14 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ - selected = peek_next_connected_locked(p); + ready_list *selected = peek_next_connected_locked(p); + GPR_ASSERT(selected != NULL); if (p->pending_picks != NULL) { /* if the selected subchannel is going to be used for the pending * picks, update the last picked pointer */ advance_last_picked_locked(p); } + while ((pp = p->pending_picks)) { p->pending_picks = pp->next; @@ -653,7 +677,9 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, sd->policy = p; sd->index = subchannel_idx; sd->subchannel = subchannel; - sd->user_data = addresses->addresses[i].user_data; + sd->user_data_vtable = addresses->user_data_vtable; + sd->user_data = + sd->user_data_vtable->copy(addresses->addresses[i].user_data); ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed, sd); |