diff options
author | 2016-09-21 08:12:11 -0700 | |
---|---|---|
committer | 2016-09-21 08:12:11 -0700 | |
commit | 933c955aafc734b75136510df38e877840c1f649 (patch) | |
tree | 4f3dd658417d51c2da53b0f2e0285b437cc178e0 /src/core/ext/client_config | |
parent | 89675154020e5bdabe654b5b65087fe3935b696a (diff) | |
parent | 5bedd48c0e8b9582841c511037bc20781a6c681c (diff) |
Merge remote-tracking branch 'upstream/master' into http_connect
Diffstat (limited to 'src/core/ext/client_config')
-rw-r--r-- | src/core/ext/client_config/client_channel.c | 34 | ||||
-rw-r--r-- | src/core/ext/client_config/client_channel.h | 13 | ||||
-rw-r--r-- | src/core/ext/client_config/lb_policy_factory.c | 20 | ||||
-rw-r--r-- | src/core/ext/client_config/lb_policy_factory.h | 13 | ||||
-rw-r--r-- | src/core/ext/client_config/resolver_factory.h | 5 | ||||
-rw-r--r-- | src/core/ext/client_config/resolver_registry.c | 4 | ||||
-rw-r--r-- | src/core/ext/client_config/resolver_registry.h | 3 | ||||
-rw-r--r-- | src/core/ext/client_config/resolver_result.c | 65 | ||||
-rw-r--r-- | src/core/ext/client_config/resolver_result.h | 35 | ||||
-rw-r--r-- | src/core/ext/client_config/subchannel.c | 26 | ||||
-rw-r--r-- | src/core/ext/client_config/subchannel.h | 2 | ||||
-rw-r--r-- | src/core/ext/client_config/subchannel_index.c | 2 |
12 files changed, 151 insertions, 71 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 6b779efe57..b2b4fea83c 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -42,6 +42,7 @@ #include <grpc/support/sync.h> #include <grpc/support/useful.h> +#include "src/core/ext/client_config/lb_policy_registry.h" #include "src/core/ext/client_config/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" @@ -63,6 +64,8 @@ typedef struct client_channel_channel_data { grpc_resolver *resolver; /** have we started resolving this channel */ bool started_resolving; + /** client channel factory */ + grpc_client_channel_factory *client_channel_factory; /** mutex protecting client configuration, including all variables below in this data structure */ @@ -173,20 +176,28 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy"); if (chand->resolver_result != NULL) { - lb_policy = grpc_resolver_result_get_lb_policy(chand->resolver_result); + grpc_lb_policy_args lb_policy_args; + lb_policy_args.server_name = + grpc_resolver_result_get_server_name(chand->resolver_result); + lb_policy_args.addresses = + grpc_resolver_result_get_addresses(chand->resolver_result); + lb_policy_args.additional_args = + grpc_resolver_result_get_lb_policy_args(chand->resolver_result); + lb_policy_args.client_channel_factory = chand->client_channel_factory; + lb_policy = grpc_lb_policy_create( + exec_ctx, + grpc_resolver_result_get_lb_policy_name(chand->resolver_result), + &lb_policy_args); if (lb_policy != NULL) { - GRPC_LB_POLICY_REF(lb_policy, "channel"); GRPC_LB_POLICY_REF(lb_policy, "config_change"); GRPC_ERROR_UNREF(state_error); state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error); } - grpc_resolver_result_unref(exec_ctx, chand->resolver_result); + chand->resolver_result = NULL; } - chand->resolver_result = NULL; - if (lb_policy != NULL) { grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties, chand->interested_parties); @@ -346,6 +357,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_resolver_shutdown(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); } + if (chand->client_channel_factory != NULL) { + grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory); + } if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, chand->lb_policy->interested_parties, @@ -767,10 +781,12 @@ const grpc_channel_filter grpc_client_channel_filter = { "client-channel", }; -void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, - grpc_channel_stack *channel_stack, - grpc_resolver *resolver) { +void grpc_client_channel_finish_initialization( + grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, + grpc_resolver *resolver, + grpc_client_channel_factory *client_channel_factory) { /* post construction initialization: set the transport setup pointer */ + GPR_ASSERT(client_channel_factory != NULL); grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); channel_data *chand = elem->channel_data; gpr_mu_lock(&chand->mu); @@ -784,6 +800,8 @@ void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, grpc_resolver_next(exec_ctx, resolver, &chand->resolver_result, &chand->on_resolver_result_changed); } + chand->client_channel_factory = client_channel_factory; + grpc_client_channel_factory_ref(client_channel_factory); gpr_mu_unlock(&chand->mu); } diff --git a/src/core/ext/client_config/client_channel.h b/src/core/ext/client_config/client_channel.h index 1e47ad34ad..abb5ac4d87 100644 --- a/src/core/ext/client_config/client_channel.h +++ b/src/core/ext/client_config/client_channel.h @@ -34,6 +34,7 @@ #ifndef GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H #define GRPC_CORE_EXT_CLIENT_CONFIG_CLIENT_CHANNEL_H +#include "src/core/ext/client_config/client_channel_factory.h" #include "src/core/ext/client_config/resolver.h" #include "src/core/lib/channel/channel_stack.h" @@ -46,12 +47,12 @@ extern const grpc_channel_filter grpc_client_channel_filter; -/* post-construction initializer to let the client channel know which - transport setup it should cancel upon destruction, or initiate when it needs - a connection */ -void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, - grpc_channel_stack *channel_stack, - grpc_resolver *resolver); +/* Post-construction initializer to give the client channel its resolver + and factory. */ +void grpc_client_channel_finish_initialization( + grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, + grpc_resolver *resolver, + grpc_client_channel_factory *client_channel_factory); grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect); diff --git a/src/core/ext/client_config/lb_policy_factory.c b/src/core/ext/client_config/lb_policy_factory.c index f86117aa38..c17af91a09 100644 --- a/src/core/ext/client_config/lb_policy_factory.c +++ b/src/core/ext/client_config/lb_policy_factory.c @@ -34,6 +34,7 @@ #include <string.h> #include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> #include "src/core/ext/client_config/lb_policy_factory.h" @@ -46,6 +47,25 @@ grpc_lb_addresses* grpc_lb_addresses_create(size_t num_addresses) { return addresses; } +grpc_lb_addresses* grpc_lb_addresses_copy(grpc_lb_addresses* addresses, + void* (*user_data_copy)(void*)) { + grpc_lb_addresses* new_addresses = + grpc_lb_addresses_create(addresses->num_addresses); + memcpy(new_addresses->addresses, addresses->addresses, + sizeof(grpc_lb_address) * addresses->num_addresses); + for (size_t i = 0; i < addresses->num_addresses; ++i) { + if (new_addresses->addresses[i].balancer_name != NULL) { + new_addresses->addresses[i].balancer_name = + gpr_strdup(new_addresses->addresses[i].balancer_name); + } + if (user_data_copy != NULL) { + new_addresses->addresses[i].user_data = + user_data_copy(new_addresses->addresses[i].user_data); + } + } + return new_addresses; +} + void grpc_lb_addresses_set_address(grpc_lb_addresses* addresses, size_t index, void* address, size_t address_len, bool is_balancer, char* balancer_name, diff --git a/src/core/ext/client_config/lb_policy_factory.h b/src/core/ext/client_config/lb_policy_factory.h index 6247db2b61..ade55704f2 100644 --- a/src/core/ext/client_config/lb_policy_factory.h +++ b/src/core/ext/client_config/lb_policy_factory.h @@ -36,9 +36,9 @@ #include "src/core/ext/client_config/client_channel_factory.h" #include "src/core/ext/client_config/lb_policy.h" -#include "src/core/ext/client_config/resolver_result.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/resolve_address.h" typedef struct grpc_lb_policy_factory grpc_lb_policy_factory; typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable; @@ -68,6 +68,11 @@ typedef struct grpc_lb_addresses { * \a num_addresses addresses. */ grpc_lb_addresses *grpc_lb_addresses_create(size_t num_addresses); +/** Creates a copy of \a addresses. If \a user_data_copy is not NULL, + * it will be invoked to copy the \a user_data field of each address. */ +grpc_lb_addresses *grpc_lb_addresses_copy(grpc_lb_addresses *addresses, + void *(*user_data_copy)(void *)); + /** Sets the value of the address at index \a index of \a addresses. * \a address is a socket address of length \a address_len. * Takes ownership of \a balancer_name. */ @@ -82,10 +87,14 @@ void grpc_lb_addresses_destroy(grpc_lb_addresses *addresses, void (*user_data_destroy)(void *)); /** Arguments passed to LB policies. */ +/* TODO(roth, ctiller): Consider replacing this struct with + grpc_channel_args. See comment in resolver_result.h for details. */ typedef struct grpc_lb_policy_args { - char *server_name; + const char *server_name; grpc_lb_addresses *addresses; grpc_client_channel_factory *client_channel_factory; + /* Can be used to pass implementation-specific parameters to the LB policy. */ + grpc_channel_args *additional_args; } grpc_lb_policy_args; struct grpc_lb_policy_factory_vtable { diff --git a/src/core/ext/client_config/resolver_factory.h b/src/core/ext/client_config/resolver_factory.h index f69bf79564..9ec5b9a70e 100644 --- a/src/core/ext/client_config/resolver_factory.h +++ b/src/core/ext/client_config/resolver_factory.h @@ -47,10 +47,7 @@ struct grpc_resolver_factory { const grpc_resolver_factory_vtable *vtable; }; -typedef struct grpc_resolver_args { - grpc_uri *uri; - grpc_client_channel_factory *client_channel_factory; -} grpc_resolver_args; +typedef struct grpc_resolver_args { grpc_uri *uri; } grpc_resolver_args; struct grpc_resolver_factory_vtable { void (*ref)(grpc_resolver_factory *factory); diff --git a/src/core/ext/client_config/resolver_registry.c b/src/core/ext/client_config/resolver_registry.c index e7a4abd568..b5308a140c 100644 --- a/src/core/ext/client_config/resolver_registry.c +++ b/src/core/ext/client_config/resolver_registry.c @@ -128,15 +128,13 @@ static grpc_resolver_factory *resolve_factory(const char *target, return factory; } -grpc_resolver *grpc_resolver_create( - const char *target, grpc_client_channel_factory *client_channel_factory) { +grpc_resolver *grpc_resolver_create(const char *target) { grpc_uri *uri = NULL; grpc_resolver_factory *factory = resolve_factory(target, &uri); grpc_resolver *resolver; grpc_resolver_args args; memset(&args, 0, sizeof(args)); args.uri = uri; - args.client_channel_factory = client_channel_factory; resolver = grpc_resolver_factory_create_resolver(factory, &args); grpc_uri_destroy(uri); return resolver; diff --git a/src/core/ext/client_config/resolver_registry.h b/src/core/ext/client_config/resolver_registry.h index 5ef1383cd3..92e248d548 100644 --- a/src/core/ext/client_config/resolver_registry.h +++ b/src/core/ext/client_config/resolver_registry.h @@ -55,8 +55,7 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory); If a resolver factory was found, use it to instantiate a resolver and return it. If a resolver factory was not found, return NULL. */ -grpc_resolver *grpc_resolver_create( - const char *target, grpc_client_channel_factory *client_channel_factory); +grpc_resolver *grpc_resolver_create(const char *target); /** Find a resolver factory given a name and return an (owned-by-the-caller) * reference to it */ diff --git a/src/core/ext/client_config/resolver_result.c b/src/core/ext/client_config/resolver_result.c index 242989a0da..63480d152b 100644 --- a/src/core/ext/client_config/resolver_result.c +++ b/src/core/ext/client_config/resolver_result.c @@ -34,40 +34,61 @@ #include <string.h> #include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/channel/channel_args.h" struct grpc_resolver_result { gpr_refcount refs; - grpc_lb_policy* lb_policy; + char* server_name; + grpc_lb_addresses* addresses; + char* lb_policy_name; + grpc_channel_args* lb_policy_args; }; -grpc_resolver_result* grpc_resolver_result_create() { - grpc_resolver_result* c = gpr_malloc(sizeof(*c)); - memset(c, 0, sizeof(*c)); - gpr_ref_init(&c->refs, 1); - return c; +grpc_resolver_result* grpc_resolver_result_create( + const char* server_name, grpc_lb_addresses* addresses, + const char* lb_policy_name, grpc_channel_args* lb_policy_args) { + grpc_resolver_result* result = gpr_malloc(sizeof(*result)); + memset(result, 0, sizeof(*result)); + gpr_ref_init(&result->refs, 1); + result->server_name = gpr_strdup(server_name); + result->addresses = addresses; + result->lb_policy_name = gpr_strdup(lb_policy_name); + result->lb_policy_args = lb_policy_args; + return result; } -void grpc_resolver_result_ref(grpc_resolver_result* c) { gpr_ref(&c->refs); } +void grpc_resolver_result_ref(grpc_resolver_result* result) { + gpr_ref(&result->refs); +} void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx, - grpc_resolver_result* c) { - if (gpr_unref(&c->refs)) { - if (c->lb_policy != NULL) { - GRPC_LB_POLICY_UNREF(exec_ctx, c->lb_policy, "resolver_result"); - } - gpr_free(c); + grpc_resolver_result* result) { + if (gpr_unref(&result->refs)) { + gpr_free(result->server_name); + grpc_lb_addresses_destroy(result->addresses, NULL /* user_data_destroy */); + gpr_free(result->lb_policy_name); + grpc_channel_args_destroy(result->lb_policy_args); + gpr_free(result); } } -void grpc_resolver_result_set_lb_policy(grpc_resolver_result* c, - grpc_lb_policy* lb_policy) { - GPR_ASSERT(c->lb_policy == NULL); - if (lb_policy) { - GRPC_LB_POLICY_REF(lb_policy, "resolver_result"); - } - c->lb_policy = lb_policy; +const char* grpc_resolver_result_get_server_name(grpc_resolver_result* result) { + return result->server_name; +} + +grpc_lb_addresses* grpc_resolver_result_get_addresses( + grpc_resolver_result* result) { + return result->addresses; +} + +const char* grpc_resolver_result_get_lb_policy_name( + grpc_resolver_result* result) { + return result->lb_policy_name; } -grpc_lb_policy* grpc_resolver_result_get_lb_policy(grpc_resolver_result* c) { - return c->lb_policy; +grpc_channel_args* grpc_resolver_result_get_lb_policy_args( + grpc_resolver_result* result) { + return result->lb_policy_args; } diff --git a/src/core/ext/client_config/resolver_result.h b/src/core/ext/client_config/resolver_result.h index 5a69d81990..414c2e2482 100644 --- a/src/core/ext/client_config/resolver_result.h +++ b/src/core/ext/client_config/resolver_result.h @@ -32,22 +32,43 @@ #ifndef GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H #define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H -#include <stdbool.h> - -#include "src/core/ext/client_config/lb_policy.h" +#include "src/core/ext/client_config/lb_policy_factory.h" #include "src/core/lib/iomgr/resolve_address.h" +// TODO(roth, ctiller): In the long term, we are considering replacing +// the resolver_result data structure with grpc_channel_args. The idea is +// that the resolver will return a set of channel args that contains the +// information that is currently in the resolver_result struct. For +// example, there will be specific args indicating the set of addresses +// and the name of the LB policy to instantiate. Note that if we did +// this, we would probably want to change the data structure of +// grpc_channel_args such to a hash table or AVL or some other data +// structure that does not require linear search to find keys. + /// Results reported from a grpc_resolver. typedef struct grpc_resolver_result grpc_resolver_result; -grpc_resolver_result* grpc_resolver_result_create(); +/// Takes ownership of \a addresses and \a lb_policy_args. +grpc_resolver_result* grpc_resolver_result_create( + const char* server_name, grpc_lb_addresses* addresses, + const char* lb_policy_name, grpc_channel_args* lb_policy_args); void grpc_resolver_result_ref(grpc_resolver_result* result); void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx, grpc_resolver_result* result); -void grpc_resolver_result_set_lb_policy(grpc_resolver_result* result, - grpc_lb_policy* lb_policy); -grpc_lb_policy* grpc_resolver_result_get_lb_policy( +/// Caller does NOT take ownership of result. +const char* grpc_resolver_result_get_server_name(grpc_resolver_result* result); + +/// Caller does NOT take ownership of result. +grpc_lb_addresses* grpc_resolver_result_get_addresses( + grpc_resolver_result* result); + +/// Caller does NOT take ownership of result. +const char* grpc_resolver_result_get_lb_policy_name( + grpc_resolver_result* result); + +/// Caller does NOT take ownership of result. +grpc_channel_args* grpc_resolver_result_get_lb_policy_args( grpc_resolver_result* result); #endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H */ diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index 2c4364b259..1141844634 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -33,6 +33,7 @@ #include "src/core/ext/client_config/subchannel.h" +#include <limits.h> #include <string.h> #include <grpc/support/alloc.h> @@ -347,21 +348,16 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, } if (0 == strcmp(c->args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { - if (c->args->args[i].type == GRPC_ARG_INTEGER) { - if (c->args->args[i].value.integer >= 0) { - gpr_backoff_init( - &c->backoff_state, GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, - GRPC_SUBCHANNEL_RECONNECT_JITTER, - GPR_MIN(c->args->args[i].value.integer, - GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000), - c->args->args[i].value.integer); - } else { - gpr_log(GPR_ERROR, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS - " : must be non-negative"); - } - } else { - gpr_log(GPR_ERROR, - GRPC_ARG_MAX_RECONNECT_BACKOFF_MS " : must be an integer"); + const grpc_integer_options options = {-1, 0, INT_MAX}; + const int value = + grpc_channel_arg_get_integer(&c->args->args[i], options); + if (value >= 0) { + gpr_backoff_init( + &c->backoff_state, GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, + GRPC_SUBCHANNEL_RECONNECT_JITTER, + GPR_MIN(value, + GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000), + value); } } } diff --git a/src/core/ext/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h index a24dbe80fb..218bb43e0a 100644 --- a/src/core/ext/client_config/subchannel.h +++ b/src/core/ext/client_config/subchannel.h @@ -163,7 +163,7 @@ struct grpc_subchannel_args { /** Channel arguments to be supplied to the newly created channel */ const grpc_channel_args *args; /** Server name */ - char *server_name; + const char *server_name; /** Address to connect to */ struct sockaddr *addr; size_t addr_len; diff --git a/src/core/ext/client_config/subchannel_index.c b/src/core/ext/client_config/subchannel_index.c index 40ce91492d..673f85b8cb 100644 --- a/src/core/ext/client_config/subchannel_index.c +++ b/src/core/ext/client_config/subchannel_index.c @@ -132,7 +132,7 @@ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, grpc_connector_unref(exec_ctx, k->connector); gpr_free((grpc_channel_args *)k->args.filters); grpc_channel_args_destroy((grpc_channel_args *)k->args.args); - gpr_free(k->args.server_name); + gpr_free((void *)k->args.server_name); gpr_free(k->args.addr); gpr_free(k); } |