diff options
-rw-r--r-- | src/core/ext/client_config/client_channel.c | 63 | ||||
-rw-r--r-- | src/core/ext/client_config/resolver_result.c | 158 | ||||
-rw-r--r-- | src/core/ext/client_config/resolver_result.h | 38 |
3 files changed, 245 insertions, 14 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index e4500cc50d..2cf5490122 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -52,6 +52,9 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/metadata.h" +#include "src/core/lib/transport/metadata_batch.h" +#include "src/core/lib/transport/static_metadata.h" /* Client channel implementation */ @@ -73,7 +76,9 @@ typedef struct client_channel_channel_data { /** currently active load balancer - guarded by mu */ grpc_lb_policy *lb_policy; /** incoming resolver result - set by resolver.next(), guarded by mu */ - grpc_resolver_result *resolver_result; + grpc_resolver_result *incoming_resolver_result; + /** current resolver result */ + grpc_resolver_result *current_resolver_result; /** a list of closures that are all waiting for config to come in */ grpc_closure_list waiting_for_config_closures; /** resolver callback */ @@ -175,16 +180,17 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, bool exit_idle = false; grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy"); - if (chand->resolver_result != NULL) { + if (chand->incoming_resolver_result != NULL) { grpc_lb_policy_args lb_policy_args; lb_policy_args.addresses = - grpc_resolver_result_get_addresses(chand->resolver_result); - lb_policy_args.additional_args = - grpc_resolver_result_get_lb_policy_args(chand->resolver_result); + grpc_resolver_result_get_addresses(chand->incoming_resolver_result); + lb_policy_args.additional_args = grpc_resolver_result_get_lb_policy_args( + chand->incoming_resolver_result); lb_policy_args.client_channel_factory = chand->client_channel_factory; lb_policy = grpc_lb_policy_create( exec_ctx, - grpc_resolver_result_get_lb_policy_name(chand->resolver_result), + grpc_resolver_result_get_lb_policy_name( + chand->incoming_resolver_result), &lb_policy_args); if (lb_policy != NULL) { GRPC_LB_POLICY_REF(lb_policy, "config_change"); @@ -192,8 +198,11 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error); } - grpc_resolver_result_unref(exec_ctx, chand->resolver_result); - chand->resolver_result = NULL; + if (chand->current_resolver_result != NULL) { + grpc_resolver_result_unref(exec_ctx, chand->current_resolver_result); + } + chand->current_resolver_result = chand->incoming_resolver_result; + chand->incoming_resolver_result = NULL; } if (lb_policy != NULL) { @@ -227,7 +236,8 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, watch_lb_policy(exec_ctx, chand, lb_policy, state); } GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, + grpc_resolver_next(exec_ctx, chand->resolver, + &chand->incoming_resolver_result, &chand->on_resolver_result_changed); gpr_mu_unlock(&chand->mu); } else { @@ -364,6 +374,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } + if (chand->current_resolver_result != NULL) { + grpc_resolver_result_unref(exec_ctx, chand->current_resolver_result); + } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); grpc_pollset_set_destroy(chand->interested_parties); gpr_mu_destroy(&chand->mu); @@ -399,6 +412,9 @@ typedef struct client_channel_call_data { grpc_connected_subchannel *connected_subchannel; grpc_polling_entity *pollent; + grpc_mdstr *path; + grpc_method_config *method_config; + grpc_transport_stream_op **waiting_ops; size_t waiting_ops_count; size_t waiting_ops_capacity; @@ -474,7 +490,9 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - call_data *calld = arg; + grpc_call_element *elem = arg; + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; gpr_mu_lock(&calld->mu); GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); @@ -489,6 +507,11 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_CREATE_REFERENCING( "Cancelled before creating subchannel", &error, 1)); } else { + /* Get method config. */ +// FIXME: need to actually use the config data! + calld->method_config = grpc_resolver_result_get_method_config( + chand->current_resolver_result, calld->path); + /* Create call on subchannel. */ grpc_subchannel_call *subchannel_call = NULL; grpc_error *new_error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, calld->pollent, @@ -596,7 +619,8 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, if (chand->resolver != NULL && !chand->started_resolving) { chand->started_resolving = true; GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, + grpc_resolver_next(exec_ctx, chand->resolver, + &chand->incoming_resolver_result, &chand->on_resolver_result_changed); } if (chand->resolver != NULL) { @@ -687,8 +711,15 @@ retry: if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && calld->connected_subchannel == NULL && op->send_initial_metadata != NULL) { + for (grpc_linked_mdelem *mdelem = op->send_initial_metadata->list.head; + mdelem != NULL; mdelem = mdelem->next) { + if (mdelem->md->key == GRPC_MDSTR_PATH) { + calld->path = GRPC_MDSTR_REF(mdelem->md->value); + break; + } + } calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; - grpc_closure_init(&calld->next_step, subchannel_ready, calld); + grpc_closure_init(&calld->next_step, subchannel_ready, elem); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata, op->send_initial_metadata_flags, @@ -728,6 +759,8 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, gpr_atm_rel_store(&calld->subchannel_call, 0); gpr_mu_init(&calld->mu); calld->connected_subchannel = NULL; + calld->path = NULL; + calld->method_config = NULL; calld->waiting_ops = NULL; calld->waiting_ops_count = 0; calld->waiting_ops_capacity = 0; @@ -743,6 +776,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, const grpc_call_final_info *final_info, void *and_free_memory) { call_data *calld = elem->call_data; + if (calld->path != NULL) GRPC_MDSTR_UNREF(calld->path); grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); @@ -794,7 +828,7 @@ void grpc_client_channel_set_resolver_and_client_channel_factory( chand->exit_idle_when_lb_policy_arrives) { chand->started_resolving = true; GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, resolver, &chand->resolver_result, + grpc_resolver_next(exec_ctx, resolver, &chand->incoming_resolver_result, &chand->on_resolver_result_changed); } chand->client_channel_factory = client_channel_factory; @@ -816,7 +850,8 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( if (!chand->started_resolving && chand->resolver != NULL) { GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); chand->started_resolving = true; - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, + grpc_resolver_next(exec_ctx, chand->resolver, + &chand->incoming_resolver_result, &chand->on_resolver_result_changed); } } diff --git a/src/core/ext/client_config/resolver_result.c b/src/core/ext/client_config/resolver_result.c index 59c9e7dc25..c730551fe5 100644 --- a/src/core/ext/client_config/resolver_result.c +++ b/src/core/ext/client_config/resolver_result.c @@ -36,13 +36,154 @@ #include <grpc/support/alloc.h> #include <grpc/support/string_util.h> +#include "src/core/lib/transport/metadata.h" #include "src/core/lib/channel/channel_args.h" +// +// grpc_method_config +// + +struct grpc_method_config { + gpr_refcount refs; + bool* wait_for_ready; + gpr_timespec* timeout; + int32_t* max_request_message_bytes; + int32_t* max_response_message_bytes; +}; + +grpc_method_config* grpc_method_config_create( + bool* wait_for_ready, gpr_timespec* timeout, + int32_t* max_request_message_bytes, int32_t* max_response_message_bytes) { + grpc_method_config* config = gpr_malloc(sizeof(*config)); + memset(config, 0, sizeof(*config)); + gpr_ref_init(&config->refs, 1); + if (wait_for_ready != NULL) { + config->wait_for_ready = gpr_malloc(sizeof(*wait_for_ready)); + *config->wait_for_ready = *wait_for_ready; + } + if (timeout != NULL) { + config->timeout = gpr_malloc(sizeof(*timeout)); + *config->timeout = *timeout; + } + if (max_request_message_bytes != NULL) { + config->max_request_message_bytes = + gpr_malloc(sizeof(*max_request_message_bytes)); + *config->max_request_message_bytes = *max_request_message_bytes; + } + if (max_response_message_bytes != NULL) { + config->max_response_message_bytes = + gpr_malloc(sizeof(*max_response_message_bytes)); + *config->max_response_message_bytes = *max_response_message_bytes; + } + return config; +} + +grpc_method_config* grpc_method_config_ref(grpc_method_config* method_config) { + gpr_ref(&method_config->refs); + return method_config; +} + +void grpc_method_config_unref(grpc_method_config* method_config) { + if (gpr_unref(&method_config->refs)) { + gpr_free(method_config->wait_for_ready); + gpr_free(method_config->timeout); + gpr_free(method_config->max_request_message_bytes); + gpr_free(method_config->max_response_message_bytes); + gpr_free(method_config); + } +} + +bool* grpc_method_config_get_wait_for_ready(grpc_method_config* method_config) { + return method_config->wait_for_ready; +} + +gpr_timespec* grpc_method_config_get_timeout( + grpc_method_config* method_config) { + return method_config->timeout; +} + +int32_t* grpc_method_config_get_max_request_message_bytes( + grpc_method_config* method_config) { + return method_config->max_request_message_bytes; +} + +int32_t* grpc_method_config_get_max_response_message_bytes( + grpc_method_config* method_config) { + return method_config->max_response_message_bytes; +} + +// +// method_config_table +// + +typedef struct method_config_table_entry { + grpc_mdstr* path; + grpc_method_config* method_config; +} method_config_table_entry; + +#define METHOD_CONFIG_TABLE_SIZE 128 +typedef struct method_config_table { + method_config_table_entry entries[METHOD_CONFIG_TABLE_SIZE]; +} method_config_table; + +static void method_config_table_init(method_config_table* table) { + memset(table, 0, sizeof(*table)); +} + +static void method_config_table_destroy(method_config_table* table) { + for (size_t i = 0; i < GPR_ARRAY_SIZE(table->entries); ++i) { + method_config_table_entry* entry = &table->entries[i]; + if (entry->path != NULL) { + GRPC_MDSTR_UNREF(entry->path); + grpc_method_config_unref(entry->method_config); + } + } +} + +// Helper function for insert and get operations that performs quadratic +// probing (https://en.wikipedia.org/wiki/Quadratic_probing). +static size_t method_config_table_find_index(method_config_table* table, + grpc_mdstr* path, + bool find_empty) { + for (size_t i = 0; i < GPR_ARRAY_SIZE(table->entries); ++i) { + const size_t idx = (path->hash + i * i) % GPR_ARRAY_SIZE(table->entries); + if (table->entries[idx].path == NULL) + return find_empty ? idx : GPR_ARRAY_SIZE(table->entries); + if (table->entries[idx].path == path) return idx; + } + return GPR_ARRAY_SIZE(table->entries) + 1; // Not found. +} + +static void method_config_table_insert(method_config_table* table, + grpc_mdstr* path, + grpc_method_config* config) { + const size_t idx = + method_config_table_find_index(table, path, true /* find_empty */); + // This can happen if the table is full. + GPR_ASSERT(idx != GPR_ARRAY_SIZE(table->entries)); + method_config_table_entry* entry = &table->entries[idx]; + entry->path = GRPC_MDSTR_REF(path); + entry->method_config = grpc_method_config_ref(config); +} + +static grpc_method_config* method_config_table_get(method_config_table* table, + grpc_mdstr* path) { + const size_t idx = + method_config_table_find_index(table, path, false /* find_empty */); + if (idx == GPR_ARRAY_SIZE(table->entries)) return NULL; // Not found. + return table->entries[idx].method_config; +} + +// +// grpc_resolver_result +// + struct grpc_resolver_result { gpr_refcount refs; grpc_lb_addresses* addresses; char* lb_policy_name; grpc_channel_args* lb_policy_args; + method_config_table method_configs; }; grpc_resolver_result* grpc_resolver_result_create( @@ -54,6 +195,7 @@ grpc_resolver_result* grpc_resolver_result_create( result->addresses = addresses; result->lb_policy_name = gpr_strdup(lb_policy_name); result->lb_policy_args = lb_policy_args; + method_config_table_init(&result->method_configs); return result; } @@ -67,6 +209,7 @@ void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx, grpc_lb_addresses_destroy(result->addresses, NULL /* user_data_destroy */); gpr_free(result->lb_policy_name); grpc_channel_args_destroy(result->lb_policy_args); + method_config_table_destroy(&result->method_configs); gpr_free(result); } } @@ -85,3 +228,18 @@ grpc_channel_args* grpc_resolver_result_get_lb_policy_args( grpc_resolver_result* result) { return result->lb_policy_args; } + +void grpc_resolver_result_add_method_config(grpc_resolver_result* result, + grpc_mdstr** paths, + size_t num_paths, + grpc_method_config* method_config) { + for (size_t i = 0; i < num_paths; ++i) { + method_config_table_insert(&result->method_configs, paths[i], + method_config); + } +} + +grpc_method_config* grpc_resolver_result_get_method_config( + grpc_resolver_result* result, grpc_mdstr* path) { + return method_config_table_get(&result->method_configs, path); +} diff --git a/src/core/ext/client_config/resolver_result.h b/src/core/ext/client_config/resolver_result.h index d4118b90e8..d911bfbe52 100644 --- a/src/core/ext/client_config/resolver_result.h +++ b/src/core/ext/client_config/resolver_result.h @@ -45,6 +45,27 @@ // grpc_channel_args such to a hash table or AVL or some other data // structure that does not require linear search to find keys. +/// Per-method configuration. + +typedef struct grpc_method_config grpc_method_config; + +/// Any parameter may be NULL to indicate that the value is unset. +grpc_method_config* grpc_method_config_create( + bool* wait_for_ready, gpr_timespec* timeout, + int32_t* max_request_message_bytes, int32_t* max_response_message_bytes); + +grpc_method_config* grpc_method_config_ref(grpc_method_config* method_config); +void grpc_method_config_unref(grpc_method_config* method_config); + +/// These methods return NULL if the requested field is unset. +/// The caller does NOT take ownership of the result. +bool* grpc_method_config_get_wait_for_ready(grpc_method_config* method_config); +gpr_timespec* grpc_method_config_get_timeout(grpc_method_config* method_config); +int32_t* grpc_method_config_get_max_request_message_bytes( + grpc_method_config* method_config); +int32_t* grpc_method_config_get_max_response_message_bytes( + grpc_method_config* method_config); + /// Results reported from a grpc_resolver. typedef struct grpc_resolver_result grpc_resolver_result; @@ -68,4 +89,21 @@ const char* grpc_resolver_result_get_lb_policy_name( grpc_channel_args* grpc_resolver_result_get_lb_policy_args( grpc_resolver_result* result); +/// Adds a method config. \a paths indicates the set of path names +/// for which this config applies. Each name is of one of the following +/// forms: +/// service/method -- specifies exact service and method name +/// service/* -- matches all methods for the specified service +/// * -- matches all methods for all services +/// Takes new references to all elements of \a paths and to \a method_config. +void grpc_resolver_result_add_method_config(grpc_resolver_result* result, + grpc_mdstr** paths, + size_t num_paths, + grpc_method_config* method_config); + +/// Returns NULL if the method has no config. +/// Caller does NOT take ownership of result. +grpc_method_config* grpc_resolver_result_get_method_config( + grpc_resolver_result* result, grpc_mdstr* path); + #endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H */ |