From 331b9c02f9372ef832e612d48b90991c181ba8a7 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Mon, 12 Sep 2016 18:37:05 -0700 Subject: Moved LB token changes solely into grpclb.c --- src/core/ext/resolver/dns/native/dns_resolver.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'src/core/ext/resolver/dns/native') diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 79682e78b5..1841a75845 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -175,7 +175,14 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_lb_policy_args lb_policy_args; result = grpc_resolver_result_create(); memset(&lb_policy_args, 0, sizeof(lb_policy_args)); - lb_policy_args.addresses = addresses; + lb_policy_args.num_addresses = addresses->naddrs; + lb_policy_args.lb_addresses = + gpr_malloc(sizeof(grpc_lb_address) * lb_policy_args.num_addresses); + memset(lb_policy_args.lb_addresses, 0, + sizeof(grpc_lb_address) * lb_policy_args.num_addresses); + for (size_t i = 0; i < addresses->naddrs; ++i) { + lb_policy_args.lb_addresses[i].resolved_address = &r->addresses->addrs[i]; + } lb_policy_args.client_channel_factory = r->client_channel_factory; lb_policy = grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); -- cgit v1.2.3 From f47d6fbdccd6e3c64b0ff70b6a63236819886540 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 14 Sep 2016 12:59:17 -0700 Subject: More PR comments --- src/core/ext/client_config/lb_policy_factory.h | 15 +-- src/core/ext/lb_policy/grpclb/grpclb.c | 107 ++++++++++++--------- src/core/ext/lb_policy/pick_first/pick_first.c | 9 +- src/core/ext/lb_policy/round_robin/round_robin.c | 9 +- src/core/ext/resolver/dns/native/dns_resolver.c | 7 +- src/core/ext/resolver/sockaddr/sockaddr_resolver.c | 8 +- test/cpp/grpclb/grpclb_test.cc | 63 ++++++------ 7 files changed, 118 insertions(+), 100 deletions(-) (limited to 'src/core/ext/resolver/dns/native') diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h index d2d1a0f16e..7191ca7d89 100644 --- a/src/core/ext/client_config/lb_policy_factory.h +++ b/src/core/ext/client_config/lb_policy_factory.h @@ -48,22 +48,17 @@ struct grpc_lb_policy_factory { }; /** A resolved address alongside any LB related information associated with it. - * \a user_data, if not \a NULL, is opaque and meant to be consumed by the gRPC - * LB policy. Anywhere else, refer to the functions in \a - * grpc_lb_policy_user_data_vtable to operate with it */ + * \a user_data, if not NULL, contains opaque data meant to be consumed by the + * gRPC LB policy. Note that no all LB policies support \a user_data as input. + * Those who don't will simply ignore it and will correspondingly return NULL in + * their namesake pick() output argument. */ typedef struct grpc_lb_address { grpc_resolved_address *resolved_address; void *user_data; } grpc_lb_address; -/** Functions acting upon the opaque \a grpc_lb_address.user_data */ -typedef struct grpc_lb_policy_user_data_vtable { - void *(*copy)(void *); - void (*destroy)(void *); -} grpc_lb_policy_user_data_vtable; - typedef struct grpc_lb_policy_args { - grpc_lb_address *lb_addresses; + grpc_lb_address *addresses; size_t num_addresses; grpc_client_channel_factory *client_channel_factory; } grpc_lb_policy_args; diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 9a176ba0d1..1150451116 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -125,9 +125,16 @@ static void *user_data_copy(void *user_data) { return GRPC_MDELEM_REF(user_data); } -static void user_data_destroy(void *user_data) { - if (user_data == NULL) return; - GRPC_MDELEM_UNREF(user_data); +static void lb_addrs_destroy(grpc_lb_address *lb_addresses, + size_t num_addresses) { + /* free "resolved" addresses memblock */ + gpr_free(lb_addresses->resolved_address); + for (size_t i = 0; i < num_addresses; ++i) { + if (lb_addresses[i].user_data != NULL) { + GRPC_MDELEM_UNREF(lb_addresses[i].user_data); + } + } + gpr_free(lb_addresses); } /* add lb_token of selected subchannel (address) to the call's initial @@ -385,21 +392,12 @@ static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist, * Given that the validity tests are very cheap, they are performed again * instead of marking the valid ones during the first pass, as this would * incurr in an allocation due to the arbitrary number of server */ - size_t num_processed = 0; - for (size_t i = 0; i < num_valid; ++i) { - const grpc_grpclb_server *server = serverlist->servers[i]; - if (!is_server_valid(serverlist->servers[i], i, false)) continue; - grpc_lb_address *const lb_addr = &lb_addrs[i]; - - /* lb token processing */ - if (server->has_load_balance_token) { - const size_t lb_token_size = - GPR_ARRAY_SIZE(server->load_balance_token) - 1; - grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer( - (uint8_t *)server->load_balance_token, lb_token_size); - lb_addr->user_data = grpc_mdelem_from_metadata_strings( - GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr); - } + size_t addr_idx = 0; + for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) { + GPR_ASSERT(addr_idx < num_valid); + const grpc_grpclb_server *server = serverlist->servers[sl_idx]; + if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue; + grpc_lb_address *const lb_addr = &lb_addrs[addr_idx]; /* address processing */ const uint16_t netorder_port = htons((uint16_t)server->port); @@ -407,7 +405,7 @@ static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist, * server->ip_address.bytes. */ const grpc_grpclb_ip_address *ip = &server->ip_address; - lb_addr->resolved_address = &r_addrs_memblock[i]; + lb_addr->resolved_address = &r_addrs_memblock[addr_idx]; struct sockaddr_storage *sa = (struct sockaddr_storage *)lb_addr->resolved_address->addr; size_t *sa_len = &lb_addr->resolved_address->len; @@ -428,9 +426,25 @@ static size_t process_serverlist(const grpc_grpclb_serverlist *serverlist, addr6->sin6_port = netorder_port; } GPR_ASSERT(*sa_len > 0); - ++num_processed; + + /* lb token processing */ + if (server->has_load_balance_token) { + const size_t lb_token_size = + GPR_ARRAY_SIZE(server->load_balance_token) - 1; + grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer( + (uint8_t *)server->load_balance_token, lb_token_size); + lb_addr->user_data = grpc_mdelem_from_metadata_strings( + GRPC_MDSTR_LOAD_REPORTING_INITIAL, lb_token_mdstr); + } else { + gpr_log(GPR_ERROR, + "Missing LB token for backend address '%s'. The empty token will " + "be used instead", + grpc_sockaddr_to_uri((struct sockaddr *)sa)); + lb_addr->user_data = GRPC_MDELEM_LOAD_REPORTING_INITIAL_EMPTY; + } + ++addr_idx; } - GPR_ASSERT(num_processed == num_valid); + GPR_ASSERT(addr_idx == num_valid); *lb_addresses = lb_addrs; return num_valid; } @@ -444,26 +458,19 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, memset(&args, 0, sizeof(args)); args.client_channel_factory = glb_policy->cc_factory; const size_t num_ok_addresses = - process_serverlist(serverlist, &args.lb_addresses); + process_serverlist(serverlist, &args.addresses); args.num_addresses = num_ok_addresses; grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); - glb_policy->num_ok_serverlist_addresses = num_ok_addresses; if (glb_policy->lb_addresses != NULL) { /* dispose of the previous version */ - for (size_t i = 0; i < num_ok_addresses; ++i) { - user_data_destroy(glb_policy->lb_addresses[i].user_data); - } - gpr_free(glb_policy->lb_addresses); + lb_addrs_destroy(glb_policy->lb_addresses, + glb_policy->num_ok_serverlist_addresses); } + glb_policy->num_ok_serverlist_addresses = num_ok_addresses; + glb_policy->lb_addresses = args.addresses; - glb_policy->lb_addresses = args.lb_addresses; - - if (args.num_addresses > 0) { - /* free "resolved" addresses memblock */ - gpr_free(args.lb_addresses->resolved_address); - } return rr; } @@ -560,7 +567,8 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, memset(glb_policy, 0, sizeof(*glb_policy)); /* All input addresses in args->addresses come from a resolver that claims - * they are LB services. It's the resolver's responsibility to make sure this + * they are LB services. It's the resolver's responsibility to make sure + * this * policy is only instantiated and used in that case. * * Create a client channel over them to communicate with a LB service */ @@ -570,18 +578,24 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, return NULL; } + /* this LB policy doesn't support \a user_data */ + GPR_ASSERT(args->addresses[0].user_data == NULL); + /* construct a target from the addresses in args, given in the form * ipvX://ip1:port1,ip2:port2,... * TODO(dgq): support mixed ip version */ char **addr_strs = gpr_malloc(sizeof(char *) * args->num_addresses); addr_strs[0] = grpc_sockaddr_to_uri( - (const struct sockaddr *)&args->lb_addresses[0].resolved_address->addr); + (const struct sockaddr *)&args->addresses[0].resolved_address->addr); for (size_t i = 1; i < args->num_addresses; i++) { + /* this LB policy doesn't support \a user_data */ + GPR_ASSERT(args->addresses[i].user_data == NULL); + GPR_ASSERT( - grpc_sockaddr_to_string(&addr_strs[i], - (const struct sockaddr *)&args->lb_addresses[i] - .resolved_address->addr, - true) == 0); + grpc_sockaddr_to_string( + &addr_strs[i], + (const struct sockaddr *)&args->addresses[i].resolved_address->addr, + true) == 0); } size_t uri_path_len; char *target_uri_str = gpr_strjoin_sep( @@ -630,10 +644,8 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } gpr_mu_destroy(&glb_policy->mu); - for (size_t i = 0; i < glb_policy->num_ok_serverlist_addresses; ++i) { - user_data_destroy(glb_policy->lb_addresses[i].user_data); - } - gpr_free(glb_policy->lb_addresses); + lb_addrs_destroy(glb_policy->lb_addresses, + glb_policy->num_ok_serverlist_addresses); gpr_free(glb_policy); } @@ -882,7 +894,8 @@ typedef struct lb_client_data { grpc_metadata_array initial_metadata_recv; /* initial MD from LB server */ grpc_metadata_array trailing_metadata_recv; /* trailing MD from LB server */ - /* what's being sent to the LB server. Note that its value may vary if the LB + /* what's being sent to the LB server. Note that its value may vary if the + * LB * server indicates a redirect. */ grpc_byte_buffer *request_payload; @@ -1090,7 +1103,8 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { * it'll just create the first RR policy instance */ rr_handover(exec_ctx, lb_client->glb_policy, error); } else { - /* unref the RR policy, eventually leading to its substitution with a + /* unref the RR policy, eventually leading to its substitution with + * a * new one constructed from the received serverlist (see * glb_rr_connectivity_changed) */ GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy, @@ -1155,7 +1169,8 @@ static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, lb_client->status, lb_client->status_details, lb_client->status_details_capacity); } - /* TODO(dgq): deal with stream termination properly (fire up another one? fail + /* TODO(dgq): deal with stream termination properly (fire up another one? + * fail * the original call?) */ } diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 21d948033a..d907ddd5d1 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -438,7 +438,7 @@ static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {} static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { - GPR_ASSERT(args->lb_addresses != NULL); + GPR_ASSERT(args->addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); if (args->num_addresses == 0) return NULL; @@ -451,10 +451,13 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, grpc_subchannel_args sc_args; size_t subchannel_idx = 0; for (size_t i = 0; i < args->num_addresses; i++) { + /* this LB policy doesn't support \a user_data */ + GPR_ASSERT(args->addresses[i].user_data == NULL); + memset(&sc_args, 0, sizeof(grpc_subchannel_args)); sc_args.addr = - (struct sockaddr *)(args->lb_addresses[i].resolved_address->addr); - sc_args.addr_len = (size_t)args->lb_addresses[i].resolved_address->len; + (struct sockaddr *)(args->addresses[i].resolved_address->addr); + sc_args.addr_len = (size_t)args->addresses[i].resolved_address->len; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 4d89cef9b6..1351ff0277 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -603,7 +603,7 @@ static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {} static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { - GPR_ASSERT(args->lb_addresses != NULL); + GPR_ASSERT(args->addresses != NULL); GPR_ASSERT(args->client_channel_factory != NULL); if (args->num_addresses == 0) return NULL; @@ -620,11 +620,10 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, size_t subchannel_idx = 0; for (size_t i = 0; i < p->num_addresses; i++) { memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - sc_args.addr = - (struct sockaddr *)args->lb_addresses[i].resolved_address->addr; - sc_args.addr_len = args->lb_addresses[i].resolved_address->len; + sc_args.addr = (struct sockaddr *)args->addresses[i].resolved_address->addr; + sc_args.addr_len = args->addresses[i].resolved_address->len; - p->user_data[i] = args->lb_addresses[i].user_data; + p->user_data[i] = args->addresses[i].user_data; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 1841a75845..32e9de69a6 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -176,16 +176,17 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, result = grpc_resolver_result_create(); memset(&lb_policy_args, 0, sizeof(lb_policy_args)); lb_policy_args.num_addresses = addresses->naddrs; - lb_policy_args.lb_addresses = + lb_policy_args.addresses = gpr_malloc(sizeof(grpc_lb_address) * lb_policy_args.num_addresses); - memset(lb_policy_args.lb_addresses, 0, + memset(lb_policy_args.addresses, 0, sizeof(grpc_lb_address) * lb_policy_args.num_addresses); for (size_t i = 0; i < addresses->naddrs; ++i) { - lb_policy_args.lb_addresses[i].resolved_address = &r->addresses->addrs[i]; + lb_policy_args.addresses[i].resolved_address = &r->addresses->addrs[i]; } lb_policy_args.client_channel_factory = r->client_channel_factory; lb_policy = grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); + gpr_free(lb_policy_args.addresses); if (lb_policy != NULL) { grpc_resolver_result_set_lb_policy(result, lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index 54a7e1c84c..425285287c 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -126,17 +126,17 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy_args lb_policy_args; memset(&lb_policy_args, 0, sizeof(lb_policy_args)); lb_policy_args.num_addresses = r->addresses->naddrs; - lb_policy_args.lb_addresses = + lb_policy_args.addresses = gpr_malloc(sizeof(grpc_lb_address) * lb_policy_args.num_addresses); - memset(lb_policy_args.lb_addresses, 0, + memset(lb_policy_args.addresses, 0, sizeof(grpc_lb_address) * lb_policy_args.num_addresses); for (size_t i = 0; i < lb_policy_args.num_addresses; ++i) { - lb_policy_args.lb_addresses[i].resolved_address = &r->addresses->addrs[i]; + lb_policy_args.addresses[i].resolved_address = &r->addresses->addrs[i]; } lb_policy_args.client_channel_factory = r->client_channel_factory; grpc_lb_policy *lb_policy = grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args); - gpr_free(lb_policy_args.lb_addresses); + gpr_free(lb_policy_args.addresses); grpc_resolver_result_set_lb_policy(result, lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr"); r->published = 1; diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index c044256165..95abe38031 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -37,6 +37,8 @@ #include #include +#include + #include #include #include @@ -76,6 +78,7 @@ extern "C" { // - Test against a non-LB server. That server should return UNIMPLEMENTED and // the call should fail. // - Random LB server closing the stream unexpectedly. +// - Test using DNS-resolvable names (localhost?) namespace grpc { namespace { @@ -612,27 +615,30 @@ static void fork_lb_server(void *arg) { tf->lb_server_update_delay_ms); } -static void setup_test_fixture(test_fixture *tf, - int lb_server_update_delay_ms) { - tf->lb_server_update_delay_ms = lb_server_update_delay_ms; +static test_fixture setup_test_fixture(int lb_server_update_delay_ms) { + test_fixture tf; + memset(&tf, 0, sizeof(tf)); + tf.lb_server_update_delay_ms = lb_server_update_delay_ms; gpr_thd_options options = gpr_thd_options_default(); gpr_thd_options_set_joinable(&options); for (int i = 0; i < NUM_BACKENDS; ++i) { - setup_server("127.0.0.1", &tf->lb_backends[i]); - gpr_thd_new(&tf->lb_backends[i].tid, fork_backend_server, - &tf->lb_backends[i], &options); + setup_server("127.0.0.1", &tf.lb_backends[i]); + gpr_thd_new(&tf.lb_backends[i].tid, fork_backend_server, &tf.lb_backends[i], + &options); } - setup_server("127.0.0.1", &tf->lb_server); - gpr_thd_new(&tf->lb_server.tid, fork_lb_server, &tf->lb_server, &options); + setup_server("127.0.0.1", &tf.lb_server); + gpr_thd_new(&tf.lb_server.tid, fork_lb_server, &tf.lb_server, &options); char *server_uri; gpr_asprintf(&server_uri, "ipv4:%s?lb_policy=grpclb&lb_enabled=1", - tf->lb_server.servers_hostport); - setup_client(server_uri, &tf->client); + tf.lb_server.servers_hostport); + setup_client(server_uri, &tf.client); gpr_free(server_uri); + + return tf; } static void teardown_test_fixture(test_fixture *tf) { @@ -643,19 +649,13 @@ static void teardown_test_fixture(test_fixture *tf) { teardown_server(&tf->lb_server); } -// The LB server will send two updates: batch 1 and batch 2. Each batch -// contains -// two addresses, both of a valid and running backend server. Batch 1 is -// readily -// available and provided as soon as the client establishes the streaming -// call. -// Batch 2 is sent after a delay of \a lb_server_update_delay_ms -// milliseconds. +// The LB server will send two updates: batch 1 and batch 2. Each batch contains +// two addresses, both of a valid and running backend server. Batch 1 is readily +// available and provided as soon as the client establishes the streaming call. +// Batch 2 is sent after a delay of \a lb_server_update_delay_ms milliseconds. static test_fixture test_update(int lb_server_update_delay_ms) { gpr_log(GPR_INFO, "start %s(%d)", __func__, lb_server_update_delay_ms); - test_fixture tf; - memset(&tf, 0, sizeof(tf)); - setup_test_fixture(&tf, lb_server_update_delay_ms); + test_fixture tf = setup_test_fixture(lb_server_update_delay_ms); perform_request( &tf.client); // "consumes" 1st backend server of 1st serverlist perform_request( @@ -671,13 +671,7 @@ static test_fixture test_update(int lb_server_update_delay_ms) { return tf; } -} // namespace -} // namespace grpc - -int main(int argc, char **argv) { - grpc_test_init(argc, argv); - grpc_init(); - +TEST(GrpclbTest, Updates) { grpc::test_fixture tf_result; // Clients take a bit over one second to complete a call (the last part of the // call sleeps for 1 second while verifying the client's completion queue is @@ -712,7 +706,18 @@ int main(int argc, char **argv) { GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced > 0); GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced > 0); GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 0); +} + +TEST(GrpclbTest, InvalidAddressInServerlist) {} +} // namespace +} // namespace grpc + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc_test_init(argc, argv); + grpc_init(); + const auto result = RUN_ALL_TESTS(); grpc_shutdown(); - return 0; + return result; } -- cgit v1.2.3