diff options
author | David Garcia Quintas <dgq@google.com> | 2016-08-22 15:06:49 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2016-08-22 15:06:49 -0700 |
commit | 8a81aa12fbdb6ccb0f4c80dfc1f8cdf263097780 (patch) | |
tree | f2cbab78cff03d943131b5a101eafbd95ce400e1 | |
parent | 18bc43b4eaf476b41a1a1af1c3c02d8cf70f0f52 (diff) |
Changes to grpclb and tests for binary ip addresses.
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 79 | ||||
-rw-r--r-- | src/core/ext/lb_policy/grpclb/load_balancer_api.h | 1 | ||||
-rw-r--r-- | test/cpp/grpclb/grpclb_test.cc | 42 |
3 files changed, 71 insertions, 51 deletions
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 43674504b5..d0316e648d 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -96,6 +96,9 @@ * - Implement LB service forwarding (point 2c. in the doc's diagram). */ +#include <arpa/inet.h> +#include <errno.h> + #include <string.h> #include <grpc/byte_buffer_reader.h> @@ -285,17 +288,7 @@ struct rr_connectivity_data { static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist, glb_lb_policy *glb_policy) { - /* TODO(dgq): support mixed ip version */ GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); - char **host_ports = gpr_malloc(sizeof(char *) * serverlist->num_servers); - for (size_t i = 0; i < serverlist->num_servers; ++i) { - gpr_join_host_port(&host_ports[i], serverlist->servers[i]->ip_address, - serverlist->servers[i]->port); - } - - size_t uri_path_len; - char *concat_ipports = gpr_strjoin_sep( - (const char **)host_ports, serverlist->num_servers, ",", &uri_path_len); grpc_lb_policy_args args; memset(&args, 0, sizeof(args)); @@ -305,38 +298,56 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses)); args.addresses->addrs = gpr_malloc(sizeof(grpc_resolved_address) * serverlist->num_servers); - size_t out_addrs_idx = 0; + size_t addr_idx = 0; for (size_t i = 0; i < serverlist->num_servers; ++i) { - grpc_uri uri; + const grpc_grpclb_server *const server = serverlist->servers[i]; + /* a minimal of error checking */ + if (server->port >> 16 != 0) { + gpr_log(GPR_ERROR, "Invalid port '%d'. Ignoring server list index %zu", + server->port, i); + continue; + } + const uint16_t netorder_port = htons((uint16_t)server->port); + /* the addresses are given in binary format (a in(6)_addr struct) in + * server->ip_address.bytes. */ + const grpc_grpclb_ip_address *ip = &server->ip_address; struct sockaddr_storage sa; - size_t sa_len; - uri.path = host_ports[i]; - if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */ - memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len); - args.addresses->addrs[out_addrs_idx].len = sa_len; - ++out_addrs_idx; - const size_t token_max_size = - GPR_ARRAY_SIZE(serverlist->servers[i]->load_balance_token); - serverlist->servers[i]->load_balance_token[token_max_size - 1] = '\0'; - args.tokens[i].token_size = - strlen(serverlist->servers[i]->load_balance_token); - args.tokens[i].token = gpr_malloc(args.tokens[i].token_size); - memcpy(args.tokens[i].token, serverlist->servers[i]->load_balance_token, - args.tokens[i].token_size); + size_t sa_len = 0; + if (ip->size == 4) { + struct sockaddr_in *addr4 = (struct sockaddr_in *)&sa; + memset(addr4, 0, sizeof(struct sockaddr_in)); + sa_len = sizeof(struct sockaddr_in); + addr4->sin_family = AF_INET; + memcpy(&addr4->sin_addr, ip->bytes, ip->size); + addr4->sin_port = netorder_port; + } else if (ip->size == 6) { + struct sockaddr_in *addr6 = (struct sockaddr_in *)&sa; + memset(addr6, 0, sizeof(struct sockaddr_in)); + sa_len = sizeof(struct sockaddr_in); + addr6->sin_family = AF_INET; + memcpy(&addr6->sin_addr, ip->bytes, ip->size); + addr6->sin_port = netorder_port; } else { - gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.", - host_ports[i]); + gpr_log(GPR_ERROR, + "Expected IP to be 4 or 16 bytes. Got %d. Ignoring server list " + "index %zu", + ip->size, i); + continue; } + GPR_ASSERT(sa_len > 0); + memcpy(args.addresses->addrs[addr_idx].addr, &sa, sa_len); + args.addresses->addrs[addr_idx].len = sa_len; + ++addr_idx; + + args.tokens[i].token_size = GPR_ARRAY_SIZE(server->load_balance_token) - 1; + args.tokens[i].token = gpr_malloc(args.tokens[i].token_size); + memcpy(args.tokens[i].token, server->load_balance_token, + args.tokens[i].token_size); } - args.addresses->naddrs = out_addrs_idx; + args.addresses->naddrs = addr_idx; grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); - gpr_free(concat_ipports); - for (size_t i = 0; i < serverlist->num_servers; i++) { - gpr_free(host_ports[i]); - } - gpr_free(host_ports); gpr_free(args.addresses->addrs); gpr_free(args.addresses); gpr_free(args.tokens); diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/lb_policy/grpclb/load_balancer_api.h index 9726c87a37..c1e73d08ef 100644 --- a/src/core/ext/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.h @@ -45,6 +45,7 @@ extern "C" { #define GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH 128 +typedef grpc_lb_v1_Server_ip_address_t grpc_grpclb_ip_address; typedef grpc_lb_v1_LoadBalanceRequest grpc_grpclb_request; typedef grpc_lb_v1_InitialLoadBalanceResponse grpc_grpclb_initial_response; typedef grpc_lb_v1_Server grpc_grpclb_server; diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index 487ef0c26d..1e90640313 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -37,7 +37,8 @@ #include <cstring> #include <string> -extern "C" { +#include <arpa/inet.h> + #include <grpc/grpc.h> #include <grpc/impl/codegen/byte_buffer_reader.h> #include <grpc/support/alloc.h> @@ -48,6 +49,9 @@ extern "C" { #include <grpc/support/thd.h> #include <grpc/support/time.h> +#include <grpc++/impl/codegen/config.h> + +extern "C" { #include "src/core/ext/client_config/client_channel.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/support/string.h" @@ -107,8 +111,8 @@ static gpr_slice build_response_payload_slice( int64_t expiration_interval_secs, int32_t expiration_interval_nanos) { // server_list { // servers { - // ip_address: "127.0.0.1" - // port: ... + // ip_address: <in_addr/6 bytes of an IP> + // port: <16 bit uint> // load_balance_token: "token..." // } // ... @@ -127,21 +131,21 @@ static gpr_slice build_response_payload_slice( } for (size_t i = 0; i < nports; i++) { auto *server = serverlist->add_servers(); - server->set_ip_address(host); + // TODO(dgq): test ipv6 + struct in_addr ip4; + GPR_ASSERT(inet_pton(AF_INET, host, &ip4) == 1); + server->set_ip_address( + grpc::string(reinterpret_cast<const char *>(&ip4), sizeof(ip4))); server->set_port(ports[i]); // The following long long int cast is meant to work around the // disfunctional implementation of std::to_string in gcc 4.4, which doesn't // have a version for int but does have one for long long int. - server->set_load_balance_token("token" + - std::to_string((long long int)ports[i])); + string token_data = "token" + std::to_string((long long int)ports[i]); + token_data.resize(64, '-'); + server->set_load_balance_token(token_data); } - - gpr_log(GPR_INFO, "generating response: %s", - response.ShortDebugString().c_str()); - - const gpr_slice response_slice = - gpr_slice_from_copied_string(response.SerializeAsString().c_str()); - return response_slice; + const grpc::string &enc_resp = response.SerializeAsString(); + return gpr_slice_from_copied_buffer(enc_resp.data(), enc_resp.size()); } static void drain_cq(grpc_completion_queue *cq) { @@ -321,11 +325,15 @@ static void start_backend_server(server_fixture *sf) { return; } GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); - char *expected_token; - GPR_ASSERT(gpr_asprintf(&expected_token, "token%d", sf->port) > 0); + + // The following long long int cast is meant to work around the + // disfunctional implementation of std::to_string in gcc 4.4, which doesn't + // have a version for int but does have one for long long int. + string expected_token = "token" + std::to_string((long long int)sf->port); + expected_token.resize(64, '-'); GPR_ASSERT(contains_metadata(&request_metadata_recv, - "load-reporting-initial", expected_token)); - gpr_free(expected_token); + "load-reporting-initial", + expected_token.c_str())); gpr_log(GPR_INFO, "Server[%s] after tag 100", sf->servers_hostport); |