aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-03-09 15:09:34 -0800
committerGravatar Craig Tiller <ctiller@google.com>2017-03-09 15:09:34 -0800
commitaf1158126075c007a0e2fcdf99609545685f62e6 (patch)
tree1e50761cd5a2414789cb51dc048c6fbd44c91682 /src/core
parent4d92a49fc0a96afc7f4539c0e600f06b6e4089f6 (diff)
parenteb064ec7b81b60c5e1eb47d6124d0c05056b3097 (diff)
Merge github.com:grpc/grpc into cpp_bazelness
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/client_channel/client_channel.c222
-rw-r--r--src/core/ext/client_channel/client_channel_plugin.c2
-rw-r--r--src/core/ext/client_channel/http_proxy.c10
-rw-r--r--src/core/ext/client_channel/parse_address.c35
-rw-r--r--src/core/ext/client_channel/resolver_registry.c22
-rw-r--r--src/core/ext/client_channel/resolver_registry.h5
-rw-r--r--src/core/ext/client_channel/subchannel.c11
-rw-r--r--src/core/ext/client_channel/subchannel.h3
-rw-r--r--src/core/ext/client_channel/uri_parser.c37
-rw-r--r--src/core/ext/client_channel/uri_parser.h4
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c2
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.c2
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c7
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c11
-rw-r--r--src/core/lib/http/parser.c4
-rw-r--r--src/core/lib/iomgr/pollset_uv.c22
-rw-r--r--src/core/lib/iomgr/sockaddr_utils.c12
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c19
-rw-r--r--src/core/lib/iomgr/timer_generic.c9
-rw-r--r--src/core/lib/iomgr/udp_server.c6
-rw-r--r--src/core/lib/support/spinlock.h52
-rw-r--r--src/core/lib/support/sync.c8
-rw-r--r--src/core/lib/surface/call.c4
-rw-r--r--src/core/lib/transport/transport.c35
-rw-r--r--src/core/lib/transport/transport.h6
-rw-r--r--src/core/lib/tsi/test_creds/BUILD2
26 files changed, 375 insertions, 177 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index 34015c534e..bf64f84772 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -76,24 +76,82 @@ typedef enum {
WAIT_FOR_READY_TRUE
} wait_for_ready_value;
-typedef struct method_parameters {
+typedef struct {
+ gpr_refcount refs;
gpr_timespec timeout;
wait_for_ready_value wait_for_ready;
} method_parameters;
+static method_parameters *method_parameters_ref(
+ method_parameters *method_params) {
+ gpr_ref(&method_params->refs);
+ return method_params;
+}
+
+static void method_parameters_unref(method_parameters *method_params) {
+ if (gpr_unref(&method_params->refs)) {
+ gpr_free(method_params);
+ }
+}
+
static void *method_parameters_copy(void *value) {
- void *new_value = gpr_malloc(sizeof(method_parameters));
- memcpy(new_value, value, sizeof(method_parameters));
- return new_value;
+ return method_parameters_ref(value);
}
-static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *p) {
- gpr_free(p);
+static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) {
+ method_parameters_unref(value);
}
static const grpc_slice_hash_table_vtable method_parameters_vtable = {
method_parameters_free, method_parameters_copy};
+static bool parse_wait_for_ready(grpc_json *field,
+ wait_for_ready_value *wait_for_ready) {
+ if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
+ return false;
+ }
+ *wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE
+ : WAIT_FOR_READY_FALSE;
+ return true;
+}
+
+static bool parse_timeout(grpc_json *field, gpr_timespec *timeout) {
+ if (field->type != GRPC_JSON_STRING) return false;
+ size_t len = strlen(field->value);
+ if (field->value[len - 1] != 's') return false;
+ char *buf = gpr_strdup(field->value);
+ buf[len - 1] = '\0'; // Remove trailing 's'.
+ char *decimal_point = strchr(buf, '.');
+ if (decimal_point != NULL) {
+ *decimal_point = '\0';
+ timeout->tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1);
+ if (timeout->tv_nsec == -1) {
+ gpr_free(buf);
+ return false;
+ }
+ // There should always be exactly 3, 6, or 9 fractional digits.
+ int multiplier = 1;
+ switch (strlen(decimal_point + 1)) {
+ case 9:
+ break;
+ case 6:
+ multiplier *= 1000;
+ break;
+ case 3:
+ multiplier *= 1000000;
+ break;
+ default: // Unsupported number of digits.
+ gpr_free(buf);
+ return false;
+ }
+ timeout->tv_nsec *= multiplier;
+ }
+ timeout->tv_sec = gpr_parse_nonnegative_int(buf);
+ gpr_free(buf);
+ if (timeout->tv_sec == -1) return false;
+ return true;
+}
+
static void *method_parameters_create_from_json(const grpc_json *json) {
wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET;
gpr_timespec timeout = {0, 0, GPR_TIMESPAN};
@@ -101,49 +159,14 @@ static void *method_parameters_create_from_json(const grpc_json *json) {
if (field->key == NULL) continue;
if (strcmp(field->key, "waitForReady") == 0) {
if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate.
- if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
- return NULL;
- }
- wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE
- : WAIT_FOR_READY_FALSE;
+ if (!parse_wait_for_ready(field, &wait_for_ready)) return NULL;
} else if (strcmp(field->key, "timeout") == 0) {
if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate.
- if (field->type != GRPC_JSON_STRING) return NULL;
- size_t len = strlen(field->value);
- if (field->value[len - 1] != 's') return NULL;
- char *buf = gpr_strdup(field->value);
- buf[len - 1] = '\0'; // Remove trailing 's'.
- char *decimal_point = strchr(buf, '.');
- if (decimal_point != NULL) {
- *decimal_point = '\0';
- timeout.tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1);
- if (timeout.tv_nsec == -1) {
- gpr_free(buf);
- return NULL;
- }
- // There should always be exactly 3, 6, or 9 fractional digits.
- int multiplier = 1;
- switch (strlen(decimal_point + 1)) {
- case 9:
- break;
- case 6:
- multiplier *= 1000;
- break;
- case 3:
- multiplier *= 1000000;
- break;
- default: // Unsupported number of digits.
- gpr_free(buf);
- return NULL;
- }
- timeout.tv_nsec *= multiplier;
- }
- timeout.tv_sec = gpr_parse_nonnegative_int(buf);
- if (timeout.tv_sec == -1) return NULL;
- gpr_free(buf);
+ if (!parse_timeout(field, &timeout)) return NULL;
}
}
method_parameters *value = gpr_malloc(sizeof(method_parameters));
+ gpr_ref_init(&value->refs, 1);
value->timeout = timeout;
value->wait_for_ready = wait_for_ready;
return value;
@@ -629,7 +652,7 @@ typedef struct client_channel_call_data {
grpc_slice path; // Request path.
gpr_timespec call_start_time;
gpr_timespec deadline;
- wait_for_ready_value wait_for_ready_from_service_config;
+ method_parameters *method_params;
grpc_closure read_service_config;
grpc_error *cancel_error;
@@ -837,10 +860,11 @@ static bool pick_subchannel_locked(
initial_metadata_flags &
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
const bool wait_for_ready_set_from_service_config =
- calld->wait_for_ready_from_service_config != WAIT_FOR_READY_UNSET;
+ calld->method_params != NULL &&
+ calld->method_params->wait_for_ready != WAIT_FOR_READY_UNSET;
if (!wait_for_ready_set_from_api &&
wait_for_ready_set_from_service_config) {
- if (calld->wait_for_ready_from_service_config == WAIT_FOR_READY_TRUE) {
+ if (calld->method_params->wait_for_ready == WAIT_FOR_READY_TRUE) {
initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
} else {
initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
@@ -978,10 +1002,9 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
add_waiting_locked(calld, op);
}
-static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx,
- void *arg,
- grpc_error *error_ignored) {
- GPR_TIMER_BEGIN("cc_start_transport_stream_op_locked", 0);
+static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error_ignored) {
+ GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0);
grpc_transport_stream_op *op = arg;
grpc_call_element *elem = op->handler_private.args[0];
@@ -991,7 +1014,7 @@ static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx,
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
"start_transport_stream_op");
- GPR_TIMER_END("cc_start_transport_stream_op_locked", 0);
+ GPR_TIMER_END("start_transport_stream_op_locked", 0);
}
/* The logic here is fairly complicated, due to (a) the fact that we
@@ -1031,52 +1054,53 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_closure_sched(
exec_ctx,
grpc_closure_init(&op->handler_private.closure,
- cc_start_transport_stream_op_locked, op,
+ start_transport_stream_op_locked, op,
grpc_combiner_scheduler(chand->combiner, false)),
GRPC_ERROR_NONE);
GPR_TIMER_END("cc_start_transport_stream_op", 0);
}
+// Sets calld->method_params.
+// If the method params specify a timeout, populates
+// *per_method_deadline and returns true.
+static bool set_call_method_params_from_service_config_locked(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ gpr_timespec *per_method_deadline) {
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ if (chand->method_params_table != NULL) {
+ calld->method_params = grpc_method_config_table_get(
+ exec_ctx, chand->method_params_table, calld->path);
+ if (calld->method_params != NULL) {
+ method_parameters_ref(calld->method_params);
+ if (gpr_time_cmp(calld->method_params->timeout,
+ gpr_time_0(GPR_TIMESPAN)) != 0) {
+ *per_method_deadline =
+ gpr_time_add(calld->call_start_time, calld->method_params->timeout);
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
// Gets data from the service config. Invoked when the resolver returns
// its initial result.
static void read_service_config_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_call_element *elem = arg;
- channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
// If this is an error, there's no point in looking at the service config.
if (error == GRPC_ERROR_NONE) {
- // Get the method config table from channel data.
- grpc_slice_hash_table *method_params_table = NULL;
- if (chand->method_params_table != NULL) {
- method_params_table =
- grpc_slice_hash_table_ref(chand->method_params_table);
- }
- // If the method config table was present, use it.
- if (method_params_table != NULL) {
- const method_parameters *method_params = grpc_method_config_table_get(
- exec_ctx, method_params_table, calld->path);
- if (method_params != NULL) {
- const bool have_method_timeout =
- gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0;
- if (have_method_timeout ||
- method_params->wait_for_ready != WAIT_FOR_READY_UNSET) {
- if (have_method_timeout) {
- const gpr_timespec per_method_deadline =
- gpr_time_add(calld->call_start_time, method_params->timeout);
- if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
- calld->deadline = per_method_deadline;
- // Reset deadline timer.
- grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
- }
- }
- if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) {
- calld->wait_for_ready_from_service_config =
- method_params->wait_for_ready;
- }
- }
+ gpr_timespec per_method_deadline;
+ if (set_call_method_params_from_service_config_locked(
+ exec_ctx, elem, &per_method_deadline)) {
+ // If the deadline from the service config is shorter than the one
+ // from the client API, reset the deadline timer.
+ if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
+ calld->deadline = per_method_deadline;
+ grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
}
- grpc_slice_hash_table_unref(exec_ctx, method_params_table);
}
}
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config");
@@ -1091,29 +1115,12 @@ static void initial_read_service_config_locked(grpc_exec_ctx *exec_ctx,
// If the resolver has already returned results, then we can access
// the service config parameters immediately. Otherwise, we need to
// defer that work until the resolver returns an initial result.
- // TODO(roth): This code is almost but not quite identical to the code
- // in read_service_config() above. It would be nice to find a way to
- // combine them, to avoid having to maintain it twice.
if (chand->lb_policy != NULL) {
// We already have a resolver result, so check for service config.
- if (chand->method_params_table != NULL) {
- grpc_slice_hash_table *method_params_table =
- grpc_slice_hash_table_ref(chand->method_params_table);
- method_parameters *method_params = grpc_method_config_table_get(
- exec_ctx, method_params_table, calld->path);
- if (method_params != NULL) {
- if (gpr_time_cmp(method_params->timeout,
- gpr_time_0(GPR_CLOCK_MONOTONIC)) != 0) {
- gpr_timespec per_method_deadline =
- gpr_time_add(calld->call_start_time, method_params->timeout);
- calld->deadline = gpr_time_min(calld->deadline, per_method_deadline);
- }
- if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) {
- calld->wait_for_ready_from_service_config =
- method_params->wait_for_ready;
- }
- }
- grpc_slice_hash_table_unref(exec_ctx, method_params_table);
+ gpr_timespec per_method_deadline;
+ if (set_call_method_params_from_service_config_locked(
+ exec_ctx, elem, &per_method_deadline)) {
+ calld->deadline = gpr_time_min(calld->deadline, per_method_deadline);
}
} else {
// We don't yet have a resolver result, so register a callback to
@@ -1144,7 +1151,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
calld->path = grpc_slice_ref_internal(args->path);
calld->call_start_time = args->start_time;
calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
- calld->wait_for_ready_from_service_config = WAIT_FOR_READY_UNSET;
+ calld->method_params = NULL;
calld->cancel_error = GRPC_ERROR_NONE;
gpr_atm_rel_store(&calld->subchannel_call, 0);
calld->connected_subchannel = NULL;
@@ -1172,6 +1179,9 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data;
grpc_deadline_state_destroy(exec_ctx, elem);
grpc_slice_unref_internal(exec_ctx, calld->path);
+ if (calld->method_params != NULL) {
+ method_parameters_unref(calld->method_params);
+ }
GRPC_ERROR_UNREF(calld->cancel_error);
grpc_subchannel_call *call = GET_CALL(calld);
if (call != NULL && call != CANCELLED_CALL) {
diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/client_channel/client_channel_plugin.c
index 6f9df3e386..28d3b63f99 100644
--- a/src/core/ext/client_channel/client_channel_plugin.c
+++ b/src/core/ext/client_channel/client_channel_plugin.c
@@ -64,7 +64,7 @@ static bool set_default_host_if_unset(grpc_exec_ctx *exec_ctx,
}
}
char *default_authority = grpc_get_default_authority(
- grpc_channel_stack_builder_get_target(builder));
+ exec_ctx, grpc_channel_stack_builder_get_target(builder));
if (default_authority != NULL) {
grpc_arg arg;
arg.type = GRPC_ARG_STRING;
diff --git a/src/core/ext/client_channel/http_proxy.c b/src/core/ext/client_channel/http_proxy.c
index bbe4ff550c..e280cef101 100644
--- a/src/core/ext/client_channel/http_proxy.c
+++ b/src/core/ext/client_channel/http_proxy.c
@@ -46,10 +46,11 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/support/env.h"
-static char* grpc_get_http_proxy_server() {
+static char* grpc_get_http_proxy_server(grpc_exec_ctx* exec_ctx) {
char* uri_str = gpr_getenv("http_proxy");
if (uri_str == NULL) return NULL;
- grpc_uri* uri = grpc_uri_parse(uri_str, false /* suppress_errors */);
+ grpc_uri* uri =
+ grpc_uri_parse(exec_ctx, uri_str, false /* suppress_errors */);
char* proxy_name = NULL;
if (uri == NULL || uri->authority == NULL) {
gpr_log(GPR_ERROR, "cannot parse value of 'http_proxy' env var");
@@ -76,9 +77,10 @@ static bool proxy_mapper_map_name(grpc_exec_ctx* exec_ctx,
const grpc_channel_args* args,
char** name_to_resolve,
grpc_channel_args** new_args) {
- *name_to_resolve = grpc_get_http_proxy_server();
+ *name_to_resolve = grpc_get_http_proxy_server(exec_ctx);
if (*name_to_resolve == NULL) return false;
- grpc_uri* uri = grpc_uri_parse(server_uri, false /* suppress_errors */);
+ grpc_uri* uri =
+ grpc_uri_parse(exec_ctx, server_uri, false /* suppress_errors */);
if (uri == NULL || uri->path[0] == '\0') {
gpr_log(GPR_ERROR,
"'http_proxy' environment variable set, but cannot "
diff --git a/src/core/ext/client_channel/parse_address.c b/src/core/ext/client_channel/parse_address.c
index b1d55ad0f5..8ae15fc72b 100644
--- a/src/core/ext/client_channel/parse_address.c
+++ b/src/core/ext/client_channel/parse_address.c
@@ -44,16 +44,18 @@
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include "src/core/lib/support/string.h"
#ifdef GRPC_HAVE_UNIX_SOCKET
int parse_unix(grpc_uri *uri, grpc_resolved_address *resolved_addr) {
struct sockaddr_un *un = (struct sockaddr_un *)resolved_addr->addr;
-
+ const size_t maxlen = sizeof(un->sun_path);
+ const size_t path_len = strnlen(uri->path, maxlen);
+ if (path_len == maxlen) return 0;
un->sun_family = AF_UNIX;
strcpy(un->sun_path, uri->path);
- resolved_addr->len = strlen(un->sun_path) + sizeof(un->sun_family) + 1;
-
+ resolved_addr->len = sizeof(*un);
return 1;
}
@@ -119,9 +121,30 @@ int parse_ipv6(grpc_uri *uri, grpc_resolved_address *resolved_addr) {
memset(in6, 0, sizeof(*in6));
resolved_addr->len = sizeof(*in6);
in6->sin6_family = AF_INET6;
- if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) {
- gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host);
- goto done;
+
+ /* Handle the RFC6874 syntax for IPv6 zone identifiers. */
+ char *host_end = (char *)gpr_memrchr(host, '%', strlen(host));
+ if (host_end != NULL) {
+ GPR_ASSERT(host_end >= host);
+ char host_without_scope[INET6_ADDRSTRLEN];
+ size_t host_without_scope_len = (size_t)(host_end - host);
+ strncpy(host_without_scope, host, host_without_scope_len);
+ host_without_scope[host_without_scope_len] = '\0';
+ if (inet_pton(AF_INET6, host_without_scope, &in6->sin6_addr) == 0) {
+ gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host_without_scope);
+ goto done;
+ }
+ if (gpr_parse_bytes_to_uint32(host_end + 1,
+ strlen(host) - host_without_scope_len - 1,
+ &in6->sin6_scope_id) == 0) {
+ gpr_log(GPR_ERROR, "invalid ipv6 scope id: '%s'", host_end + 1);
+ goto done;
+ }
+ } else {
+ if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) {
+ gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host);
+ goto done;
+ }
}
if (port != NULL) {
diff --git a/src/core/ext/client_channel/resolver_registry.c b/src/core/ext/client_channel/resolver_registry.c
index f8e8bc9c39..3c5a6fb3ff 100644
--- a/src/core/ext/client_channel/resolver_registry.c
+++ b/src/core/ext/client_channel/resolver_registry.c
@@ -108,22 +108,23 @@ static grpc_resolver_factory *lookup_factory_by_uri(grpc_uri *uri) {
return lookup_factory(uri->scheme);
}
-static grpc_resolver_factory *resolve_factory(const char *target,
+static grpc_resolver_factory *resolve_factory(grpc_exec_ctx *exec_ctx,
+ const char *target,
grpc_uri **uri,
char **canonical_target) {
grpc_resolver_factory *factory = NULL;
GPR_ASSERT(uri != NULL);
- *uri = grpc_uri_parse(target, 1);
+ *uri = grpc_uri_parse(exec_ctx, target, 1);
factory = lookup_factory_by_uri(*uri);
if (factory == NULL) {
grpc_uri_destroy(*uri);
gpr_asprintf(canonical_target, "%s%s", g_default_resolver_prefix, target);
- *uri = grpc_uri_parse(*canonical_target, 1);
+ *uri = grpc_uri_parse(exec_ctx, *canonical_target, 1);
factory = lookup_factory_by_uri(*uri);
if (factory == NULL) {
- grpc_uri_destroy(grpc_uri_parse(target, 0));
- grpc_uri_destroy(grpc_uri_parse(*canonical_target, 0));
+ grpc_uri_destroy(grpc_uri_parse(exec_ctx, target, 0));
+ grpc_uri_destroy(grpc_uri_parse(exec_ctx, *canonical_target, 0));
gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target,
*canonical_target);
}
@@ -138,7 +139,7 @@ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target,
grpc_uri *uri = NULL;
char *canonical_target = NULL;
grpc_resolver_factory *factory =
- resolve_factory(target, &uri, &canonical_target);
+ resolve_factory(exec_ctx, target, &uri, &canonical_target);
grpc_resolver *resolver;
grpc_resolver_args resolver_args;
memset(&resolver_args, 0, sizeof(resolver_args));
@@ -153,21 +154,22 @@ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target,
return resolver;
}
-char *grpc_get_default_authority(const char *target) {
+char *grpc_get_default_authority(grpc_exec_ctx *exec_ctx, const char *target) {
grpc_uri *uri = NULL;
char *canonical_target = NULL;
grpc_resolver_factory *factory =
- resolve_factory(target, &uri, &canonical_target);
+ resolve_factory(exec_ctx, target, &uri, &canonical_target);
char *authority = grpc_resolver_factory_get_default_authority(factory, uri);
grpc_uri_destroy(uri);
gpr_free(canonical_target);
return authority;
}
-char *grpc_resolver_factory_add_default_prefix_if_needed(const char *target) {
+char *grpc_resolver_factory_add_default_prefix_if_needed(
+ grpc_exec_ctx *exec_ctx, const char *target) {
grpc_uri *uri = NULL;
char *canonical_target = NULL;
- resolve_factory(target, &uri, &canonical_target);
+ resolve_factory(exec_ctx, target, &uri, &canonical_target);
grpc_uri_destroy(uri);
return canonical_target == NULL ? gpr_strdup(target) : canonical_target;
}
diff --git a/src/core/ext/client_channel/resolver_registry.h b/src/core/ext/client_channel/resolver_registry.h
index e2c189cf0c..1a3ebee25a 100644
--- a/src/core/ext/client_channel/resolver_registry.h
+++ b/src/core/ext/client_channel/resolver_registry.h
@@ -74,10 +74,11 @@ grpc_resolver_factory *grpc_resolver_factory_lookup(const char *name);
/** Given a target, return a (freshly allocated with gpr_malloc) string
representing the default authority to pass from a client. */
-char *grpc_get_default_authority(const char *target);
+char *grpc_get_default_authority(grpc_exec_ctx *exec_ctx, const char *target);
/** Returns a newly allocated string containing \a target, adding the
default prefix if needed. */
-char *grpc_resolver_factory_add_default_prefix_if_needed(const char *target);
+char *grpc_resolver_factory_add_default_prefix_if_needed(
+ grpc_exec_ctx *exec_ctx, const char *target);
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_REGISTRY_H */
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index cb2d2c821d..5df0a9060d 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -331,7 +331,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
}
c->pollset_set = grpc_pollset_set_create();
grpc_resolved_address *addr = gpr_malloc(sizeof(*addr));
- grpc_get_subchannel_address_arg(args->args, addr);
+ grpc_get_subchannel_address_arg(exec_ctx, args->args, addr);
grpc_set_initial_connect_string(&addr, &c->initial_connect_string);
grpc_resolved_address *new_address = NULL;
grpc_channel_args *new_args = NULL;
@@ -787,9 +787,9 @@ grpc_call_stack *grpc_subchannel_call_get_call_stack(
return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
}
-static void grpc_uri_to_sockaddr(const char *uri_str,
+static void grpc_uri_to_sockaddr(grpc_exec_ctx *exec_ctx, const char *uri_str,
grpc_resolved_address *addr) {
- grpc_uri *uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */);
+ grpc_uri *uri = grpc_uri_parse(exec_ctx, uri_str, 0 /* suppress_errors */);
GPR_ASSERT(uri != NULL);
if (strcmp(uri->scheme, "ipv4") == 0) {
GPR_ASSERT(parse_ipv4(uri, addr));
@@ -801,12 +801,13 @@ static void grpc_uri_to_sockaddr(const char *uri_str,
grpc_uri_destroy(uri);
}
-void grpc_get_subchannel_address_arg(const grpc_channel_args *args,
+void grpc_get_subchannel_address_arg(grpc_exec_ctx *exec_ctx,
+ const grpc_channel_args *args,
grpc_resolved_address *addr) {
const char *addr_uri_str = grpc_get_subchannel_address_uri_arg(args);
memset(addr, 0, sizeof(*addr));
if (*addr_uri_str != '\0') {
- grpc_uri_to_sockaddr(addr_uri_str, addr);
+ grpc_uri_to_sockaddr(exec_ctx, addr_uri_str, addr);
}
}
diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h
index 26ce954487..6a70a76467 100644
--- a/src/core/ext/client_channel/subchannel.h
+++ b/src/core/ext/client_channel/subchannel.h
@@ -175,7 +175,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
const grpc_subchannel_args *args);
/// Sets \a addr from \a args.
-void grpc_get_subchannel_address_arg(const grpc_channel_args *args,
+void grpc_get_subchannel_address_arg(grpc_exec_ctx *exec_ctx,
+ const grpc_channel_args *args,
grpc_resolved_address *addr);
/// Returns the URI string for the address to connect to.
diff --git a/src/core/ext/client_channel/uri_parser.c b/src/core/ext/client_channel/uri_parser.c
index 7dd7b753cc..d385db0801 100644
--- a/src/core/ext/client_channel/uri_parser.c
+++ b/src/core/ext/client_channel/uri_parser.c
@@ -35,13 +35,15 @@
#include <string.h>
-#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>
+#include "src/core/lib/slice/percent_encoding.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
/** a size_t default value... maps to all 1's */
@@ -68,11 +70,16 @@ static grpc_uri *bad_uri(const char *uri_text, size_t pos, const char *section,
return NULL;
}
-/** Returns a copy of \a src[begin, end) */
-static char *copy_component(const char *src, size_t begin, size_t end) {
- char *out = gpr_malloc(end - begin + 1);
- memcpy(out, src + begin, end - begin);
- out[end - begin] = 0;
+/** Returns a copy of percent decoded \a src[begin, end) */
+static char *decode_and_copy_component(grpc_exec_ctx *exec_ctx, const char *src,
+ size_t begin, size_t end) {
+ grpc_slice component =
+ grpc_slice_from_copied_buffer(src + begin, end - begin);
+ grpc_slice decoded_component =
+ grpc_permissive_percent_decode_slice(component);
+ char *out = grpc_dump_slice(decoded_component, GPR_DUMP_ASCII);
+ grpc_slice_unref_internal(exec_ctx, component);
+ grpc_slice_unref_internal(exec_ctx, decoded_component);
return out;
}
@@ -175,7 +182,8 @@ static void parse_query_parts(grpc_uri *uri) {
}
}
-grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) {
+grpc_uri *grpc_uri_parse(grpc_exec_ctx *exec_ctx, const char *uri_text,
+ int suppress_errors) {
grpc_uri *uri;
size_t scheme_begin = 0;
size_t scheme_end = NOT_SET;
@@ -263,11 +271,16 @@ grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) {
}
uri = gpr_zalloc(sizeof(*uri));
- uri->scheme = copy_component(uri_text, scheme_begin, scheme_end);
- uri->authority = copy_component(uri_text, authority_begin, authority_end);
- uri->path = copy_component(uri_text, path_begin, path_end);
- uri->query = copy_component(uri_text, query_begin, query_end);
- uri->fragment = copy_component(uri_text, fragment_begin, fragment_end);
+ uri->scheme =
+ decode_and_copy_component(exec_ctx, uri_text, scheme_begin, scheme_end);
+ uri->authority = decode_and_copy_component(exec_ctx, uri_text,
+ authority_begin, authority_end);
+ uri->path =
+ decode_and_copy_component(exec_ctx, uri_text, path_begin, path_end);
+ uri->query =
+ decode_and_copy_component(exec_ctx, uri_text, query_begin, query_end);
+ uri->fragment = decode_and_copy_component(exec_ctx, uri_text, fragment_begin,
+ fragment_end);
parse_query_parts(uri);
return uri;
diff --git a/src/core/ext/client_channel/uri_parser.h b/src/core/ext/client_channel/uri_parser.h
index 5fe0e8f35e..efd4302c1c 100644
--- a/src/core/ext/client_channel/uri_parser.h
+++ b/src/core/ext/client_channel/uri_parser.h
@@ -35,6 +35,7 @@
#define GRPC_CORE_EXT_CLIENT_CHANNEL_URI_PARSER_H
#include <stddef.h>
+#include "src/core/lib/iomgr/exec_ctx.h"
typedef struct {
char *scheme;
@@ -51,7 +52,8 @@ typedef struct {
} grpc_uri;
/** parse a uri, return NULL on failure */
-grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors);
+grpc_uri *grpc_uri_parse(grpc_exec_ctx *exec_ctx, const char *uri_text,
+ int suppress_errors);
/** return the part of a query string after the '=' in "?key=xxx&...", or NULL
* if key is not present */
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index aea0fcc33d..d612591f2e 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -861,7 +861,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(arg != NULL);
GPR_ASSERT(arg->type == GRPC_ARG_STRING);
- grpc_uri *uri = grpc_uri_parse(arg->value.string, true);
+ grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true);
GPR_ASSERT(uri->path[0] != '\0');
glb_policy->server_name =
gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c
index fc5e17d8fc..eae0145ecc 100644
--- a/src/core/ext/transport/chttp2/client/chttp2_connector.c
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c
@@ -226,7 +226,7 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx,
grpc_closure *notify) {
chttp2_connector *c = (chttp2_connector *)con;
grpc_resolved_address addr;
- grpc_get_subchannel_address_arg(args->channel_args, &addr);
+ grpc_get_subchannel_address_arg(exec_ctx, args->channel_args, &addr);
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->notify == NULL);
c->notify = notify;
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index 490a0c560e..067ac35a5a 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -72,8 +72,11 @@ static grpc_channel *client_channel_factory_create_channel(
grpc_arg arg;
arg.type = GRPC_ARG_STRING;
arg.key = GRPC_ARG_SERVER_URI;
- arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target);
- grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
+ arg.value.string =
+ grpc_resolver_factory_add_default_prefix_if_needed(exec_ctx, target);
+ const char *to_remove[] = {GRPC_ARG_SERVER_URI};
+ grpc_channel_args *new_args =
+ grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1);
gpr_free(arg.value.string);
grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args,
GRPC_CLIENT_CHANNEL, NULL);
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index d8c18eb122..f0c241d68e 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -83,7 +83,7 @@ static grpc_subchannel_args *get_secure_naming_subchannel_args(
const char *server_uri_str = server_uri_arg->value.string;
GPR_ASSERT(server_uri_str != NULL);
grpc_uri *server_uri =
- grpc_uri_parse(server_uri_str, true /* supress errors */);
+ grpc_uri_parse(exec_ctx, server_uri_str, true /* supress errors */);
GPR_ASSERT(server_uri != NULL);
const char *server_uri_path;
server_uri_path =
@@ -96,7 +96,7 @@ static grpc_subchannel_args *get_secure_naming_subchannel_args(
const char *target_uri_str =
grpc_get_subchannel_address_uri_arg(args->args);
grpc_uri *target_uri =
- grpc_uri_parse(target_uri_str, false /* suppress errors */);
+ grpc_uri_parse(exec_ctx, target_uri_str, false /* suppress errors */);
GPR_ASSERT(target_uri != NULL);
if (target_uri->path[0] != '\0') { // "path" may be empty
const grpc_slice key = grpc_slice_from_static_string(
@@ -181,8 +181,11 @@ static grpc_channel *client_channel_factory_create_channel(
grpc_arg arg;
arg.type = GRPC_ARG_STRING;
arg.key = GRPC_ARG_SERVER_URI;
- arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target);
- grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
+ arg.value.string =
+ grpc_resolver_factory_add_default_prefix_if_needed(exec_ctx, target);
+ const char *to_remove[] = {GRPC_ARG_SERVER_URI};
+ grpc_channel_args *new_args =
+ grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1);
gpr_free(arg.value.string);
grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args,
GRPC_CLIENT_CHANNEL, NULL);
diff --git a/src/core/lib/http/parser.c b/src/core/lib/http/parser.c
index 2f84adc187..b9c56c103c 100644
--- a/src/core/lib/http/parser.c
+++ b/src/core/lib/http/parser.c
@@ -284,9 +284,9 @@ static grpc_error *addbyte(grpc_http_parser *parser, uint8_t byte,
case GRPC_HTTP_HEADERS:
if (parser->cur_line_length >= GRPC_HTTP_PARSER_MAX_HEADER_LENGTH) {
if (grpc_http1_trace)
- gpr_log(GPR_ERROR, "HTTP client max line length (%d) exceeded",
+ gpr_log(GPR_ERROR, "HTTP header max line length (%d) exceeded",
GRPC_HTTP_PARSER_MAX_HEADER_LENGTH);
- return GRPC_ERROR_NONE;
+ return GRPC_ERROR_CREATE("HTTP header max line length exceeded");
}
parser->cur_line[parser->cur_line_length] = byte;
parser->cur_line_length++;
diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c
index a379ddf25f..af33949c69 100644
--- a/src/core/lib/iomgr/pollset_uv.c
+++ b/src/core/lib/iomgr/pollset_uv.c
@@ -57,14 +57,28 @@ int grpc_pollset_work_run_loop;
gpr_mu grpc_polling_mu;
+/* This is used solely to kick the uv loop, by setting a callback to be run
+ immediately in the next loop iteration.
+ Note: In the future, if there is a bug that involves missing wakeups in the
+ future, try adding a uv_async_t to kick the loop differently */
+uv_timer_t dummy_uv_handle;
+
size_t grpc_pollset_size() { return sizeof(grpc_pollset); }
+void dummy_timer_cb(uv_timer_t *handle) {}
+
void grpc_pollset_global_init(void) {
gpr_mu_init(&grpc_polling_mu);
+ uv_timer_init(uv_default_loop(), &dummy_uv_handle);
grpc_pollset_work_run_loop = 1;
}
-void grpc_pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); }
+static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; }
+
+void grpc_pollset_global_shutdown(void) {
+ gpr_mu_destroy(&grpc_polling_mu);
+ uv_close((uv_handle_t *)&dummy_uv_handle, timer_close_cb);
+}
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
*mu = &grpc_polling_mu;
@@ -72,8 +86,6 @@ void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
pollset->shutting_down = 0;
}
-static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; }
-
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
GPR_ASSERT(!pollset->shutting_down);
@@ -81,6 +93,9 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (grpc_pollset_work_run_loop) {
// Drain any pending UV callbacks without blocking
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
+ } else {
+ // kick the loop once
+ uv_timer_start(&dummy_uv_handle, dummy_timer_cb, 0, 0);
}
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
}
@@ -130,6 +145,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_error *grpc_pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
+ uv_timer_start(&dummy_uv_handle, dummy_timer_cb, 0, 0);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/lib/iomgr/sockaddr_utils.c b/src/core/lib/iomgr/sockaddr_utils.c
index ffa62cb53c..9d2732666b 100644
--- a/src/core/lib/iomgr/sockaddr_utils.c
+++ b/src/core/lib/iomgr/sockaddr_utils.c
@@ -162,6 +162,7 @@ int grpc_sockaddr_to_string(char **out,
char ntop_buf[INET6_ADDRSTRLEN];
const void *ip = NULL;
int port;
+ uint32_t sin6_scope_id = 0;
int ret;
*out = NULL;
@@ -177,10 +178,19 @@ int grpc_sockaddr_to_string(char **out,
const struct sockaddr_in6 *addr6 = (const struct sockaddr_in6 *)addr;
ip = &addr6->sin6_addr;
port = ntohs(addr6->sin6_port);
+ sin6_scope_id = addr6->sin6_scope_id;
}
if (ip != NULL &&
grpc_inet_ntop(addr->sa_family, ip, ntop_buf, sizeof(ntop_buf)) != NULL) {
- ret = gpr_join_host_port(out, ntop_buf, port);
+ if (sin6_scope_id != 0) {
+ char *host_with_scope;
+ /* Enclose sin6_scope_id with the format defined in RFC 6784 section 2. */
+ gpr_asprintf(&host_with_scope, "%s%%25%" PRIu32, ntop_buf, sin6_scope_id);
+ ret = gpr_join_host_port(out, host_with_scope, port);
+ gpr_free(host_with_scope);
+ } else {
+ ret = gpr_join_host_port(out, ntop_buf, port);
+ }
} else {
ret = gpr_asprintf(out, "(sockaddr family=%d)", addr->sa_family);
}
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 36f878fdd4..5f286a6723 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -114,6 +114,8 @@ struct grpc_tcp_server {
/* is this server shutting down? */
bool shutdown;
+ /* have listeners been shutdown? */
+ bool shutdown_listeners;
/* use SO_REUSEPORT */
bool so_reuseport;
/* expand wildcard addresses to a list of all local addresses */
@@ -161,7 +163,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
grpc_tcp_server **server) {
gpr_once_init(&check_init, init);
- grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
+ grpc_tcp_server *s = gpr_zalloc(sizeof(grpc_tcp_server));
s->so_reuseport = has_so_reuseport;
s->resource_quota = grpc_resource_quota_create(NULL);
s->expand_wildcard_addrs = false;
@@ -422,7 +424,14 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
return;
default:
- gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
+ gpr_mu_lock(&sp->server->mu);
+ if (!sp->server->shutdown_listeners) {
+ gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
+ } else {
+ /* if we have shutdown listeners, accept4 could fail, and we
+ needn't notify users */
+ }
+ gpr_mu_unlock(&sp->server->mu);
goto error;
}
}
@@ -438,11 +447,6 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_fd *fdobj = grpc_fd_create(fd, name);
- if (read_notifier_pollset == NULL) {
- gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd");
- goto error;
- }
-
grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj);
// Create acceptor.
@@ -941,6 +945,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx,
grpc_tcp_server *s) {
gpr_mu_lock(&s->mu);
+ s->shutdown_listeners = true;
/* shutdown all fd's */
if (s->active_ports) {
grpc_tcp_listener *sp;
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index 6d638bcbaa..d4df96c214 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -42,6 +42,7 @@
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/time_averaged_stats.h"
#include "src/core/lib/iomgr/timer_heap.h"
+#include "src/core/lib/support/spinlock.h"
#define INVALID_HEAP_INDEX 0xffffffffu
@@ -69,7 +70,7 @@ typedef struct {
/* Protects g_shard_queue */
static gpr_mu g_mu;
/* Allow only one run_some_expired_timers at once */
-static gpr_mu g_checker_mu;
+static gpr_spinlock g_checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER;
static gpr_clock_type g_clock_type;
static shard_type g_shards[NUM_SHARDS];
/* Protected by g_mu */
@@ -90,7 +91,6 @@ void grpc_timer_list_init(gpr_timespec now) {
g_initialized = true;
gpr_mu_init(&g_mu);
- gpr_mu_init(&g_checker_mu);
g_clock_type = now.clock_type;
for (i = 0; i < NUM_SHARDS; i++) {
@@ -117,7 +117,6 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {
grpc_timer_heap_destroy(&shard->heap);
}
gpr_mu_destroy(&g_mu);
- gpr_mu_destroy(&g_checker_mu);
g_initialized = false;
}
@@ -324,7 +323,7 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
/* TODO(ctiller): verify that there are any timers (atomically) here */
- if (gpr_mu_trylock(&g_checker_mu)) {
+ if (gpr_spinlock_trylock(&g_checker_mu)) {
gpr_mu_lock(&g_mu);
while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) {
@@ -350,7 +349,7 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
}
gpr_mu_unlock(&g_mu);
- gpr_mu_unlock(&g_checker_mu);
+ gpr_spinlock_unlock(&g_checker_mu);
} else if (next != NULL) {
/* TODO(ctiller): this forces calling code to do an short poll, and
then retry the timer check (because this time through the timer list was
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 2a1c8d39fa..d1bcd89af1 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -485,7 +485,11 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
- s->active_ports++;
+ /* Registered for both read and write callbacks: increment active_ports
+ * twice to account for this, and delay free-ing of memory until both
+ * on_read and on_write have fired. */
+ s->active_ports += 2;
+
sp = sp->next;
}
diff --git a/src/core/lib/support/spinlock.h b/src/core/lib/support/spinlock.h
new file mode 100644
index 0000000000..d8c7c5ffde
--- /dev/null
+++ b/src/core/lib/support/spinlock.h
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_SUPPORT_SPINLOCK_H
+#define GRPC_CORE_LIB_SUPPORT_SPINLOCK_H
+
+#include <grpc/support/atm.h>
+
+/* Simple spinlock. No backoff strategy, gpr_spinlock_lock is almost always
+ a concurrency code smell. */
+typedef struct { gpr_atm atm; } gpr_spinlock;
+
+#define GPR_SPINLOCK_INITIALIZER ((gpr_spinlock){0})
+#define GPR_SPINLOCK_STATIC_INITIALIZER \
+ { 0 }
+#define gpr_spinlock_trylock(lock) (gpr_atm_acq_cas(&(lock)->atm, 0, 1))
+#define gpr_spinlock_unlock(lock) (gpr_atm_rel_store(&(lock)->atm, 0))
+#define gpr_spinlock_lock(lock) \
+ do { \
+ } while (!gpr_spinlock_trylock((lock)))
+
+#endif /* GRPC_CORE_LIB_SUPPORT_SPINLOCK_H */
diff --git a/src/core/lib/support/sync.c b/src/core/lib/support/sync.c
index 44b83f8175..e4a7fce646 100644
--- a/src/core/lib/support/sync.c
+++ b/src/core/lib/support/sync.c
@@ -37,6 +37,8 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
+#include <assert.h>
+
/* Number of mutexes to allocate for events, to avoid lock contention.
Should be a prime. */
enum { event_sync_partitions = 31 };
@@ -99,8 +101,12 @@ void gpr_ref_init(gpr_refcount *r, int n) { gpr_atm_rel_store(&r->count, n); }
void gpr_ref(gpr_refcount *r) { gpr_atm_no_barrier_fetch_add(&r->count, 1); }
void gpr_ref_non_zero(gpr_refcount *r) {
+#ifndef NDEBUG
gpr_atm prior = gpr_atm_no_barrier_fetch_add(&r->count, 1);
- GPR_ASSERT(prior > 0);
+ assert(prior > 0);
+#else
+ gpr_ref(r);
+#endif
}
void gpr_refn(gpr_refcount *r, int n) {
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index cc57654ea4..c2547c5147 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -161,6 +161,7 @@ struct grpc_call {
bool receiving_message;
bool requested_final_op;
bool received_final_op;
+ bool sent_any_op;
/* have we received initial metadata */
bool has_initial_md_been_received;
@@ -488,7 +489,7 @@ void grpc_call_destroy(grpc_call *c) {
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->destroy_called);
c->destroy_called = 1;
- cancel = !c->received_final_op;
+ cancel = c->sent_any_op && !c->received_final_op;
gpr_mu_unlock(&c->mu);
if (cancel) {
cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE,
@@ -1678,6 +1679,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&bctl->finish_batch, finish_batch, bctl,
grpc_schedule_on_exec_ctx);
stream_op->on_complete = &bctl->finish_batch;
+ call->sent_any_op = true;
gpr_mu_unlock(&call->mu);
execute_op(exec_ctx, call, stream_op);
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 004e748f25..165950e288 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -84,6 +84,39 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
}
}
+#define STREAM_REF_FROM_SLICE_REF(p) \
+ ((grpc_stream_refcount *)(((uint8_t *)p) - \
+ offsetof(grpc_stream_refcount, slice_refcount)))
+
+static void slice_stream_ref(void *p) {
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+ grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(p), "slice");
+#else
+ grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(p));
+#endif
+}
+
+static void slice_stream_unref(grpc_exec_ctx *exec_ctx, void *p) {
+#ifdef GRPC_STREAM_REFCOUNT_DEBUG
+ grpc_stream_unref(exec_ctx, STREAM_REF_FROM_SLICE_REF(p), "slice");
+#else
+ grpc_stream_unref(exec_ctx, STREAM_REF_FROM_SLICE_REF(p));
+#endif
+}
+
+grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount *refcount,
+ void *buffer, size_t length) {
+ slice_stream_ref(&refcount->slice_refcount);
+ return (grpc_slice){.refcount = &refcount->slice_refcount,
+ .data.refcounted = {.bytes = buffer, .length = length}};
+}
+
+static const grpc_slice_refcount_vtable stream_ref_slice_vtable = {
+ .ref = slice_stream_ref,
+ .unref = slice_stream_unref,
+ .eq = grpc_slice_default_eq_impl,
+ .hash = grpc_slice_default_hash_impl};
+
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
grpc_iomgr_cb_func cb, void *cb_arg,
@@ -95,6 +128,8 @@ void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs,
#endif
gpr_ref_init(&refcount->refs, initial_refs);
grpc_closure_init(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx);
+ refcount->slice_refcount.vtable = &stream_ref_slice_vtable;
+ refcount->slice_refcount.sub_refcount = &refcount->slice_refcount;
}
static void move64(uint64_t *from, uint64_t *to) {
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index bb23c0225a..cc1c277b35 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -64,6 +64,7 @@ typedef struct grpc_stream_refcount {
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
const char *object_type;
#endif
+ grpc_slice_refcount slice_refcount;
} grpc_stream_refcount;
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
@@ -84,6 +85,11 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount);
grpc_stream_ref_init(rc, ir, cb, cb_arg)
#endif
+/* Wrap a buffer that is owned by some stream object into a slice that shares
+ the same refcount */
+grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount *refcount,
+ void *buffer, size_t length);
+
typedef struct {
uint64_t framing_bytes;
uint64_t data_bytes;
diff --git a/src/core/lib/tsi/test_creds/BUILD b/src/core/lib/tsi/test_creds/BUILD
index dcd6d930a8..5cf04caf17 100644
--- a/src/core/lib/tsi/test_creds/BUILD
+++ b/src/core/lib/tsi/test_creds/BUILD
@@ -27,6 +27,8 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+licenses(["notice"]) # 3-clause BSD
+
exports_files([
"ca.pem",
"server1.key",