aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/client_config/client_channel.c63
-rw-r--r--src/core/ext/client_config/resolver_result.c158
-rw-r--r--src/core/ext/client_config/resolver_result.h38
3 files changed, 245 insertions, 14 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index e4500cc50d..2cf5490122 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -52,6 +52,9 @@
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/metadata.h"
+#include "src/core/lib/transport/metadata_batch.h"
+#include "src/core/lib/transport/static_metadata.h"
/* Client channel implementation */
@@ -73,7 +76,9 @@ typedef struct client_channel_channel_data {
/** currently active load balancer - guarded by mu */
grpc_lb_policy *lb_policy;
/** incoming resolver result - set by resolver.next(), guarded by mu */
- grpc_resolver_result *resolver_result;
+ grpc_resolver_result *incoming_resolver_result;
+ /** current resolver result */
+ grpc_resolver_result *current_resolver_result;
/** a list of closures that are all waiting for config to come in */
grpc_closure_list waiting_for_config_closures;
/** resolver callback */
@@ -175,16 +180,17 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
bool exit_idle = false;
grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
- if (chand->resolver_result != NULL) {
+ if (chand->incoming_resolver_result != NULL) {
grpc_lb_policy_args lb_policy_args;
lb_policy_args.addresses =
- grpc_resolver_result_get_addresses(chand->resolver_result);
- lb_policy_args.additional_args =
- grpc_resolver_result_get_lb_policy_args(chand->resolver_result);
+ grpc_resolver_result_get_addresses(chand->incoming_resolver_result);
+ lb_policy_args.additional_args = grpc_resolver_result_get_lb_policy_args(
+ chand->incoming_resolver_result);
lb_policy_args.client_channel_factory = chand->client_channel_factory;
lb_policy = grpc_lb_policy_create(
exec_ctx,
- grpc_resolver_result_get_lb_policy_name(chand->resolver_result),
+ grpc_resolver_result_get_lb_policy_name(
+ chand->incoming_resolver_result),
&lb_policy_args);
if (lb_policy != NULL) {
GRPC_LB_POLICY_REF(lb_policy, "config_change");
@@ -192,8 +198,11 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
state =
grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
}
- grpc_resolver_result_unref(exec_ctx, chand->resolver_result);
- chand->resolver_result = NULL;
+ if (chand->current_resolver_result != NULL) {
+ grpc_resolver_result_unref(exec_ctx, chand->current_resolver_result);
+ }
+ chand->current_resolver_result = chand->incoming_resolver_result;
+ chand->incoming_resolver_result = NULL;
}
if (lb_policy != NULL) {
@@ -227,7 +236,8 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
watch_lb_policy(exec_ctx, chand, lb_policy, state);
}
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
+ grpc_resolver_next(exec_ctx, chand->resolver,
+ &chand->incoming_resolver_result,
&chand->on_resolver_result_changed);
gpr_mu_unlock(&chand->mu);
} else {
@@ -364,6 +374,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
}
+ if (chand->current_resolver_result != NULL) {
+ grpc_resolver_result_unref(exec_ctx, chand->current_resolver_result);
+ }
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
grpc_pollset_set_destroy(chand->interested_parties);
gpr_mu_destroy(&chand->mu);
@@ -399,6 +412,9 @@ typedef struct client_channel_call_data {
grpc_connected_subchannel *connected_subchannel;
grpc_polling_entity *pollent;
+ grpc_mdstr *path;
+ grpc_method_config *method_config;
+
grpc_transport_stream_op **waiting_ops;
size_t waiting_ops_count;
size_t waiting_ops_capacity;
@@ -474,7 +490,9 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- call_data *calld = arg;
+ grpc_call_element *elem = arg;
+ call_data *calld = elem->call_data;
+ channel_data *chand = elem->channel_data;
gpr_mu_lock(&calld->mu);
GPR_ASSERT(calld->creation_phase ==
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
@@ -489,6 +507,11 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_CREATE_REFERENCING(
"Cancelled before creating subchannel", &error, 1));
} else {
+ /* Get method config. */
+// FIXME: need to actually use the config data!
+ calld->method_config = grpc_resolver_result_get_method_config(
+ chand->current_resolver_result, calld->path);
+ /* Create call on subchannel. */
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *new_error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, calld->pollent,
@@ -596,7 +619,8 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
if (chand->resolver != NULL && !chand->started_resolving) {
chand->started_resolving = true;
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
+ grpc_resolver_next(exec_ctx, chand->resolver,
+ &chand->incoming_resolver_result,
&chand->on_resolver_result_changed);
}
if (chand->resolver != NULL) {
@@ -687,8 +711,15 @@ retry:
if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
calld->connected_subchannel == NULL &&
op->send_initial_metadata != NULL) {
+ for (grpc_linked_mdelem *mdelem = op->send_initial_metadata->list.head;
+ mdelem != NULL; mdelem = mdelem->next) {
+ if (mdelem->md->key == GRPC_MDSTR_PATH) {
+ calld->path = GRPC_MDSTR_REF(mdelem->md->value);
+ break;
+ }
+ }
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
- grpc_closure_init(&calld->next_step, subchannel_ready, calld);
+ grpc_closure_init(&calld->next_step, subchannel_ready, elem);
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata,
op->send_initial_metadata_flags,
@@ -728,6 +759,8 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
gpr_atm_rel_store(&calld->subchannel_call, 0);
gpr_mu_init(&calld->mu);
calld->connected_subchannel = NULL;
+ calld->path = NULL;
+ calld->method_config = NULL;
calld->waiting_ops = NULL;
calld->waiting_ops_count = 0;
calld->waiting_ops_capacity = 0;
@@ -743,6 +776,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_final_info *final_info,
void *and_free_memory) {
call_data *calld = elem->call_data;
+ if (calld->path != NULL) GRPC_MDSTR_UNREF(calld->path);
grpc_subchannel_call *call = GET_CALL(calld);
if (call != NULL && call != CANCELLED_CALL) {
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");
@@ -794,7 +828,7 @@ void grpc_client_channel_set_resolver_and_client_channel_factory(
chand->exit_idle_when_lb_policy_arrives) {
chand->started_resolving = true;
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
- grpc_resolver_next(exec_ctx, resolver, &chand->resolver_result,
+ grpc_resolver_next(exec_ctx, resolver, &chand->incoming_resolver_result,
&chand->on_resolver_result_changed);
}
chand->client_channel_factory = client_channel_factory;
@@ -816,7 +850,8 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
if (!chand->started_resolving && chand->resolver != NULL) {
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
chand->started_resolving = true;
- grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result,
+ grpc_resolver_next(exec_ctx, chand->resolver,
+ &chand->incoming_resolver_result,
&chand->on_resolver_result_changed);
}
}
diff --git a/src/core/ext/client_config/resolver_result.c b/src/core/ext/client_config/resolver_result.c
index 59c9e7dc25..c730551fe5 100644
--- a/src/core/ext/client_config/resolver_result.c
+++ b/src/core/ext/client_config/resolver_result.c
@@ -36,13 +36,154 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
+#include "src/core/lib/transport/metadata.h"
#include "src/core/lib/channel/channel_args.h"
+//
+// grpc_method_config
+//
+
+struct grpc_method_config {
+ gpr_refcount refs;
+ bool* wait_for_ready;
+ gpr_timespec* timeout;
+ int32_t* max_request_message_bytes;
+ int32_t* max_response_message_bytes;
+};
+
+grpc_method_config* grpc_method_config_create(
+ bool* wait_for_ready, gpr_timespec* timeout,
+ int32_t* max_request_message_bytes, int32_t* max_response_message_bytes) {
+ grpc_method_config* config = gpr_malloc(sizeof(*config));
+ memset(config, 0, sizeof(*config));
+ gpr_ref_init(&config->refs, 1);
+ if (wait_for_ready != NULL) {
+ config->wait_for_ready = gpr_malloc(sizeof(*wait_for_ready));
+ *config->wait_for_ready = *wait_for_ready;
+ }
+ if (timeout != NULL) {
+ config->timeout = gpr_malloc(sizeof(*timeout));
+ *config->timeout = *timeout;
+ }
+ if (max_request_message_bytes != NULL) {
+ config->max_request_message_bytes =
+ gpr_malloc(sizeof(*max_request_message_bytes));
+ *config->max_request_message_bytes = *max_request_message_bytes;
+ }
+ if (max_response_message_bytes != NULL) {
+ config->max_response_message_bytes =
+ gpr_malloc(sizeof(*max_response_message_bytes));
+ *config->max_response_message_bytes = *max_response_message_bytes;
+ }
+ return config;
+}
+
+grpc_method_config* grpc_method_config_ref(grpc_method_config* method_config) {
+ gpr_ref(&method_config->refs);
+ return method_config;
+}
+
+void grpc_method_config_unref(grpc_method_config* method_config) {
+ if (gpr_unref(&method_config->refs)) {
+ gpr_free(method_config->wait_for_ready);
+ gpr_free(method_config->timeout);
+ gpr_free(method_config->max_request_message_bytes);
+ gpr_free(method_config->max_response_message_bytes);
+ gpr_free(method_config);
+ }
+}
+
+bool* grpc_method_config_get_wait_for_ready(grpc_method_config* method_config) {
+ return method_config->wait_for_ready;
+}
+
+gpr_timespec* grpc_method_config_get_timeout(
+ grpc_method_config* method_config) {
+ return method_config->timeout;
+}
+
+int32_t* grpc_method_config_get_max_request_message_bytes(
+ grpc_method_config* method_config) {
+ return method_config->max_request_message_bytes;
+}
+
+int32_t* grpc_method_config_get_max_response_message_bytes(
+ grpc_method_config* method_config) {
+ return method_config->max_response_message_bytes;
+}
+
+//
+// method_config_table
+//
+
+typedef struct method_config_table_entry {
+ grpc_mdstr* path;
+ grpc_method_config* method_config;
+} method_config_table_entry;
+
+#define METHOD_CONFIG_TABLE_SIZE 128
+typedef struct method_config_table {
+ method_config_table_entry entries[METHOD_CONFIG_TABLE_SIZE];
+} method_config_table;
+
+static void method_config_table_init(method_config_table* table) {
+ memset(table, 0, sizeof(*table));
+}
+
+static void method_config_table_destroy(method_config_table* table) {
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(table->entries); ++i) {
+ method_config_table_entry* entry = &table->entries[i];
+ if (entry->path != NULL) {
+ GRPC_MDSTR_UNREF(entry->path);
+ grpc_method_config_unref(entry->method_config);
+ }
+ }
+}
+
+// Helper function for insert and get operations that performs quadratic
+// probing (https://en.wikipedia.org/wiki/Quadratic_probing).
+static size_t method_config_table_find_index(method_config_table* table,
+ grpc_mdstr* path,
+ bool find_empty) {
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(table->entries); ++i) {
+ const size_t idx = (path->hash + i * i) % GPR_ARRAY_SIZE(table->entries);
+ if (table->entries[idx].path == NULL)
+ return find_empty ? idx : GPR_ARRAY_SIZE(table->entries);
+ if (table->entries[idx].path == path) return idx;
+ }
+ return GPR_ARRAY_SIZE(table->entries) + 1; // Not found.
+}
+
+static void method_config_table_insert(method_config_table* table,
+ grpc_mdstr* path,
+ grpc_method_config* config) {
+ const size_t idx =
+ method_config_table_find_index(table, path, true /* find_empty */);
+ // This can happen if the table is full.
+ GPR_ASSERT(idx != GPR_ARRAY_SIZE(table->entries));
+ method_config_table_entry* entry = &table->entries[idx];
+ entry->path = GRPC_MDSTR_REF(path);
+ entry->method_config = grpc_method_config_ref(config);
+}
+
+static grpc_method_config* method_config_table_get(method_config_table* table,
+ grpc_mdstr* path) {
+ const size_t idx =
+ method_config_table_find_index(table, path, false /* find_empty */);
+ if (idx == GPR_ARRAY_SIZE(table->entries)) return NULL; // Not found.
+ return table->entries[idx].method_config;
+}
+
+//
+// grpc_resolver_result
+//
+
struct grpc_resolver_result {
gpr_refcount refs;
grpc_lb_addresses* addresses;
char* lb_policy_name;
grpc_channel_args* lb_policy_args;
+ method_config_table method_configs;
};
grpc_resolver_result* grpc_resolver_result_create(
@@ -54,6 +195,7 @@ grpc_resolver_result* grpc_resolver_result_create(
result->addresses = addresses;
result->lb_policy_name = gpr_strdup(lb_policy_name);
result->lb_policy_args = lb_policy_args;
+ method_config_table_init(&result->method_configs);
return result;
}
@@ -67,6 +209,7 @@ void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx,
grpc_lb_addresses_destroy(result->addresses, NULL /* user_data_destroy */);
gpr_free(result->lb_policy_name);
grpc_channel_args_destroy(result->lb_policy_args);
+ method_config_table_destroy(&result->method_configs);
gpr_free(result);
}
}
@@ -85,3 +228,18 @@ grpc_channel_args* grpc_resolver_result_get_lb_policy_args(
grpc_resolver_result* result) {
return result->lb_policy_args;
}
+
+void grpc_resolver_result_add_method_config(grpc_resolver_result* result,
+ grpc_mdstr** paths,
+ size_t num_paths,
+ grpc_method_config* method_config) {
+ for (size_t i = 0; i < num_paths; ++i) {
+ method_config_table_insert(&result->method_configs, paths[i],
+ method_config);
+ }
+}
+
+grpc_method_config* grpc_resolver_result_get_method_config(
+ grpc_resolver_result* result, grpc_mdstr* path) {
+ return method_config_table_get(&result->method_configs, path);
+}
diff --git a/src/core/ext/client_config/resolver_result.h b/src/core/ext/client_config/resolver_result.h
index d4118b90e8..d911bfbe52 100644
--- a/src/core/ext/client_config/resolver_result.h
+++ b/src/core/ext/client_config/resolver_result.h
@@ -45,6 +45,27 @@
// grpc_channel_args such to a hash table or AVL or some other data
// structure that does not require linear search to find keys.
+/// Per-method configuration.
+
+typedef struct grpc_method_config grpc_method_config;
+
+/// Any parameter may be NULL to indicate that the value is unset.
+grpc_method_config* grpc_method_config_create(
+ bool* wait_for_ready, gpr_timespec* timeout,
+ int32_t* max_request_message_bytes, int32_t* max_response_message_bytes);
+
+grpc_method_config* grpc_method_config_ref(grpc_method_config* method_config);
+void grpc_method_config_unref(grpc_method_config* method_config);
+
+/// These methods return NULL if the requested field is unset.
+/// The caller does NOT take ownership of the result.
+bool* grpc_method_config_get_wait_for_ready(grpc_method_config* method_config);
+gpr_timespec* grpc_method_config_get_timeout(grpc_method_config* method_config);
+int32_t* grpc_method_config_get_max_request_message_bytes(
+ grpc_method_config* method_config);
+int32_t* grpc_method_config_get_max_response_message_bytes(
+ grpc_method_config* method_config);
+
/// Results reported from a grpc_resolver.
typedef struct grpc_resolver_result grpc_resolver_result;
@@ -68,4 +89,21 @@ const char* grpc_resolver_result_get_lb_policy_name(
grpc_channel_args* grpc_resolver_result_get_lb_policy_args(
grpc_resolver_result* result);
+/// Adds a method config. \a paths indicates the set of path names
+/// for which this config applies. Each name is of one of the following
+/// forms:
+/// service/method -- specifies exact service and method name
+/// service/* -- matches all methods for the specified service
+/// * -- matches all methods for all services
+/// Takes new references to all elements of \a paths and to \a method_config.
+void grpc_resolver_result_add_method_config(grpc_resolver_result* result,
+ grpc_mdstr** paths,
+ size_t num_paths,
+ grpc_method_config* method_config);
+
+/// Returns NULL if the method has no config.
+/// Caller does NOT take ownership of result.
+grpc_method_config* grpc_resolver_result_get_method_config(
+ grpc_resolver_result* result, grpc_mdstr* path);
+
#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_RESOLVER_RESULT_H */