diff options
author | murgatroid99 <mlumish@google.com> | 2016-10-07 15:09:53 -0700 |
---|---|---|
committer | murgatroid99 <mlumish@google.com> | 2016-10-07 15:09:53 -0700 |
commit | fde76d7996fbfd259ac3d0dfa8324efbdcf14f63 (patch) | |
tree | b91c93607f138a266966463b21dbb906218562f8 /src/core/ext/lb_policy/grpclb | |
parent | 197d87e6efddc532cca5a213467adca231bac270 (diff) | |
parent | 70c0b32c0fc5db15c764a2e22c2491d19d7cff7b (diff) |
Merge branch 'master' into uv_core_transport
Diffstat (limited to 'src/core/ext/lb_policy/grpclb')
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 63 |
1 files changed, 26 insertions, 37 deletions
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 2d218def52..f8fd8b93da 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -107,6 +107,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/string_util.h> +#include <grpc/support/time.h> #include "src/core/ext/client_config/client_channel_factory.h" #include "src/core/ext/client_config/lb_policy_factory.h" @@ -200,18 +201,8 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, typedef struct pending_pick { struct pending_pick *next; - /* polling entity for the pick()'s async notification */ - grpc_polling_entity *pollent; - - /* the initial metadata for the pick. See grpc_lb_policy_pick() */ - grpc_metadata_batch *initial_metadata; - - /* storage for the lb token initial metadata mdelem */ - grpc_linked_mdelem *lb_token_mdelem_storage; - - /* bitmask passed to pick() and used for selective cancelling. See - * grpc_lb_policy_cancel_picks() */ - uint32_t initial_metadata_flags; + /* original pick()'s arguments */ + grpc_lb_policy_pick_args pick_args; /* output argument where to store the pick()ed connected subchannel, or NULL * upon error. */ @@ -233,11 +224,8 @@ static void add_pending_pick(pending_pick **root, memset(pp, 0, sizeof(pending_pick)); memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg)); pp->next = *root; - pp->pollent = pick_args->pollent; + pp->pick_args = *pick_args; pp->target = target; - pp->initial_metadata = pick_args->initial_metadata; - pp->initial_metadata_flags = pick_args->initial_metadata_flags; - pp->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; pp->wrapped_on_complete_arg.wrapped_closure = on_complete; pp->wrapped_on_complete_arg.target = target; pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata; @@ -284,9 +272,13 @@ typedef struct glb_lb_policy { /** mutex protecting remaining members */ gpr_mu mu; + /** who the client is trying to communicate with */ const char *server_name; grpc_client_channel_factory *cc_factory; + /** deadline for the LB's call */ + gpr_timespec deadline; + /** for communicating with the LB server */ grpc_channel *lb_channel; @@ -487,10 +479,8 @@ static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } - const grpc_lb_policy_pick_args pick_args = { - pp->pollent, pp->initial_metadata, pp->initial_metadata_flags, - pp->lb_token_mdelem_storage}; - grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pick_args, pp->target, + grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, &pp->pick_args, + pp->target, (void **)&pp->wrapped_on_complete_arg.lb_token, &pp->wrapped_on_complete); pp->wrapped_on_complete_arg.owning_pending_node = pp; @@ -587,7 +577,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, } else { GPR_ASSERT(grpc_sockaddr_to_string( &addr_strs[addr_index++], - &args->addresses->addresses[i].address, true) == 0); + &args->addresses->addresses[i].address, true) > 0); } } } @@ -658,7 +648,6 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { *pp->target = NULL; grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_NONE, NULL); - gpr_free(pp); pp = next; } @@ -696,12 +685,11 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if (pp->target == target) { grpc_polling_entity_del_from_pollset_set( - exec_ctx, pp->pollent, glb_policy->base.interested_parties); + exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties); *target = NULL; grpc_exec_ctx_sched( exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); - gpr_free(pp); } else { pp->next = glb_policy->pending_picks; glb_policy->pending_picks = pp; @@ -727,14 +715,13 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, glb_policy->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; - if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == + if ((pp->pick_args.initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { grpc_polling_entity_del_from_pollset_set( - exec_ctx, pp->pollent, glb_policy->base.interested_parties); + exec_ctx, pp->pick_args.pollent, glb_policy->base.interested_parties); grpc_exec_ctx_sched( exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL); - gpr_free(pp); } else { pp->next = glb_policy->pending_picks; glb_policy->pending_picks = pp; @@ -765,8 +752,6 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, void **user_data, grpc_closure *on_complete) { - glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - if (pick_args->lb_token_mdelem_storage == NULL) { *target = NULL; grpc_exec_ctx_sched( @@ -774,11 +759,13 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting " "won't work without it. Failing"), NULL); - return 1; + return 0; } + glb_lb_policy *glb_policy = (glb_lb_policy *)pol; gpr_mu_lock(&glb_policy->mu); - int r; + glb_policy->deadline = pick_args->deadline; + bool pick_done; if (glb_policy->rr_policy != NULL) { if (grpc_lb_glb_trace) { @@ -797,10 +784,11 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure_init(&glb_policy->wrapped_on_complete, wrapped_rr_closure, &glb_policy->wc_arg); - r = grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target, + pick_done = + grpc_lb_policy_pick(exec_ctx, glb_policy->rr_policy, pick_args, target, (void **)&glb_policy->wc_arg.lb_token, &glb_policy->wrapped_on_complete); - if (r != 0) { + if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", @@ -814,6 +802,8 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_MDELEM_REF(glb_policy->wc_arg.lb_token)); } } else { + /* else, the pending pick will be registered and taken care of by the + * pending pick list inside the RR policy (glb_policy->rr_policy) */ grpc_polling_entity_add_to_pollset_set(exec_ctx, pick_args->pollent, glb_policy->base.interested_parties); add_pending_pick(&glb_policy->pending_picks, pick_args, target, @@ -822,10 +812,10 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (!glb_policy->started_picking) { start_picking(exec_ctx, glb_policy); } - r = 0; + pick_done = false; } gpr_mu_unlock(&glb_policy->mu); - return r; + return pick_done; } static grpc_connectivity_state glb_check_connectivity( @@ -935,8 +925,7 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) { grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client); grpc_closure_init(&lb_client->srv_status_rcvd, srv_status_rcvd_cb, lb_client); - /* TODO(dgq): get the deadline from the parent channel. */ - lb_client->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + lb_client->deadline = glb_policy->deadline; /* Note the following LB call progresses every time there's activity in \a * glb_policy->base.interested_parties, which is comprised of the polling |