From 0748f3925cf0892b4780de25199bdd82aab30e57 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 13 Jan 2017 09:22:44 -0800 Subject: Store subchannel address in a channel arg. --- src/core/ext/client_channel/connector.h | 3 -- src/core/ext/client_channel/subchannel.c | 40 +++++++++++++++++----- src/core/ext/client_channel/subchannel.h | 8 +++-- src/core/ext/client_channel/subchannel_index.c | 12 ------- src/core/ext/lb_policy/pick_first/pick_first.c | 15 ++++++-- src/core/ext/lb_policy/round_robin/round_robin.c | 15 ++++++-- .../ext/transport/chttp2/client/chttp2_connector.c | 10 ++++-- 7 files changed, 69 insertions(+), 34 deletions(-) (limited to 'src') diff --git a/src/core/ext/client_channel/connector.h b/src/core/ext/client_channel/connector.h index 3de061620e..395f89b3b2 100644 --- a/src/core/ext/client_channel/connector.h +++ b/src/core/ext/client_channel/connector.h @@ -48,9 +48,6 @@ struct grpc_connector { typedef struct { /** set of pollsets interested in this connection */ grpc_pollset_set *interested_parties; - /** address to connect to */ - const grpc_resolved_address *addr; - size_t addr_len; /** initial connect string to send */ grpc_slice initial_connect_string; /** deadline for connection */ diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index 1bac82b451..7a2ad98433 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -41,9 +41,12 @@ #include "src/core/ext/client_channel/client_channel.h" #include "src/core/ext/client_channel/initial_connect_string.h" +#include "src/core/ext/client_channel/parse_address.h" #include "src/core/ext/client_channel/subchannel_index.h" +#include "src/core/ext/client_channel/uri_parser.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" @@ -95,8 +98,6 @@ struct grpc_subchannel { size_t num_filters; /** channel arguments */ grpc_channel_args *args; - /** address to connect to */ - grpc_resolved_address *addr; grpc_subchannel_key *key; @@ -211,7 +212,6 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_subchannel *c = arg; gpr_free((void *)c->filters); grpc_channel_args_destroy(exec_ctx, c->args); - gpr_free(c->addr); grpc_slice_unref_internal(exec_ctx, c->initial_connect_string); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); @@ -327,12 +327,22 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, } else { c->filters = NULL; } - c->addr = gpr_malloc(sizeof(grpc_resolved_address)); - if (args->addr->len) - memcpy(c->addr, args->addr, sizeof(grpc_resolved_address)); c->pollset_set = grpc_pollset_set_create(); - grpc_set_initial_connect_string(&c->addr, &c->initial_connect_string); - c->args = grpc_channel_args_copy(args->args); + const grpc_arg *addr_arg = + grpc_channel_args_find(args->args, GRPC_ARG_SUBCHANNEL_ADDRESS); + GPR_ASSERT(addr_arg != NULL); // Should have been set by LB policy. + grpc_resolved_address *addr = gpr_malloc(sizeof(*addr)); + grpc_uri_to_sockaddr(addr_arg->value.string, addr); + grpc_set_initial_connect_string(&addr, &c->initial_connect_string); + static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; + grpc_arg new_arg; + new_arg.key = GRPC_ARG_SUBCHANNEL_ADDRESS; + new_arg.type = GRPC_ARG_STRING; + new_arg.value.string = grpc_sockaddr_to_uri(addr); + gpr_free(addr); + c->args = grpc_channel_args_copy_and_add_and_remove( + args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1); + gpr_free(new_arg.value.string); c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; grpc_closure_init(&c->connected, subchannel_connected, c, @@ -385,7 +395,6 @@ static void continue_connect_locked(grpc_exec_ctx *exec_ctx, grpc_connect_in_args args; args.interested_parties = c->pollset_set; - args.addr = c->addr; args.deadline = c->next_attempt; args.channel_args = c->args; args.initial_connect_string = c->initial_connect_string; @@ -771,3 +780,16 @@ grpc_call_stack *grpc_subchannel_call_get_call_stack( grpc_subchannel_call *subchannel_call) { return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); } + +void grpc_uri_to_sockaddr(char *uri_str, grpc_resolved_address *addr) { + grpc_uri *uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */); + GPR_ASSERT(uri != NULL); + if (strcmp(uri->scheme, "ipv4") == 0) { + GPR_ASSERT(parse_ipv4(uri, addr)); + } else if (strcmp(uri->scheme, "ipv6") == 0) { + GPR_ASSERT(parse_ipv6(uri, addr)); + } else { + GPR_ASSERT(parse_unix(uri, addr)); + } + grpc_uri_destroy(uri); +} diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h index 24aa9f73dc..4cc33fc79b 100644 --- a/src/core/ext/client_channel/subchannel.h +++ b/src/core/ext/client_channel/subchannel.h @@ -40,6 +40,9 @@ #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata.h" +// Channel arg containing a grpc_resolved_address to connect to. +#define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address" + /** A (sub-)channel that knows how to connect to exactly one target address. Provides a target for load balancing. */ typedef struct grpc_subchannel grpc_subchannel; @@ -164,8 +167,6 @@ struct grpc_subchannel_args { size_t filter_count; /** Channel arguments to be supplied to the newly created channel */ const grpc_channel_args *args; - /** Address to connect to */ - grpc_resolved_address *addr; }; /** create a subchannel given a connector */ @@ -173,4 +174,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, grpc_connector *connector, const grpc_subchannel_args *args); +/// Sets \a addr from \a uri_str. +void grpc_uri_to_sockaddr(char *uri_str, grpc_resolved_address *addr); + #endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_SUBCHANNEL_H */ diff --git a/src/core/ext/client_channel/subchannel_index.c b/src/core/ext/client_channel/subchannel_index.c index 1ebe03ef11..11889300a2 100644 --- a/src/core/ext/client_channel/subchannel_index.c +++ b/src/core/ext/client_channel/subchannel_index.c @@ -86,11 +86,6 @@ static grpc_subchannel_key *create_key( } else { k->args.filters = NULL; } - k->args.addr = gpr_malloc(sizeof(grpc_resolved_address)); - k->args.addr->len = args->addr->len; - if (k->args.addr->len > 0) { - memcpy(k->args.addr, args->addr, sizeof(grpc_resolved_address)); - } k->args.args = copy_channel_args(args->args); return k; } @@ -108,14 +103,8 @@ static int subchannel_key_compare(grpc_subchannel_key *a, grpc_subchannel_key *b) { int c = GPR_ICMP(a->connector, b->connector); if (c != 0) return c; - c = GPR_ICMP(a->args.addr->len, b->args.addr->len); - if (c != 0) return c; c = GPR_ICMP(a->args.filter_count, b->args.filter_count); if (c != 0) return c; - if (a->args.addr->len) { - c = memcmp(a->args.addr->addr, b->args.addr->addr, a->args.addr->len); - if (c != 0) return c; - } if (a->args.filter_count > 0) { c = memcmp(a->args.filters, b->args.filters, a->args.filter_count * sizeof(*a->args.filters)); @@ -129,7 +118,6 @@ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, grpc_connector_unref(exec_ctx, k->connector); gpr_free((grpc_channel_args *)k->args.filters); grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)k->args.args); - gpr_free(k->args.addr); gpr_free(k); } diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 821becff69..4677fb2343 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -36,7 +36,9 @@ #include #include "src/core/ext/client_channel/lb_policy_registry.h" +#include "src/core/ext/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" typedef struct pending_pick { @@ -466,11 +468,18 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, } memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - sc_args.addr = &addresses->addresses[i].address; - sc_args.args = args->args; - + grpc_arg addr_arg; + addr_arg.key = GRPC_ARG_SUBCHANNEL_ADDRESS; + addr_arg.type = GRPC_ARG_STRING; + addr_arg.value.string = + grpc_sockaddr_to_uri(&addresses->addresses[i].address); + grpc_channel_args *new_args = + grpc_channel_args_copy_and_add(args->args, &addr_arg, 1); + gpr_free(addr_arg.value.string); + sc_args.args = new_args; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); + grpc_channel_args_destroy(exec_ctx, new_args); if (subchannel != NULL) { p->subchannels[subchannel_idx++] = subchannel; diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index cb679489c3..3fe157a23c 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -64,8 +64,10 @@ #include #include "src/core/ext/client_channel/lb_policy_registry.h" +#include "src/core/ext/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/static_metadata.h" @@ -729,11 +731,18 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, if (addresses->addresses[i].is_balancer) continue; memset(&sc_args, 0, sizeof(grpc_subchannel_args)); - sc_args.addr = &addresses->addresses[i].address; - sc_args.args = args->args; - + grpc_arg addr_arg; + addr_arg.key = GRPC_ARG_SUBCHANNEL_ADDRESS; + addr_arg.type = GRPC_ARG_STRING; + addr_arg.value.string = + grpc_sockaddr_to_uri(&addresses->addresses[i].address); + grpc_channel_args *new_args = + grpc_channel_args_copy_and_add(args->args, &addr_arg, 1); + gpr_free(addr_arg.value.string); + sc_args.args = new_args; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); + grpc_channel_args_destroy(exec_ctx, new_args); if (subchannel != NULL) { subchannel_data *sd = gpr_malloc(sizeof(*sd)); diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c index 2c5dfaea60..ebe884b115 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.c +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -43,6 +43,7 @@ #include "src/core/ext/client_channel/connector.h" #include "src/core/ext/client_channel/http_connect_handshaker.h" +#include "src/core/ext/client_channel/subchannel.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" @@ -220,6 +221,11 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx, grpc_connect_out_args *result, grpc_closure *notify) { chttp2_connector *c = (chttp2_connector *)con; + const grpc_arg *addr_arg = + grpc_channel_args_find(args->channel_args, GRPC_ARG_SUBCHANNEL_ADDRESS); + GPR_ASSERT(addr_arg != NULL); // Should have been set by LB policy. + grpc_resolved_address addr; + grpc_uri_to_sockaddr(addr_arg->value.string, &addr); gpr_mu_lock(&c->mu); GPR_ASSERT(c->notify == NULL); c->notify = notify; @@ -231,8 +237,8 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx, GPR_ASSERT(!c->connecting); c->connecting = true; grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint, - args->interested_parties, args->channel_args, - args->addr, args->deadline); + args->interested_parties, args->channel_args, &addr, + args->deadline); gpr_mu_unlock(&c->mu); } -- cgit v1.2.3