diff options
Diffstat (limited to 'src')
8 files changed, 180 insertions, 93 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 229bf570fc..a19c94f85a 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -77,24 +77,82 @@ typedef enum { WAIT_FOR_READY_TRUE } wait_for_ready_value; -typedef struct method_parameters { +typedef struct { + gpr_refcount refs; gpr_timespec timeout; wait_for_ready_value wait_for_ready; } method_parameters; +static method_parameters *method_parameters_ref( + method_parameters *method_params) { + gpr_ref(&method_params->refs); + return method_params; +} + +static void method_parameters_unref(method_parameters *method_params) { + if (gpr_unref(&method_params->refs)) { + gpr_free(method_params); + } +} + static void *method_parameters_copy(void *value) { - void *new_value = gpr_malloc(sizeof(method_parameters)); - memcpy(new_value, value, sizeof(method_parameters)); - return new_value; + return method_parameters_ref(value); } -static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *p) { - gpr_free(p); +static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) { + method_parameters_unref(value); } static const grpc_slice_hash_table_vtable method_parameters_vtable = { method_parameters_free, method_parameters_copy}; +static bool parse_wait_for_ready(grpc_json *field, + wait_for_ready_value *wait_for_ready) { + if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) { + return false; + } + *wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE + : WAIT_FOR_READY_FALSE; + return true; +} + +static bool parse_timeout(grpc_json *field, gpr_timespec *timeout) { + if (field->type != GRPC_JSON_STRING) return false; + size_t len = strlen(field->value); + if (field->value[len - 1] != 's') return false; + char *buf = gpr_strdup(field->value); + buf[len - 1] = '\0'; // Remove trailing 's'. + char *decimal_point = strchr(buf, '.'); + if (decimal_point != NULL) { + *decimal_point = '\0'; + timeout->tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1); + if (timeout->tv_nsec == -1) { + gpr_free(buf); + return false; + } + // There should always be exactly 3, 6, or 9 fractional digits. + int multiplier = 1; + switch (strlen(decimal_point + 1)) { + case 9: + break; + case 6: + multiplier *= 1000; + break; + case 3: + multiplier *= 1000000; + break; + default: // Unsupported number of digits. + gpr_free(buf); + return false; + } + timeout->tv_nsec *= multiplier; + } + timeout->tv_sec = gpr_parse_nonnegative_int(buf); + gpr_free(buf); + if (timeout->tv_sec == -1) return false; + return true; +} + static void *method_parameters_create_from_json(const grpc_json *json) { wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET; gpr_timespec timeout = {0, 0, GPR_TIMESPAN}; @@ -102,49 +160,14 @@ static void *method_parameters_create_from_json(const grpc_json *json) { if (field->key == NULL) continue; if (strcmp(field->key, "waitForReady") == 0) { if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate. - if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) { - return NULL; - } - wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE - : WAIT_FOR_READY_FALSE; + if (!parse_wait_for_ready(field, &wait_for_ready)) return NULL; } else if (strcmp(field->key, "timeout") == 0) { if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate. - if (field->type != GRPC_JSON_STRING) return NULL; - size_t len = strlen(field->value); - if (field->value[len - 1] != 's') return NULL; - char *buf = gpr_strdup(field->value); - buf[len - 1] = '\0'; // Remove trailing 's'. - char *decimal_point = strchr(buf, '.'); - if (decimal_point != NULL) { - *decimal_point = '\0'; - timeout.tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1); - if (timeout.tv_nsec == -1) { - gpr_free(buf); - return NULL; - } - // There should always be exactly 3, 6, or 9 fractional digits. - int multiplier = 1; - switch (strlen(decimal_point + 1)) { - case 9: - break; - case 6: - multiplier *= 1000; - break; - case 3: - multiplier *= 1000000; - break; - default: // Unsupported number of digits. - gpr_free(buf); - return NULL; - } - timeout.tv_nsec *= multiplier; - } - timeout.tv_sec = gpr_parse_nonnegative_int(buf); - if (timeout.tv_sec == -1) return NULL; - gpr_free(buf); + if (!parse_timeout(field, &timeout)) return NULL; } } method_parameters *value = gpr_malloc(sizeof(method_parameters)); + gpr_ref_init(&value->refs, 1); value->timeout = timeout; value->wait_for_ready = wait_for_ready; return value; @@ -631,7 +654,7 @@ typedef struct client_channel_call_data { grpc_slice path; // Request path. gpr_timespec call_start_time; gpr_timespec deadline; - wait_for_ready_value wait_for_ready_from_service_config; + method_parameters *method_params; grpc_error *cancel_error; @@ -842,10 +865,11 @@ static bool pick_subchannel_locked( initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; const bool wait_for_ready_set_from_service_config = - calld->wait_for_ready_from_service_config != WAIT_FOR_READY_UNSET; + calld->method_params != NULL && + calld->method_params->wait_for_ready != WAIT_FOR_READY_UNSET; if (!wait_for_ready_set_from_api && wait_for_ready_set_from_service_config) { - if (calld->wait_for_ready_from_service_config == WAIT_FOR_READY_TRUE) { + if (calld->method_params->wait_for_ready == WAIT_FOR_READY_TRUE) { initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; } else { initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; @@ -984,10 +1008,9 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, add_waiting_locked(calld, op); } -static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, - void *arg, - grpc_error *error_ignored) { - GPR_TIMER_BEGIN("cc_start_transport_stream_op_locked", 0); +static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0); grpc_transport_stream_op *op = arg; grpc_call_element *elem = op->handler_private.args[0]; @@ -997,7 +1020,7 @@ static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "start_transport_stream_op"); - GPR_TIMER_END("cc_start_transport_stream_op_locked", 0); + GPR_TIMER_END("start_transport_stream_op_locked", 0); } /* The logic here is fairly complicated, due to (a) the fact that we @@ -1037,49 +1060,51 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_closure_sched( exec_ctx, grpc_closure_init(&op->handler_private.closure, - cc_start_transport_stream_op_locked, op, + start_transport_stream_op_locked, op, grpc_combiner_scheduler(chand->combiner, false)), GRPC_ERROR_NONE); GPR_TIMER_END("cc_start_transport_stream_op", 0); } +// Sets calld->method_params. +// If the method params specify a timeout, populates +// *per_method_deadline and returns true. +static bool set_call_method_params_from_service_config_locked( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + gpr_timespec *per_method_deadline) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (chand->method_params_table != NULL) { + calld->method_params = grpc_method_config_table_get( + exec_ctx, chand->method_params_table, calld->path); + if (calld->method_params != NULL) { + method_parameters_ref(calld->method_params); + if (gpr_time_cmp(calld->method_params->timeout, + gpr_time_0(GPR_TIMESPAN)) != 0) { + *per_method_deadline = + gpr_time_add(calld->call_start_time, calld->method_params->timeout); + return true; + } + } + } + return false; +} + static void apply_final_configuration_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { /* apply service-config level configuration to the call (now that we're * certain it exists) */ - channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - /* Get the method config table from channel data. */ - grpc_slice_hash_table *method_params_table = NULL; - if (chand->method_params_table != NULL) { - method_params_table = grpc_slice_hash_table_ref(chand->method_params_table); - } - /* If the method config table was present, use it. */ - if (method_params_table != NULL) { - const method_parameters *method_params = grpc_method_config_table_get( - exec_ctx, method_params_table, calld->path); - if (method_params != NULL) { - const bool have_method_timeout = - gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0; - if (have_method_timeout || - method_params->wait_for_ready != WAIT_FOR_READY_UNSET) { - if (have_method_timeout) { - const gpr_timespec per_method_deadline = - gpr_time_add(calld->call_start_time, method_params->timeout); - if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) { - calld->deadline = per_method_deadline; - } - } - if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) { - calld->wait_for_ready_from_service_config = - method_params->wait_for_ready; - } - } + gpr_timespec per_method_deadline; + if (set_call_method_params_from_service_config_locked(exec_ctx, elem, + &per_method_deadline)) { + // If the deadline from the service config is shorter than the one + // from the client API, reset the deadline timer. + if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) { + calld->deadline = per_method_deadline; + grpc_deadline_state_reset(exec_ctx, elem, calld->deadline); } - grpc_slice_hash_table_unref(exec_ctx, method_params_table); } - /* Start deadline timer. */ - grpc_deadline_state_reset(exec_ctx, elem, calld->deadline); } /* Constructor for call_data */ @@ -1105,6 +1130,9 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, call_data *calld = elem->call_data; grpc_deadline_state_destroy(exec_ctx, elem); grpc_slice_unref_internal(exec_ctx, calld->path); + if (calld->method_params != NULL) { + method_parameters_unref(calld->method_params); + } GRPC_ERROR_UNREF(calld->cancel_error); grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { diff --git a/src/core/ext/client_channel/parse_address.c b/src/core/ext/client_channel/parse_address.c index b1d55ad0f5..8e4da03de0 100644 --- a/src/core/ext/client_channel/parse_address.c +++ b/src/core/ext/client_channel/parse_address.c @@ -49,11 +49,12 @@ int parse_unix(grpc_uri *uri, grpc_resolved_address *resolved_addr) { struct sockaddr_un *un = (struct sockaddr_un *)resolved_addr->addr; - + const size_t maxlen = sizeof(un->sun_path); + const size_t path_len = strnlen(uri->path, maxlen); + if (path_len == maxlen) return 0; un->sun_family = AF_UNIX; strcpy(un->sun_path, uri->path); - resolved_addr->len = strlen(un->sun_path) + sizeof(un->sun_family) + 1; - + resolved_addr->len = sizeof(*un); return 1; } diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 490a0c560e..286232f277 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -73,7 +73,9 @@ static grpc_channel *client_channel_factory_create_channel( arg.type = GRPC_ARG_STRING; arg.key = GRPC_ARG_SERVER_URI; arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1); + const char *to_remove[] = {GRPC_ARG_SERVER_URI}; + grpc_channel_args *new_args = + grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1); gpr_free(arg.value.string); grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args, GRPC_CLIENT_CHANNEL, NULL); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index d8c18eb122..825db68c65 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -182,7 +182,9 @@ static grpc_channel *client_channel_factory_create_channel( arg.type = GRPC_ARG_STRING; arg.key = GRPC_ARG_SERVER_URI; arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1); + const char *to_remove[] = {GRPC_ARG_SERVER_URI}; + grpc_channel_args *new_args = + grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1); gpr_free(arg.value.string); grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args, GRPC_CLIENT_CHANNEL, NULL); diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c index a379ddf25f..af33949c69 100644 --- a/src/core/lib/iomgr/pollset_uv.c +++ b/src/core/lib/iomgr/pollset_uv.c @@ -57,14 +57,28 @@ int grpc_pollset_work_run_loop; gpr_mu grpc_polling_mu; +/* This is used solely to kick the uv loop, by setting a callback to be run + immediately in the next loop iteration. + Note: In the future, if there is a bug that involves missing wakeups in the + future, try adding a uv_async_t to kick the loop differently */ +uv_timer_t dummy_uv_handle; + size_t grpc_pollset_size() { return sizeof(grpc_pollset); } +void dummy_timer_cb(uv_timer_t *handle) {} + void grpc_pollset_global_init(void) { gpr_mu_init(&grpc_polling_mu); + uv_timer_init(uv_default_loop(), &dummy_uv_handle); grpc_pollset_work_run_loop = 1; } -void grpc_pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); } +static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; } + +void grpc_pollset_global_shutdown(void) { + gpr_mu_destroy(&grpc_polling_mu); + uv_close((uv_handle_t *)&dummy_uv_handle, timer_close_cb); +} void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { *mu = &grpc_polling_mu; @@ -72,8 +86,6 @@ void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { pollset->shutting_down = 0; } -static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; } - void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { GPR_ASSERT(!pollset->shutting_down); @@ -81,6 +93,9 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (grpc_pollset_work_run_loop) { // Drain any pending UV callbacks without blocking uv_run(uv_default_loop(), UV_RUN_NOWAIT); + } else { + // kick the loop once + uv_timer_start(&dummy_uv_handle, dummy_timer_cb, 0, 0); } grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); } @@ -130,6 +145,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_error *grpc_pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { + uv_timer_start(&dummy_uv_handle, dummy_timer_cb, 0, 0); return GRPC_ERROR_NONE; } diff --git a/src/core/lib/tsi/test_creds/BUILD b/src/core/lib/tsi/test_creds/BUILD new file mode 100644 index 0000000000..dcd6d930a8 --- /dev/null +++ b/src/core/lib/tsi/test_creds/BUILD @@ -0,0 +1,34 @@ +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +exports_files([ + "ca.pem", + "server1.key", + "server1.pem", +]) diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py index f2e3898ed6..ee235032f0 100644 --- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py +++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py @@ -249,3 +249,7 @@ class InvocationDefectsTest(unittest.TestCase): with self.assertRaises(grpc.RpcError): for _ in range(test_constants.STREAM_LENGTH // 2 + 1): next(response_iterator) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py b/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py index be3522f46f..eb5f459848 100644 --- a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py +++ b/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py @@ -35,8 +35,8 @@ import unittest from grpc import _common _SHORT_TIME = 0.5 -_LONG_TIME = 2.0 -_EPSILON = 0.1 +_LONG_TIME = 5.0 +_EPSILON = 0.5 def cleanup(timeout): |