diff options
author | kpayson64 <kpayson@google.com> | 2017-09-14 13:54:16 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-14 13:54:16 -0700 |
commit | 7db81fb6d05b18f3150356e38b57fbe65c1a95a4 (patch) | |
tree | d61fc1e8d3d636f7fa28045359900b57aed70c2b /src/core | |
parent | 8f21b12abefd8eafbdcaec6634e3bfbdad2e869b (diff) | |
parent | 9fa10cce9ffe2689174af7b69940c8c2f2b51d0f (diff) |
Merge pull request #12565 from kpayson64/revert_lb_stuff
Revert "Add fallback (use backends from resolver if can't reach balan…
Diffstat (limited to 'src/core')
3 files changed, 42 insertions, 159 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index d3a51c6e6f..a776a07d99 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -123,7 +123,6 @@ #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2 -#define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000 grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false, "glb"); @@ -300,10 +299,6 @@ typedef struct glb_lb_policy { /** timeout in milliseconds for the LB call. 0 means no deadline. */ int lb_call_timeout_ms; - /** timeout in milliseconds for before using fallback backend addresses. - * 0 means not using fallback. */ - int lb_fallback_timeout_ms; - /** for communicating with the LB server */ grpc_channel *lb_channel; @@ -330,9 +325,6 @@ typedef struct glb_lb_policy { * Otherwise, we delegate to the RR policy. */ size_t serverlist_index; - /** stores the backend addresses from the resolver */ - grpc_lb_addresses *fallback_backend_addresses; - /** list of picks that are waiting on RR's policy connectivity */ pending_pick *pending_picks; @@ -353,9 +345,6 @@ typedef struct glb_lb_policy { /** is \a lb_call_retry_timer active? */ bool retry_timer_active; - /** is \a lb_fallback_timer active? */ - bool fallback_timer_active; - /** called upon changes to the LB channel's connectivity. */ grpc_closure lb_channel_on_connectivity_changed; @@ -378,9 +367,6 @@ typedef struct glb_lb_policy { /* LB call retry timer callback. */ grpc_closure lb_on_call_retry; - /* LB fallback timer callback. */ - grpc_closure lb_on_fallback; - grpc_call *lb_call; /* streaming call to the LB server, */ grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */ @@ -404,9 +390,6 @@ typedef struct glb_lb_policy { /** LB call retry timer */ grpc_timer lb_call_retry_timer; - /** LB fallback timer */ - grpc_timer lb_fallback_timer; - bool initial_request_sent; bool seen_initial_response; @@ -553,32 +536,6 @@ static grpc_lb_addresses *process_serverlist_locked( return lb_addresses; } -/* Returns the backend addresses extracted from the given addresses */ -static grpc_lb_addresses *extract_backend_addresses_locked( - grpc_exec_ctx *exec_ctx, const grpc_lb_addresses *addresses) { - /* first pass: count the number of backend addresses */ - size_t num_backends = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (!addresses->addresses[i].is_balancer) { - ++num_backends; - } - } - /* second pass: actually populate the addresses and (empty) LB tokens */ - grpc_lb_addresses *backend_addresses = - grpc_lb_addresses_create(num_backends, &lb_token_vtable); - size_t num_copied = 0; - for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (addresses->addresses[i].is_balancer) continue; - const grpc_resolved_address *addr = &addresses->addresses[i].address; - grpc_lb_addresses_set_address(backend_addresses, num_copied, &addr->addr, - addr->len, false /* is_balancer */, - NULL /* balancer_name */, - (void *)GRPC_MDELEM_LB_TOKEN_EMPTY.payload); - ++num_copied; - } - return backend_addresses; -} - static void update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, grpc_connectivity_state rr_state, grpc_error *rr_state_error) { @@ -646,38 +603,35 @@ static bool pick_from_internal_rr_locked( grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, const grpc_lb_policy_pick_args *pick_args, bool force_async, grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { - // Check for drops if we are not using fallback backend addresses. - if (glb_policy->serverlist != NULL) { - // Look at the index into the serverlist to see if we should drop this call. - grpc_grpclb_server *server = - glb_policy->serverlist->servers[glb_policy->serverlist_index++]; - if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) { - glb_policy->serverlist_index = 0; // Wrap-around. + // Look at the index into the serverlist to see if we should drop this call. + grpc_grpclb_server *server = + glb_policy->serverlist->servers[glb_policy->serverlist_index++]; + if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) { + glb_policy->serverlist_index = 0; // Wrap-around. + } + if (server->drop) { + // Not using the RR policy, so unref it. + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", + (intptr_t)wc_arg->rr_policy); } - if (server->drop) { - // Not using the RR policy, so unref it. - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", - (intptr_t)wc_arg->rr_policy); - } - GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); - // Update client load reporting stats to indicate the number of - // dropped calls. Note that we have to do this here instead of in - // the client_load_reporting filter, because we do not create a - // subchannel call (and therefore no client_load_reporting filter) - // for dropped calls. - grpc_grpclb_client_stats_add_call_dropped_locked( - server->load_balance_token, wc_arg->client_stats); - grpc_grpclb_client_stats_unref(wc_arg->client_stats); - if (force_async) { - GPR_ASSERT(wc_arg->wrapped_closure != NULL); - GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); - gpr_free(wc_arg->free_when_done); - return false; - } + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); + // Update client load reporting stats to indicate the number of + // dropped calls. Note that we have to do this here instead of in + // the client_load_reporting filter, because we do not create a + // subchannel call (and therefore no client_load_reporting filter) + // for dropped calls. + grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token, + wc_arg->client_stats); + grpc_grpclb_client_stats_unref(wc_arg->client_stats); + if (force_async) { + GPR_ASSERT(wc_arg->wrapped_closure != NULL); + GRPC_CLOSURE_SCHED(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); gpr_free(wc_arg->free_when_done); - return true; + return false; } + gpr_free(wc_arg->free_when_done); + return true; } // Pick via the RR policy. const bool pick_done = grpc_lb_policy_pick_locked( @@ -715,18 +669,8 @@ static bool pick_from_internal_rr_locked( static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { - grpc_lb_addresses *addresses; - if (glb_policy->serverlist != NULL) { - GPR_ASSERT(glb_policy->serverlist->num_servers > 0); - addresses = process_serverlist_locked(exec_ctx, glb_policy->serverlist); - } else { - // If rr_handover_locked() is invoked when we haven't received any - // serverlist from the balancer, we use the fallback backends returned by - // the resolver. Note that the fallback backend list may be empty, in which - // case the new round_robin policy will keep the requested picks pending. - GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL); - addresses = grpc_lb_addresses_copy(glb_policy->fallback_backend_addresses); - } + grpc_lb_addresses *addresses = + process_serverlist_locked(exec_ctx, glb_policy->serverlist); GPR_ASSERT(addresses != NULL); grpc_lb_policy_args *args = (grpc_lb_policy_args *)gpr_zalloc(sizeof(*args)); args->client_channel_factory = glb_policy->cc_factory; @@ -832,6 +776,8 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, /* glb_policy->rr_policy may be NULL (initial handover) */ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { + GPR_ASSERT(glb_policy->serverlist != NULL && + glb_policy->serverlist->num_servers > 0); if (glb_policy->shutting_down) return; grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy); GPR_ASSERT(args != NULL); @@ -980,9 +926,6 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (glb_policy->serverlist != NULL) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } - if (glb_policy->fallback_backend_addresses != NULL) { - grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses); - } grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); grpc_subchannel_index_unref(); if (glb_policy->pending_update_args != NULL) { @@ -1124,28 +1067,10 @@ static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); } -static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy); static void start_picking_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { - /* start a timer to fall back */ - if (glb_policy->lb_fallback_timeout_ms > 0 && - glb_policy->serverlist == NULL) { - gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_timespec deadline = gpr_time_add( - now, - gpr_time_from_millis(glb_policy->lb_fallback_timeout_ms, GPR_TIMESPAN)); - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_fallback_timer"); - GRPC_CLOSURE_INIT(&glb_policy->lb_on_fallback, lb_on_fallback_timer_locked, - glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner)); - glb_policy->fallback_timer_active = true; - grpc_timer_init(exec_ctx, &glb_policy->lb_fallback_timer, deadline, - &glb_policy->lb_on_fallback, now); - } - glb_policy->started_picking = true; gpr_backoff_reset(&glb_policy->lb_call_backoff_state); query_for_backends_locked(exec_ctx, glb_policy); @@ -1600,15 +1525,6 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, if (glb_policy->serverlist != NULL) { /* dispose of the old serverlist */ grpc_grpclb_destroy_serverlist(glb_policy->serverlist); - } else { - /* or dispose of the fallback */ - grpc_lb_addresses_destroy(exec_ctx, - glb_policy->fallback_backend_addresses); - glb_policy->fallback_backend_addresses = NULL; - if (glb_policy->fallback_timer_active) { - grpc_timer_cancel(exec_ctx, &glb_policy->lb_fallback_timer); - glb_policy->fallback_timer_active = false; - } } /* and update the copy in the glb_lb_policy instance. This * serverlist instance will be destroyed either upon the next @@ -1619,7 +1535,9 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, } } else { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Received empty server list, ignoring."); + gpr_log(GPR_INFO, + "Received empty server list. Picks will stay pending until " + "a response with > 0 servers is received"); } grpc_grpclb_destroy_serverlist(serverlist); } @@ -1666,26 +1584,6 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_retry_timer"); } -static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - glb_lb_policy *glb_policy = (glb_lb_policy *)arg; - /* If we receive a serverlist after the timer fires but before this callback - * actually runs, don't do anything. */ - if (glb_policy->serverlist != NULL) return; - glb_policy->fallback_timer_active = false; - if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE) { - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, - "Falling back to use backends from resolver (grpclb %p)", - (void *)glb_policy); - } - GPR_ASSERT(glb_policy->fallback_backend_addresses != NULL); - rr_handover_locked(exec_ctx, glb_policy); - } - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "grpclb_fallback_timer"); -} - static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)arg; @@ -1806,17 +1704,6 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, &glb_policy->lb_channel_connectivity, &glb_policy->lb_channel_on_connectivity_changed, NULL); } - - // Propagate update to fallback_backend_addresses if a non-empty serverlist - // hasn't been received from the balancer. - if (glb_policy->serverlist == NULL) { - grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses); - glb_policy->fallback_backend_addresses = - extract_backend_addresses_locked(exec_ctx, addresses); - if (glb_policy->rr_policy != NULL) { - rr_handover_locked(exec_ctx, glb_policy); - } - } } // Invoked as part of the update process. It continues watching the LB channel @@ -1899,7 +1786,13 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { - /* Count the number of gRPC-LB addresses. There must be at least one. */ + /* Count the number of gRPC-LB addresses. There must be at least one. + * TODO(roth): For now, we ignore non-balancer addresses, but in the + * future, we may change the behavior such that we fall back to using + * the non-balancer addresses if we cannot reach any balancers. In the + * fallback case, we should use the LB policy indicated by + * GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is + * unset, we should default to pick_first). */ const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); if (arg == NULL || arg->type != GRPC_ARG_POINTER) { @@ -1935,11 +1828,6 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, glb_policy->lb_call_timeout_ms = grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX}); - arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS); - glb_policy->lb_fallback_timeout_ms = grpc_channel_arg_get_integer( - arg, (grpc_integer_options){GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, - INT_MAX}); - // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, // since we use this to trigger the client_load_reporting filter. grpc_arg new_arg = @@ -1948,11 +1836,6 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, glb_policy->args = grpc_channel_args_copy_and_add_and_remove( args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); - /* Extract the backend addresses (may be empty) from the resolver for - * fallback. */ - glb_policy->fallback_backend_addresses = - extract_backend_addresses_locked(exec_ctx, addresses); - /* Create a client channel over them to communicate with a LB service */ glb_policy->response_generator = grpc_fake_resolver_response_generator_create(); diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.c b/src/core/ext/filters/client_channel/lb_policy_factory.c index a9a00f5b2b..acf5929746 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.c +++ b/src/core/ext/filters/client_channel/lb_policy_factory.c @@ -56,7 +56,7 @@ grpc_lb_addresses* grpc_lb_addresses_copy(const grpc_lb_addresses* addresses) { } void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index, - const void* address, size_t address_len, + void* address, size_t address_len, bool is_balancer, const char* balancer_name, void* user_data) { GPR_ASSERT(index < addresses->num_addresses); diff --git a/src/core/ext/filters/client_channel/lb_policy_factory.h b/src/core/ext/filters/client_channel/lb_policy_factory.h index cf0f8cb615..9d9fb143df 100644 --- a/src/core/ext/filters/client_channel/lb_policy_factory.h +++ b/src/core/ext/filters/client_channel/lb_policy_factory.h @@ -73,7 +73,7 @@ grpc_lb_addresses *grpc_lb_addresses_copy(const grpc_lb_addresses *addresses); * \a address is a socket address of length \a address_len. * Takes ownership of \a balancer_name. */ void grpc_lb_addresses_set_address(grpc_lb_addresses *addresses, size_t index, - const void *address, size_t address_len, + void *address, size_t address_len, bool is_balancer, const char *balancer_name, void *user_data); |