aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/client_channel/client_channel.c194
-rw-r--r--src/core/ext/client_channel/parse_address.c7
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c4
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c4
-rw-r--r--src/core/lib/iomgr/pollset_uv.c22
-rw-r--r--src/core/lib/tsi/test_creds/BUILD34
-rw-r--r--src/python/grpcio_tests/tests/unit/_invocation_defects_test.py4
-rw-r--r--src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py4
8 files changed, 180 insertions, 93 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index 229bf570fc..a19c94f85a 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -77,24 +77,82 @@ typedef enum {
WAIT_FOR_READY_TRUE
} wait_for_ready_value;
-typedef struct method_parameters {
+typedef struct {
+ gpr_refcount refs;
gpr_timespec timeout;
wait_for_ready_value wait_for_ready;
} method_parameters;
+static method_parameters *method_parameters_ref(
+ method_parameters *method_params) {
+ gpr_ref(&method_params->refs);
+ return method_params;
+}
+
+static void method_parameters_unref(method_parameters *method_params) {
+ if (gpr_unref(&method_params->refs)) {
+ gpr_free(method_params);
+ }
+}
+
static void *method_parameters_copy(void *value) {
- void *new_value = gpr_malloc(sizeof(method_parameters));
- memcpy(new_value, value, sizeof(method_parameters));
- return new_value;
+ return method_parameters_ref(value);
}
-static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *p) {
- gpr_free(p);
+static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) {
+ method_parameters_unref(value);
}
static const grpc_slice_hash_table_vtable method_parameters_vtable = {
method_parameters_free, method_parameters_copy};
+static bool parse_wait_for_ready(grpc_json *field,
+ wait_for_ready_value *wait_for_ready) {
+ if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
+ return false;
+ }
+ *wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE
+ : WAIT_FOR_READY_FALSE;
+ return true;
+}
+
+static bool parse_timeout(grpc_json *field, gpr_timespec *timeout) {
+ if (field->type != GRPC_JSON_STRING) return false;
+ size_t len = strlen(field->value);
+ if (field->value[len - 1] != 's') return false;
+ char *buf = gpr_strdup(field->value);
+ buf[len - 1] = '\0'; // Remove trailing 's'.
+ char *decimal_point = strchr(buf, '.');
+ if (decimal_point != NULL) {
+ *decimal_point = '\0';
+ timeout->tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1);
+ if (timeout->tv_nsec == -1) {
+ gpr_free(buf);
+ return false;
+ }
+ // There should always be exactly 3, 6, or 9 fractional digits.
+ int multiplier = 1;
+ switch (strlen(decimal_point + 1)) {
+ case 9:
+ break;
+ case 6:
+ multiplier *= 1000;
+ break;
+ case 3:
+ multiplier *= 1000000;
+ break;
+ default: // Unsupported number of digits.
+ gpr_free(buf);
+ return false;
+ }
+ timeout->tv_nsec *= multiplier;
+ }
+ timeout->tv_sec = gpr_parse_nonnegative_int(buf);
+ gpr_free(buf);
+ if (timeout->tv_sec == -1) return false;
+ return true;
+}
+
static void *method_parameters_create_from_json(const grpc_json *json) {
wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET;
gpr_timespec timeout = {0, 0, GPR_TIMESPAN};
@@ -102,49 +160,14 @@ static void *method_parameters_create_from_json(const grpc_json *json) {
if (field->key == NULL) continue;
if (strcmp(field->key, "waitForReady") == 0) {
if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate.
- if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
- return NULL;
- }
- wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE
- : WAIT_FOR_READY_FALSE;
+ if (!parse_wait_for_ready(field, &wait_for_ready)) return NULL;
} else if (strcmp(field->key, "timeout") == 0) {
if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate.
- if (field->type != GRPC_JSON_STRING) return NULL;
- size_t len = strlen(field->value);
- if (field->value[len - 1] != 's') return NULL;
- char *buf = gpr_strdup(field->value);
- buf[len - 1] = '\0'; // Remove trailing 's'.
- char *decimal_point = strchr(buf, '.');
- if (decimal_point != NULL) {
- *decimal_point = '\0';
- timeout.tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1);
- if (timeout.tv_nsec == -1) {
- gpr_free(buf);
- return NULL;
- }
- // There should always be exactly 3, 6, or 9 fractional digits.
- int multiplier = 1;
- switch (strlen(decimal_point + 1)) {
- case 9:
- break;
- case 6:
- multiplier *= 1000;
- break;
- case 3:
- multiplier *= 1000000;
- break;
- default: // Unsupported number of digits.
- gpr_free(buf);
- return NULL;
- }
- timeout.tv_nsec *= multiplier;
- }
- timeout.tv_sec = gpr_parse_nonnegative_int(buf);
- if (timeout.tv_sec == -1) return NULL;
- gpr_free(buf);
+ if (!parse_timeout(field, &timeout)) return NULL;
}
}
method_parameters *value = gpr_malloc(sizeof(method_parameters));
+ gpr_ref_init(&value->refs, 1);
value->timeout = timeout;
value->wait_for_ready = wait_for_ready;
return value;
@@ -631,7 +654,7 @@ typedef struct client_channel_call_data {
grpc_slice path; // Request path.
gpr_timespec call_start_time;
gpr_timespec deadline;
- wait_for_ready_value wait_for_ready_from_service_config;
+ method_parameters *method_params;
grpc_error *cancel_error;
@@ -842,10 +865,11 @@ static bool pick_subchannel_locked(
initial_metadata_flags &
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
const bool wait_for_ready_set_from_service_config =
- calld->wait_for_ready_from_service_config != WAIT_FOR_READY_UNSET;
+ calld->method_params != NULL &&
+ calld->method_params->wait_for_ready != WAIT_FOR_READY_UNSET;
if (!wait_for_ready_set_from_api &&
wait_for_ready_set_from_service_config) {
- if (calld->wait_for_ready_from_service_config == WAIT_FOR_READY_TRUE) {
+ if (calld->method_params->wait_for_ready == WAIT_FOR_READY_TRUE) {
initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
} else {
initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
@@ -984,10 +1008,9 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
add_waiting_locked(calld, op);
}
-static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx,
- void *arg,
- grpc_error *error_ignored) {
- GPR_TIMER_BEGIN("cc_start_transport_stream_op_locked", 0);
+static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error_ignored) {
+ GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0);
grpc_transport_stream_op *op = arg;
grpc_call_element *elem = op->handler_private.args[0];
@@ -997,7 +1020,7 @@ static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx,
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
"start_transport_stream_op");
- GPR_TIMER_END("cc_start_transport_stream_op_locked", 0);
+ GPR_TIMER_END("start_transport_stream_op_locked", 0);
}
/* The logic here is fairly complicated, due to (a) the fact that we
@@ -1037,49 +1060,51 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_closure_sched(
exec_ctx,
grpc_closure_init(&op->handler_private.closure,
- cc_start_transport_stream_op_locked, op,
+ start_transport_stream_op_locked, op,
grpc_combiner_scheduler(chand->combiner, false)),
GRPC_ERROR_NONE);
GPR_TIMER_END("cc_start_transport_stream_op", 0);
}
+// Sets calld->method_params.
+// If the method params specify a timeout, populates
+// *per_method_deadline and returns true.
+static bool set_call_method_params_from_service_config_locked(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ gpr_timespec *per_method_deadline) {
+ channel_data *chand = elem->channel_data;
+ call_data *calld = elem->call_data;
+ if (chand->method_params_table != NULL) {
+ calld->method_params = grpc_method_config_table_get(
+ exec_ctx, chand->method_params_table, calld->path);
+ if (calld->method_params != NULL) {
+ method_parameters_ref(calld->method_params);
+ if (gpr_time_cmp(calld->method_params->timeout,
+ gpr_time_0(GPR_TIMESPAN)) != 0) {
+ *per_method_deadline =
+ gpr_time_add(calld->call_start_time, calld->method_params->timeout);
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
static void apply_final_configuration_locked(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
/* apply service-config level configuration to the call (now that we're
* certain it exists) */
- channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- /* Get the method config table from channel data. */
- grpc_slice_hash_table *method_params_table = NULL;
- if (chand->method_params_table != NULL) {
- method_params_table = grpc_slice_hash_table_ref(chand->method_params_table);
- }
- /* If the method config table was present, use it. */
- if (method_params_table != NULL) {
- const method_parameters *method_params = grpc_method_config_table_get(
- exec_ctx, method_params_table, calld->path);
- if (method_params != NULL) {
- const bool have_method_timeout =
- gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0;
- if (have_method_timeout ||
- method_params->wait_for_ready != WAIT_FOR_READY_UNSET) {
- if (have_method_timeout) {
- const gpr_timespec per_method_deadline =
- gpr_time_add(calld->call_start_time, method_params->timeout);
- if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
- calld->deadline = per_method_deadline;
- }
- }
- if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) {
- calld->wait_for_ready_from_service_config =
- method_params->wait_for_ready;
- }
- }
+ gpr_timespec per_method_deadline;
+ if (set_call_method_params_from_service_config_locked(exec_ctx, elem,
+ &per_method_deadline)) {
+ // If the deadline from the service config is shorter than the one
+ // from the client API, reset the deadline timer.
+ if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) {
+ calld->deadline = per_method_deadline;
+ grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
}
- grpc_slice_hash_table_unref(exec_ctx, method_params_table);
}
- /* Start deadline timer. */
- grpc_deadline_state_reset(exec_ctx, elem, calld->deadline);
}
/* Constructor for call_data */
@@ -1105,6 +1130,9 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data;
grpc_deadline_state_destroy(exec_ctx, elem);
grpc_slice_unref_internal(exec_ctx, calld->path);
+ if (calld->method_params != NULL) {
+ method_parameters_unref(calld->method_params);
+ }
GRPC_ERROR_UNREF(calld->cancel_error);
grpc_subchannel_call *call = GET_CALL(calld);
if (call != NULL && call != CANCELLED_CALL) {
diff --git a/src/core/ext/client_channel/parse_address.c b/src/core/ext/client_channel/parse_address.c
index b1d55ad0f5..8e4da03de0 100644
--- a/src/core/ext/client_channel/parse_address.c
+++ b/src/core/ext/client_channel/parse_address.c
@@ -49,11 +49,12 @@
int parse_unix(grpc_uri *uri, grpc_resolved_address *resolved_addr) {
struct sockaddr_un *un = (struct sockaddr_un *)resolved_addr->addr;
-
+ const size_t maxlen = sizeof(un->sun_path);
+ const size_t path_len = strnlen(uri->path, maxlen);
+ if (path_len == maxlen) return 0;
un->sun_family = AF_UNIX;
strcpy(un->sun_path, uri->path);
- resolved_addr->len = strlen(un->sun_path) + sizeof(un->sun_family) + 1;
-
+ resolved_addr->len = sizeof(*un);
return 1;
}
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index 490a0c560e..286232f277 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -73,7 +73,9 @@ static grpc_channel *client_channel_factory_create_channel(
arg.type = GRPC_ARG_STRING;
arg.key = GRPC_ARG_SERVER_URI;
arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target);
- grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
+ const char *to_remove[] = {GRPC_ARG_SERVER_URI};
+ grpc_channel_args *new_args =
+ grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1);
gpr_free(arg.value.string);
grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args,
GRPC_CLIENT_CHANNEL, NULL);
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index d8c18eb122..825db68c65 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -182,7 +182,9 @@ static grpc_channel *client_channel_factory_create_channel(
arg.type = GRPC_ARG_STRING;
arg.key = GRPC_ARG_SERVER_URI;
arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target);
- grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
+ const char *to_remove[] = {GRPC_ARG_SERVER_URI};
+ grpc_channel_args *new_args =
+ grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1);
gpr_free(arg.value.string);
grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args,
GRPC_CLIENT_CHANNEL, NULL);
diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c
index a379ddf25f..af33949c69 100644
--- a/src/core/lib/iomgr/pollset_uv.c
+++ b/src/core/lib/iomgr/pollset_uv.c
@@ -57,14 +57,28 @@ int grpc_pollset_work_run_loop;
gpr_mu grpc_polling_mu;
+/* This is used solely to kick the uv loop, by setting a callback to be run
+ immediately in the next loop iteration.
+ Note: In the future, if there is a bug that involves missing wakeups in the
+ future, try adding a uv_async_t to kick the loop differently */
+uv_timer_t dummy_uv_handle;
+
size_t grpc_pollset_size() { return sizeof(grpc_pollset); }
+void dummy_timer_cb(uv_timer_t *handle) {}
+
void grpc_pollset_global_init(void) {
gpr_mu_init(&grpc_polling_mu);
+ uv_timer_init(uv_default_loop(), &dummy_uv_handle);
grpc_pollset_work_run_loop = 1;
}
-void grpc_pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); }
+static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; }
+
+void grpc_pollset_global_shutdown(void) {
+ gpr_mu_destroy(&grpc_polling_mu);
+ uv_close((uv_handle_t *)&dummy_uv_handle, timer_close_cb);
+}
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
*mu = &grpc_polling_mu;
@@ -72,8 +86,6 @@ void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
pollset->shutting_down = 0;
}
-static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; }
-
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
GPR_ASSERT(!pollset->shutting_down);
@@ -81,6 +93,9 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (grpc_pollset_work_run_loop) {
// Drain any pending UV callbacks without blocking
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
+ } else {
+ // kick the loop once
+ uv_timer_start(&dummy_uv_handle, dummy_timer_cb, 0, 0);
}
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
}
@@ -130,6 +145,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_error *grpc_pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
+ uv_timer_start(&dummy_uv_handle, dummy_timer_cb, 0, 0);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/lib/tsi/test_creds/BUILD b/src/core/lib/tsi/test_creds/BUILD
new file mode 100644
index 0000000000..dcd6d930a8
--- /dev/null
+++ b/src/core/lib/tsi/test_creds/BUILD
@@ -0,0 +1,34 @@
+# Copyright 2017, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+exports_files([
+ "ca.pem",
+ "server1.key",
+ "server1.pem",
+])
diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
index f2e3898ed6..ee235032f0 100644
--- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
+++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
@@ -249,3 +249,7 @@ class InvocationDefectsTest(unittest.TestCase):
with self.assertRaises(grpc.RpcError):
for _ in range(test_constants.STREAM_LENGTH // 2 + 1):
next(response_iterator)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py b/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py
index be3522f46f..eb5f459848 100644
--- a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py
+++ b/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py
@@ -35,8 +35,8 @@ import unittest
from grpc import _common
_SHORT_TIME = 0.5
-_LONG_TIME = 2.0
-_EPSILON = 0.1
+_LONG_TIME = 5.0
+_EPSILON = 0.5
def cleanup(timeout):