aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2016-11-29 14:24:42 -0800
committerGravatar Mark D. Roth <roth@google.com>2016-11-29 14:24:42 -0800
commit6c3d040ebf01605082d026fd721f87faacf966b5 (patch)
treead7be6a55579061dbdbf679cba2382039e1996e6 /src/core
parent95fcb58fc1c3f71e42dd1a7a618d31bf2ae6e43e (diff)
parent0610434185cd528b72e5be936075cae4c4a07a8e (diff)
Merge branch 'security_handshaker1' into security_handshaker2
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/client_channel/client_channel.c120
-rw-r--r--src/core/ext/client_channel/http_connect_handshaker.c91
-rw-r--r--src/core/ext/client_channel/lb_policy_registry.c8
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c209
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.c6
-rw-r--r--src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c6
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c16
-rw-r--r--src/core/lib/channel/handshaker.c42
-rw-r--r--src/core/lib/channel/handshaker.h21
-rw-r--r--src/core/lib/channel/http_client_filter.c66
-rw-r--r--src/core/lib/channel/http_server_filter.c28
-rw-r--r--src/core/lib/channel/message_size_filter.c77
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c31
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c29
-rw-r--r--src/core/lib/iomgr/ev_posix.h1
-rw-r--r--src/core/lib/iomgr/iomgr.c1
-rw-r--r--src/core/lib/iomgr/resource_quota.c30
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c7
-rw-r--r--src/core/lib/iomgr/tcp_windows.c2
-rw-r--r--src/core/lib/json/json.c6
-rw-r--r--src/core/lib/json/json.h22
-rw-r--r--src/core/lib/slice/slice.c8
-rw-r--r--src/core/lib/support/string.c9
-rw-r--r--src/core/lib/support/string.h3
-rw-r--r--src/core/lib/support/subprocess_posix.c4
-rw-r--r--src/core/lib/surface/call.c5
-rw-r--r--src/core/lib/surface/server.c11
-rw-r--r--src/core/lib/transport/mdstr_hash_table.c42
-rw-r--r--src/core/lib/transport/mdstr_hash_table.h18
-rw-r--r--src/core/lib/transport/method_config.c340
-rw-r--r--src/core/lib/transport/method_config.h136
-rw-r--r--src/core/lib/transport/service_config.c249
-rw-r--r--src/core/lib/transport/service_config.h70
33 files changed, 924 insertions, 790 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index b66fed4b88..1fcff4388a 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -56,7 +56,7 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h"
#include "src/core/lib/transport/metadata_batch.h"
-#include "src/core/lib/transport/method_config.h"
+#include "src/core/lib/transport/service_config.h"
#include "src/core/lib/transport/static_metadata.h"
/* Client channel implementation */
@@ -82,30 +82,61 @@ static void *method_parameters_copy(void *value) {
return new_value;
}
-static int method_parameters_cmp(void *value1, void *value2) {
- const method_parameters *v1 = value1;
- const method_parameters *v2 = value2;
- const int retval = gpr_time_cmp(v1->timeout, v2->timeout);
- if (retval != 0) return retval;
- if (v1->wait_for_ready > v2->wait_for_ready) return 1;
- if (v1->wait_for_ready < v2->wait_for_ready) return -1;
- return 0;
-}
-
static const grpc_mdstr_hash_table_vtable method_parameters_vtable = {
- gpr_free, method_parameters_copy, method_parameters_cmp};
-
-static void *method_config_convert_value(
- const grpc_method_config *method_config) {
+ gpr_free, method_parameters_copy};
+
+static void *method_parameters_create_from_json(const grpc_json *json) {
+ wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET;
+ gpr_timespec timeout = {0, 0, GPR_TIMESPAN};
+ for (grpc_json *field = json->child; field != NULL; field = field->next) {
+ if (field->key == NULL) continue;
+ if (strcmp(field->key, "waitForReady") == 0) {
+ if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate.
+ if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
+ return NULL;
+ }
+ wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE
+ : WAIT_FOR_READY_FALSE;
+ } else if (strcmp(field->key, "timeout") == 0) {
+ if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate.
+ if (field->type != GRPC_JSON_STRING) return NULL;
+ size_t len = strlen(field->value);
+ if (field->value[len - 1] != 's') return NULL;
+ char *buf = gpr_strdup(field->value);
+ buf[len - 1] = '\0'; // Remove trailing 's'.
+ char *decimal_point = strchr(buf, '.');
+ if (decimal_point != NULL) {
+ *decimal_point = '\0';
+ timeout.tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1);
+ if (timeout.tv_nsec == -1) {
+ gpr_free(buf);
+ return NULL;
+ }
+ // There should always be exactly 3, 6, or 9 fractional digits.
+ int multiplier = 1;
+ switch (strlen(decimal_point + 1)) {
+ case 9:
+ break;
+ case 6:
+ multiplier *= 1000;
+ break;
+ case 3:
+ multiplier *= 1000000;
+ break;
+ default: // Unsupported number of digits.
+ gpr_free(buf);
+ return NULL;
+ }
+ timeout.tv_nsec *= multiplier;
+ }
+ timeout.tv_sec = gpr_parse_nonnegative_int(buf);
+ if (timeout.tv_sec == -1) return NULL;
+ gpr_free(buf);
+ }
+ }
method_parameters *value = gpr_malloc(sizeof(method_parameters));
- const gpr_timespec *timeout = grpc_method_config_get_timeout(method_config);
- value->timeout = timeout != NULL ? *timeout : gpr_time_0(GPR_TIMESPAN);
- const bool *wait_for_ready =
- grpc_method_config_get_wait_for_ready(method_config);
- value->wait_for_ready =
- wait_for_ready == NULL
- ? WAIT_FOR_READY_UNSET
- : (wait_for_ready ? WAIT_FOR_READY_TRUE : WAIT_FOR_READY_FALSE);
+ value->timeout = timeout;
+ value->wait_for_ready = wait_for_ready;
return value;
}
@@ -126,6 +157,8 @@ typedef struct client_channel_channel_data {
/** currently active load balancer */
char *lb_policy_name;
grpc_lb_policy *lb_policy;
+ /** service config in JSON form */
+ char *service_config_json;
/** maps method names to method_parameters structs */
grpc_mdstr_hash_table *method_params_table;
/** incoming resolver result - set by resolver.next() */
@@ -232,15 +265,12 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
bool exit_idle = false;
grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
+ char *service_config_json = NULL;
if (chand->resolver_result != NULL) {
- grpc_lb_policy_args lb_policy_args;
- lb_policy_args.args = chand->resolver_result;
- lb_policy_args.client_channel_factory = chand->client_channel_factory;
-
// Find LB policy name.
const grpc_arg *channel_arg =
- grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_LB_POLICY_NAME);
+ grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
if (channel_arg != NULL) {
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
lb_policy_name = channel_arg->value.string;
@@ -249,7 +279,7 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
// assume that we should use the grpclb policy, regardless of what the
// resolver actually specified.
channel_arg =
- grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_LB_ADDRESSES);
+ grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
if (channel_arg != NULL) {
GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
grpc_lb_addresses *addresses = channel_arg->value.pointer.p;
@@ -274,7 +304,10 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
// Use pick_first if nothing was specified and we didn't select grpclb
// above.
if (lb_policy_name == NULL) lb_policy_name = "pick_first";
-
+ // Instantiate LB policy.
+ grpc_lb_policy_args lb_policy_args;
+ lb_policy_args.args = chand->resolver_result;
+ lb_policy_args.client_channel_factory = chand->client_channel_factory;
lb_policy =
grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
if (lb_policy != NULL) {
@@ -283,13 +316,20 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
state =
grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error);
}
+ // Find service config.
channel_arg =
- grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_SERVICE_CONFIG);
+ grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
if (channel_arg != NULL) {
- GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
- method_params_table = grpc_method_config_table_convert(
- (grpc_method_config_table *)channel_arg->value.pointer.p,
- method_config_convert_value, &method_parameters_vtable);
+ GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
+ service_config_json = gpr_strdup(channel_arg->value.string);
+ grpc_service_config *service_config =
+ grpc_service_config_create(service_config_json);
+ if (service_config != NULL) {
+ method_params_table = grpc_service_config_create_method_config_table(
+ service_config, method_parameters_create_from_json,
+ &method_parameters_vtable);
+ grpc_service_config_destroy(service_config);
+ }
}
// Before we clean up, save a copy of lb_policy_name, since it might
// be pointing to data inside chand->resolver_result.
@@ -311,6 +351,10 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
+ if (service_config_json != NULL) {
+ gpr_free(chand->service_config_json);
+ chand->service_config_json = service_config_json;
+ }
if (chand->method_params_table != NULL) {
grpc_mdstr_hash_table_unref(chand->method_params_table);
}
@@ -446,6 +490,11 @@ static void cc_get_channel_info(grpc_exec_ctx *exec_ctx,
? NULL
: gpr_strdup(chand->lb_policy_name);
}
+ if (info->service_config_json != NULL) {
+ *info->service_config_json = chand->service_config_json == NULL
+ ? NULL
+ : gpr_strdup(chand->service_config_json);
+ }
gpr_mu_unlock(&chand->mu);
}
@@ -489,6 +538,7 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
}
gpr_free(chand->lb_policy_name);
+ gpr_free(chand->service_config_json);
if (chand->method_params_table != NULL) {
grpc_mdstr_hash_table_unref(chand->method_params_table);
}
diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c
index c9861a5aed..61fec5cba9 100644
--- a/src/core/ext/client_channel/http_connect_handshaker.c
+++ b/src/core/ext/client_channel/http_connect_handshaker.c
@@ -41,6 +41,7 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/client_channel/uri_parser.h"
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/http/format_request.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/support/env.h"
@@ -55,9 +56,12 @@ typedef struct http_connect_handshaker {
gpr_refcount refcount;
gpr_mu mu;
+ bool shutdown;
+ // Endpoint and read buffer to destroy after a shutdown.
+ grpc_endpoint* endpoint_to_destroy;
+ grpc_slice_buffer* read_buffer_to_destroy;
+
// State saved while performing the handshake.
- // args will be NULL when either there is no handshake in progress or
- // when the handshaker is shutting down.
grpc_handshaker_args* args;
grpc_closure* on_handshake_done;
@@ -70,9 +74,17 @@ typedef struct http_connect_handshaker {
} http_connect_handshaker;
// Unref and clean up handshaker.
-static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) {
+static void http_connect_handshaker_unref(grpc_exec_ctx* exec_ctx,
+ http_connect_handshaker* handshaker) {
if (gpr_unref(&handshaker->refcount)) {
gpr_mu_destroy(&handshaker->mu);
+ if (handshaker->endpoint_to_destroy != NULL) {
+ grpc_endpoint_destroy(exec_ctx, handshaker->endpoint_to_destroy);
+ }
+ if (handshaker->read_buffer_to_destroy != NULL) {
+ grpc_slice_buffer_destroy(handshaker->read_buffer_to_destroy);
+ gpr_free(handshaker->read_buffer_to_destroy);
+ }
gpr_free(handshaker->proxy_server);
gpr_free(handshaker->server_name);
grpc_slice_buffer_destroy(&handshaker->write_buffer);
@@ -82,18 +94,47 @@ static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) {
}
}
+// Set args fields to NULL, saving the endpoint and read buffer for
+// later destruction.
+static void cleanup_args_for_failure_locked(
+ http_connect_handshaker* handshaker) {
+ handshaker->endpoint_to_destroy = handshaker->args->endpoint;
+ handshaker->args->endpoint = NULL;
+ handshaker->read_buffer_to_destroy = handshaker->args->read_buffer;
+ handshaker->args->read_buffer = NULL;
+ grpc_channel_args_destroy(handshaker->args->args);
+ handshaker->args->args = NULL;
+}
+
// Callback invoked when finished writing HTTP CONNECT request.
static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
http_connect_handshaker* handshaker = arg;
gpr_mu_lock(&handshaker->mu);
- if (error != GRPC_ERROR_NONE || handshaker->args == NULL) {
- // If the write failed, invoke the callback immediately with the error.
- grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done,
- GRPC_ERROR_REF(error), NULL);
- handshaker->args = NULL;
+ if (error != GRPC_ERROR_NONE || handshaker->shutdown) {
+ // If the write failed or we're shutting down, clean up and invoke the
+ // callback with the error.
+ if (error == GRPC_ERROR_NONE) {
+ // If we were shut down after the write succeeded but before this
+ // callback was invoked, we need to generate our own error.
+ error = GRPC_ERROR_CREATE("Handshaker shutdown");
+ } else {
+ GRPC_ERROR_REF(error); // Take ref for the handshake-done callback.
+ }
+ if (!handshaker->shutdown) {
+ // TODO(ctiller): It is currently necessary to shutdown endpoints
+ // before destroying them, even if we know that there are no
+ // pending read/write callbacks. This should be fixed, at which
+ // point this can be removed.
+ grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
+ // Not shutting down, so the write failed. Clean up before
+ // invoking the callback.
+ cleanup_args_for_failure_locked(handshaker);
+ }
+ // Invoke callback.
+ grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL);
gpr_mu_unlock(&handshaker->mu);
- http_connect_handshaker_unref(handshaker);
+ http_connect_handshaker_unref(exec_ctx, handshaker);
} else {
// Otherwise, read the response.
// The read callback inherits our ref to the handshaker.
@@ -109,8 +150,26 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
http_connect_handshaker* handshaker = arg;
gpr_mu_lock(&handshaker->mu);
- if (error != GRPC_ERROR_NONE || handshaker->args == NULL) {
- GRPC_ERROR_REF(error); // Take ref to pass to the handshake-done callback.
+ if (error != GRPC_ERROR_NONE || handshaker->shutdown) {
+ // If the write failed or we're shutting down, clean up and invoke the
+ // callback with the error.
+ if (error == GRPC_ERROR_NONE) {
+ // If we were shut down after the write succeeded but before this
+ // callback was invoked, we need to generate our own error.
+ error = GRPC_ERROR_CREATE("Handshaker shutdown");
+ } else {
+ GRPC_ERROR_REF(error); // Take ref for the handshake-done callback.
+ }
+ if (!handshaker->shutdown) {
+ // TODO(ctiller): It is currently necessary to shutdown endpoints
+ // before destroying them, even if we know that there are no
+ // pending read/write callbacks. This should be fixed, at which
+ // point this can be removed.
+ grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
+ // Not shutting down, so the write failed. Clean up before
+ // invoking the callback.
+ cleanup_args_for_failure_locked(handshaker);
+ }
goto done;
}
// Add buffer to parser.
@@ -172,10 +231,9 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
}
done:
// Invoke handshake-done callback.
- handshaker->args = NULL;
grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL);
gpr_mu_unlock(&handshaker->mu);
- http_connect_handshaker_unref(handshaker);
+ http_connect_handshaker_unref(exec_ctx, handshaker);
}
//
@@ -185,16 +243,17 @@ done:
static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker_in) {
http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
- http_connect_handshaker_unref(handshaker);
+ http_connect_handshaker_unref(exec_ctx, handshaker);
}
static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker_in) {
http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
gpr_mu_lock(&handshaker->mu);
- if (handshaker->args != NULL) {
+ if (!handshaker->shutdown) {
+ handshaker->shutdown = true;
grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
- handshaker->args = NULL;
+ cleanup_args_for_failure_locked(handshaker);
}
gpr_mu_unlock(&handshaker->mu);
}
diff --git a/src/core/ext/client_channel/lb_policy_registry.c b/src/core/ext/client_channel/lb_policy_registry.c
index f46a721f9d..90c149d947 100644
--- a/src/core/ext/client_channel/lb_policy_registry.c
+++ b/src/core/ext/client_channel/lb_policy_registry.c
@@ -35,6 +35,8 @@
#include <string.h>
+#include "src/core/lib/support/string.h"
+
#define MAX_POLICIES 10
static grpc_lb_policy_factory *g_all_of_the_lb_policies[MAX_POLICIES];
@@ -52,8 +54,8 @@ void grpc_lb_policy_registry_shutdown(void) {
void grpc_register_lb_policy(grpc_lb_policy_factory *factory) {
int i;
for (i = 0; i < g_number_of_lb_policies; i++) {
- GPR_ASSERT(0 != strcmp(factory->vtable->name,
- g_all_of_the_lb_policies[i]->vtable->name));
+ GPR_ASSERT(0 != gpr_stricmp(factory->vtable->name,
+ g_all_of_the_lb_policies[i]->vtable->name));
}
GPR_ASSERT(g_number_of_lb_policies != MAX_POLICIES);
grpc_lb_policy_factory_ref(factory);
@@ -66,7 +68,7 @@ static grpc_lb_policy_factory *lookup_factory(const char *name) {
if (name == NULL) return NULL;
for (i = 0; i < g_number_of_lb_policies; i++) {
- if (0 == strcmp(name, g_all_of_the_lb_policies[i]->vtable->name)) {
+ if (0 == gpr_stricmp(name, g_all_of_the_lb_policies[i]->vtable->name)) {
return g_all_of_the_lb_policies[i];
}
}
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index d8ef0c8098..4262d2b9a4 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -182,18 +182,24 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg,
NULL);
if (wc_arg->rr_policy != NULL) {
- /* if target is NULL, no pick has been made by the RR policy (eg, all
+ /* if *target is NULL, no pick has been made by the RR policy (eg, all
* addresses failed to connect). There won't be any user_data/token
* available */
- if (wc_arg->target != NULL) {
- GPR_ASSERT(wc_arg->lb_token != NULL);
- initial_metadata_add_lb_token(wc_arg->initial_metadata,
- wc_arg->lb_token_mdelem_storage,
- GRPC_MDELEM_REF(wc_arg->lb_token));
+ if (*wc_arg->target != NULL) {
+ if (wc_arg->lb_token != NULL) {
+ initial_metadata_add_lb_token(wc_arg->initial_metadata,
+ wc_arg->lb_token_mdelem_storage,
+ GRPC_MDELEM_REF(wc_arg->lb_token));
+ } else {
+ gpr_log(GPR_ERROR,
+ "No LB token for connected subchannel pick %p (from RR "
+ "instance %p).",
+ (void *)*wc_arg->target, (void *)wc_arg->rr_policy);
+ abort();
+ }
}
if (grpc_lb_glb_trace) {
- gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")",
- (intptr_t)wc_arg->rr_policy);
+ gpr_log(GPR_INFO, "Unreffing RR %p", (void *)wc_arg->rr_policy);
}
GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure");
}
@@ -411,7 +417,7 @@ static void parse_server(const grpc_grpclb_server *server,
}
/* Returns addresses extracted from \a serverlist. */
-static grpc_lb_addresses *process_serverlist(
+static grpc_lb_addresses *process_serverlist_locked(
const grpc_grpclb_serverlist *serverlist) {
size_t num_valid = 0;
/* first pass: count how many are valid in order to allocate the necessary
@@ -451,10 +457,12 @@ static grpc_lb_addresses *process_serverlist(
user_data = grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LB_TOKEN,
lb_token_mdstr);
} else {
- gpr_log(GPR_ERROR,
+ char *uri = grpc_sockaddr_to_uri(&addr);
+ gpr_log(GPR_INFO,
"Missing LB token for backend address '%s'. The empty token will "
"be used instead",
- grpc_sockaddr_to_uri(&addr));
+ uri);
+ gpr_free(uri);
user_data = GRPC_MDELEM_LB_TOKEN_EMPTY;
}
@@ -467,6 +475,68 @@ static grpc_lb_addresses *process_serverlist(
return lb_addresses;
}
+/* returns true if the new RR policy should replace the current one, if any */
+static bool update_lb_connectivity_status_locked(
+ grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
+ grpc_connectivity_state new_rr_state, grpc_error *new_rr_state_error) {
+ grpc_error *curr_state_error;
+ const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check(
+ &glb_policy->state_tracker, &curr_state_error);
+
+ /* The new connectivity status is a function of the previous one and the new
+ * input coming from the status of the RR policy.
+ *
+ * current state (grpclb's)
+ * |
+ * v || I | C | R | TF | SD | <- new state (RR's)
+ * ===++====+=====+=====+======+======+
+ * I || I | C | R | [I] | [I] |
+ * ---++----+-----+-----+------+------+
+ * C || I | C | R | [C] | [C] |
+ * ---++----+-----+-----+------+------+
+ * R || I | C | R | [R] | [R] |
+ * ---++----+-----+-----+------+------+
+ * TF || I | C | R | [TF] | [TF] |
+ * ---++----+-----+-----+------+------+
+ * SD || NA | NA | NA | NA | NA | (*)
+ * ---++----+-----+-----+------+------+
+ *
+ * A [STATE] indicates that the old RR policy is kept. In those cases, STATE
+ * is the current state of grpclb, which is left untouched.
+ *
+ * In summary, if the new state is TRANSIENT_FAILURE or SHUTDOWN, stick to
+ * the previous RR instance.
+ *
+ * Note that the status is never updated to SHUTDOWN as a result of calling
+ * this function. Only glb_shutdown() has the power to set that state.
+ *
+ * (*) This function mustn't be called during shutting down. */
+ GPR_ASSERT(curr_glb_state != GRPC_CHANNEL_SHUTDOWN);
+
+ switch (new_rr_state) {
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ case GRPC_CHANNEL_SHUTDOWN:
+ GPR_ASSERT(new_rr_state_error != GRPC_ERROR_NONE);
+ return false; /* don't replace the RR policy */
+ case GRPC_CHANNEL_INIT:
+ case GRPC_CHANNEL_IDLE:
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_READY:
+ GPR_ASSERT(new_rr_state_error == GRPC_ERROR_NONE);
+ }
+
+ if (grpc_lb_glb_trace) {
+ gpr_log(GPR_INFO,
+ "Setting grpclb's state to %s from new RR policy %p state.",
+ grpc_connectivity_state_name(new_rr_state),
+ (void *)glb_policy->rr_policy);
+ }
+ grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
+ new_rr_state, GRPC_ERROR_REF(new_rr_state_error),
+ "update_lb_connectivity_status_locked");
+ return true;
+}
+
/* perform a pick over \a rr_policy. Given that a pick can return immediately
* (ignoring its completion callback) we need to perform the cleanups this
* callback would be otherwise resposible for */
@@ -508,7 +578,7 @@ static grpc_lb_policy *create_rr_locked(
grpc_lb_policy_args args;
memset(&args, 0, sizeof(args));
args.client_channel_factory = glb_policy->cc_factory;
- grpc_lb_addresses *addresses = process_serverlist(serverlist);
+ grpc_lb_addresses *addresses = process_serverlist_locked(serverlist);
// Replace the LB addresses in the channel args that we pass down to
// the subchannel.
@@ -529,49 +599,84 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
/* glb_policy->rr_policy may be NULL (initial handover) */
static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
- glb_lb_policy *glb_policy, grpc_error *error) {
+ glb_lb_policy *glb_policy) {
GPR_ASSERT(glb_policy->serverlist != NULL &&
glb_policy->serverlist->num_servers > 0);
+ if (glb_policy->shutting_down) return;
+
+ grpc_lb_policy *new_rr_policy =
+ create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy);
+ if (new_rr_policy == NULL) {
+ gpr_log(GPR_ERROR,
+ "Failure creating a RoundRobin policy for serverlist update with "
+ "%lu entries. The previous RR instance (%p), if any, will continue "
+ "to be used. Future updates from the LB will attempt to create new "
+ "instances.",
+ (unsigned long)glb_policy->serverlist->num_servers,
+ (void *)glb_policy->rr_policy);
+ return;
+ }
+
+ grpc_error *new_rr_state_error = NULL;
+ const grpc_connectivity_state new_rr_state =
+ grpc_lb_policy_check_connectivity(exec_ctx, new_rr_policy,
+ &new_rr_state_error);
+ /* Connectivity state is a function of the new RR policy just created */
+ const bool replace_old_rr = update_lb_connectivity_status_locked(
+ exec_ctx, glb_policy, new_rr_state, new_rr_state_error);
+
+ if (!replace_old_rr) {
+ /* dispose of the new RR policy that won't be used after all */
+ GRPC_LB_POLICY_UNREF(exec_ctx, new_rr_policy, "rr_handover_no_replace");
+ if (grpc_lb_glb_trace) {
+ gpr_log(GPR_INFO,
+ "Keeping old RR policy (%p) despite new serverlist: new RR "
+ "policy was in %s connectivity state.",
+ (void *)glb_policy->rr_policy,
+ grpc_connectivity_state_name(new_rr_state));
+ }
+ return;
+ }
+
if (grpc_lb_glb_trace) {
- gpr_log(GPR_INFO, "RR handover. Old RR: %p", (void *)glb_policy->rr_policy);
+ gpr_log(GPR_INFO, "Created RR policy (%p) to replace old RR (%p)",
+ (void *)new_rr_policy, (void *)glb_policy->rr_policy);
}
+
if (glb_policy->rr_policy != NULL) {
/* if we are phasing out an existing RR instance, unref it. */
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover");
}
- glb_policy->rr_policy =
- create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy);
- if (grpc_lb_glb_trace) {
- gpr_log(GPR_INFO, "Created RR policy (%p)", (void *)glb_policy->rr_policy);
- }
+ /* Finally update the RR policy to the newly created one */
+ glb_policy->rr_policy = new_rr_policy;
- GPR_ASSERT(glb_policy->rr_policy != NULL);
+ /* Add the gRPC LB's interested_parties pollset_set to that of the newly
+ * created RR policy. This will make the RR policy progress upon activity on
+ * gRPC LB, which in turn is tied to the application's call */
grpc_pollset_set_add_pollset_set(exec_ctx,
glb_policy->rr_policy->interested_parties,
glb_policy->base.interested_parties);
+ /* Allocate the data for the tracking of the new RR policy's connectivity.
+ * It'll be deallocated in glb_rr_connectivity_changed() */
rr_connectivity_data *rr_connectivity =
gpr_malloc(sizeof(rr_connectivity_data));
memset(rr_connectivity, 0, sizeof(rr_connectivity_data));
grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed,
rr_connectivity);
rr_connectivity->glb_policy = glb_policy;
- rr_connectivity->state = grpc_lb_policy_check_connectivity(
- exec_ctx, glb_policy->rr_policy, &error);
+ rr_connectivity->state = new_rr_state;
- grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
- rr_connectivity->state, GRPC_ERROR_REF(error),
- "rr_handover");
- /* subscribe */
+ /* Subscribe to changes to the connectivity of the new RR */
GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb");
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
&rr_connectivity->state,
&rr_connectivity->on_change);
grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy);
- /* flush pending ops */
+ /* Update picks and pings in wait */
pending_pick *pp;
while ((pp = glb_policy->pending_picks)) {
glb_policy->pending_picks = pp->next;
@@ -602,28 +707,36 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx,
static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- /* If shutdown or error free the arg. Rely on the rest of the code to set the
- * right grpclb status. */
- rr_connectivity_data *rr_conn_data = arg;
- glb_lb_policy *glb_policy = rr_conn_data->glb_policy;
- gpr_mu_lock(&glb_policy->mu);
+ rr_connectivity_data *rr_connectivity = arg;
+ glb_lb_policy *glb_policy = rr_connectivity->glb_policy;
- if (rr_conn_data->state != GRPC_CHANNEL_SHUTDOWN &&
- !glb_policy->shutting_down) {
- /* RR not shutting down. Mimic the RR's policy state */
- grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
- rr_conn_data->state, GRPC_ERROR_REF(error),
- "rr_connectivity_cb");
- /* resubscribe. Reuse the "rr_connectivity_cb" weak ref. */
+ gpr_mu_lock(&glb_policy->mu);
+ const bool shutting_down = glb_policy->shutting_down;
+ bool unref_needed = false;
+ GRPC_ERROR_REF(error);
+
+ if (rr_connectivity->state == GRPC_CHANNEL_SHUTDOWN || shutting_down) {
+ /* RR policy shutting down. Don't renew subscription and free the arg of
+ * this callback. In addition we need to stash away the current policy to
+ * be UNREF'd after releasing the lock. Otherwise, if the UNREF is the last
+ * one, the policy would be destroyed, alongside the lock, which would
+ * result in a use-after-free */
+ unref_needed = true;
+ gpr_free(rr_connectivity);
+ } else { /* rr state != SHUTDOWN && !shutting down: biz as usual */
+ update_lb_connectivity_status_locked(exec_ctx, glb_policy,
+ rr_connectivity->state, error);
+ /* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */
grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy,
- &rr_conn_data->state,
- &rr_conn_data->on_change);
- } else {
+ &rr_connectivity->state,
+ &rr_connectivity->on_change);
+ }
+ gpr_mu_unlock(&glb_policy->mu);
+ if (unref_needed) {
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"rr_connectivity_cb");
- gpr_free(rr_conn_data);
}
- gpr_mu_unlock(&glb_policy->mu);
+ GRPC_ERROR_UNREF(error);
}
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
@@ -768,7 +881,6 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
* while holding glb_policy->mu: lb_on_server_status_received, invoked due to
* the cancel, needs to acquire that same lock */
grpc_call *lb_call = glb_policy->lb_call;
- glb_policy->lb_call = NULL;
gpr_mu_unlock(&glb_policy->mu);
/* glb_policy->lb_call and this local lb_call must be consistent at this point
@@ -1126,15 +1238,18 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
gpr_log(GPR_INFO,
"Incoming server list identical to current, ignoring.");
}
+ grpc_grpclb_destroy_serverlist(serverlist);
} else { /* new serverlist */
if (glb_policy->serverlist != NULL) {
/* dispose of the old serverlist */
grpc_grpclb_destroy_serverlist(glb_policy->serverlist);
}
- /* and update the copy in the glb_lb_policy instance */
+ /* and update the copy in the glb_lb_policy instance. This serverlist
+ * instance will be destroyed either upon the next update or in
+ * glb_destroy() */
glb_policy->serverlist = serverlist;
- rr_handover_locked(exec_ctx, glb_policy, error);
+ rr_handover_locked(exec_ctx, glb_policy);
}
} else {
if (grpc_lb_glb_trace) {
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index 86d43e5721..9284d19357 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -114,8 +114,6 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
const char *error_str = grpc_error_string(error);
gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str);
grpc_error_free_string(error_str);
- grpc_endpoint_destroy(exec_ctx, args->endpoint);
- gpr_free(args->read_buffer);
gpr_mu_lock(&connection_state->server_state->mu);
} else {
gpr_mu_lock(&connection_state->server_state->mu);
@@ -132,7 +130,10 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
// Need to destroy this here, because the server may have already
// gone away.
grpc_endpoint_destroy(exec_ctx, args->endpoint);
+ grpc_slice_buffer_destroy(args->read_buffer);
+ gpr_free(args->read_buffer);
}
+ grpc_channel_args_destroy(args->args);
}
pending_handshake_manager_remove_locked(connection_state->server_state,
connection_state->handshake_mgr);
@@ -140,7 +141,6 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr);
grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server);
gpr_free(connection_state);
- grpc_channel_args_destroy(args->args);
}
static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index ab0cebe902..afdf93398f 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -121,8 +121,6 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
const char *error_str = grpc_error_string(error);
gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str);
grpc_error_free_string(error_str);
- grpc_endpoint_destroy(exec_ctx, args->endpoint);
- gpr_free(args->read_buffer);
gpr_mu_lock(&connection_state->server_state->mu);
} else {
gpr_mu_lock(&connection_state->server_state->mu);
@@ -143,7 +141,10 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
// Need to destroy this here, because the server may have already
// gone away.
grpc_endpoint_destroy(exec_ctx, args->endpoint);
+ grpc_slice_buffer_destroy(args->read_buffer);
+ gpr_free(args->read_buffer);
}
+ grpc_channel_args_destroy(args->args);
}
pending_handshake_manager_remove_locked(connection_state->server_state,
connection_state->handshake_mgr);
@@ -151,7 +152,6 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr);
grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server);
gpr_free(connection_state);
- grpc_channel_args_destroy(args->args);
}
static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 127e1cdc13..4e3c7ff681 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -954,6 +954,16 @@ static void complete_fetch(grpc_exec_ctx *exec_ctx, void *gs,
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
+static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id,
+ bool is_client, bool is_initial) {
+ for (grpc_linked_mdelem *md = md_batch->list.head; md != md_batch->list.tail;
+ md = md->next) {
+ gpr_log(GPR_INFO, "HTTP:%d:%s:%s: %s: %s", id, is_initial ? "HDR" : "TRL",
+ is_client ? "CLI" : "SVR", grpc_mdstr_as_c_string(md->md->key),
+ grpc_mdstr_as_c_string(md->md->value));
+ }
+}
+
static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_error *error_ignored) {
GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
@@ -967,6 +977,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
gpr_log(GPR_DEBUG, "perform_stream_op_locked: %s; on_complete = %p", str,
op->on_complete);
gpr_free(str);
+ if (op->send_initial_metadata) {
+ log_metadata(op->send_initial_metadata, s->id, t->is_client, true);
+ }
+ if (op->send_trailing_metadata) {
+ log_metadata(op->send_trailing_metadata, s->id, t->is_client, false);
+ }
}
grpc_closure *on_complete = op->on_complete;
diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c
index 905db118be..f3bd91284e 100644
--- a/src/core/lib/channel/handshaker.c
+++ b/src/core/lib/channel/handshaker.c
@@ -141,33 +141,25 @@ void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx,
void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr) {
gpr_mu_lock(&mgr->mu);
- for (size_t i = 0; i < mgr->count; ++i) {
- grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[i]);
+ if (mgr->index > 0) {
+ grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[mgr->index - 1]);
}
gpr_mu_unlock(&mgr->mu);
}
-static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error);
-
// Helper function to call either the next handshaker or the
// on_handshake_done callback.
static void call_next_handshaker_locked(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr,
- grpc_handshaker_args* args,
grpc_error* error) {
GPR_ASSERT(mgr->index <= mgr->count);
- // If we got an error, skip all remaining handshakers and invoke the
- // caller-supplied callback immediately.
- // Otherwise, if this is the last handshaker, then call the on_handshake_done
- // callback instead of chaining back to this function again.
+ // If we got an error or we've finished the last handshaker, invoke
+ // the on_handshake_done callback. Otherwise, call the next handshaker.
if (error != GRPC_ERROR_NONE || mgr->index == mgr->count) {
// Cancel deadline timer, since we're invoking the on_handshake_done
// callback now.
grpc_timer_cancel(exec_ctx, &mgr->deadline_timer);
- args->user_data = mgr->user_data;
- grpc_exec_ctx_sched(exec_ctx, &mgr->on_handshake_done,
- GRPC_ERROR_REF(error), NULL);
+ grpc_exec_ctx_sched(exec_ctx, &mgr->on_handshake_done, error, NULL);
// Since we're invoking the final callback, we won't be coming back
// to this function, so we can release our reference to the
// handshake manager.
@@ -176,7 +168,8 @@ static void call_next_handshaker_locked(grpc_exec_ctx* exec_ctx,
}
// Call the next handshaker.
grpc_handshaker_do_handshake(exec_ctx, mgr->handshakers[mgr->index],
- mgr->acceptor, &mgr->call_next_handshaker, args);
+ mgr->acceptor, &mgr->call_next_handshaker,
+ &mgr->args);
++mgr->index;
}
@@ -184,10 +177,9 @@ static void call_next_handshaker_locked(grpc_exec_ctx* exec_ctx,
// handshakers together.
static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
- grpc_handshaker_args* args = arg;
- grpc_handshake_manager* mgr = args->user_data;
+ grpc_handshake_manager* mgr = arg;
gpr_mu_lock(&mgr->mu);
- call_next_handshaker_locked(exec_ctx, mgr, args, error);
+ call_next_handshaker_locked(exec_ctx, mgr, GRPC_ERROR_REF(error));
gpr_mu_unlock(&mgr->mu);
}
@@ -205,25 +197,19 @@ void grpc_handshake_manager_do_handshake(
grpc_endpoint* endpoint, const grpc_channel_args* channel_args,
gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor,
grpc_iomgr_cb_func on_handshake_done, void* user_data) {
+ gpr_mu_lock(&mgr->mu);
+ GPR_ASSERT(mgr->index == 0);
// Construct handshaker args. These will be passed through all
// handshakers and eventually be freed by the on_handshake_done callback.
mgr->args.endpoint = endpoint;
mgr->args.args = grpc_channel_args_copy(channel_args);
+ mgr->args.user_data = user_data;
mgr->args.read_buffer = gpr_malloc(sizeof(*mgr->args.read_buffer));
grpc_slice_buffer_init(mgr->args.read_buffer);
// Initialize state needed for calling handshakers.
- gpr_mu_lock(&mgr->mu);
- GPR_ASSERT(mgr->index == 0);
mgr->acceptor = acceptor;
- grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker,
- &mgr->args);
+ grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, mgr);
grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, &mgr->args);
- // While chaining between handshakers, we use args->user_data to
- // store a pointer to the handshake manager. This will be
- // changed to point to the caller-supplied user_data before calling
- // the on_handshake_done callback.
- mgr->args.user_data = mgr;
- mgr->user_data = user_data;
// Start deadline timer, which owns a ref.
gpr_ref(&mgr->refs);
grpc_timer_init(exec_ctx, &mgr->deadline_timer,
@@ -231,6 +217,6 @@ void grpc_handshake_manager_do_handshake(
on_timeout, mgr, gpr_now(GPR_CLOCK_MONOTONIC));
// Start first handshaker, which also owns a ref.
gpr_ref(&mgr->refs);
- call_next_handshaker_locked(exec_ctx, mgr, &mgr->args, GRPC_ERROR_NONE);
+ call_next_handshaker_locked(exec_ctx, mgr, GRPC_ERROR_NONE);
gpr_mu_unlock(&mgr->mu);
}
diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h
index f0614c354b..2e1f543512 100644
--- a/src/core/lib/channel/handshaker.h
+++ b/src/core/lib/channel/handshaker.h
@@ -57,17 +57,24 @@ typedef struct grpc_handshaker grpc_handshaker;
/// Arguments passed through handshakers and to the on_handshake_done callback.
///
/// For handshakers, all members are input/output parameters; for
-/// example, a handshaker may read from \a endpoint and then later
-/// replace it with a wrapped endpoint. Similarly, a handshaker may
-/// modify \a args.
+/// example, a handshaker may read from or write to \a endpoint and
+/// then later replace it with a wrapped endpoint. Similarly, a
+/// handshaker may modify \a args.
+///
+/// A handshaker takes ownership of the members while a handshake is in
+/// progress. Upon failure or shutdown of an in-progress handshaker,
+/// the handshaker is responsible for destroying the members and setting
+/// them to NULL before invoking the on_handshake_done callback.
///
/// For the on_handshake_done callback, all members are input arguments,
/// which the callback takes ownership of.
typedef struct {
grpc_endpoint* endpoint;
grpc_channel_args* args;
- void* user_data;
grpc_slice_buffer* read_buffer;
+ // User data passed through the handshake manager. Not used by
+ // individual handshakers.
+ void* user_data;
} grpc_handshaker_args;
typedef struct {
@@ -132,9 +139,9 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
///
/// When done, invokes \a on_handshake_done with a grpc_handshaker_args
/// object as its argument. If the callback is invoked with error !=
-/// GRPC_ERROR_NONE, then handshaking failed and the resulting endpoint
-/// will have already been shut down (although the caller will still be
-/// responsible for destroying it).
+/// GRPC_ERROR_NONE, then handshaking failed and the handshaker has done
+/// the necessary clean-up. Otherwise, the callback takes ownership of
+/// the arguments.
void grpc_handshake_manager_do_handshake(
grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr,
grpc_endpoint* endpoint, const grpc_channel_args* channel_args,
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index f57d7c2453..fd8b46afcb 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -36,6 +36,7 @@
#include <grpc/support/string_util.h>
#include <string.h>
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/percent_encoding.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
#include "src/core/lib/transport/transport_impl.h"
@@ -56,6 +57,7 @@ typedef struct call_data {
grpc_linked_mdelem payload_bin;
grpc_metadata_batch *recv_initial_metadata;
+ grpc_metadata_batch *recv_trailing_metadata;
uint8_t *payload_bytes;
/* Vars to read data off of send_message */
@@ -69,14 +71,16 @@ typedef struct call_data {
bool send_message_blocked;
/** Closure to call when finished with the hc_on_recv hook */
- grpc_closure *on_done_recv;
+ grpc_closure *on_done_recv_initial_metadata;
+ grpc_closure *on_done_recv_trailing_metadata;
grpc_closure *on_complete;
grpc_closure *post_send;
/** Receive closures are chained: we inject this closure as the on_done_recv
up-call on transport_op, and remember to call our on_done_recv member
after handling it. */
- grpc_closure hc_on_recv;
+ grpc_closure hc_on_recv_initial_metadata;
+ grpc_closure hc_on_recv_trailing_metadata;
grpc_closure hc_on_complete;
grpc_closure got_slice;
grpc_closure send_done;
@@ -106,6 +110,16 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element_send_close_with_message(a->exec_ctx, a->elem,
GRPC_STATUS_CANCELLED, &message);
return NULL;
+ } else if (md->key == GRPC_MDSTR_GRPC_MESSAGE) {
+ grpc_slice pct_decoded_msg =
+ grpc_permissive_percent_decode_slice(md->value->slice);
+ if (grpc_slice_is_equivalent(pct_decoded_msg, md->value->slice)) {
+ grpc_slice_unref(pct_decoded_msg);
+ return md;
+ } else {
+ return grpc_mdelem_from_metadata_strings(
+ GRPC_MDSTR_GRPC_MESSAGE, grpc_mdstr_from_slice(pct_decoded_msg));
+ }
} else if (md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) {
return NULL;
} else if (md->key == GRPC_MDSTR_CONTENT_TYPE) {
@@ -129,8 +143,8 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_error *error) {
+static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
+ void *user_data, grpc_error *error) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
client_recv_filter_args a;
@@ -138,7 +152,21 @@ static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
a.exec_ctx = exec_ctx;
grpc_metadata_batch_filter(calld->recv_initial_metadata, client_recv_filter,
&a);
- calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, error);
+ grpc_closure_run(exec_ctx, calld->on_done_recv_initial_metadata,
+ GRPC_ERROR_REF(error));
+}
+
+static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
+ void *user_data, grpc_error *error) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ client_recv_filter_args a;
+ a.elem = elem;
+ a.exec_ctx = exec_ctx;
+ grpc_metadata_batch_filter(calld->recv_trailing_metadata, client_recv_filter,
+ &a);
+ grpc_closure_run(exec_ctx, calld->on_done_recv_trailing_metadata,
+ GRPC_ERROR_REF(error));
}
static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
@@ -217,12 +245,15 @@ static void hc_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
message, and the payload is below the size threshold, and all the data
for this request is immediately available. */
grpc_mdelem *method = GRPC_MDELEM_METHOD_POST;
- calld->send_message_blocked = false;
if ((op->send_initial_metadata_flags &
GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) &&
op->send_message != NULL &&
op->send_message->length < channeld->max_payload_size_for_get) {
method = GRPC_MDELEM_METHOD_GET;
+ /* The following write to calld->send_message_blocked isn't racy with
+ reads in hc_start_transport_op (which deals with SEND_MESSAGE ops) because
+ being here means ops->send_message is not NULL, which is primarily
+ guarding the read there. */
calld->send_message_blocked = true;
} else if (op->send_initial_metadata_flags &
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
@@ -281,8 +312,15 @@ static void hc_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
if (op->recv_initial_metadata != NULL) {
/* substitute our callback for the higher callback */
calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->on_done_recv = op->recv_initial_metadata_ready;
- op->recv_initial_metadata_ready = &calld->hc_on_recv;
+ calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready;
+ op->recv_initial_metadata_ready = &calld->hc_on_recv_initial_metadata;
+ }
+
+ if (op->recv_trailing_metadata != NULL) {
+ /* substitute our callback for the higher callback */
+ calld->recv_trailing_metadata = op->recv_trailing_metadata;
+ calld->on_done_recv_trailing_metadata = op->on_complete;
+ op->on_complete = &calld->hc_on_recv_trailing_metadata;
}
}
@@ -296,8 +334,7 @@ static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data;
if (op->send_message != NULL && calld->send_message_blocked) {
/* Don't forward the op. send_message contains slices that aren't ready
- yet. The call will be forwarded by the op_complete of slice read call.
- */
+ yet. The call will be forwarded by the op_complete of slice read call. */
} else {
grpc_call_next_op(exec_ctx, elem, op);
}
@@ -308,11 +345,16 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *calld = elem->call_data;
- calld->on_done_recv = NULL;
+ calld->on_done_recv_initial_metadata = NULL;
+ calld->on_done_recv_trailing_metadata = NULL;
calld->on_complete = NULL;
calld->payload_bytes = NULL;
+ calld->send_message_blocked = false;
grpc_slice_buffer_init(&calld->slices);
- grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
+ grpc_closure_init(&calld->hc_on_recv_initial_metadata,
+ hc_on_recv_initial_metadata, elem);
+ grpc_closure_init(&calld->hc_on_recv_trailing_metadata,
+ hc_on_recv_trailing_metadata, elem);
grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem);
grpc_closure_init(&calld->got_slice, got_slice, elem);
grpc_closure_init(&calld->send_done, send_done, elem);
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index 6a33689fec..b42ff06039 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -37,6 +37,7 @@
#include <grpc/support/log.h>
#include <string.h>
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/percent_encoding.h"
#include "src/core/lib/transport/static_metadata.h"
#define EXPECTED_CONTENT_TYPE "application/grpc"
@@ -86,6 +87,23 @@ typedef struct {
grpc_exec_ctx *exec_ctx;
} server_filter_args;
+static grpc_mdelem *server_filter_outgoing_metadata(void *user_data,
+ grpc_mdelem *md) {
+ if (md->key == GRPC_MDSTR_GRPC_MESSAGE) {
+ grpc_slice pct_encoded_msg = grpc_percent_encode_slice(
+ md->value->slice, grpc_compatible_percent_encoding_unreserved_bytes);
+ if (grpc_slice_is_equivalent(pct_encoded_msg, md->value->slice)) {
+ grpc_slice_unref(pct_encoded_msg);
+ return md;
+ } else {
+ return grpc_mdelem_from_metadata_strings(
+ GRPC_MDSTR_GRPC_MESSAGE, grpc_mdstr_from_slice(pct_encoded_msg));
+ }
+ } else {
+ return md;
+ }
+}
+
static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
server_filter_args *a = user_data;
grpc_call_element *elem = a->elem;
@@ -254,7 +272,7 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data,
}
}
-static void hs_mutate_op(grpc_call_element *elem,
+static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
@@ -290,6 +308,12 @@ static void hs_mutate_op(grpc_call_element *elem,
op->on_complete = &calld->hs_on_complete;
}
}
+
+ if (op->send_trailing_metadata) {
+ server_filter_args a = {elem, exec_ctx};
+ grpc_metadata_batch_filter(op->send_trailing_metadata,
+ server_filter_outgoing_metadata, &a);
+ }
}
static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
@@ -297,7 +321,7 @@ static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
GPR_TIMER_BEGIN("hs_start_transport_op", 0);
- hs_mutate_op(elem, op);
+ hs_mutate_op(exec_ctx, elem, op);
grpc_call_next_op(exec_ctx, elem, op);
GPR_TIMER_END("hs_start_transport_op", 0);
}
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
index 1331fe1c65..1cf68d790d 100644
--- a/src/core/lib/channel/message_size_filter.c
+++ b/src/core/lib/channel/message_size_filter.c
@@ -34,16 +34,14 @@
#include <limits.h>
#include <string.h>
+#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/transport/method_config.h"
-
-#define DEFAULT_MAX_SEND_MESSAGE_LENGTH -1 // Unlimited.
-// The protobuf library will (by default) start warning at 100 megs.
-#define DEFAULT_MAX_RECV_MESSAGE_LENGTH (4 * 1024 * 1024)
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/transport/service_config.h"
typedef struct message_size_limits {
int max_send_size;
@@ -56,30 +54,29 @@ static void* message_size_limits_copy(void* value) {
return new_value;
}
-static int message_size_limits_cmp(void* value1, void* value2) {
- const message_size_limits* v1 = value1;
- const message_size_limits* v2 = value2;
- if (v1->max_send_size > v2->max_send_size) return 1;
- if (v1->max_send_size < v2->max_send_size) return -1;
- if (v1->max_recv_size > v2->max_recv_size) return 1;
- if (v1->max_recv_size < v2->max_recv_size) return -1;
- return 0;
-}
-
static const grpc_mdstr_hash_table_vtable message_size_limits_vtable = {
- gpr_free, message_size_limits_copy, message_size_limits_cmp};
-
-static void* method_config_convert_value(
- const grpc_method_config* method_config) {
+ gpr_free, message_size_limits_copy};
+
+static void* message_size_limits_create_from_json(const grpc_json* json) {
+ int max_request_message_bytes = -1;
+ int max_response_message_bytes = -1;
+ for (grpc_json* field = json->child; field != NULL; field = field->next) {
+ if (field->key == NULL) continue;
+ if (strcmp(field->key, "maxRequestMessageBytes") == 0) {
+ if (max_request_message_bytes >= 0) return NULL; // Duplicate.
+ if (field->type != GRPC_JSON_STRING) return NULL;
+ max_request_message_bytes = gpr_parse_nonnegative_int(field->value);
+ if (max_request_message_bytes == -1) return NULL;
+ } else if (strcmp(field->key, "maxResponseMessageBytes") == 0) {
+ if (max_response_message_bytes >= 0) return NULL; // Duplicate.
+ if (field->type != GRPC_JSON_STRING) return NULL;
+ max_response_message_bytes = gpr_parse_nonnegative_int(field->value);
+ if (max_response_message_bytes == -1) return NULL;
+ }
+ }
message_size_limits* value = gpr_malloc(sizeof(message_size_limits));
- const int32_t* max_request_message_bytes =
- grpc_method_config_get_max_request_message_bytes(method_config);
- value->max_send_size =
- max_request_message_bytes != NULL ? *max_request_message_bytes : -1;
- const int32_t* max_response_message_bytes =
- grpc_method_config_get_max_response_message_bytes(method_config);
- value->max_recv_size =
- max_response_message_bytes != NULL ? *max_response_message_bytes : -1;
+ value->max_send_size = max_request_message_bytes;
+ value->max_recv_size = max_response_message_bytes;
return value;
}
@@ -201,20 +198,20 @@ static void init_channel_elem(grpc_exec_ctx* exec_ctx,
GPR_ASSERT(!args->is_last);
channel_data* chand = elem->channel_data;
memset(chand, 0, sizeof(*chand));
- chand->max_send_size = DEFAULT_MAX_SEND_MESSAGE_LENGTH;
- chand->max_recv_size = DEFAULT_MAX_RECV_MESSAGE_LENGTH;
+ chand->max_send_size = GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH;
+ chand->max_recv_size = GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH;
for (size_t i = 0; i < args->channel_args->num_args; ++i) {
if (strcmp(args->channel_args->args[i].key,
GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) == 0) {
- const grpc_integer_options options = {DEFAULT_MAX_SEND_MESSAGE_LENGTH, 0,
- INT_MAX};
+ const grpc_integer_options options = {
+ GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH, 0, INT_MAX};
chand->max_send_size =
grpc_channel_arg_get_integer(&args->channel_args->args[i], options);
}
if (strcmp(args->channel_args->args[i].key,
GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) {
- const grpc_integer_options options = {DEFAULT_MAX_RECV_MESSAGE_LENGTH, 0,
- INT_MAX};
+ const grpc_integer_options options = {
+ GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH, 0, INT_MAX};
chand->max_recv_size =
grpc_channel_arg_get_integer(&args->channel_args->args[i], options);
}
@@ -223,10 +220,16 @@ static void init_channel_elem(grpc_exec_ctx* exec_ctx,
const grpc_arg* channel_arg =
grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG);
if (channel_arg != NULL) {
- GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER);
- chand->method_limit_table = grpc_method_config_table_convert(
- (grpc_method_config_table*)channel_arg->value.pointer.p,
- method_config_convert_value, &message_size_limits_vtable);
+ GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
+ grpc_service_config* service_config =
+ grpc_service_config_create(channel_arg->value.string);
+ if (service_config != NULL) {
+ chand->method_limit_table =
+ grpc_service_config_create_method_config_table(
+ service_config, message_size_limits_create_from_json,
+ &message_size_limits_vtable);
+ grpc_service_config_destroy(service_config);
+ }
}
}
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 91041a7c28..07fbfd849e 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -72,6 +72,11 @@ static int grpc_polling_trace = 0; /* Disabled by default */
static int grpc_wakeup_signal = -1;
static bool is_grpc_wakeup_signal_initialized = false;
+/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
+ * sure to wake up one polling thread (which can wake up other threads if
+ * needed) */
+static grpc_wakeup_fd global_wakeup_fd;
+
/* Implements the function defined in grpc_posix.h. This function might be
* called before even calling grpc_init() to set either a different signal to
* use. If signum == -1, then the use of signals is disabled */
@@ -437,9 +442,8 @@ static void polling_island_add_wakeup_fd_locked(polling_island *pi,
gpr_asprintf(&err_msg,
"epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
"error: %d (%s)",
- pi->epoll_fd,
- GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), errno,
- strerror(errno));
+ pi->epoll_fd, GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd),
+ errno, strerror(errno));
append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
gpr_free(err_msg);
}
@@ -541,7 +545,7 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
goto done;
}
- polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error);
+ polling_island_add_wakeup_fd_locked(pi, &global_wakeup_fd, error);
polling_island_add_wakeup_fd_locked(pi, &pi->workqueue_wakeup_fd, error);
if (initial_fd != NULL) {
@@ -843,11 +847,6 @@ static void polling_island_global_shutdown() {
* alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
* case occurs. */
-/* TODO: sreek: Right now, this wakes up all pollers. In future we should make
- * sure to wake up one polling thread (which can wake up other threads if
- * needed) */
-grpc_wakeup_fd grpc_global_wakeup_fd;
-
static grpc_fd *fd_freelist = NULL;
static gpr_mu fd_freelist_mu;
@@ -1163,11 +1162,11 @@ static grpc_error *pollset_global_init(void) {
gpr_tls_init(&g_current_thread_pollset);
gpr_tls_init(&g_current_thread_worker);
poller_kick_init();
- return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
+ return grpc_wakeup_fd_init(&global_wakeup_fd);
}
static void pollset_global_shutdown(void) {
- grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
+ grpc_wakeup_fd_destroy(&global_wakeup_fd);
gpr_tls_destroy(&g_current_thread_pollset);
gpr_tls_destroy(&g_current_thread_worker);
}
@@ -1274,7 +1273,7 @@ static grpc_error *pollset_kick(grpc_pollset *p,
}
static grpc_error *kick_poller(void) {
- return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
+ return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
}
static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
@@ -1501,13 +1500,11 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
for (int i = 0; i < ep_rv; ++i) {
void *data_ptr = ep_ev[i].data.ptr;
- if (data_ptr == &grpc_global_wakeup_fd) {
- append_error(error,
- grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
+ if (data_ptr == &global_wakeup_fd) {
+ append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
} else if (data_ptr == &pi->workqueue_wakeup_fd) {
- append_error(error,
- grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd),
+ append_error(error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
maybe_do_workqueue_work(exec_ctx, pi);
} else if (data_ptr == &polling_island_wakeup_fd) {
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index e1d620cfff..21b28e5554 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -120,6 +120,8 @@ struct grpc_fd {
grpc_pollset *read_notifier_pollset;
};
+static grpc_wakeup_fd global_wakeup_fd;
+
/* Begin polling on an fd.
Registers that the given pollset is interested in this fd - so that if read
or writability interest changes, the pollset can be kicked to pick up that
@@ -769,17 +771,17 @@ static grpc_error *pollset_kick(grpc_pollset *p,
static grpc_error *pollset_global_init(void) {
gpr_tls_init(&g_current_thread_poller);
gpr_tls_init(&g_current_thread_worker);
- return grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
+ return grpc_wakeup_fd_init(&global_wakeup_fd);
}
static void pollset_global_shutdown(void) {
- grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
+ grpc_wakeup_fd_destroy(&global_wakeup_fd);
gpr_tls_destroy(&g_current_thread_poller);
gpr_tls_destroy(&g_current_thread_worker);
}
static grpc_error *kick_poller(void) {
- return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd);
+ return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
}
/* main interface */
@@ -947,7 +949,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
fd_count = 0;
pfd_count = 2;
- pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&global_wakeup_fd);
pfds[0].events = POLLIN;
pfds[0].revents = 0;
pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
@@ -1001,8 +1003,8 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
} else {
if (pfds[0].revents & POLLIN_CHECK) {
- work_combine_error(
- &error, grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd));
+ work_combine_error(&error,
+ grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd));
}
if (pfds[1].revents & POLLIN_CHECK) {
work_combine_error(
@@ -1343,6 +1345,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
int res, idx;
gpr_cv *pollcv;
cv_node *cvn, *prev;
+ int skip_poll = 0;
nfds_t nsockfds = 0;
gpr_thd_id t_id;
gpr_thd_options opt;
@@ -1358,17 +1361,17 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
cvn->cv = pollcv;
cvn->next = g_cvfds.cvfds[idx].cvs;
g_cvfds.cvfds[idx].cvs = cvn;
- // We should return immediately if there are pending events,
- // but we still need to call poll() to check for socket events
+ // Don't bother polling if a wakeup fd is ready
if (g_cvfds.cvfds[idx].is_set) {
- timeout = 0;
+ skip_poll = 1;
}
} else if (fds[i].fd >= 0) {
nsockfds++;
}
}
- if (nsockfds > 0) {
+ res = 0;
+ if (!skip_poll && nsockfds > 0) {
pargs = gpr_malloc(sizeof(struct poll_args));
// Both the main thread and calling thread get a reference
gpr_ref_init(&pargs->refcount, 2);
@@ -1398,16 +1401,14 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
res = pargs->retval;
errno = pargs->err;
} else {
- res = 0;
errno = 0;
gpr_atm_no_barrier_store(&pargs->status, CANCELLED);
}
- } else {
+ } else if (!skip_poll) {
gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME);
deadline =
gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
gpr_cv_wait(pollcv, &g_cvfds.mu, deadline);
- res = 0;
}
idx = 0;
@@ -1431,7 +1432,7 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) {
fds[i].revents = POLLIN;
if (res >= 0) res++;
}
- } else if (fds[i].fd >= 0 &&
+ } else if (!skip_poll && fds[i].fd >= 0 &&
gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) {
fds[i].revents = pargs->fds[idx].revents;
idx++;
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 2fdef06838..cb5832539d 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -183,6 +183,5 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
/* override to allow tests to hook poll() usage */
typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
-extern grpc_wakeup_fd grpc_global_wakeup_fd;
#endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index 4fd83e0b22..3470b5ac81 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -108,6 +108,7 @@ void grpc_iomgr_shutdown(void) {
NULL)) {
gpr_mu_unlock(&g_mu);
grpc_exec_ctx_flush(&exec_ctx);
+ grpc_iomgr_platform_flush();
gpr_mu_lock(&g_mu);
continue;
}
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index 051a30baa3..379bf9bd23 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -104,6 +104,9 @@ struct grpc_resource_user {
/* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
*/
grpc_closure *reclaimers[2];
+ /* Reclaimers just posted: once we're in the combiner lock, we'll move them
+ to the array above */
+ grpc_closure *new_reclaimers[2];
/* Trampoline closures to finish reclamation and re-enter the quota combiner
lock */
grpc_closure post_reclaimer_closure[2];
@@ -418,9 +421,25 @@ static void ru_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *ru,
rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
}
+static bool ru_post_reclaimer(grpc_exec_ctx *exec_ctx,
+ grpc_resource_user *resource_user,
+ bool destructive) {
+ grpc_closure *closure = resource_user->new_reclaimers[destructive];
+ GPR_ASSERT(closure != NULL);
+ resource_user->new_reclaimers[destructive] = NULL;
+ GPR_ASSERT(resource_user->reclaimers[destructive] == NULL);
+ if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
+ grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL);
+ return false;
+ }
+ resource_user->reclaimers[destructive] = closure;
+ return true;
+}
+
static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
grpc_error *error) {
grpc_resource_user *resource_user = ru;
+ if (!ru_post_reclaimer(exec_ctx, resource_user, false)) return;
if (!rulist_empty(resource_user->resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION) &&
rulist_empty(resource_user->resource_quota,
@@ -435,6 +454,7 @@ static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
grpc_error *error) {
grpc_resource_user *resource_user = ru;
+ if (!ru_post_reclaimer(exec_ctx, resource_user, true)) return;
if (!rulist_empty(resource_user->resource_quota,
GRPC_RULIST_AWAITING_ALLOCATION) &&
rulist_empty(resource_user->resource_quota,
@@ -649,6 +669,8 @@ grpc_resource_user *grpc_resource_user_create(
resource_user->added_to_free_pool = false;
resource_user->reclaimers[0] = NULL;
resource_user->reclaimers[1] = NULL;
+ resource_user->new_reclaimers[0] = NULL;
+ resource_user->new_reclaimers[1] = NULL;
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
resource_user->links[i].next = resource_user->links[i].prev = NULL;
}
@@ -748,12 +770,8 @@ void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user,
bool destructive,
grpc_closure *closure) {
- GPR_ASSERT(resource_user->reclaimers[destructive] == NULL);
- if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL);
- return;
- }
- resource_user->reclaimers[destructive] = closure;
+ GPR_ASSERT(resource_user->new_reclaimers[destructive] == NULL);
+ resource_user->new_reclaimers[destructive] = closure;
grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
&resource_user->post_reclaimer_closure[destructive],
GRPC_ERROR_NONE, false);
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index 13347735df..a3a70a8ed7 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -251,8 +251,11 @@ finish:
done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
if (error != GRPC_ERROR_NONE) {
- error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION,
- "Failed to connect to remote host");
+ char *error_descr;
+ gpr_asprintf(&error_descr, "Failed to connect to remote host: %s",
+ grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION));
+ error = grpc_error_set_str(error, GRPC_ERROR_STR_DESCRIPTION, error_descr);
+ gpr_free(error_descr);
error =
grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, ac->addr_str);
}
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index 62afbcef51..d4613b674e 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -423,7 +423,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
tcp->base.vtable = &vtable;
tcp->socket = socket;
gpr_mu_init(&tcp->mu);
- gpr_ref_init(&tcp->refcount, 2);
+ gpr_ref_init(&tcp->refcount, 1);
grpc_closure_init(&tcp->on_read, on_read, tcp);
grpc_closure_init(&tcp->on_write, on_write, tcp);
tcp->peer_string = gpr_strdup(peer_string);
diff --git a/src/core/lib/json/json.c b/src/core/lib/json/json.c
index 5b583a1f2e..48b13686d7 100644
--- a/src/core/lib/json/json.c
+++ b/src/core/lib/json/json.c
@@ -37,15 +37,15 @@
#include "src/core/lib/json/json.h"
-grpc_json *grpc_json_create(grpc_json_type type) {
- grpc_json *json = gpr_malloc(sizeof(*json));
+grpc_json* grpc_json_create(grpc_json_type type) {
+ grpc_json* json = gpr_malloc(sizeof(*json));
memset(json, 0, sizeof(*json));
json->type = type;
return json;
}
-void grpc_json_destroy(grpc_json *json) {
+void grpc_json_destroy(grpc_json* json) {
while (json->child) {
grpc_json_destroy(json->child);
}
diff --git a/src/core/lib/json/json.h b/src/core/lib/json/json.h
index 681df4bb77..7111db0b52 100644
--- a/src/core/lib/json/json.h
+++ b/src/core/lib/json/json.h
@@ -42,14 +42,14 @@
* are not owned by it.
*/
typedef struct grpc_json {
- struct grpc_json *next;
- struct grpc_json *prev;
- struct grpc_json *child;
- struct grpc_json *parent;
+ struct grpc_json* next;
+ struct grpc_json* prev;
+ struct grpc_json* child;
+ struct grpc_json* parent;
grpc_json_type type;
- const char *key;
- const char *value;
+ const char* key;
+ const char* value;
} grpc_json;
/* The next two functions are going to parse the input string, and
@@ -65,8 +65,8 @@ typedef struct grpc_json {
*
* Delete the allocated tree afterward using grpc_json_destroy().
*/
-grpc_json *grpc_json_parse_string_with_len(char *input, size_t size);
-grpc_json *grpc_json_parse_string(char *input);
+grpc_json* grpc_json_parse_string_with_len(char* input, size_t size);
+grpc_json* grpc_json_parse_string(char* input);
/* This function will create a new string using gpr_realloc, and will
* deserialize the grpc_json tree into it. It'll be zero-terminated,
@@ -76,13 +76,13 @@ grpc_json *grpc_json_parse_string(char *input);
* If indent is 0, then newlines will be suppressed as well, and the
* output will be condensed at its maximum.
*/
-char *grpc_json_dump_to_string(grpc_json *json, int indent);
+char* grpc_json_dump_to_string(grpc_json* json, int indent);
/* Use these to create or delete a grpc_json object.
* Deletion is recursive. We will not attempt to free any of the strings
* in any of the objects of that tree.
*/
-grpc_json *grpc_json_create(grpc_json_type type);
-void grpc_json_destroy(grpc_json *json);
+grpc_json* grpc_json_create(grpc_json_type type);
+void grpc_json_destroy(grpc_json* json);
#endif /* GRPC_CORE_LIB_JSON_JSON_H */
diff --git a/src/core/lib/slice/slice.c b/src/core/lib/slice/slice.c
index 3dac18df61..52977e6d9a 100644
--- a/src/core/lib/slice/slice.c
+++ b/src/core/lib/slice/slice.c
@@ -348,3 +348,11 @@ int grpc_slice_str_cmp(grpc_slice a, const char *b) {
if (d != 0) return d;
return memcmp(GRPC_SLICE_START_PTR(a), b, b_length);
}
+
+int grpc_slice_is_equivalent(grpc_slice a, grpc_slice b) {
+ if (a.refcount == NULL || b.refcount == NULL) {
+ return grpc_slice_cmp(a, b) == 0;
+ }
+ return a.data.refcounted.length == b.data.refcounted.length &&
+ a.data.refcounted.bytes == b.data.refcounted.bytes;
+}
diff --git a/src/core/lib/support/string.c b/src/core/lib/support/string.c
index dc243bf0bf..f10a30f0fd 100644
--- a/src/core/lib/support/string.c
+++ b/src/core/lib/support/string.c
@@ -34,7 +34,9 @@
#include "src/core/lib/support/string.h"
#include <ctype.h>
+#include <limits.h>
#include <stddef.h>
+#include <stdlib.h>
#include <string.h>
#include <grpc/support/alloc.h>
@@ -189,6 +191,13 @@ int int64_ttoa(int64_t value, char *string) {
return i;
}
+int gpr_parse_nonnegative_int(const char *value) {
+ char *end;
+ long result = strtol(value, &end, 0);
+ if (*end != '\0' || result < 0 || result > INT_MAX) return -1;
+ return (int)result;
+}
+
char *gpr_leftpad(const char *str, char flag, size_t length) {
const size_t str_length = strlen(str);
const size_t out_length = str_length > length ? str_length : length;
diff --git a/src/core/lib/support/string.h b/src/core/lib/support/string.h
index 13891d0b9e..e933e2eb46 100644
--- a/src/core/lib/support/string.h
+++ b/src/core/lib/support/string.h
@@ -77,6 +77,9 @@ NOTE: This function ensures sufficient bit width even on Win x64,
where long is 32bit is size.*/
int int64_ttoa(int64_t value, char *output);
+// Parses a non-negative number from a value string. Returns -1 on error.
+int gpr_parse_nonnegative_int(const char *value);
+
/* Reverse a run of bytes */
void gpr_reverse_bytes(char *str, int len);
diff --git a/src/core/lib/support/subprocess_posix.c b/src/core/lib/support/subprocess_posix.c
index 4f4de9298e..daf371d03e 100644
--- a/src/core/lib/support/subprocess_posix.c
+++ b/src/core/lib/support/subprocess_posix.c
@@ -40,6 +40,7 @@
#include <assert.h>
#include <errno.h>
#include <signal.h>
+#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -52,7 +53,7 @@
struct gpr_subprocess {
int pid;
- int joined;
+ bool joined;
};
const char *gpr_subprocess_binary_extension() { return ""; }
@@ -100,6 +101,7 @@ retry:
gpr_log(GPR_ERROR, "waitpid failed: %s", strerror(errno));
return -1;
}
+ p->joined = true;
return status;
}
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 8d2386e6a0..1e0f3eeca5 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -637,9 +637,6 @@ static int prepare_application_metadata(grpc_call *call, int count,
if (call->send_extra_metadata_count == 0) {
prepend_extra_metadata = 0;
} else {
- for (i = 0; i < call->send_extra_metadata_count; i++) {
- GRPC_MDELEM_REF(call->send_extra_metadata[i].md);
- }
for (i = 1; i < call->send_extra_metadata_count; i++) {
call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1];
}
@@ -685,6 +682,7 @@ static int prepare_application_metadata(grpc_call *call, int count,
&call->send_extra_metadata[call->send_extra_metadata_count - 1];
batch->list.head->prev = NULL;
batch->list.tail->next = NULL;
+ call->send_extra_metadata_count = 0;
break;
case 3: {
/* prepend AND md */
@@ -700,6 +698,7 @@ static int prepare_application_metadata(grpc_call *call, int count,
batch->list.tail = linked_from_md(last_md);
batch->list.head->prev = NULL;
batch->list.tail->next = NULL;
+ call->send_extra_metadata_count = 0;
break;
}
default:
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 89dd825460..fe73aa375c 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -195,6 +195,7 @@ struct grpc_server {
grpc_completion_queue **cqs;
grpc_pollset **pollsets;
size_t cq_count;
+ size_t pollset_count;
bool started;
/* The two following mutexes control access to server-state
@@ -1085,7 +1086,7 @@ void grpc_server_start(grpc_server *server) {
GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
server->started = true;
- size_t pollset_count = 0;
+ server->pollset_count = 0;
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
server->request_freelist_per_cq =
gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count);
@@ -1093,7 +1094,8 @@ void grpc_server_start(grpc_server *server) {
gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) {
- server->pollsets[pollset_count++] = grpc_cq_pollset(server->cqs[i]);
+ server->pollsets[server->pollset_count++] =
+ grpc_cq_pollset(server->cqs[i]);
}
server->request_freelist_per_cq[i] =
gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq);
@@ -1112,7 +1114,8 @@ void grpc_server_start(grpc_server *server) {
}
for (l = server->listeners; l; l = l->next) {
- l->start(&exec_ctx, server, l->arg, server->pollsets, pollset_count);
+ l->start(&exec_ctx, server, l->arg, server->pollsets,
+ server->pollset_count);
}
grpc_exec_ctx_finish(&exec_ctx);
@@ -1120,7 +1123,7 @@ void grpc_server_start(grpc_server *server) {
void grpc_server_get_pollsets(grpc_server *server, grpc_pollset ***pollsets,
size_t *pollset_count) {
- *pollset_count = server->cq_count;
+ *pollset_count = server->pollset_count;
*pollsets = server->pollsets;
}
diff --git a/src/core/lib/transport/mdstr_hash_table.c b/src/core/lib/transport/mdstr_hash_table.c
index 8e914c420b..3d01e56df7 100644
--- a/src/core/lib/transport/mdstr_hash_table.c
+++ b/src/core/lib/transport/mdstr_hash_table.c
@@ -41,7 +41,6 @@
struct grpc_mdstr_hash_table {
gpr_refcount refs;
- size_t num_entries;
size_t size;
grpc_mdstr_hash_table_entry* entries;
};
@@ -77,7 +76,6 @@ grpc_mdstr_hash_table* grpc_mdstr_hash_table_create(
grpc_mdstr_hash_table* table = gpr_malloc(sizeof(*table));
memset(table, 0, sizeof(*table));
gpr_ref_init(&table->refs, 1);
- table->num_entries = num_entries;
// Quadratic probing gets best performance when the table is no more
// than half full.
table->size = num_entries * 2;
@@ -96,7 +94,7 @@ grpc_mdstr_hash_table* grpc_mdstr_hash_table_ref(grpc_mdstr_hash_table* table) {
return table;
}
-int grpc_mdstr_hash_table_unref(grpc_mdstr_hash_table* table) {
+void grpc_mdstr_hash_table_unref(grpc_mdstr_hash_table* table) {
if (table != NULL && gpr_unref(&table->refs)) {
for (size_t i = 0; i < table->size; ++i) {
grpc_mdstr_hash_table_entry* entry = &table->entries[i];
@@ -107,13 +105,7 @@ int grpc_mdstr_hash_table_unref(grpc_mdstr_hash_table* table) {
}
gpr_free(table->entries);
gpr_free(table);
- return 1;
}
- return 0;
-}
-
-size_t grpc_mdstr_hash_table_num_entries(const grpc_mdstr_hash_table* table) {
- return table->num_entries;
}
void* grpc_mdstr_hash_table_get(const grpc_mdstr_hash_table* table,
@@ -123,35 +115,3 @@ void* grpc_mdstr_hash_table_get(const grpc_mdstr_hash_table* table,
if (idx == table->size) return NULL; // Not found.
return table->entries[idx].value;
}
-
-int grpc_mdstr_hash_table_cmp(const grpc_mdstr_hash_table* table1,
- const grpc_mdstr_hash_table* table2) {
- // Compare by num_entries.
- if (table1->num_entries < table2->num_entries) return -1;
- if (table1->num_entries > table2->num_entries) return 1;
- for (size_t i = 0; i < table1->num_entries; ++i) {
- grpc_mdstr_hash_table_entry* e1 = &table1->entries[i];
- grpc_mdstr_hash_table_entry* e2 = &table2->entries[i];
- // Compare keys by hash value.
- if (e1->key->hash < e2->key->hash) return -1;
- if (e1->key->hash > e2->key->hash) return 1;
- // Compare by vtable (pointer equality).
- if (e1->vtable < e2->vtable) return -1;
- if (e1->vtable > e2->vtable) return 1;
- // Compare values via vtable.
- const int value_result = e1->vtable->compare_value(e1->value, e2->value);
- if (value_result != 0) return value_result;
- }
- return 0;
-}
-
-void grpc_mdstr_hash_table_iterate(
- const grpc_mdstr_hash_table* table,
- void (*func)(const grpc_mdstr_hash_table_entry* entry, void* user_data),
- void* user_data) {
- for (size_t i = 0; i < table->size; ++i) {
- if (table->entries[i].key != NULL) {
- func(&table->entries[i], user_data);
- }
- }
-}
diff --git a/src/core/lib/transport/mdstr_hash_table.h b/src/core/lib/transport/mdstr_hash_table.h
index bceb4df93d..8982ec3a8d 100644
--- a/src/core/lib/transport/mdstr_hash_table.h
+++ b/src/core/lib/transport/mdstr_hash_table.h
@@ -51,7 +51,6 @@ typedef struct grpc_mdstr_hash_table grpc_mdstr_hash_table;
typedef struct grpc_mdstr_hash_table_vtable {
void (*destroy_value)(void* value);
void* (*copy_value)(void* value);
- int (*compare_value)(void* value1, void* value2);
} grpc_mdstr_hash_table_vtable;
typedef struct grpc_mdstr_hash_table_entry {
@@ -67,26 +66,11 @@ grpc_mdstr_hash_table* grpc_mdstr_hash_table_create(
size_t num_entries, grpc_mdstr_hash_table_entry* entries);
grpc_mdstr_hash_table* grpc_mdstr_hash_table_ref(grpc_mdstr_hash_table* table);
-/** Returns 1 when \a table is destroyed. */
-int grpc_mdstr_hash_table_unref(grpc_mdstr_hash_table* table);
-
-/** Returns the number of entries in \a table. */
-size_t grpc_mdstr_hash_table_num_entries(const grpc_mdstr_hash_table* table);
+void grpc_mdstr_hash_table_unref(grpc_mdstr_hash_table* table);
/** Returns the value from \a table associated with \a key.
Returns NULL if \a key is not found. */
void* grpc_mdstr_hash_table_get(const grpc_mdstr_hash_table* table,
const grpc_mdstr* key);
-/** Compares two hash tables.
- The sort order is stable but undefined. */
-int grpc_mdstr_hash_table_cmp(const grpc_mdstr_hash_table* table1,
- const grpc_mdstr_hash_table* table2);
-
-/** Iterates over the entries in \a table, calling \a func for each entry. */
-void grpc_mdstr_hash_table_iterate(
- const grpc_mdstr_hash_table* table,
- void (*func)(const grpc_mdstr_hash_table_entry* entry, void* user_data),
- void* user_data);
-
#endif /* GRPC_CORE_LIB_TRANSPORT_MDSTR_HASH_TABLE_H */
diff --git a/src/core/lib/transport/method_config.c b/src/core/lib/transport/method_config.c
deleted file mode 100644
index 57d97700bf..0000000000
--- a/src/core/lib/transport/method_config.c
+++ /dev/null
@@ -1,340 +0,0 @@
-//
-// Copyright 2015, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-//
-
-#include "src/core/lib/transport/method_config.h"
-
-#include <string.h>
-
-#include <grpc/impl/codegen/grpc_types.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-#include <grpc/support/time.h>
-
-#include "src/core/lib/transport/mdstr_hash_table.h"
-#include "src/core/lib/transport/metadata.h"
-
-//
-// grpc_method_config
-//
-
-// bool vtable
-
-static void* bool_copy(void* valuep) {
- bool value = *(bool*)valuep;
- bool* new_value = gpr_malloc(sizeof(bool));
- *new_value = value;
- return new_value;
-}
-
-static int bool_cmp(void* v1, void* v2) {
- bool b1 = *(bool*)v1;
- bool b2 = *(bool*)v2;
- if (!b1 && b2) return -1;
- if (b1 && !b2) return 1;
- return 0;
-}
-
-static grpc_mdstr_hash_table_vtable bool_vtable = {gpr_free, bool_copy,
- bool_cmp};
-
-// timespec vtable
-
-static void* timespec_copy(void* valuep) {
- gpr_timespec value = *(gpr_timespec*)valuep;
- gpr_timespec* new_value = gpr_malloc(sizeof(gpr_timespec));
- *new_value = value;
- return new_value;
-}
-
-static int timespec_cmp(void* v1, void* v2) {
- return gpr_time_cmp(*(gpr_timespec*)v1, *(gpr_timespec*)v2);
-}
-
-static grpc_mdstr_hash_table_vtable timespec_vtable = {gpr_free, timespec_copy,
- timespec_cmp};
-
-// int32 vtable
-
-static void* int32_copy(void* valuep) {
- int32_t value = *(int32_t*)valuep;
- int32_t* new_value = gpr_malloc(sizeof(int32_t));
- *new_value = value;
- return new_value;
-}
-
-static int int32_cmp(void* v1, void* v2) {
- int32_t i1 = *(int32_t*)v1;
- int32_t i2 = *(int32_t*)v2;
- if (i1 < i2) return -1;
- if (i1 > i2) return 1;
- return 0;
-}
-
-static grpc_mdstr_hash_table_vtable int32_vtable = {gpr_free, int32_copy,
- int32_cmp};
-
-// Hash table keys.
-#define GRPC_METHOD_CONFIG_WAIT_FOR_READY "grpc.wait_for_ready" // bool
-#define GRPC_METHOD_CONFIG_TIMEOUT "grpc.timeout" // gpr_timespec
-#define GRPC_METHOD_CONFIG_MAX_REQUEST_MESSAGE_BYTES \
- "grpc.max_request_message_bytes" // int32
-#define GRPC_METHOD_CONFIG_MAX_RESPONSE_MESSAGE_BYTES \
- "grpc.max_response_message_bytes" // int32
-
-struct grpc_method_config {
- grpc_mdstr_hash_table* table;
- grpc_mdstr* wait_for_ready_key;
- grpc_mdstr* timeout_key;
- grpc_mdstr* max_request_message_bytes_key;
- grpc_mdstr* max_response_message_bytes_key;
-};
-
-grpc_method_config* grpc_method_config_create(
- bool* wait_for_ready, gpr_timespec* timeout,
- int32_t* max_request_message_bytes, int32_t* max_response_message_bytes) {
- grpc_method_config* method_config = gpr_malloc(sizeof(grpc_method_config));
- memset(method_config, 0, sizeof(grpc_method_config));
- method_config->wait_for_ready_key =
- grpc_mdstr_from_string(GRPC_METHOD_CONFIG_WAIT_FOR_READY);
- method_config->timeout_key =
- grpc_mdstr_from_string(GRPC_METHOD_CONFIG_TIMEOUT);
- method_config->max_request_message_bytes_key =
- grpc_mdstr_from_string(GRPC_METHOD_CONFIG_MAX_REQUEST_MESSAGE_BYTES);
- method_config->max_response_message_bytes_key =
- grpc_mdstr_from_string(GRPC_METHOD_CONFIG_MAX_RESPONSE_MESSAGE_BYTES);
- grpc_mdstr_hash_table_entry entries[4];
- size_t num_entries = 0;
- if (wait_for_ready != NULL) {
- entries[num_entries].key = method_config->wait_for_ready_key;
- entries[num_entries].value = wait_for_ready;
- entries[num_entries].vtable = &bool_vtable;
- ++num_entries;
- }
- if (timeout != NULL) {
- entries[num_entries].key = method_config->timeout_key;
- entries[num_entries].value = timeout;
- entries[num_entries].vtable = &timespec_vtable;
- ++num_entries;
- }
- if (max_request_message_bytes != NULL) {
- entries[num_entries].key = method_config->max_request_message_bytes_key;
- entries[num_entries].value = max_request_message_bytes;
- entries[num_entries].vtable = &int32_vtable;
- ++num_entries;
- }
- if (max_response_message_bytes != NULL) {
- entries[num_entries].key = method_config->max_response_message_bytes_key;
- entries[num_entries].value = max_response_message_bytes;
- entries[num_entries].vtable = &int32_vtable;
- ++num_entries;
- }
- method_config->table = grpc_mdstr_hash_table_create(num_entries, entries);
- return method_config;
-}
-
-grpc_method_config* grpc_method_config_ref(grpc_method_config* method_config) {
- grpc_mdstr_hash_table_ref(method_config->table);
- return method_config;
-}
-
-void grpc_method_config_unref(grpc_method_config* method_config) {
- if (grpc_mdstr_hash_table_unref(method_config->table)) {
- GRPC_MDSTR_UNREF(method_config->wait_for_ready_key);
- GRPC_MDSTR_UNREF(method_config->timeout_key);
- GRPC_MDSTR_UNREF(method_config->max_request_message_bytes_key);
- GRPC_MDSTR_UNREF(method_config->max_response_message_bytes_key);
- gpr_free(method_config);
- }
-}
-
-int grpc_method_config_cmp(const grpc_method_config* method_config1,
- const grpc_method_config* method_config2) {
- return grpc_mdstr_hash_table_cmp(method_config1->table,
- method_config2->table);
-}
-
-const bool* grpc_method_config_get_wait_for_ready(
- const grpc_method_config* method_config) {
- return grpc_mdstr_hash_table_get(method_config->table,
- method_config->wait_for_ready_key);
-}
-
-const gpr_timespec* grpc_method_config_get_timeout(
- const grpc_method_config* method_config) {
- return grpc_mdstr_hash_table_get(method_config->table,
- method_config->timeout_key);
-}
-
-const int32_t* grpc_method_config_get_max_request_message_bytes(
- const grpc_method_config* method_config) {
- return grpc_mdstr_hash_table_get(
- method_config->table, method_config->max_request_message_bytes_key);
-}
-
-const int32_t* grpc_method_config_get_max_response_message_bytes(
- const grpc_method_config* method_config) {
- return grpc_mdstr_hash_table_get(
- method_config->table, method_config->max_response_message_bytes_key);
-}
-
-//
-// grpc_method_config_table
-//
-
-static void method_config_unref(void* valuep) {
- grpc_method_config_unref(valuep);
-}
-
-static void* method_config_ref(void* valuep) {
- return grpc_method_config_ref(valuep);
-}
-
-static int method_config_cmp(void* valuep1, void* valuep2) {
- return grpc_method_config_cmp(valuep1, valuep2);
-}
-
-static const grpc_mdstr_hash_table_vtable method_config_table_vtable = {
- method_config_unref, method_config_ref, method_config_cmp};
-
-grpc_method_config_table* grpc_method_config_table_create(
- size_t num_entries, grpc_method_config_table_entry* entries) {
- grpc_mdstr_hash_table_entry* hash_table_entries =
- gpr_malloc(sizeof(grpc_mdstr_hash_table_entry) * num_entries);
- for (size_t i = 0; i < num_entries; ++i) {
- hash_table_entries[i].key = entries[i].method_name;
- hash_table_entries[i].value = entries[i].method_config;
- hash_table_entries[i].vtable = &method_config_table_vtable;
- }
- grpc_method_config_table* method_config_table =
- grpc_mdstr_hash_table_create(num_entries, hash_table_entries);
- gpr_free(hash_table_entries);
- return method_config_table;
-}
-
-grpc_method_config_table* grpc_method_config_table_ref(
- grpc_method_config_table* table) {
- return grpc_mdstr_hash_table_ref(table);
-}
-
-void grpc_method_config_table_unref(grpc_method_config_table* table) {
- grpc_mdstr_hash_table_unref(table);
-}
-
-int grpc_method_config_table_cmp(const grpc_method_config_table* table1,
- const grpc_method_config_table* table2) {
- return grpc_mdstr_hash_table_cmp(table1, table2);
-}
-
-void* grpc_method_config_table_get(const grpc_mdstr_hash_table* table,
- const grpc_mdstr* path) {
- void* value = grpc_mdstr_hash_table_get(table, path);
- // If we didn't find a match for the path, try looking for a wildcard
- // entry (i.e., change "/service/method" to "/service/*").
- if (value == NULL) {
- const char* path_str = grpc_mdstr_as_c_string(path);
- const char* sep = strrchr(path_str, '/') + 1;
- const size_t len = (size_t)(sep - path_str);
- char* buf = gpr_malloc(len + 2); // '*' and NUL
- memcpy(buf, path_str, len);
- buf[len] = '*';
- buf[len + 1] = '\0';
- grpc_mdstr* wildcard_path = grpc_mdstr_from_string(buf);
- gpr_free(buf);
- value = grpc_mdstr_hash_table_get(table, wildcard_path);
- GRPC_MDSTR_UNREF(wildcard_path);
- }
- return value;
-}
-
-static void* copy_arg(void* p) { return grpc_method_config_table_ref(p); }
-
-static void destroy_arg(void* p) { grpc_method_config_table_unref(p); }
-
-static int cmp_arg(void* p1, void* p2) {
- return grpc_method_config_table_cmp(p1, p2);
-}
-
-static grpc_arg_pointer_vtable arg_vtable = {copy_arg, destroy_arg, cmp_arg};
-
-grpc_arg grpc_method_config_table_create_channel_arg(
- grpc_method_config_table* table) {
- grpc_arg arg;
- arg.type = GRPC_ARG_POINTER;
- arg.key = GRPC_ARG_SERVICE_CONFIG;
- arg.value.pointer.p = table;
- arg.value.pointer.vtable = &arg_vtable;
- return arg;
-}
-
-// State used by convert_entry() below.
-typedef struct conversion_state {
- void* (*convert_value)(const grpc_method_config* method_config);
- const grpc_mdstr_hash_table_vtable* vtable;
- size_t num_entries;
- grpc_mdstr_hash_table_entry* entries;
-} conversion_state;
-
-// A function to be passed to grpc_mdstr_hash_table_iterate() to create
-// a copy of the entries.
-static void convert_entry(const grpc_mdstr_hash_table_entry* entry,
- void* user_data) {
- conversion_state* state = user_data;
- state->entries[state->num_entries].key = GRPC_MDSTR_REF(entry->key);
- state->entries[state->num_entries].value = state->convert_value(entry->value);
- state->entries[state->num_entries].vtable = state->vtable;
- ++state->num_entries;
-}
-
-grpc_mdstr_hash_table* grpc_method_config_table_convert(
- const grpc_method_config_table* table,
- void* (*convert_value)(const grpc_method_config* method_config),
- const grpc_mdstr_hash_table_vtable* vtable) {
- // Create an array of the entries in the table with converted values.
- conversion_state state;
- state.convert_value = convert_value;
- state.vtable = vtable;
- state.num_entries = 0;
- state.entries = gpr_malloc(sizeof(grpc_mdstr_hash_table_entry) *
- grpc_mdstr_hash_table_num_entries(table));
- grpc_mdstr_hash_table_iterate(table, convert_entry, &state);
- // Create a new table based on the array we just constructed.
- grpc_mdstr_hash_table* new_table =
- grpc_mdstr_hash_table_create(state.num_entries, state.entries);
- // Clean up the array.
- for (size_t i = 0; i < state.num_entries; ++i) {
- GRPC_MDSTR_UNREF(state.entries[i].key);
- vtable->destroy_value(state.entries[i].value);
- }
- gpr_free(state.entries);
- // Return the new table.
- return new_table;
-}
diff --git a/src/core/lib/transport/method_config.h b/src/core/lib/transport/method_config.h
deleted file mode 100644
index 58fedd9436..0000000000
--- a/src/core/lib/transport/method_config.h
+++ /dev/null
@@ -1,136 +0,0 @@
-//
-// Copyright 2016, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-//
-
-#ifndef GRPC_CORE_LIB_TRANSPORT_METHOD_CONFIG_H
-#define GRPC_CORE_LIB_TRANSPORT_METHOD_CONFIG_H
-
-#include <stdbool.h>
-
-#include <grpc/impl/codegen/gpr_types.h>
-#include <grpc/impl/codegen/grpc_types.h>
-
-#include "src/core/lib/transport/mdstr_hash_table.h"
-#include "src/core/lib/transport/metadata.h"
-
-/// Per-method configuration.
-typedef struct grpc_method_config grpc_method_config;
-
-/// Creates a grpc_method_config with the specified parameters.
-/// Any parameter may be NULL to indicate that the value is unset.
-///
-/// \a wait_for_ready indicates whether the client should wait until the
-/// request deadline for the channel to become ready, even if there is a
-/// temporary failure before the deadline while attempting to connect.
-///
-/// \a timeout indicates the timeout for calls.
-///
-/// \a max_request_message_bytes and \a max_response_message_bytes
-/// indicate the maximum sizes of the request (checked when sending) and
-/// response (checked when receiving) messages.
-grpc_method_config* grpc_method_config_create(
- bool* wait_for_ready, gpr_timespec* timeout,
- int32_t* max_request_message_bytes, int32_t* max_response_message_bytes);
-
-grpc_method_config* grpc_method_config_ref(grpc_method_config* method_config);
-void grpc_method_config_unref(grpc_method_config* method_config);
-
-/// Compares two grpc_method_configs.
-/// The sort order is stable but undefined.
-int grpc_method_config_cmp(const grpc_method_config* method_config1,
- const grpc_method_config* method_config2);
-
-/// These methods return NULL if the requested field is unset.
-/// The caller does NOT take ownership of the result.
-const bool* grpc_method_config_get_wait_for_ready(
- const grpc_method_config* method_config);
-const gpr_timespec* grpc_method_config_get_timeout(
- const grpc_method_config* method_config);
-const int32_t* grpc_method_config_get_max_request_message_bytes(
- const grpc_method_config* method_config);
-const int32_t* grpc_method_config_get_max_response_message_bytes(
- const grpc_method_config* method_config);
-
-/// A table of method configs.
-typedef grpc_mdstr_hash_table grpc_method_config_table;
-
-typedef struct grpc_method_config_table_entry {
- /// The name is of one of the following forms:
- /// service/method -- specifies exact service and method name
- /// service/* -- matches all methods for the specified service
- grpc_mdstr* method_name;
- grpc_method_config* method_config;
-} grpc_method_config_table_entry;
-
-/// Takes new references to all keys and values in \a entries.
-grpc_method_config_table* grpc_method_config_table_create(
- size_t num_entries, grpc_method_config_table_entry* entries);
-
-grpc_method_config_table* grpc_method_config_table_ref(
- grpc_method_config_table* table);
-void grpc_method_config_table_unref(grpc_method_config_table* table);
-
-/// Compares two grpc_method_config_tables.
-/// The sort order is stable but undefined.
-int grpc_method_config_table_cmp(const grpc_method_config_table* table1,
- const grpc_method_config_table* table2);
-
-/// Gets the method config for the specified \a path, which should be of
-/// the form "/service/method".
-/// Returns NULL if the method has no config.
-/// Caller does NOT own a reference to the result.
-///
-/// Note: This returns a void* instead of a grpc_method_config* so that
-/// it can also be used for tables constructed via
-/// grpc_method_config_table_convert().
-void* grpc_method_config_table_get(const grpc_mdstr_hash_table* table,
- const grpc_mdstr* path);
-
-/// Returns a channel arg containing \a table.
-grpc_arg grpc_method_config_table_create_channel_arg(
- grpc_method_config_table* table);
-
-/// Generates a new table from \a table whose values are converted to a
-/// new form via the \a convert_value function. The new table will use
-/// \a vtable for its values.
-///
-/// This is generally used to convert the table's value type from
-/// grpc_method_config to a simple struct containing only the parameters
-/// relevant to a particular filter, thus avoiding the need for a hash
-/// table lookup on the fast path. In that scenario, \a convert_value
-/// will return a new instance of the struct containing the values from
-/// the grpc_method_config, and \a vtable provides the methods for
-/// operating on the struct type.
-grpc_mdstr_hash_table* grpc_method_config_table_convert(
- const grpc_method_config_table* table,
- void* (*convert_value)(const grpc_method_config* method_config),
- const grpc_mdstr_hash_table_vtable* vtable);
-
-#endif /* GRPC_CORE_LIB_TRANSPORT_METHOD_CONFIG_H */
diff --git a/src/core/lib/transport/service_config.c b/src/core/lib/transport/service_config.c
new file mode 100644
index 0000000000..2e2b59e3f7
--- /dev/null
+++ b/src/core/lib/transport/service_config.c
@@ -0,0 +1,249 @@
+//
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#include "src/core/lib/transport/service_config.h"
+
+#include <string.h>
+
+#include <grpc/impl/codegen/grpc_types.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/json/json.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/transport/mdstr_hash_table.h"
+
+// The main purpose of the code here is to parse the service config in
+// JSON form, which will look like this:
+//
+// {
+// "loadBalancingPolicy": "string", // optional
+// "methodConfig": [ // array of one or more method_config objects
+// {
+// "name": [ // array of one or more name objects
+// {
+// "service": "string", // required
+// "method": "string", // optional
+// }
+// ],
+// // remaining fields are optional.
+// // see https://developers.google.com/protocol-buffers/docs/proto3#json
+// // for format details.
+// "waitForReady": bool,
+// "timeout": "duration_string",
+// "maxRequestMessageBytes": "int64_string",
+// "maxResponseMessageBytes": "int64_string",
+// }
+// ]
+// }
+
+struct grpc_service_config {
+ char* json_string; // Underlying storage for json_tree.
+ grpc_json* json_tree;
+};
+
+grpc_service_config* grpc_service_config_create(const char* json_string) {
+ grpc_service_config* service_config = gpr_malloc(sizeof(*service_config));
+ service_config->json_string = gpr_strdup(json_string);
+ service_config->json_tree =
+ grpc_json_parse_string(service_config->json_string);
+ if (service_config->json_tree == NULL) {
+ gpr_log(GPR_INFO, "failed to parse JSON for service config");
+ gpr_free(service_config->json_string);
+ gpr_free(service_config);
+ return NULL;
+ }
+ return service_config;
+}
+
+void grpc_service_config_destroy(grpc_service_config* service_config) {
+ grpc_json_destroy(service_config->json_tree);
+ gpr_free(service_config->json_string);
+ gpr_free(service_config);
+}
+
+const char* grpc_service_config_get_lb_policy_name(
+ const grpc_service_config* service_config) {
+ const grpc_json* json = service_config->json_tree;
+ if (json->type != GRPC_JSON_OBJECT || json->key != NULL) return NULL;
+ const char* lb_policy_name = NULL;
+ for (grpc_json* field = json->child; field != NULL; field = field->next) {
+ if (field->key == NULL) return NULL;
+ if (strcmp(field->key, "loadBalancingPolicy") == 0) {
+ if (lb_policy_name != NULL) return NULL; // Duplicate.
+ if (field->type != GRPC_JSON_STRING) return NULL;
+ lb_policy_name = field->value;
+ }
+ }
+ return lb_policy_name;
+}
+
+// Returns the number of names specified in the method config \a json.
+static size_t count_names_in_method_config_json(grpc_json* json) {
+ size_t num_names = 0;
+ for (grpc_json* field = json->child; field != NULL; field = field->next) {
+ if (field->key != NULL && strcmp(field->key, "name") == 0) ++num_names;
+ }
+ return num_names;
+}
+
+// Returns a path string for the JSON name object specified by \a json.
+// Returns NULL on error. Caller takes ownership of result.
+static char* parse_json_method_name(grpc_json* json) {
+ if (json->type != GRPC_JSON_OBJECT) return NULL;
+ const char* service_name = NULL;
+ const char* method_name = NULL;
+ for (grpc_json* child = json->child; child != NULL; child = child->next) {
+ if (child->key == NULL) return NULL;
+ if (child->type != GRPC_JSON_STRING) return NULL;
+ if (strcmp(child->key, "service") == 0) {
+ if (service_name != NULL) return NULL; // Duplicate.
+ if (child->value == NULL) return NULL;
+ service_name = child->value;
+ } else if (strcmp(child->key, "method") == 0) {
+ if (method_name != NULL) return NULL; // Duplicate.
+ if (child->value == NULL) return NULL;
+ method_name = child->value;
+ }
+ }
+ if (service_name == NULL) return NULL; // Required field.
+ char* path;
+ gpr_asprintf(&path, "/%s/%s", service_name,
+ method_name == NULL ? "*" : method_name);
+ return path;
+}
+
+// Parses the method config from \a json. Adds an entry to \a entries for
+// each name found, incrementing \a idx for each entry added.
+// Returns false on error.
+static bool parse_json_method_config(
+ grpc_json* json, void* (*create_value)(const grpc_json* method_config_json),
+ const grpc_mdstr_hash_table_vtable* vtable,
+ grpc_mdstr_hash_table_entry* entries, size_t* idx) {
+ // Construct value.
+ void* method_config = create_value(json);
+ if (method_config == NULL) return false;
+ // Construct list of paths.
+ bool success = false;
+ gpr_strvec paths;
+ gpr_strvec_init(&paths);
+ for (grpc_json* child = json->child; child != NULL; child = child->next) {
+ if (child->key == NULL) continue;
+ if (strcmp(child->key, "name") == 0) {
+ if (child->type != GRPC_JSON_ARRAY) goto done;
+ for (grpc_json* name = child->child; name != NULL; name = name->next) {
+ char* path = parse_json_method_name(name);
+ gpr_strvec_add(&paths, path);
+ }
+ }
+ }
+ if (paths.count == 0) goto done; // No names specified.
+ // Add entry for each path.
+ for (size_t i = 0; i < paths.count; ++i) {
+ entries[*idx].key = grpc_mdstr_from_string(paths.strs[i]);
+ entries[*idx].value = vtable->copy_value(method_config);
+ entries[*idx].vtable = vtable;
+ ++*idx;
+ }
+ success = true;
+done:
+ vtable->destroy_value(method_config);
+ gpr_strvec_destroy(&paths);
+ return success;
+}
+
+grpc_mdstr_hash_table* grpc_service_config_create_method_config_table(
+ const grpc_service_config* service_config,
+ void* (*create_value)(const grpc_json* method_config_json),
+ const grpc_mdstr_hash_table_vtable* vtable) {
+ const grpc_json* json = service_config->json_tree;
+ // Traverse parsed JSON tree.
+ if (json->type != GRPC_JSON_OBJECT || json->key != NULL) return NULL;
+ size_t num_entries = 0;
+ grpc_mdstr_hash_table_entry* entries = NULL;
+ for (grpc_json* field = json->child; field != NULL; field = field->next) {
+ if (field->key == NULL) return NULL;
+ if (strcmp(field->key, "methodConfig") == 0) {
+ if (entries != NULL) return NULL; // Duplicate.
+ if (field->type != GRPC_JSON_ARRAY) return NULL;
+ // Find number of entries.
+ for (grpc_json* method = field->child; method != NULL;
+ method = method->next) {
+ num_entries += count_names_in_method_config_json(method);
+ }
+ // Populate method config table entries.
+ entries = gpr_malloc(num_entries * sizeof(grpc_mdstr_hash_table_entry));
+ size_t idx = 0;
+ for (grpc_json* method = field->child; method != NULL;
+ method = method->next) {
+ if (!parse_json_method_config(method, create_value, vtable, entries,
+ &idx)) {
+ return NULL;
+ }
+ }
+ GPR_ASSERT(idx == num_entries);
+ }
+ }
+ // Instantiate method config table.
+ grpc_mdstr_hash_table* method_config_table = NULL;
+ if (entries != NULL) {
+ method_config_table = grpc_mdstr_hash_table_create(num_entries, entries);
+ // Clean up.
+ for (size_t i = 0; i < num_entries; ++i) {
+ GRPC_MDSTR_UNREF(entries[i].key);
+ vtable->destroy_value(entries[i].value);
+ }
+ gpr_free(entries);
+ }
+ return method_config_table;
+}
+
+void* grpc_method_config_table_get(const grpc_mdstr_hash_table* table,
+ const grpc_mdstr* path) {
+ void* value = grpc_mdstr_hash_table_get(table, path);
+ // If we didn't find a match for the path, try looking for a wildcard
+ // entry (i.e., change "/service/method" to "/service/*").
+ if (value == NULL) {
+ const char* path_str = grpc_mdstr_as_c_string(path);
+ const char* sep = strrchr(path_str, '/') + 1;
+ const size_t len = (size_t)(sep - path_str);
+ char* buf = gpr_malloc(len + 2); // '*' and NUL
+ memcpy(buf, path_str, len);
+ buf[len] = '*';
+ buf[len + 1] = '\0';
+ grpc_mdstr* wildcard_path = grpc_mdstr_from_string(buf);
+ gpr_free(buf);
+ value = grpc_mdstr_hash_table_get(table, wildcard_path);
+ GRPC_MDSTR_UNREF(wildcard_path);
+ }
+ return value;
+}
diff --git a/src/core/lib/transport/service_config.h b/src/core/lib/transport/service_config.h
new file mode 100644
index 0000000000..2ffe475193
--- /dev/null
+++ b/src/core/lib/transport/service_config.h
@@ -0,0 +1,70 @@
+//
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#ifndef GRPC_CORE_LIB_TRANSPORT_SERVICE_CONFIG_H
+#define GRPC_CORE_LIB_TRANSPORT_SERVICE_CONFIG_H
+
+#include <grpc/impl/codegen/grpc_types.h>
+
+#include "src/core/lib/json/json.h"
+#include "src/core/lib/transport/mdstr_hash_table.h"
+
+typedef struct grpc_service_config grpc_service_config;
+
+grpc_service_config* grpc_service_config_create(const char* json_string);
+void grpc_service_config_destroy(grpc_service_config* service_config);
+
+/// Gets the LB policy name from \a service_config.
+/// Returns NULL if no LB policy name was specified.
+/// Caller does NOT take ownership.
+const char* grpc_service_config_get_lb_policy_name(
+ const grpc_service_config* service_config);
+
+/// Creates a method config table based on the data in \a json.
+/// The table's keys are request paths. The table's value type is
+/// returned by \a create_value(), based on data parsed from the JSON tree.
+/// \a vtable provides methods used to manage the values.
+/// Returns NULL on error.
+grpc_mdstr_hash_table* grpc_service_config_create_method_config_table(
+ const grpc_service_config* service_config,
+ void* (*create_value)(const grpc_json* method_config_json),
+ const grpc_mdstr_hash_table_vtable* vtable);
+
+/// A helper function for looking up values in the table returned by
+/// \a grpc_service_config_create_method_config_table().
+/// Gets the method config for the specified \a path, which should be of
+/// the form "/service/method".
+/// Returns NULL if the method has no config.
+/// Caller does NOT own a reference to the result.
+void* grpc_method_config_table_get(const grpc_mdstr_hash_table* table,
+ const grpc_mdstr* path);
+
+#endif /* GRPC_CORE_LIB_TRANSPORT_SERVICE_CONFIG_H */