From d7389b418f9ea077e4ab3de2b53362ab02ab6870 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 17 May 2017 12:22:17 -0700 Subject: Implement grpclb drop support. --- .../client_channel/lb_policy/grpclb/grpclb.c | 81 +++++++++++++++----- .../lb_policy/grpclb/load_balancer_api.c | 89 +++++++++++----------- .../lb_policy/grpclb/load_balancer_api.h | 2 +- 3 files changed, 108 insertions(+), 64 deletions(-) (limited to 'src/core/ext/filters/client_channel/lb_policy') diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index b7c0e929b7..d2a2856a18 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -327,6 +327,11 @@ typedef struct glb_lb_policy { * response has arrived. */ grpc_grpclb_serverlist *serverlist; + /** Index into serverlist for next pick. + * If the server at this index is a drop, we return a drop. + * Otherwise, we delegate to the RR policy. */ + size_t serverlist_index; + /** list of picks that are waiting on RR's policy connectivity */ pending_pick *pending_picks; @@ -402,6 +407,9 @@ struct rr_connectivity_data { static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, bool log) { + if (server->drop_for_rate_limiting || server->drop_for_load_balancing) { + return false; + } const grpc_grpclb_ip_address *ip = &server->ip_address; if (server->port >> 16 != 0) { if (log) { @@ -411,7 +419,6 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, } return false; } - if (ip->size != 4 && ip->size != 16) { if (log) { gpr_log(GPR_ERROR, @@ -445,11 +452,12 @@ static const grpc_lb_user_data_vtable lb_token_vtable = { static void parse_server(const grpc_grpclb_server *server, grpc_resolved_address *addr) { + memset(addr, 0, sizeof(*addr)); + if (server->drop_for_rate_limiting || server->drop_for_load_balancing) return; const uint16_t netorder_port = htons((uint16_t)server->port); /* the addresses are given in binary format (a in(6)_addr struct) in * server->ip_address.bytes. */ const grpc_grpclb_ip_address *ip = &server->ip_address; - memset(addr, 0, sizeof(*addr)); if (ip->size == 4) { addr->len = sizeof(struct sockaddr_in); struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr->addr; @@ -586,16 +594,51 @@ static bool update_lb_connectivity_status_locked( return true; } -/* perform a pick over \a rr_policy. Given that a pick can return immediately - * (ignoring its completion callback) we need to perform the cleanups this - * callback would be otherwise resposible for */ +/* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return + * immediately (ignoring its completion callback), we need to perform the + * cleanups this callback would otherwise be resposible for. + * If \a force_async is true, then we will manually schedule the + * completion callback even if the pick is available immediately. */ static bool pick_from_internal_rr_locked( - grpc_exec_ctx *exec_ctx, grpc_lb_policy *rr_policy, - const grpc_lb_policy_pick_args *pick_args, + grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, + const grpc_lb_policy_pick_args *pick_args, bool force_async, grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { - GPR_ASSERT(rr_policy != NULL); + // Look at the index into the serverlist to see if we should drop this call. + grpc_grpclb_server *server = + glb_policy->serverlist->servers[glb_policy->serverlist_index++]; + if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) { + glb_policy->serverlist_index = 0; // Wrap-around. + } + if (server->drop_for_rate_limiting || server->drop_for_load_balancing) { + // Not using the RR policy, so unref it. + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", + (intptr_t)wc_arg->rr_policy); + } + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); + // Update client load reporting stats to indicate the number of + // dropped calls. Note that we have to do this here instead of in + // the client_load_reporting filter, because we do not create a + // subchannel call (and therefore no client_load_reporting filter) + // for dropped calls. + grpc_grpclb_client_stats_add_call_started(wc_arg->client_stats); + grpc_grpclb_client_stats_add_call_finished( + server->drop_for_rate_limiting, server->drop_for_load_balancing, + false /* failed_to_send */, false /* known_received */, + wc_arg->client_stats); + grpc_grpclb_client_stats_unref(wc_arg->client_stats); + if (force_async) { + GPR_ASSERT(wc_arg->wrapped_closure != NULL); + grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + gpr_free(wc_arg->free_when_done); + return false; + } + gpr_free(wc_arg->free_when_done); + return true; + } + // Pick via the RR policy. const bool pick_done = grpc_lb_policy_pick_locked( - exec_ctx, rr_policy, pick_args, target, wc_arg->context, + exec_ctx, wc_arg->rr_policy, pick_args, target, wc_arg->context, (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ @@ -604,17 +647,20 @@ static bool pick_from_internal_rr_locked( (intptr_t)wc_arg->rr_policy); } GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); - /* add the load reporting initial metadata */ initial_metadata_add_lb_token(exec_ctx, pick_args->initial_metadata, pick_args->lb_token_mdelem_storage, GRPC_MDELEM_REF(wc_arg->lb_token)); - // Pass on client stats via context. Passes ownership of the reference. GPR_ASSERT(wc_arg->client_stats != NULL); wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats; wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; - + if (force_async) { + GPR_ASSERT(wc_arg->wrapped_closure != NULL); + grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + gpr_free(wc_arg->free_when_done); + return false; + } gpr_free(wc_arg->free_when_done); } /* else, the pending pick will be registered and taken care of by the @@ -744,8 +790,8 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } - pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy, - &pp->pick_args, pp->target, + pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args, + true /* force_async */, pp->target, &pp->wrapped_on_complete_arg); } @@ -1115,8 +1161,9 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; wc_arg->initial_metadata = pick_args->initial_metadata; wc_arg->free_when_done = wc_arg; - pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy, - pick_args, target, wc_arg); + pick_done = + pick_from_internal_rr_locked(exec_ctx, glb_policy, pick_args, + false /* force_async */, target, wc_arg); } else { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_DEBUG, @@ -1517,7 +1564,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, * serverlist instance will be destroyed either upon the next * update or in glb_destroy() */ glb_policy->serverlist = serverlist; - + glb_policy->serverlist_index = 0; rr_handover_locked(exec_ctx, glb_policy); } } else { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c index 81b6932fae..90e7c2efe5 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c @@ -37,44 +37,39 @@ #include +/* invoked once for every Server in ServerList */ +static bool count_serverlist(pb_istream_t *stream, const pb_field_t *field, + void **arg) { + grpc_grpclb_serverlist *sl = *arg; + grpc_grpclb_server server; + if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) { + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); + return false; + } + ++sl->num_servers; + return true; +} + typedef struct decode_serverlist_arg { - /* The first pass counts the number of servers in the server list. The second - * one allocates and decodes. */ - bool first_pass; /* The decoding callback is invoked once per server in serverlist. Remember * which index of the serverlist are we currently decoding */ size_t decoding_idx; - /* Populated after the first pass. Number of server in the input serverlist */ - size_t num_servers; /* The decoded serverlist */ - grpc_grpclb_server **servers; + grpc_grpclb_serverlist *serverlist; } decode_serverlist_arg; /* invoked once for every Server in ServerList */ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field, void **arg) { decode_serverlist_arg *dec_arg = *arg; - if (dec_arg->first_pass) { /* count how many server do we have */ - grpc_grpclb_server server; - if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) { - gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); - return false; - } - dec_arg->num_servers++; - } else { /* second pass. Actually decode. */ - grpc_grpclb_server *server = gpr_zalloc(sizeof(grpc_grpclb_server)); - GPR_ASSERT(dec_arg->num_servers > 0); - if (dec_arg->decoding_idx == 0) { /* first iteration of second pass */ - dec_arg->servers = - gpr_malloc(sizeof(grpc_grpclb_server *) * dec_arg->num_servers); - } - if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) { - gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); - return false; - } - dec_arg->servers[dec_arg->decoding_idx++] = server; + GPR_ASSERT(dec_arg->serverlist->num_servers >= dec_arg->decoding_idx); + grpc_grpclb_server *server = gpr_zalloc(sizeof(grpc_grpclb_server)); + if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) { + gpr_free(server); + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); + return false; } - + dec_arg->serverlist->servers[dec_arg->decoding_idx++] = server; return true; } @@ -165,36 +160,38 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse( grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( grpc_slice encoded_grpc_grpclb_response) { - bool status; - decode_serverlist_arg arg; pb_istream_t stream = pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response), GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response)); pb_istream_t stream_at_start = stream; + grpc_grpclb_serverlist *sl = gpr_zalloc(sizeof(grpc_grpclb_serverlist)); grpc_grpclb_response res; memset(&res, 0, sizeof(grpc_grpclb_response)); - memset(&arg, 0, sizeof(decode_serverlist_arg)); - - res.server_list.servers.funcs.decode = decode_serverlist; - res.server_list.servers.arg = &arg; - arg.first_pass = true; - status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res); + // First pass: count number of servers. + res.server_list.servers.funcs.decode = count_serverlist; + res.server_list.servers.arg = sl; + bool status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res); if (!status) { + gpr_free(sl); gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); return NULL; } - - arg.first_pass = false; - status = - pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, &res); - if (!status) { - gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); - return NULL; + // Second pass: populate servers. + if (sl->num_servers > 0) { + sl->servers = gpr_zalloc(sizeof(grpc_grpclb_server *) * sl->num_servers); + decode_serverlist_arg decode_arg; + memset(&decode_arg, 0, sizeof(decode_arg)); + decode_arg.serverlist = sl; + res.server_list.servers.funcs.decode = decode_serverlist; + res.server_list.servers.arg = &decode_arg; + status = pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, + &res); + if (!status) { + grpc_grpclb_destroy_serverlist(sl); + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); + return NULL; + } } - - grpc_grpclb_serverlist *sl = gpr_zalloc(sizeof(grpc_grpclb_serverlist)); - sl->num_servers = arg.num_servers; - sl->servers = arg.servers; if (res.server_list.has_expiration_interval) { sl->expiration_interval = res.server_list.expiration_interval; } @@ -228,7 +225,7 @@ grpc_grpclb_serverlist *grpc_grpclb_serverlist_copy( bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist *lhs, const grpc_grpclb_serverlist *rhs) { - if ((lhs == NULL) || (rhs == NULL)) { + if (lhs == NULL || rhs == NULL) { return false; } if (lhs->num_servers != rhs->num_servers) { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h index 06873821bd..7f596ce1f1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h @@ -51,7 +51,7 @@ typedef grpc_lb_v1_LoadBalanceRequest grpc_grpclb_request; typedef grpc_lb_v1_InitialLoadBalanceResponse grpc_grpclb_initial_response; typedef grpc_lb_v1_Server grpc_grpclb_server; typedef grpc_lb_v1_Duration grpc_grpclb_duration; -typedef struct grpc_grpclb_serverlist { +typedef struct { grpc_grpclb_server **servers; size_t num_servers; grpc_grpclb_duration expiration_interval; -- cgit v1.2.3