From 046cf7646918b19a8956e20f7e28e7422a6f29cd Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 26 Sep 2016 11:13:51 -0700 Subject: Encode method config table in channel args instead of in resolver result. --- src/core/ext/client_config/client_channel.c | 65 ++++++++++---------- src/core/ext/client_config/method_config.c | 71 ++++++++++++++++++++++ src/core/ext/client_config/method_config.h | 5 ++ src/core/ext/client_config/resolver_result.c | 11 +--- src/core/ext/client_config/resolver_result.h | 6 +- src/core/ext/lb_policy/grpclb/grpclb.c | 5 ++ src/core/ext/lb_policy/pick_first/pick_first.c | 1 + src/core/ext/lb_policy/round_robin/round_robin.c | 1 + src/core/ext/resolver/dns/native/dns_resolver.c | 2 +- src/core/ext/resolver/sockaddr/sockaddr_resolver.c | 2 +- src/core/lib/channel/channel_args.c | 10 +++ src/core/lib/channel/channel_args.h | 6 ++ 12 files changed, 137 insertions(+), 48 deletions(-) (limited to 'src') diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 73e62fcbbf..96c8c62c04 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -43,6 +43,7 @@ #include #include "src/core/ext/client_config/lb_policy_registry.h" +#include "src/core/ext/client_config/method_config.h" #include "src/core/ext/client_config/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" @@ -70,15 +71,14 @@ typedef struct client_channel_channel_data { /** client channel factory */ grpc_client_channel_factory *client_channel_factory; - /** mutex protecting client configuration, including all - variables below in this data structure */ + /** mutex protecting all variables below in this data structure */ gpr_mu mu; - /** currently active load balancer - guarded by mu */ + /** currently active load balancer */ grpc_lb_policy *lb_policy; - /** incoming resolver result - set by resolver.next(), guarded by mu */ - grpc_resolver_result *incoming_resolver_result; - /** current resolver result */ - grpc_resolver_result *current_resolver_result; + /** method config table */ + grpc_method_config_table *method_config_table; + /** incoming resolver result - set by resolver.next() */ + grpc_resolver_result *resolver_result; /** a list of closures that are all waiting for config to come in */ grpc_closure_list waiting_for_config_closures; /** resolver callback */ @@ -176,23 +176,23 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, channel_data *chand = arg; grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; + grpc_method_config_table *method_config_table = NULL; grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; bool exit_idle = false; grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy"); - if (chand->incoming_resolver_result != NULL) { + if (chand->resolver_result != NULL) { grpc_lb_policy_args lb_policy_args; lb_policy_args.server_name = - grpc_resolver_result_get_server_name(chand->incoming_resolver_result); + grpc_resolver_result_get_server_name(chand->resolver_result); lb_policy_args.addresses = - grpc_resolver_result_get_addresses(chand->incoming_resolver_result); - lb_policy_args.additional_args = grpc_resolver_result_get_lb_policy_args( - chand->incoming_resolver_result); + grpc_resolver_result_get_addresses(chand->resolver_result); + lb_policy_args.additional_args = + grpc_resolver_result_get_lb_policy_args(chand->resolver_result); lb_policy_args.client_channel_factory = chand->client_channel_factory; lb_policy = grpc_lb_policy_create( exec_ctx, - grpc_resolver_result_get_lb_policy_name( - chand->incoming_resolver_result), + grpc_resolver_result_get_lb_policy_name(chand->resolver_result), &lb_policy_args); if (lb_policy != NULL) { GRPC_LB_POLICY_REF(lb_policy, "config_change"); @@ -200,11 +200,15 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error); } - if (chand->current_resolver_result != NULL) { - grpc_resolver_result_unref(exec_ctx, chand->current_resolver_result); + const grpc_arg *channel_arg = grpc_channel_args_find( + lb_policy_args.additional_args, GRPC_ARG_SERVICE_CONFIG); + if (channel_arg != NULL) { + GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER); + method_config_table = grpc_method_config_table_ref( + (grpc_method_config_table *)channel_arg->value.pointer.p); } - chand->current_resolver_result = chand->incoming_resolver_result; - chand->incoming_resolver_result = NULL; + grpc_resolver_result_unref(exec_ctx, chand->resolver_result); + chand->resolver_result = NULL; } if (lb_policy != NULL) { @@ -215,6 +219,10 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&chand->mu); old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; + if (chand->method_config_table != NULL) { + grpc_method_config_table_unref(chand->method_config_table); + } + chand->method_config_table = method_config_table; if (lb_policy != NULL) { grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, NULL); @@ -238,8 +246,7 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, watch_lb_policy(exec_ctx, chand, lb_policy, state); } GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, chand->resolver, - &chand->incoming_resolver_result, + grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, &chand->on_resolver_result_changed); gpr_mu_unlock(&chand->mu); } else { @@ -376,8 +383,8 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } - if (chand->current_resolver_result != NULL) { - grpc_resolver_result_unref(exec_ctx, chand->current_resolver_result); + if (chand->method_config_table != NULL) { + grpc_method_config_table_unref(chand->method_config_table); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); grpc_pollset_set_destroy(chand->interested_parties); @@ -512,11 +519,9 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, /* Get method config. */ // FIXME: need to actually use the config data! // FIXME: think about refcounting vs. atomicity here - grpc_method_config_table* table = grpc_resolver_result_get_method_configs( - chand->current_resolver_result); - if (table != NULL) { + if (chand->method_config_table != NULL) { calld->method_config = grpc_method_config_table_get_method_config( - table, calld->path); + chand->method_config_table, calld->path); } /* Create call on subchannel. */ grpc_subchannel_call *subchannel_call = NULL; @@ -626,8 +631,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, if (chand->resolver != NULL && !chand->started_resolving) { chand->started_resolving = true; GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, chand->resolver, - &chand->incoming_resolver_result, + grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, &chand->on_resolver_result_changed); } if (chand->resolver != NULL) { @@ -836,7 +840,7 @@ void grpc_client_channel_finish_initialization( chand->exit_idle_when_lb_policy_arrives) { chand->started_resolving = true; GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, resolver, &chand->incoming_resolver_result, + grpc_resolver_next(exec_ctx, resolver, &chand->resolver_result, &chand->on_resolver_result_changed); } chand->client_channel_factory = client_channel_factory; @@ -858,8 +862,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( if (!chand->started_resolving && chand->resolver != NULL) { GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); chand->started_resolving = true; - grpc_resolver_next(exec_ctx, chand->resolver, - &chand->incoming_resolver_result, + grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, &chand->on_resolver_result_changed); } } diff --git a/src/core/ext/client_config/method_config.c b/src/core/ext/client_config/method_config.c index e3589153fb..989f776967 100644 --- a/src/core/ext/client_config/method_config.c +++ b/src/core/ext/client_config/method_config.c @@ -33,9 +33,11 @@ #include +#include #include #include #include +#include #include "src/core/lib/transport/metadata.h" @@ -212,3 +214,72 @@ grpc_method_config* grpc_method_config_table_get_method_config( } return grpc_method_config_ref(method_config); } + +static void* copy_arg(void* p) { + return grpc_method_config_table_ref(p); +} + +static void destroy_arg(void* p) { + grpc_method_config_table_unref(p); +} + +static int cmp_arg(void* p1, void* p2) { + grpc_method_config_table* t1 = p1; + grpc_method_config_table* t2 = p2; + for (size_t i = 0; i < GPR_ARRAY_SIZE(t1->entries); ++i) { + grpc_method_config_table_entry* e1 = &t1->entries[i]; + grpc_method_config_table_entry* e2 = &t2->entries[i]; + // Compare paths by hash value. + if (e1->path->hash < e2->path->hash) return -1; + if (e1->path->hash > e2->path->hash) return 1; + // Compare wait_for_ready. + const bool wait_for_ready1 = + e1->method_config->wait_for_ready == NULL + ? false : *e1->method_config->wait_for_ready; + const bool wait_for_ready2 = + e2->method_config->wait_for_ready == NULL + ? false : *e2->method_config->wait_for_ready; + if (wait_for_ready1 < wait_for_ready2) return -1; + if (wait_for_ready1 > wait_for_ready2) return 1; + // Compare timeout. + const gpr_timespec timeout1 = + e1->method_config->timeout == NULL + ? gpr_inf_past(GPR_CLOCK_MONOTONIC) : *e1->method_config->timeout; + const gpr_timespec timeout2 = + e2->method_config->timeout == NULL + ? gpr_inf_past(GPR_CLOCK_MONOTONIC) : *e2->method_config->timeout; + const int timeout_result = gpr_time_cmp(timeout1, timeout2); + if (timeout_result != 0) return timeout_result; + // Compare max_request_message_bytes. + const int32_t max_request_message_bytes1 = + e1->method_config->max_request_message_bytes == NULL + ? -1 : *e1->method_config->max_request_message_bytes; + const int32_t max_request_message_bytes2 = + e2->method_config->max_request_message_bytes == NULL + ? -1 : *e2->method_config->max_request_message_bytes; + if (max_request_message_bytes1 < max_request_message_bytes2) return -1; + if (max_request_message_bytes1 > max_request_message_bytes2) return 1; + // Compare max_response_message_bytes. + const int32_t max_response_message_bytes1 = + e1->method_config->max_response_message_bytes == NULL + ? -1 : *e1->method_config->max_response_message_bytes; + const int32_t max_response_message_bytes2 = + e2->method_config->max_response_message_bytes == NULL + ? -1 : *e2->method_config->max_response_message_bytes; + if (max_response_message_bytes1 < max_response_message_bytes2) return -1; + if (max_response_message_bytes1 > max_response_message_bytes2) return 1; + } + return 0; +} + +static grpc_arg_pointer_vtable arg_vtable = {copy_arg, destroy_arg, cmp_arg}; + +grpc_arg grpc_method_config_table_create_channel_arg( + grpc_method_config_table* table) { + grpc_arg arg; + arg.type = GRPC_ARG_POINTER; + arg.key = GRPC_ARG_SERVICE_CONFIG; + arg.value.pointer.p = grpc_method_config_table_ref(table); + arg.value.pointer.vtable = &arg_vtable; + return arg; +} diff --git a/src/core/ext/client_config/method_config.h b/src/core/ext/client_config/method_config.h index 9490e296b2..2be0cd1006 100644 --- a/src/core/ext/client_config/method_config.h +++ b/src/core/ext/client_config/method_config.h @@ -35,6 +35,7 @@ #include #include +#include #include "src/core/lib/transport/metadata.h" @@ -82,4 +83,8 @@ void grpc_method_config_table_add_method_config( grpc_method_config* grpc_method_config_table_get_method_config( grpc_method_config_table* table, grpc_mdstr* path); +/// Returns a channel arg containing \a table. +grpc_arg grpc_method_config_table_create_channel_arg( + grpc_method_config_table* table); + #endif /* GRPC_CORE_EXT_CLIENT_CONFIG_METHOD_CONFIG_H */ diff --git a/src/core/ext/client_config/resolver_result.c b/src/core/ext/client_config/resolver_result.c index 5c5ffb2635..16d124d205 100644 --- a/src/core/ext/client_config/resolver_result.c +++ b/src/core/ext/client_config/resolver_result.c @@ -49,13 +49,11 @@ struct grpc_resolver_result { grpc_lb_addresses* addresses; char* lb_policy_name; grpc_channel_args* lb_policy_args; - grpc_method_config_table* method_configs; }; grpc_resolver_result* grpc_resolver_result_create( const char* server_name, grpc_lb_addresses* addresses, - const char* lb_policy_name, grpc_channel_args* lb_policy_args, - grpc_method_config_table* method_configs) { + const char* lb_policy_name, grpc_channel_args* lb_policy_args) { grpc_resolver_result* result = gpr_malloc(sizeof(*result)); memset(result, 0, sizeof(*result)); gpr_ref_init(&result->refs, 1); @@ -63,7 +61,6 @@ grpc_resolver_result* grpc_resolver_result_create( result->addresses = addresses; result->lb_policy_name = gpr_strdup(lb_policy_name); result->lb_policy_args = lb_policy_args; - result->method_configs = grpc_method_config_table_ref(method_configs); return result; } @@ -78,7 +75,6 @@ void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx, grpc_lb_addresses_destroy(result->addresses, NULL /* user_data_destroy */); gpr_free(result->lb_policy_name); grpc_channel_args_destroy(result->lb_policy_args); - grpc_method_config_table_unref(result->method_configs); gpr_free(result); } } @@ -101,8 +97,3 @@ grpc_channel_args* grpc_resolver_result_get_lb_policy_args( grpc_resolver_result* result) { return result->lb_policy_args; } - -grpc_method_config_table* grpc_resolver_result_get_method_configs( - grpc_resolver_result* result) { - return result->method_configs; -} diff --git a/src/core/ext/client_config/resolver_result.h b/src/core/ext/client_config/resolver_result.h index e6127f1d6d..707c8c2cf5 100644 --- a/src/core/ext/client_config/resolver_result.h +++ b/src/core/ext/client_config/resolver_result.h @@ -33,7 +33,6 @@ #define GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H #include "src/core/ext/client_config/lb_policy_factory.h" -#include "src/core/ext/client_config/method_config.h" #include "src/core/lib/iomgr/resolve_address.h" // TODO(roth, ctiller): In the long term, we are considering replacing @@ -52,8 +51,7 @@ typedef struct grpc_resolver_result grpc_resolver_result; /// Takes ownership of \a addresses, \a lb_policy_args. grpc_resolver_result* grpc_resolver_result_create( const char* server_name, grpc_lb_addresses* addresses, - const char* lb_policy_name, grpc_channel_args* lb_policy_args, - grpc_method_config_table* method_configs); + const char* lb_policy_name, grpc_channel_args* lb_policy_args); void grpc_resolver_result_ref(grpc_resolver_result* result); void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx, @@ -67,7 +65,5 @@ const char* grpc_resolver_result_get_lb_policy_name( grpc_resolver_result* result); grpc_channel_args* grpc_resolver_result_get_lb_policy_args( grpc_resolver_result* result); -grpc_method_config_table* grpc_resolver_result_get_method_configs( - grpc_resolver_result* result); #endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H */ diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 8105c4415d..de3438be81 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -112,6 +112,7 @@ #include "src/core/ext/client_config/parse_address.h" #include "src/core/ext/lb_policy/grpclb/grpclb.h" #include "src/core/ext/lb_policy/grpclb/load_balancer_api.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/support/string.h" @@ -285,6 +286,7 @@ typedef struct glb_lb_policy { const char *server_name; grpc_client_channel_factory *cc_factory; + grpc_channel_args *args; /** for communicating with the LB server */ grpc_channel *lb_channel; @@ -442,6 +444,7 @@ static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, args.server_name = glb_policy->server_name; args.client_channel_factory = glb_policy->cc_factory; args.addresses = process_serverlist(serverlist); + args.additional_args = glb_policy->args; grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); @@ -567,6 +570,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, * Create a client channel over them to communicate with a LB service */ glb_policy->server_name = gpr_strdup(args->server_name); glb_policy->cc_factory = args->client_channel_factory; + glb_policy->args = grpc_channel_args_copy(args->additional_args); GPR_ASSERT(glb_policy->cc_factory != NULL); /* construct a target from the addresses in args, given in the form @@ -633,6 +637,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GPR_ASSERT(glb_policy->pending_picks == NULL); GPR_ASSERT(glb_policy->pending_pings == NULL); gpr_free((void *)glb_policy->server_name); + grpc_channel_args_destroy(glb_policy->args); grpc_channel_destroy(glb_policy->lb_channel); glb_policy->lb_channel = NULL; grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker); diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 466a0fdede..10030ef18b 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -470,6 +470,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, sc_args.addr = (struct sockaddr *)(&args->addresses->addresses[i].address.addr); sc_args.addr_len = args->addresses->addresses[i].address.len; + sc_args.args = args->additional_args; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 037f180a9e..b6cfe8994d 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -633,6 +633,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, sc_args.addr = (struct sockaddr *)(&args->addresses->addresses[i].address.addr); sc_args.addr_len = args->addresses->addresses[i].address.len; + sc_args.args = args->additional_args; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( exec_ctx, args->client_channel_factory, &sc_args); diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 879e26f7d9..e8ac1b12ae 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -181,7 +181,7 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, } grpc_resolved_addresses_destroy(r->addresses); result = grpc_resolver_result_create(r->target_name, addresses, - r->lb_policy_name, NULL, NULL); + r->lb_policy_name, NULL); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now); diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index 61be1b3c25..74d2015e5c 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -122,7 +122,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, r->published = true; *r->target_result = grpc_resolver_result_create( "", grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */), - r->lb_policy_name, NULL, NULL); + r->lb_policy_name, NULL); grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); r->next_completion = NULL; } diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c index 3a56b1ff20..bab6fcd9fa 100644 --- a/src/core/lib/channel/channel_args.c +++ b/src/core/lib/channel/channel_args.c @@ -272,6 +272,16 @@ int grpc_channel_args_compare(const grpc_channel_args *a, return 0; } +const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args, + const char *name) { + if (args != NULL) { + for (size_t i = 0; i < args->num_args; ++i) { + if (args->args[i].key == name) return &args->args[i]; + } + } + return NULL; +} + int grpc_channel_arg_get_integer(grpc_arg *arg, grpc_integer_options options) { if (arg->type != GRPC_ARG_INTEGER) { gpr_log(GPR_ERROR, "%s ignored: it must be an integer", arg->key); diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 586a296d1f..38fb4c55d4 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -89,6 +89,12 @@ uint32_t grpc_channel_args_compression_algorithm_get_states( int grpc_channel_args_compare(const grpc_channel_args *a, const grpc_channel_args *b); +/** Returns the value of argument \a name from \a args, or NULL if not found. + Note: \a name is matched using pointer equality, so it must be the + same instance of the string used to create the grpc_arg key. */ +const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args, + const char *name); + typedef struct grpc_integer_options { int default_value; // Return this if value is outside of expected bounds. int min_value; -- cgit v1.2.3