aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2016-10-20 09:41:33 -0700
committerGravatar Vijay Pai <vpai@google.com>2016-10-20 09:41:33 -0700
commit5ee07978bd13b49f0d04792b27f7d147119156de (patch)
tree460270d4001bc7f802751c0f33339c1534447601 /src/core
parent9fa9315d62c3163a93eeae2b3d3bb70231567b83 (diff)
parent68469290d7b6ed9ac8a976a2fe9c79c9701b0f13 (diff)
Merge branch 'master' into fc_1dstream
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/census/grpc_filter.c4
-rw-r--r--src/core/ext/client_config/client_channel.c193
-rw-r--r--src/core/ext/client_config/method_config.c296
-rw-r--r--src/core/ext/client_config/method_config.h116
-rw-r--r--src/core/ext/client_config/resolver_result.h9
-rw-r--r--src/core/ext/client_config/subchannel.c4
-rw-r--r--src/core/ext/client_config/subchannel.h3
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c5
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c1
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c1
-rw-r--r--src/core/ext/resolver/sockaddr/sockaddr_resolver.c6
-rw-r--r--src/core/lib/channel/channel_args.c12
-rw-r--r--src/core/lib/channel/channel_args.h4
-rw-r--r--src/core/lib/channel/channel_stack.c4
-rw-r--r--src/core/lib/channel/channel_stack.h4
-rw-r--r--src/core/lib/channel/deadline_filter.c76
-rw-r--r--src/core/lib/channel/deadline_filter.h33
-rw-r--r--src/core/lib/channel/handshaker.c2
-rw-r--r--src/core/lib/channel/message_size_filter.c60
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c9
-rw-r--r--src/core/lib/iomgr/timer.h17
-rw-r--r--src/core/lib/surface/call.c13
-rw-r--r--src/core/lib/transport/mdstr_hash_table.c142
-rw-r--r--src/core/lib/transport/mdstr_hash_table.h83
24 files changed, 1010 insertions, 87 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index 9dacc17eb4..a4cf6f37bd 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -133,7 +133,7 @@ static grpc_error *client_init_call_elem(grpc_exec_ctx *exec_ctx,
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
memset(d, 0, sizeof(*d));
- d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
+ d->start_ts = args->start_time;
return GRPC_ERROR_NONE;
}
@@ -152,7 +152,7 @@ static grpc_error *server_init_call_elem(grpc_exec_ctx *exec_ctx,
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
memset(d, 0, sizeof(*d));
- d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
+ d->start_ts = args->start_time;
/* TODO(hongyu): call census_tracing_start_op here. */
grpc_closure_init(&d->finish_recv, server_on_done_recv, elem);
return GRPC_ERROR_NONE;
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index cbf79afa17..beaa9637c3 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -43,6 +43,7 @@
#include <grpc/support/useful.h>
#include "src/core/ext/client_config/lb_policy_registry.h"
+#include "src/core/ext/client_config/method_config.h"
#include "src/core/ext/client_config/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
@@ -53,6 +54,9 @@
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/metadata.h"
+#include "src/core/lib/transport/metadata_batch.h"
+#include "src/core/lib/transport/static_metadata.h"
/* Client channel implementation */
@@ -68,12 +72,13 @@ typedef struct client_channel_channel_data {
/** client channel factory */
grpc_client_channel_factory *client_channel_factory;
- /** mutex protecting client configuration, including all
- variables below in this data structure */
+ /** mutex protecting all variables below in this data structure */
gpr_mu mu;
- /** currently active load balancer - guarded by mu */
+ /** currently active load balancer */
grpc_lb_policy *lb_policy;
- /** incoming resolver result - set by resolver.next(), guarded by mu */
+ /** method config table */
+ grpc_method_config_table *method_config_table;
+ /** incoming resolver result - set by resolver.next() */
grpc_resolver_result *resolver_result;
/** a list of closures that are all waiting for config to come in */
grpc_closure_list waiting_for_config_closures;
@@ -172,6 +177,7 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
channel_data *chand = arg;
grpc_lb_policy *lb_policy = NULL;
grpc_lb_policy *old_lb_policy;
+ grpc_method_config_table *method_config_table = NULL;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
bool exit_idle = false;
grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
@@ -220,6 +226,13 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
state =
grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
}
+ const grpc_arg *channel_arg = grpc_channel_args_find(
+ lb_policy_args.additional_args, GRPC_ARG_SERVICE_CONFIG);
+ if (channel_arg != NULL) {
+ GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
+ method_config_table = grpc_method_config_table_ref(
+ (grpc_method_config_table *)channel_arg->value.pointer.p);
+ }
grpc_resolver_result_unref(exec_ctx, chand->resolver_result);
chand->resolver_result = NULL;
}
@@ -232,6 +245,10 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_lock(&chand->mu);
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
+ if (chand->method_config_table != NULL) {
+ grpc_method_config_table_unref(chand->method_config_table);
+ }
+ chand->method_config_table = method_config_table;
if (lb_policy != NULL) {
grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
NULL);
@@ -392,6 +409,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
}
+ if (chand->method_config_table != NULL) {
+ grpc_method_config_table_unref(chand->method_config_table);
+ }
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
grpc_pollset_set_destroy(chand->interested_parties);
gpr_mu_destroy(&chand->mu);
@@ -424,7 +444,16 @@ typedef struct client_channel_call_data {
// stack and each has its own mutex. If/when we have time, find a way
// to avoid this without breaking the grpc_deadline_state abstraction.
grpc_deadline_state deadline_state;
+
+ grpc_mdstr *path; // Request path.
+ gpr_timespec call_start_time;
gpr_timespec deadline;
+ enum {
+ WAIT_FOR_READY_UNSET,
+ WAIT_FOR_READY_FALSE,
+ WAIT_FOR_READY_TRUE
+ } wait_for_ready_from_service_config;
+ grpc_closure read_service_config;
grpc_error *cancel_error;
@@ -532,10 +561,11 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_CREATE_REFERENCING(
"Cancelled before creating subchannel", &error, 1));
} else {
+ /* Create call on subchannel. */
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *new_error = grpc_connected_subchannel_create_call(
- exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline,
- &subchannel_call);
+ exec_ctx, calld->connected_subchannel, calld->pollent, calld->path,
+ calld->deadline, &subchannel_call);
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
subchannel_call = CANCELLED_CALL;
@@ -584,11 +614,15 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
/* cancelled, do nothing */
} else if (error != GRPC_ERROR_NONE) {
grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL);
- } else if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
- cpa->initial_metadata_flags,
- cpa->connected_subchannel, cpa->on_ready,
- GRPC_ERROR_NONE)) {
- grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL);
+ } else {
+ call_data *calld = cpa->elem->call_data;
+ gpr_mu_lock(&calld->mu);
+ if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
+ cpa->initial_metadata_flags, cpa->connected_subchannel,
+ cpa->on_ready, GRPC_ERROR_NONE)) {
+ grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL);
+ }
+ gpr_mu_unlock(&calld->mu);
}
gpr_free(cpa);
}
@@ -631,18 +665,33 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
GPR_ASSERT(error == GRPC_ERROR_NONE);
if (chand->lb_policy != NULL) {
grpc_lb_policy *lb_policy = chand->lb_policy;
- int r;
GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel");
gpr_mu_unlock(&chand->mu);
+ // If the application explicitly set wait_for_ready, use that.
+ // Otherwise, if the service config specified a value for this
+ // method, use that.
+ const bool wait_for_ready_set_from_api =
+ initial_metadata_flags &
+ GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
+ const bool wait_for_ready_set_from_service_config =
+ calld->wait_for_ready_from_service_config != WAIT_FOR_READY_UNSET;
+ if (!wait_for_ready_set_from_api &&
+ wait_for_ready_set_from_service_config) {
+ if (calld->wait_for_ready_from_service_config == WAIT_FOR_READY_TRUE) {
+ initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
+ } else {
+ initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
+ }
+ }
// TODO(dgq): make this deadline configurable somehow.
const grpc_lb_policy_pick_args inputs = {
initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem,
gpr_inf_future(GPR_CLOCK_MONOTONIC)};
- r = grpc_lb_policy_pick(exec_ctx, lb_policy, &inputs, connected_subchannel,
- NULL, on_ready);
+ const bool result = grpc_lb_policy_pick(
+ exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel");
GPR_TIMER_END("pick_subchannel", 0);
- return r;
+ return result;
}
if (chand->resolver != NULL && !chand->started_resolving) {
chand->started_resolving = true;
@@ -768,8 +817,8 @@ retry:
calld->connected_subchannel != NULL) {
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *error = grpc_connected_subchannel_create_call(
- exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline,
- &subchannel_call);
+ exec_ctx, calld->connected_subchannel, calld->pollent, calld->path,
+ calld->deadline, &subchannel_call);
if (error != GRPC_ERROR_NONE) {
subchannel_call = CANCELLED_CALL;
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
@@ -786,13 +835,69 @@ retry:
GPR_TIMER_END("cc_start_transport_stream_op", 0);
}
+// Gets data from the service config. Invoked when the resolver returns
+// its initial result.
+static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ grpc_call_element *elem = arg;
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ // If this is an error, there's no point in looking at the service config.
+ if (error == GRPC_ERROR_NONE) {
+ // Get the method config table from channel data.
+ gpr_mu_lock(&chand->mu);
+ grpc_method_config_table *method_config_table = NULL;
+ if (chand->method_config_table != NULL) {
+ method_config_table =
+ grpc_method_config_table_ref(chand->method_config_table);
+ }
+ gpr_mu_unlock(&chand->mu);
+ // If the method config table was present, use it.
+ if (method_config_table != NULL) {
+ const grpc_method_config *method_config =
+ grpc_method_config_table_get_method_config(method_config_table,
+ calld->path);
+ if (method_config != NULL) {
+ const gpr_timespec *per_method_timeout =
+ grpc_method_config_get_timeout(method_config);
+ const bool *wait_for_ready =
+ grpc_method_config_get_wait_for_ready(method_config);
+ if (per_method_timeout != NULL || wait_for_ready != NULL) {
+ gpr_mu_lock(&calld->mu);
+ if (per_method_timeout != NULL) {
+ gpr_timespec per_method_deadline =
+ gpr_time_add(calld->call_start_time, *per_method_timeout);
+ if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
+ calld->deadline = per_method_deadline;
+ // Reset deadline timer.
+ grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
+ }
+ }
+ if (wait_for_ready != NULL) {
+ calld->wait_for_ready_from_service_config =
+ *wait_for_ready ? WAIT_FOR_READY_TRUE : WAIT_FOR_READY_FALSE;
+ }
+ gpr_mu_unlock(&calld->mu);
+ }
+ }
+ grpc_method_config_table_unref(method_config_table);
+ }
+ }
+ GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config");
+}
+
/* Constructor for call_data */
static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
+ channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- grpc_deadline_state_init(exec_ctx, elem, args);
- calld->deadline = args->deadline;
+ // Initialize data members.
+ grpc_deadline_state_init(exec_ctx, elem, args->call_stack);
+ calld->path = GRPC_MDSTR_REF(args->path);
+ calld->call_start_time = args->start_time;
+ calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
+ calld->wait_for_ready_from_service_config = WAIT_FOR_READY_UNSET;
calld->cancel_error = GRPC_ERROR_NONE;
gpr_atm_rel_store(&calld->subchannel_call, 0);
gpr_mu_init(&calld->mu);
@@ -803,6 +908,55 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
calld->owning_call = args->call_stack;
calld->pollent = NULL;
+ // If the resolver has already returned results, then we can access
+ // the service config parameters immediately. Otherwise, we need to
+ // defer that work until the resolver returns an initial result.
+ // TODO(roth): This code is almost but not quite identical to the code
+ // in read_service_config() above. It would be nice to find a way to
+ // combine them, to avoid having to maintain it twice.
+ gpr_mu_lock(&chand->mu);
+ if (chand->lb_policy != NULL) {
+ // We already have a resolver result, so check for service config.
+ if (chand->method_config_table != NULL) {
+ grpc_method_config_table *method_config_table =
+ grpc_method_config_table_ref(chand->method_config_table);
+ gpr_mu_unlock(&chand->mu);
+ grpc_method_config *method_config =
+ grpc_method_config_table_get_method_config(method_config_table,
+ args->path);
+ if (method_config != NULL) {
+ const gpr_timespec *per_method_timeout =
+ grpc_method_config_get_timeout(method_config);
+ if (per_method_timeout != NULL) {
+ gpr_timespec per_method_deadline =
+ gpr_time_add(calld->call_start_time, *per_method_timeout);
+ calld->deadline = gpr_time_min(calld->deadline, per_method_deadline);
+ }
+ const bool *wait_for_ready =
+ grpc_method_config_get_wait_for_ready(method_config);
+ if (wait_for_ready != NULL) {
+ calld->wait_for_ready_from_service_config =
+ *wait_for_ready ? WAIT_FOR_READY_TRUE : WAIT_FOR_READY_FALSE;
+ }
+ }
+ grpc_method_config_table_unref(method_config_table);
+ } else {
+ gpr_mu_unlock(&chand->mu);
+ }
+ } else {
+ // We don't yet have a resolver result, so register a callback to
+ // get the service config data once the resolver returns.
+ // Take a reference to the call stack to be owned by the callback.
+ GRPC_CALL_STACK_REF(calld->owning_call, "read_service_config");
+ grpc_closure_init(&calld->read_service_config, read_service_config, elem);
+ grpc_closure_list_append(&chand->waiting_for_config_closures,
+ &calld->read_service_config, GRPC_ERROR_NONE);
+ gpr_mu_unlock(&chand->mu);
+ }
+ // Start the deadline timer with the current deadline value. If we
+ // do not yet have service config data, then the timer may be reset
+ // later.
+ grpc_deadline_state_start(exec_ctx, elem, calld->deadline);
return GRPC_ERROR_NONE;
}
@@ -813,6 +967,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
void *and_free_memory) {
call_data *calld = elem->call_data;
grpc_deadline_state_destroy(exec_ctx, elem);
+ GRPC_MDSTR_UNREF(calld->path);
GRPC_ERROR_UNREF(calld->cancel_error);
grpc_subchannel_call *call = GET_CALL(calld);
if (call != NULL && call != CANCELLED_CALL) {
diff --git a/src/core/ext/client_config/method_config.c b/src/core/ext/client_config/method_config.c
new file mode 100644
index 0000000000..f8a82323e7
--- /dev/null
+++ b/src/core/ext/client_config/method_config.c
@@ -0,0 +1,296 @@
+//
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#include "src/core/ext/client_config/method_config.h"
+
+#include <string.h>
+
+#include <grpc/impl/codegen/grpc_types.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/time.h>
+
+#include "src/core/lib/transport/mdstr_hash_table.h"
+#include "src/core/lib/transport/metadata.h"
+
+//
+// grpc_method_config
+//
+
+// bool vtable
+
+static void* bool_copy(void* valuep) {
+ bool value = *(bool*)valuep;
+ bool* new_value = gpr_malloc(sizeof(bool));
+ *new_value = value;
+ return new_value;
+}
+
+static int bool_cmp(void* v1, void* v2) {
+ bool b1 = *(bool*)v1;
+ bool b2 = *(bool*)v2;
+ if (!b1 && b2) return -1;
+ if (b1 && !b2) return 1;
+ return 0;
+}
+
+static grpc_mdstr_hash_table_vtable bool_vtable = {gpr_free, bool_copy,
+ bool_cmp};
+
+// timespec vtable
+
+static void* timespec_copy(void* valuep) {
+ gpr_timespec value = *(gpr_timespec*)valuep;
+ gpr_timespec* new_value = gpr_malloc(sizeof(gpr_timespec));
+ *new_value = value;
+ return new_value;
+}
+
+static int timespec_cmp(void* v1, void* v2) {
+ return gpr_time_cmp(*(gpr_timespec*)v1, *(gpr_timespec*)v2);
+}
+
+static grpc_mdstr_hash_table_vtable timespec_vtable = {gpr_free, timespec_copy,
+ timespec_cmp};
+
+// int32 vtable
+
+static void* int32_copy(void* valuep) {
+ int32_t value = *(int32_t*)valuep;
+ int32_t* new_value = gpr_malloc(sizeof(int32_t));
+ *new_value = value;
+ return new_value;
+}
+
+static int int32_cmp(void* v1, void* v2) {
+ int32_t i1 = *(int32_t*)v1;
+ int32_t i2 = *(int32_t*)v2;
+ if (i1 < i2) return -1;
+ if (i1 > i2) return 1;
+ return 0;
+}
+
+static grpc_mdstr_hash_table_vtable int32_vtable = {gpr_free, int32_copy,
+ int32_cmp};
+
+// Hash table keys.
+#define GRPC_METHOD_CONFIG_WAIT_FOR_READY "grpc.wait_for_ready" // bool
+#define GRPC_METHOD_CONFIG_TIMEOUT "grpc.timeout" // gpr_timespec
+#define GRPC_METHOD_CONFIG_MAX_REQUEST_MESSAGE_BYTES \
+ "grpc.max_request_message_bytes" // int32
+#define GRPC_METHOD_CONFIG_MAX_RESPONSE_MESSAGE_BYTES \
+ "grpc.max_response_message_bytes" // int32
+
+struct grpc_method_config {
+ grpc_mdstr_hash_table* table;
+ grpc_mdstr* wait_for_ready_key;
+ grpc_mdstr* timeout_key;
+ grpc_mdstr* max_request_message_bytes_key;
+ grpc_mdstr* max_response_message_bytes_key;
+};
+
+grpc_method_config* grpc_method_config_create(
+ bool* wait_for_ready, gpr_timespec* timeout,
+ int32_t* max_request_message_bytes, int32_t* max_response_message_bytes) {
+ grpc_method_config* method_config = gpr_malloc(sizeof(grpc_method_config));
+ memset(method_config, 0, sizeof(grpc_method_config));
+ method_config->wait_for_ready_key =
+ grpc_mdstr_from_string(GRPC_METHOD_CONFIG_WAIT_FOR_READY);
+ method_config->timeout_key =
+ grpc_mdstr_from_string(GRPC_METHOD_CONFIG_TIMEOUT);
+ method_config->max_request_message_bytes_key =
+ grpc_mdstr_from_string(GRPC_METHOD_CONFIG_MAX_REQUEST_MESSAGE_BYTES);
+ method_config->max_response_message_bytes_key =
+ grpc_mdstr_from_string(GRPC_METHOD_CONFIG_MAX_RESPONSE_MESSAGE_BYTES);
+ grpc_mdstr_hash_table_entry entries[4];
+ size_t num_entries = 0;
+ if (wait_for_ready != NULL) {
+ entries[num_entries].key = method_config->wait_for_ready_key;
+ entries[num_entries].value = wait_for_ready;
+ entries[num_entries].vtable = &bool_vtable;
+ ++num_entries;
+ }
+ if (timeout != NULL) {
+ entries[num_entries].key = method_config->timeout_key;
+ entries[num_entries].value = timeout;
+ entries[num_entries].vtable = &timespec_vtable;
+ ++num_entries;
+ }
+ if (max_request_message_bytes != NULL) {
+ entries[num_entries].key = method_config->max_request_message_bytes_key;
+ entries[num_entries].value = max_request_message_bytes;
+ entries[num_entries].vtable = &int32_vtable;
+ ++num_entries;
+ }
+ if (max_response_message_bytes != NULL) {
+ entries[num_entries].key = method_config->max_response_message_bytes_key;
+ entries[num_entries].value = max_response_message_bytes;
+ entries[num_entries].vtable = &int32_vtable;
+ ++num_entries;
+ }
+ method_config->table = grpc_mdstr_hash_table_create(num_entries, entries);
+ return method_config;
+}
+
+grpc_method_config* grpc_method_config_ref(grpc_method_config* method_config) {
+ grpc_mdstr_hash_table_ref(method_config->table);
+ return method_config;
+}
+
+void grpc_method_config_unref(grpc_method_config* method_config) {
+ if (grpc_mdstr_hash_table_unref(method_config->table)) {
+ GRPC_MDSTR_UNREF(method_config->wait_for_ready_key);
+ GRPC_MDSTR_UNREF(method_config->timeout_key);
+ GRPC_MDSTR_UNREF(method_config->max_request_message_bytes_key);
+ GRPC_MDSTR_UNREF(method_config->max_response_message_bytes_key);
+ gpr_free(method_config);
+ }
+}
+
+int grpc_method_config_cmp(const grpc_method_config* method_config1,
+ const grpc_method_config* method_config2) {
+ return grpc_mdstr_hash_table_cmp(method_config1->table,
+ method_config2->table);
+}
+
+const bool* grpc_method_config_get_wait_for_ready(
+ const grpc_method_config* method_config) {
+ return grpc_mdstr_hash_table_get(method_config->table,
+ method_config->wait_for_ready_key);
+}
+
+const gpr_timespec* grpc_method_config_get_timeout(
+ const grpc_method_config* method_config) {
+ return grpc_mdstr_hash_table_get(method_config->table,
+ method_config->timeout_key);
+}
+
+const int32_t* grpc_method_config_get_max_request_message_bytes(
+ const grpc_method_config* method_config) {
+ return grpc_mdstr_hash_table_get(
+ method_config->table, method_config->max_request_message_bytes_key);
+}
+
+const int32_t* grpc_method_config_get_max_response_message_bytes(
+ const grpc_method_config* method_config) {
+ return grpc_mdstr_hash_table_get(
+ method_config->table, method_config->max_response_message_bytes_key);
+}
+
+//
+// grpc_method_config_table
+//
+
+static void method_config_unref(void* valuep) {
+ grpc_method_config_unref(valuep);
+}
+
+static void* method_config_ref(void* valuep) {
+ return grpc_method_config_ref(valuep);
+}
+
+static int method_config_cmp(void* valuep1, void* valuep2) {
+ return grpc_method_config_cmp(valuep1, valuep2);
+}
+
+static const grpc_mdstr_hash_table_vtable method_config_table_vtable = {
+ method_config_unref, method_config_ref, method_config_cmp};
+
+grpc_method_config_table* grpc_method_config_table_create(
+ size_t num_entries, grpc_method_config_table_entry* entries) {
+ grpc_mdstr_hash_table_entry* hash_table_entries =
+ gpr_malloc(sizeof(grpc_mdstr_hash_table_entry) * num_entries);
+ for (size_t i = 0; i < num_entries; ++i) {
+ hash_table_entries[i].key = entries[i].method_name;
+ hash_table_entries[i].value = entries[i].method_config;
+ hash_table_entries[i].vtable = &method_config_table_vtable;
+ }
+ grpc_method_config_table* method_config_table =
+ grpc_mdstr_hash_table_create(num_entries, hash_table_entries);
+ gpr_free(hash_table_entries);
+ return method_config_table;
+}
+
+grpc_method_config_table* grpc_method_config_table_ref(
+ grpc_method_config_table* table) {
+ return grpc_mdstr_hash_table_ref(table);
+}
+
+void grpc_method_config_table_unref(grpc_method_config_table* table) {
+ grpc_mdstr_hash_table_unref(table);
+}
+
+int grpc_method_config_table_cmp(const grpc_method_config_table* table1,
+ const grpc_method_config_table* table2) {
+ return grpc_mdstr_hash_table_cmp(table1, table2);
+}
+
+grpc_method_config* grpc_method_config_table_get_method_config(
+ const grpc_method_config_table* table, const grpc_mdstr* path) {
+ grpc_method_config* method_config = grpc_mdstr_hash_table_get(table, path);
+ // If we didn't find a match for the path, try looking for a wildcard
+ // entry (i.e., change "/service/method" to "/service/*").
+ if (method_config == NULL) {
+ const char* path_str = grpc_mdstr_as_c_string(path);
+ const char* sep = strrchr(path_str, '/') + 1;
+ const size_t len = (size_t)(sep - path_str);
+ char* buf = gpr_malloc(len + 2); // '*' and NUL
+ memcpy(buf, path_str, len);
+ buf[len] = '*';
+ buf[len + 1] = '\0';
+ grpc_mdstr* wildcard_path = grpc_mdstr_from_string(buf);
+ gpr_free(buf);
+ method_config = grpc_mdstr_hash_table_get(table, wildcard_path);
+ GRPC_MDSTR_UNREF(wildcard_path);
+ }
+ return method_config;
+}
+
+static void* copy_arg(void* p) { return grpc_method_config_table_ref(p); }
+
+static void destroy_arg(void* p) { grpc_method_config_table_unref(p); }
+
+static int cmp_arg(void* p1, void* p2) {
+ return grpc_method_config_table_cmp(p1, p2);
+}
+
+static grpc_arg_pointer_vtable arg_vtable = {copy_arg, destroy_arg, cmp_arg};
+
+grpc_arg grpc_method_config_table_create_channel_arg(
+ grpc_method_config_table* table) {
+ grpc_arg arg;
+ arg.type = GRPC_ARG_POINTER;
+ arg.key = GRPC_ARG_SERVICE_CONFIG;
+ arg.value.pointer.p = table;
+ arg.value.pointer.vtable = &arg_vtable;
+ return arg;
+}
diff --git a/src/core/ext/client_config/method_config.h b/src/core/ext/client_config/method_config.h
new file mode 100644
index 0000000000..1302ac425d
--- /dev/null
+++ b/src/core/ext/client_config/method_config.h
@@ -0,0 +1,116 @@
+//
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_METHOD_CONFIG_H
+#define GRPC_CORE_EXT_CLIENT_CONFIG_METHOD_CONFIG_H
+
+#include <stdbool.h>
+
+#include <grpc/impl/codegen/gpr_types.h>
+#include <grpc/impl/codegen/grpc_types.h>
+
+#include "src/core/lib/transport/mdstr_hash_table.h"
+#include "src/core/lib/transport/metadata.h"
+
+/// Per-method configuration.
+typedef struct grpc_method_config grpc_method_config;
+
+/// Creates a grpc_method_config with the specified parameters.
+/// Any parameter may be NULL to indicate that the value is unset.
+///
+/// \a wait_for_ready indicates whether the client should wait until the
+/// request deadline for the channel to become ready, even if there is a
+/// temporary failure before the deadline while attempting to connect.
+///
+/// \a timeout indicates the timeout for calls.
+///
+/// \a max_request_message_bytes and \a max_response_message_bytes
+/// indicate the maximum sizes of the request (checked when sending) and
+/// response (checked when receiving) messages.
+grpc_method_config* grpc_method_config_create(
+ bool* wait_for_ready, gpr_timespec* timeout,
+ int32_t* max_request_message_bytes, int32_t* max_response_message_bytes);
+
+grpc_method_config* grpc_method_config_ref(grpc_method_config* method_config);
+void grpc_method_config_unref(grpc_method_config* method_config);
+
+/// Compares two grpc_method_configs.
+/// The sort order is stable but undefined.
+int grpc_method_config_cmp(const grpc_method_config* method_config1,
+ const grpc_method_config* method_config2);
+
+/// These methods return NULL if the requested field is unset.
+/// The caller does NOT take ownership of the result.
+const bool* grpc_method_config_get_wait_for_ready(
+ const grpc_method_config* method_config);
+const gpr_timespec* grpc_method_config_get_timeout(
+ const grpc_method_config* method_config);
+const int32_t* grpc_method_config_get_max_request_message_bytes(
+ const grpc_method_config* method_config);
+const int32_t* grpc_method_config_get_max_response_message_bytes(
+ const grpc_method_config* method_config);
+
+/// A table of method configs.
+typedef grpc_mdstr_hash_table grpc_method_config_table;
+
+typedef struct grpc_method_config_table_entry {
+ /// The name is of one of the following forms:
+ /// service/method -- specifies exact service and method name
+ /// service/* -- matches all methods for the specified service
+ grpc_mdstr* method_name;
+ grpc_method_config* method_config;
+} grpc_method_config_table_entry;
+
+/// Takes new references to all keys and values in \a entries.
+grpc_method_config_table* grpc_method_config_table_create(
+ size_t num_entries, grpc_method_config_table_entry* entries);
+
+grpc_method_config_table* grpc_method_config_table_ref(
+ grpc_method_config_table* table);
+void grpc_method_config_table_unref(grpc_method_config_table* table);
+
+/// Compares two grpc_method_config_tables.
+/// The sort order is stable but undefined.
+int grpc_method_config_table_cmp(const grpc_method_config_table* table1,
+ const grpc_method_config_table* table2);
+
+/// Gets the method config for the specified \a path, which should be of
+/// the form "/service/method".
+/// Returns NULL if the method has no config.
+/// Caller does NOT own a reference to the result.
+grpc_method_config* grpc_method_config_table_get_method_config(
+ const grpc_method_config_table* table, const grpc_mdstr* path);
+
+/// Returns a channel arg containing \a table.
+grpc_arg grpc_method_config_table_create_channel_arg(
+ grpc_method_config_table* table);
+
+#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_METHOD_CONFIG_H */
diff --git a/src/core/ext/client_config/resolver_result.h b/src/core/ext/client_config/resolver_result.h
index 414c2e2482..a7ea7c0f4b 100644
--- a/src/core/ext/client_config/resolver_result.h
+++ b/src/core/ext/client_config/resolver_result.h
@@ -52,22 +52,17 @@ typedef struct grpc_resolver_result grpc_resolver_result;
grpc_resolver_result* grpc_resolver_result_create(
const char* server_name, grpc_lb_addresses* addresses,
const char* lb_policy_name, grpc_channel_args* lb_policy_args);
+
void grpc_resolver_result_ref(grpc_resolver_result* result);
void grpc_resolver_result_unref(grpc_exec_ctx* exec_ctx,
grpc_resolver_result* result);
-/// Caller does NOT take ownership of result.
+/// Accessors. Caller does NOT take ownership of results.
const char* grpc_resolver_result_get_server_name(grpc_resolver_result* result);
-
-/// Caller does NOT take ownership of result.
grpc_lb_addresses* grpc_resolver_result_get_addresses(
grpc_resolver_result* result);
-
-/// Caller does NOT take ownership of result.
const char* grpc_resolver_result_get_lb_policy_name(
grpc_resolver_result* result);
-
-/// Caller does NOT take ownership of result.
grpc_channel_args* grpc_resolver_result_get_lb_policy_args(
grpc_resolver_result* result);
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index 4cece2d908..c531b3b1a8 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -704,7 +704,7 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
- grpc_polling_entity *pollent, gpr_timespec deadline,
+ grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec deadline,
grpc_subchannel_call **call) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
*call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
@@ -712,7 +712,7 @@ grpc_error *grpc_connected_subchannel_create_call(
(*call)->connection = con; // Ref is added below.
grpc_error *error =
grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call,
- NULL, NULL, deadline, callstk);
+ NULL, NULL, path, deadline, callstk);
if (error != GRPC_ERROR_NONE) {
const char *error_string = grpc_error_string(error);
gpr_log(GPR_ERROR, "error: %s", error_string);
diff --git a/src/core/ext/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h
index 3330621071..f8de26dfd5 100644
--- a/src/core/ext/client_config/subchannel.h
+++ b/src/core/ext/client_config/subchannel.h
@@ -38,6 +38,7 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/metadata.h"
/** A (sub-)channel that knows how to connect to exactly one target
address. Provides a target for load balancing. */
@@ -110,7 +111,7 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
/** construct a subchannel call */
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
- grpc_polling_entity *pollent, gpr_timespec deadline,
+ grpc_polling_entity *pollent, grpc_mdstr *path, gpr_timespec deadline,
grpc_subchannel_call **subchannel_call);
/** process a transport level op */
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index 9af92b787d..b24b522449 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -113,6 +113,7 @@
#include "src/core/ext/client_config/parse_address.h"
#include "src/core/ext/lb_policy/grpclb/grpclb.h"
#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/support/string.h"
@@ -270,6 +271,7 @@ typedef struct glb_lb_policy {
/** who the client is trying to communicate with */
const char *server_name;
grpc_client_channel_factory *cc_factory;
+ grpc_channel_args *args;
/** deadline for the LB's call */
gpr_timespec deadline;
@@ -457,6 +459,7 @@ static grpc_lb_policy *create_rr_locked(
args.server_name = glb_policy->server_name;
args.client_channel_factory = glb_policy->cc_factory;
args.addresses = process_serverlist(serverlist);
+ args.additional_args = glb_policy->args;
grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args);
@@ -584,6 +587,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
* Create a client channel over them to communicate with a LB service */
glb_policy->server_name = gpr_strdup(args->server_name);
glb_policy->cc_factory = args->client_channel_factory;
+ glb_policy->args = grpc_channel_args_copy(args->additional_args);
GPR_ASSERT(glb_policy->cc_factory != NULL);
/* construct a target from the addresses in args, given in the form
@@ -650,6 +654,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
GPR_ASSERT(glb_policy->pending_picks == NULL);
GPR_ASSERT(glb_policy->pending_pings == NULL);
gpr_free((void *)glb_policy->server_name);
+ grpc_channel_args_destroy(glb_policy->args);
grpc_channel_destroy(glb_policy->lb_channel);
glb_policy->lb_channel = NULL;
grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker);
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index 6533327343..da20b9281b 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -466,6 +466,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
sc_args.addr =
(struct sockaddr *)(&args->addresses->addresses[i].address.addr);
sc_args.addr_len = args->addresses->addresses[i].address.len;
+ sc_args.args = args->additional_args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 9bd3f9da24..b8f4f67aeb 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -629,6 +629,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
sc_args.addr =
(struct sockaddr *)(&args->addresses->addresses[i].address.addr);
sc_args.addr_len = args->addresses->addresses[i].address.len;
+ sc_args.args = args->additional_args;
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(
exec_ctx, args->client_channel_factory, &sc_args);
diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
index 5a7a32d7cb..28dd2569e8 100644
--- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
+++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c
@@ -33,6 +33,7 @@
#include <stdbool.h>
#include <stdio.h>
+#include <stdlib.h>
#include <string.h>
#include <grpc/support/alloc.h>
@@ -42,6 +43,7 @@
#include "src/core/ext/client_config/parse_address.h"
#include "src/core/ext/client_config/resolver_registry.h"
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/support/string.h"
@@ -119,7 +121,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
*r->target_result = grpc_resolver_result_create(
r->target_name,
grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */),
- NULL /* lb_policy_name */, NULL);
+ NULL /* lb_policy_name */, NULL /* lb_policy_args */);
grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL);
r->next_completion = NULL;
}
@@ -128,8 +130,8 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
- grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */);
gpr_free(r->target_name);
+ grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */);
gpr_free(r);
}
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c
index 3a56b1ff20..2957d2c818 100644
--- a/src/core/lib/channel/channel_args.c
+++ b/src/core/lib/channel/channel_args.c
@@ -272,6 +272,18 @@ int grpc_channel_args_compare(const grpc_channel_args *a,
return 0;
}
+const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args,
+ const char *name) {
+ if (args != NULL) {
+ for (size_t i = 0; i < args->num_args; ++i) {
+ if (strcmp(args->args[i].key, name) == 0) {
+ return &args->args[i];
+ }
+ }
+ }
+ return NULL;
+}
+
int grpc_channel_arg_get_integer(grpc_arg *arg, grpc_integer_options options) {
if (arg->type != GRPC_ARG_INTEGER) {
gpr_log(GPR_ERROR, "%s ignored: it must be an integer", arg->key);
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index 586a296d1f..a80340c0fa 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -89,6 +89,10 @@ uint32_t grpc_channel_args_compression_algorithm_get_states(
int grpc_channel_args_compare(const grpc_channel_args *a,
const grpc_channel_args *b);
+/** Returns the value of argument \a name from \a args, or NULL if not found. */
+const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args,
+ const char *name);
+
typedef struct grpc_integer_options {
int default_value; // Return this if value is outside of expected bounds.
int min_value;
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 57d34d9e9a..2c5367901d 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -162,7 +162,7 @@ grpc_error *grpc_call_stack_init(
grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
grpc_call_context_element *context, const void *transport_server_data,
- gpr_timespec deadline, grpc_call_stack *call_stack) {
+ grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
grpc_call_element_args args;
size_t count = channel_stack->count;
@@ -179,10 +179,12 @@ grpc_error *grpc_call_stack_init(
/* init per-filter data */
grpc_error *first_error = GRPC_ERROR_NONE;
+ args.start_time = gpr_now(GPR_CLOCK_MONOTONIC);
for (i = 0; i < count; i++) {
args.call_stack = call_stack;
args.server_transport_data = transport_server_data;
args.context = context;
+ args.path = path;
args.deadline = deadline;
call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data;
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index 1cfe2885d8..27f3be7b29 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -74,6 +74,8 @@ typedef struct {
grpc_call_stack *call_stack;
const void *server_transport_data;
grpc_call_context_element *context;
+ grpc_mdstr *path;
+ gpr_timespec start_time;
gpr_timespec deadline;
} grpc_call_element_args;
@@ -225,7 +227,7 @@ grpc_error *grpc_call_stack_init(
grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
grpc_call_context_element *context, const void *transport_server_data,
- gpr_timespec deadline, grpc_call_stack *call_stack);
+ grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack);
/* Set a pollset or a pollset_set for a call stack: must occur before the first
* op is started */
void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c
index 079b98a2f8..d2ea5250f6 100644
--- a/src/core/lib/channel/deadline_filter.c
+++ b/src/core/lib/channel/deadline_filter.c
@@ -64,30 +64,49 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
}
// Starts the deadline timer.
-static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem,
- gpr_timespec deadline) {
+static void start_timer_if_needed_locked(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ gpr_timespec deadline) {
grpc_deadline_state* deadline_state = elem->call_data;
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
- if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
+ // Note: We do not start the timer if there is already a timer
+ // pending. This should be okay, because this is only called from two
+ // functions exported by this module: grpc_deadline_state_start(), which
+ // starts the initial timer, and grpc_deadline_state_reset(), which
+ // cancels any pre-existing timer before starting a new one. In
+ // particular, we want to ensure that if grpc_deadline_state_start()
+ // winds up trying to start the timer after grpc_deadline_state_reset()
+ // has already done so, we ignore the value from the former.
+ if (!deadline_state->timer_pending &&
+ gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
// Take a reference to the call stack, to be owned by the timer.
GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
- gpr_mu_lock(&deadline_state->timer_mu);
deadline_state->timer_pending = true;
grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, timer_callback,
elem, gpr_now(GPR_CLOCK_MONOTONIC));
- gpr_mu_unlock(&deadline_state->timer_mu);
}
}
+static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ gpr_timespec deadline) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ gpr_mu_lock(&deadline_state->timer_mu);
+ start_timer_if_needed_locked(exec_ctx, elem, deadline);
+ gpr_mu_unlock(&deadline_state->timer_mu);
+}
// Cancels the deadline timer.
-static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
- grpc_deadline_state* deadline_state) {
- gpr_mu_lock(&deadline_state->timer_mu);
+static void cancel_timer_if_needed_locked(grpc_exec_ctx* exec_ctx,
+ grpc_deadline_state* deadline_state) {
if (deadline_state->timer_pending) {
grpc_timer_cancel(exec_ctx, &deadline_state->timer);
deadline_state->timer_pending = false;
}
+}
+static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
+ grpc_deadline_state* deadline_state) {
+ gpr_mu_lock(&deadline_state->timer_mu);
+ cancel_timer_if_needed_locked(exec_ctx, deadline_state);
gpr_mu_unlock(&deadline_state->timer_mu);
}
@@ -108,6 +127,21 @@ static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
op->on_complete = &deadline_state->on_complete;
}
+void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_call_stack* call_stack) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ memset(deadline_state, 0, sizeof(*deadline_state));
+ deadline_state->call_stack = call_stack;
+ gpr_mu_init(&deadline_state->timer_mu);
+}
+
+void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem) {
+ grpc_deadline_state* deadline_state = elem->call_data;
+ cancel_timer_if_needed(exec_ctx, deadline_state);
+ gpr_mu_destroy(&deadline_state->timer_mu);
+}
+
// Callback and associated state for starting the timer after call stack
// initialization has been completed.
struct start_timer_after_init_state {
@@ -122,16 +156,11 @@ static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
gpr_free(state);
}
-void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- grpc_call_element_args* args) {
- grpc_deadline_state* deadline_state = elem->call_data;
- memset(deadline_state, 0, sizeof(*deadline_state));
- deadline_state->call_stack = args->call_stack;
- gpr_mu_init(&deadline_state->timer_mu);
+void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ gpr_timespec deadline) {
// Deadline will always be infinite on servers, so the timer will only be
// set on clients with a finite deadline.
- const gpr_timespec deadline =
- gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
+ deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
// When the deadline passes, we indicate the failure by sending down
// an op with cancel_error set. However, we can't send down any ops
@@ -148,11 +177,13 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
}
}
-void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem) {
+void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ gpr_timespec new_deadline) {
grpc_deadline_state* deadline_state = elem->call_data;
- cancel_timer_if_needed(exec_ctx, deadline_state);
- gpr_mu_destroy(&deadline_state->timer_mu);
+ gpr_mu_lock(&deadline_state->timer_mu);
+ cancel_timer_if_needed_locked(exec_ctx, deadline_state);
+ start_timer_if_needed_locked(exec_ctx, elem, new_deadline);
+ gpr_mu_unlock(&deadline_state->timer_mu);
}
void grpc_deadline_state_client_start_transport_stream_op(
@@ -209,7 +240,8 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element_args* args) {
// Note: size of call data is different between client and server.
memset(elem->call_data, 0, elem->filter->sizeof_call_data);
- grpc_deadline_state_init(exec_ctx, elem, args);
+ grpc_deadline_state_init(exec_ctx, elem, args->call_stack);
+ grpc_deadline_state_start(exec_ctx, elem, args->deadline);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h
index 685df87761..716a852565 100644
--- a/src/core/lib/channel/deadline_filter.h
+++ b/src/core/lib/channel/deadline_filter.h
@@ -54,18 +54,37 @@ typedef struct grpc_deadline_state {
grpc_closure* next_on_complete;
} grpc_deadline_state;
-// To be used in a filter's init_call_elem(), destroy_call_elem(), and
-// start_transport_stream_op() methods to enforce call deadlines.
//
-// REQUIRES: The first field in elem->call_data is a grpc_deadline_state.
+// NOTE: All of these functions require that the first field in
+// elem->call_data is a grpc_deadline_state.
//
-// For grpc_deadline_state_client_start_transport_stream_op(), it is the
-// caller's responsibility to chain to the next filter if necessary
-// after the function returns.
+
void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- grpc_call_element_args* args);
+ grpc_call_stack* call_stack);
void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem);
+
+// Starts the timer with the specified deadline.
+// Should be called from the filter's init_call_elem() method.
+void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ gpr_timespec deadline);
+
+// Cancels the existing timer and starts a new one with new_deadline.
+//
+// Note: It is generally safe to call this with an earlier deadline
+// value than the current one, but not the reverse. No checks are done
+// to ensure that the timer callback is not invoked while it is in the
+// process of being reset, which means that attempting to increase the
+// deadline may result in the timer being called twice.
+void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ gpr_timespec new_deadline);
+
+// To be called from the client-side filter's start_transport_stream_op()
+// method. Ensures that the deadline timer is cancelled when the call
+// is completed.
+//
+// Note: It is the caller's responsibility to chain to the next filter if
+// necessary after this function returns.
void grpc_deadline_state_client_start_transport_stream_op(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_transport_stream_op* op);
diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c
index 8f9fb17a31..0d759887bc 100644
--- a/src/core/lib/channel/handshaker.c
+++ b/src/core/lib/channel/handshaker.c
@@ -183,7 +183,7 @@ void grpc_handshake_manager_do_handshake(
gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor,
grpc_handshaker_done_cb cb, void* user_data) {
grpc_channel_args* args_copy = grpc_channel_args_copy(args);
- gpr_slice_buffer* read_buffer = malloc(sizeof(*read_buffer));
+ gpr_slice_buffer* read_buffer = gpr_malloc(sizeof(*read_buffer));
gpr_slice_buffer_init(read_buffer);
if (mgr->count == 0) {
// No handshakers registered, so we just immediately call the done
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
index f067a3a51c..1382f19945 100644
--- a/src/core/lib/channel/message_size_filter.c
+++ b/src/core/lib/channel/message_size_filter.c
@@ -38,6 +38,7 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include "src/core/ext/client_config/method_config.h"
#include "src/core/lib/channel/channel_args.h"
#define DEFAULT_MAX_SEND_MESSAGE_LENGTH -1 // Unlimited.
@@ -45,6 +46,8 @@
#define DEFAULT_MAX_RECV_MESSAGE_LENGTH (4 * 1024 * 1024)
typedef struct call_data {
+ int max_send_size;
+ int max_recv_size;
// Receive closures are chained: we inject this closure as the
// recv_message_ready up-call on transport_stream_op, and remember to
// call our next_recv_message_ready member after handling it.
@@ -58,6 +61,8 @@ typedef struct call_data {
typedef struct channel_data {
int max_send_size;
int max_recv_size;
+ // Method config table.
+ grpc_method_config_table* method_config_table;
} channel_data;
// Callback invoked when we receive a message. Here we check the max
@@ -66,13 +71,12 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
grpc_error* error) {
grpc_call_element* elem = user_data;
call_data* calld = elem->call_data;
- channel_data* chand = elem->channel_data;
- if (*calld->recv_message != NULL && chand->max_recv_size >= 0 &&
- (*calld->recv_message)->length > (size_t)chand->max_recv_size) {
+ if (*calld->recv_message != NULL && calld->max_recv_size >= 0 &&
+ (*calld->recv_message)->length > (size_t)calld->max_recv_size) {
char* message_string;
gpr_asprintf(&message_string,
"Received message larger than max (%u vs. %d)",
- (*calld->recv_message)->length, chand->max_recv_size);
+ (*calld->recv_message)->length, calld->max_recv_size);
grpc_error* new_error = grpc_error_set_int(
GRPC_ERROR_CREATE(message_string), GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_INVALID_ARGUMENT);
@@ -93,13 +97,12 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_transport_stream_op* op) {
call_data* calld = elem->call_data;
- channel_data* chand = elem->channel_data;
// Check max send message size.
- if (op->send_message != NULL && chand->max_send_size >= 0 &&
- op->send_message->length > (size_t)chand->max_send_size) {
+ if (op->send_message != NULL && calld->max_send_size >= 0 &&
+ op->send_message->length > (size_t)calld->max_send_size) {
char* message_string;
gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)",
- op->send_message->length, chand->max_send_size);
+ op->send_message->length, calld->max_send_size);
gpr_slice message = gpr_slice_from_copied_string(message_string);
gpr_free(message_string);
grpc_call_element_send_close_with_message(
@@ -119,9 +122,37 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_call_element_args* args) {
+ channel_data* chand = elem->channel_data;
call_data* calld = elem->call_data;
calld->next_recv_message_ready = NULL;
grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem);
+ // Get max sizes from channel data, then merge in per-method config values.
+ // Note: Per-method config is only available on the client, so we
+ // apply the max request size to the send limit and the max response
+ // size to the receive limit.
+ calld->max_send_size = chand->max_send_size;
+ calld->max_recv_size = chand->max_recv_size;
+ if (chand->method_config_table != NULL) {
+ grpc_method_config* method_config =
+ grpc_method_config_table_get_method_config(chand->method_config_table,
+ args->path);
+ if (method_config != NULL) {
+ const int32_t* max_request_message_bytes =
+ grpc_method_config_get_max_request_message_bytes(method_config);
+ if (max_request_message_bytes != NULL &&
+ (*max_request_message_bytes < calld->max_send_size ||
+ calld->max_send_size < 0)) {
+ calld->max_send_size = *max_request_message_bytes;
+ }
+ const int32_t* max_response_message_bytes =
+ grpc_method_config_get_max_response_message_bytes(method_config);
+ if (max_response_message_bytes != NULL &&
+ (*max_response_message_bytes < calld->max_recv_size ||
+ calld->max_recv_size < 0)) {
+ calld->max_recv_size = *max_response_message_bytes;
+ }
+ }
+ }
return GRPC_ERROR_NONE;
}
@@ -155,11 +186,22 @@ static void init_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_arg_get_integer(&args->channel_args->args[i], options);
}
}
+ // Get method config table from channel args.
+ const grpc_arg* channel_arg =
+ grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG);
+ if (channel_arg != NULL) {
+ GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
+ chand->method_config_table = grpc_method_config_table_ref(
+ (grpc_method_config_table*)channel_arg->value.pointer.p);
+ }
}
// Destructor for channel_data.
static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem) {}
+ grpc_channel_element* elem) {
+ channel_data* chand = elem->channel_data;
+ grpc_method_config_table_unref(chand->method_config_table);
+}
const grpc_channel_filter grpc_message_size_filter = {
start_transport_stream_op,
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 351b069613..27e966c18c 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -985,8 +985,15 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (errno != EINTR) {
work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
}
+
for (i = 2; i < pfd_count; i++) {
- fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
+ if (watchers[i].fd == NULL) {
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL);
+ } else {
+ // Wake up all the file descriptors, if we have an invalid one
+ // we can identify it on the next pollset_work()
+ fd_end_poll(exec_ctx, &watchers[i], 1, 1, pollset);
+ }
}
} else if (r == 0) {
for (i = 2; i < pfd_count; i++) {
diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h
index a825d2a28b..5a9a177963 100644
--- a/src/core/lib/iomgr/timer.h
+++ b/src/core/lib/iomgr/timer.h
@@ -49,11 +49,11 @@ typedef struct grpc_timer {
} grpc_timer;
/* Initialize *timer. When expired or canceled, timer_cb will be called with
- *timer_cb_arg and status to indicate if it expired (SUCCESS) or was
- canceled (CANCELLED). timer_cb is guaranteed to be called exactly once,
- and application code should check the status to determine how it was
- invoked. The application callback is also responsible for maintaining
- information about when to free up any user-level state. */
+ *timer_cb_arg and error set to indicate if it expired (GRPC_ERROR_NONE) or
+ was canceled (GRPC_ERROR_CANCELLED). timer_cb is guaranteed to be called
+ exactly once, and application code should check the error to determine
+ how it was invoked. The application callback is also responsible for
+ maintaining information about when to free up any user-level state. */
void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_timespec deadline, grpc_iomgr_cb_func timer_cb,
void *timer_cb_arg, gpr_timespec now);
@@ -74,8 +74,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
In all of these cases, the cancellation is still considered successful.
They are essentially distinguished in that the timer_cb will be run
- exactly once from either the cancellation (with status CANCELLED)
- or from the activation (with status SUCCESS)
+ exactly once from either the cancellation (with error GRPC_ERROR_CANCELLED)
+ or from the activation (with error GRPC_ERROR_NONE).
Note carefully that the callback function MAY occur in the same callstack
as grpc_timer_cancel. It's expected that most timers will be cancelled (their
@@ -83,14 +83,13 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
that cancellation costs as little as possible. Making callbacks run inline
matches this aim.
- Requires: cancel() must happen after add() on a given timer */
+ Requires: cancel() must happen after init() on a given timer */
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer);
/* iomgr internal api for dealing with timers */
/* Check for timers to be run, and run them.
Return true if timer callbacks were executed.
- Drops drop_mu if it is non-null before executing callbacks.
If next is non-null, TRY to update *next with the next running timer
IF that timer occurs before *next current value.
*next is never guaranteed to be updated on any given execution; however,
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index c4effa9a05..30559304c6 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -241,11 +241,15 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args,
/* Always support no compression */
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
call->is_client = args->server_transport_data == NULL;
+ grpc_mdstr *path = NULL;
if (call->is_client) {
GPR_ASSERT(args->add_initial_metadata_count <
MAX_SEND_EXTRA_METADATA_COUNT);
for (i = 0; i < args->add_initial_metadata_count; i++) {
call->send_extra_metadata[i].md = args->add_initial_metadata[i];
+ if (args->add_initial_metadata[i]->key == GRPC_MDSTR_PATH) {
+ path = GRPC_MDSTR_REF(args->add_initial_metadata[i]->value);
+ }
}
call->send_extra_metadata_count = (int)args->add_initial_metadata_count;
} else {
@@ -306,9 +310,10 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args,
GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
/* initial refcount dropped by grpc_call_destroy */
- grpc_error *error = grpc_call_stack_init(
- &exec_ctx, channel_stack, 1, destroy_call, call, call->context,
- args->server_transport_data, send_deadline, CALL_STACK_FROM_CALL(call));
+ grpc_error *error =
+ grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call,
+ call->context, args->server_transport_data, path,
+ send_deadline, CALL_STACK_FROM_CALL(call));
if (error != GRPC_ERROR_NONE) {
grpc_status_code status;
const char *error_str;
@@ -332,6 +337,8 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args,
&exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
}
+ if (path != NULL) GRPC_MDSTR_UNREF(path);
+
grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_call_create", 0);
return error;
diff --git a/src/core/lib/transport/mdstr_hash_table.c b/src/core/lib/transport/mdstr_hash_table.c
new file mode 100644
index 0000000000..4be0536dd7
--- /dev/null
+++ b/src/core/lib/transport/mdstr_hash_table.c
@@ -0,0 +1,142 @@
+//
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#include "src/core/lib/transport/mdstr_hash_table.h"
+
+#include <stdbool.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/transport/metadata.h"
+
+struct grpc_mdstr_hash_table {
+ gpr_refcount refs;
+ size_t num_entries;
+ grpc_mdstr_hash_table_entry* entries;
+};
+
+// Helper function for insert and get operations that performs quadratic
+// probing (https://en.wikipedia.org/wiki/Quadratic_probing).
+static size_t grpc_mdstr_hash_table_find_index(
+ const grpc_mdstr_hash_table* table, const grpc_mdstr* key,
+ bool find_empty) {
+ for (size_t i = 0; i < table->num_entries; ++i) {
+ const size_t idx = (key->hash + i * i) % table->num_entries;
+ if (table->entries[idx].key == NULL)
+ return find_empty ? idx : table->num_entries;
+ if (table->entries[idx].key == key) return idx;
+ }
+ return table->num_entries; // Not found.
+}
+
+static void grpc_mdstr_hash_table_add(
+ grpc_mdstr_hash_table* table, grpc_mdstr* key, void* value,
+ const grpc_mdstr_hash_table_vtable* vtable) {
+ GPR_ASSERT(value != NULL);
+ const size_t idx =
+ grpc_mdstr_hash_table_find_index(table, key, true /* find_empty */);
+ GPR_ASSERT(idx != table->num_entries); // Table should never be full.
+ grpc_mdstr_hash_table_entry* entry = &table->entries[idx];
+ entry->key = GRPC_MDSTR_REF(key);
+ entry->value = vtable->copy_value(value);
+ entry->vtable = vtable;
+}
+
+grpc_mdstr_hash_table* grpc_mdstr_hash_table_create(
+ size_t num_entries, grpc_mdstr_hash_table_entry* entries) {
+ grpc_mdstr_hash_table* table = gpr_malloc(sizeof(*table));
+ memset(table, 0, sizeof(*table));
+ gpr_ref_init(&table->refs, 1);
+ // Quadratic probing gets best performance when the table is no more
+ // than half full.
+ table->num_entries = num_entries * 2;
+ const size_t entry_size =
+ sizeof(grpc_mdstr_hash_table_entry) * table->num_entries;
+ table->entries = gpr_malloc(entry_size);
+ memset(table->entries, 0, entry_size);
+ for (size_t i = 0; i < num_entries; ++i) {
+ grpc_mdstr_hash_table_entry* entry = &entries[i];
+ grpc_mdstr_hash_table_add(table, entry->key, entry->value, entry->vtable);
+ }
+ return table;
+}
+
+grpc_mdstr_hash_table* grpc_mdstr_hash_table_ref(grpc_mdstr_hash_table* table) {
+ if (table != NULL) gpr_ref(&table->refs);
+ return table;
+}
+
+int grpc_mdstr_hash_table_unref(grpc_mdstr_hash_table* table) {
+ if (table != NULL && gpr_unref(&table->refs)) {
+ for (size_t i = 0; i < table->num_entries; ++i) {
+ grpc_mdstr_hash_table_entry* entry = &table->entries[i];
+ if (entry->key != NULL) {
+ GRPC_MDSTR_UNREF(entry->key);
+ entry->vtable->destroy_value(entry->value);
+ }
+ }
+ gpr_free(table->entries);
+ gpr_free(table);
+ return 1;
+ }
+ return 0;
+}
+
+void* grpc_mdstr_hash_table_get(const grpc_mdstr_hash_table* table,
+ const grpc_mdstr* key) {
+ const size_t idx =
+ grpc_mdstr_hash_table_find_index(table, key, false /* find_empty */);
+ if (idx == table->num_entries) return NULL; // Not found.
+ return table->entries[idx].value;
+}
+
+int grpc_mdstr_hash_table_cmp(const grpc_mdstr_hash_table* table1,
+ const grpc_mdstr_hash_table* table2) {
+ // Compare by num_entries.
+ if (table1->num_entries < table2->num_entries) return -1;
+ if (table1->num_entries > table2->num_entries) return 1;
+ for (size_t i = 0; i < table1->num_entries; ++i) {
+ grpc_mdstr_hash_table_entry* e1 = &table1->entries[i];
+ grpc_mdstr_hash_table_entry* e2 = &table2->entries[i];
+ // Compare keys by hash value.
+ if (e1->key->hash < e2->key->hash) return -1;
+ if (e1->key->hash > e2->key->hash) return 1;
+ // Compare by vtable (pointer equality).
+ if (e1->vtable < e2->vtable) return -1;
+ if (e1->vtable > e2->vtable) return 1;
+ // Compare values via vtable.
+ const int value_result = e1->vtable->compare_value(e1->value, e2->value);
+ if (value_result != 0) return value_result;
+ }
+ return 0;
+}
diff --git a/src/core/lib/transport/mdstr_hash_table.h b/src/core/lib/transport/mdstr_hash_table.h
new file mode 100644
index 0000000000..52e5b023db
--- /dev/null
+++ b/src/core/lib/transport/mdstr_hash_table.h
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef GRPC_CORE_LIB_TRANSPORT_MDSTR_HASH_TABLE_H
+#define GRPC_CORE_LIB_TRANSPORT_MDSTR_HASH_TABLE_H
+
+#include "src/core/lib/transport/metadata.h"
+
+/** Hash table implementation.
+ *
+ * This implementation uses open addressing
+ * (https://en.wikipedia.org/wiki/Open_addressing) with quadratic
+ * probing (https://en.wikipedia.org/wiki/Quadratic_probing).
+ *
+ * The keys are \a grpc_mdstr objects. The values are arbitrary pointers
+ * with a common vtable.
+ *
+ * Hash tables are intentionally immutable, to avoid the need for locking.
+ */
+
+typedef struct grpc_mdstr_hash_table grpc_mdstr_hash_table;
+
+typedef struct grpc_mdstr_hash_table_vtable {
+ void (*destroy_value)(void* value);
+ void* (*copy_value)(void* value);
+ int (*compare_value)(void* value1, void* value2);
+} grpc_mdstr_hash_table_vtable;
+
+typedef struct grpc_mdstr_hash_table_entry {
+ grpc_mdstr* key;
+ void* value; /* Must not be NULL. */
+ const grpc_mdstr_hash_table_vtable* vtable;
+} grpc_mdstr_hash_table_entry;
+
+/** Creates a new hash table of containing \a entries, which is an array
+ of length \a num_entries.
+ Creates its own copy of all keys and values from \a entries. */
+grpc_mdstr_hash_table* grpc_mdstr_hash_table_create(
+ size_t num_entries, grpc_mdstr_hash_table_entry* entries);
+
+grpc_mdstr_hash_table* grpc_mdstr_hash_table_ref(grpc_mdstr_hash_table* table);
+/** Returns 1 when \a table is destroyed. */
+int grpc_mdstr_hash_table_unref(grpc_mdstr_hash_table* table);
+
+/** Returns the value from \a table associated with \a key.
+ Returns NULL if \a key is not found. */
+void* grpc_mdstr_hash_table_get(const grpc_mdstr_hash_table* table,
+ const grpc_mdstr* key);
+
+/** Compares two hash tables.
+ The sort order is stable but undefined. */
+int grpc_mdstr_hash_table_cmp(const grpc_mdstr_hash_table* table1,
+ const grpc_mdstr_hash_table* table2);
+
+#endif /* GRPC_CORE_LIB_TRANSPORT_MDSTR_HASH_TABLE_H */