diff options
Diffstat (limited to 'src')
86 files changed, 1922 insertions, 1315 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index d4ba950818..f021a8ae32 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -165,7 +165,6 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, channel_data *chand = arg; grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; - grpc_resolver *old_resolver; grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; int exit_idle = 0; @@ -201,28 +200,25 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, } if (iomgr_success && chand->resolver) { - grpc_resolver *resolver = chand->resolver; - GRPC_RESOLVER_REF(resolver, "channel-next"); grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, "new_lb+resolver"); if (lb_policy != NULL) { watch_lb_policy(exec_ctx, chand, lb_policy, state); } - gpr_mu_unlock(&chand->mu_config); GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration, + grpc_resolver_next(exec_ctx, chand->resolver, + &chand->incoming_configuration, &chand->on_config_changed); - GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel-next"); + gpr_mu_unlock(&chand->mu_config); } else { - old_resolver = chand->resolver; - chand->resolver = NULL; + if (chand->resolver != NULL) { + grpc_resolver_shutdown(exec_ctx, chand->resolver); + GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); + chand->resolver = NULL; + } grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone"); gpr_mu_unlock(&chand->mu_config); - if (old_resolver != NULL) { - grpc_resolver_shutdown(exec_ctx, old_resolver); - GRPC_RESOLVER_UNREF(exec_ctx, old_resolver, "channel"); - } } if (exit_idle) { @@ -247,7 +243,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_transport_op *op) { channel_data *chand = elem->channel_data; - grpc_resolver *destroy_resolver = NULL; grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); @@ -279,7 +274,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->disconnect && chand->resolver != NULL) { grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); - destroy_resolver = chand->resolver; + grpc_resolver_shutdown(exec_ctx, chand->resolver); + GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, @@ -290,11 +286,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, } } gpr_mu_unlock(&chand->mu_config); - - if (destroy_resolver) { - grpc_resolver_shutdown(exec_ctx, destroy_resolver); - GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel"); - } } typedef struct { diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index 8f46885a04..9c087dc2a1 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -174,17 +174,15 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; if (holder->connected_subchannel == NULL) { fail_locked(exec_ctx, holder); + } else if (1 == gpr_atm_acq_load(&holder->subchannel_call)) { + /* already cancelled before subchannel became ready */ + fail_locked(exec_ctx, holder); } else { - if (!gpr_atm_rel_cas( - &holder->subchannel_call, 0, - (gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call( - exec_ctx, holder->connected_subchannel, holder->pollset))) { - GPR_ASSERT(gpr_atm_acq_load(&holder->subchannel_call) == 1); - /* if this cas fails, the call was cancelled before the pick completed */ - fail_locked(exec_ctx, holder); - } else { - retry_waiting_locked(exec_ctx, holder); - } + gpr_atm_rel_store( + &holder->subchannel_call, + (gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call( + exec_ctx, holder->connected_subchannel, holder->pollset)); + retry_waiting_locked(exec_ctx, holder); } gpr_mu_unlock(&holder->mu); GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel"); diff --git a/src/core/client_config/client_config.c b/src/core/client_config/client_config.c index 6ecffb3854..c500af25ee 100644 --- a/src/core/client_config/client_config.c +++ b/src/core/client_config/client_config.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -53,7 +53,9 @@ void grpc_client_config_ref(grpc_client_config *c) { gpr_ref(&c->refs); } void grpc_client_config_unref(grpc_exec_ctx *exec_ctx, grpc_client_config *c) { if (gpr_unref(&c->refs)) { - GRPC_LB_POLICY_UNREF(exec_ctx, c->lb_policy, "client_config"); + if (c->lb_policy != NULL) { + GRPC_LB_POLICY_UNREF(exec_ctx, c->lb_policy, "client_config"); + } gpr_free(c); } } diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 81167b31c8..8ed1223d39 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -387,8 +387,8 @@ static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {} static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { + if (args->num_subchannels == 0) return NULL; pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); - GPR_ASSERT(args->num_subchannels > 0); memset(p, 0, sizeof(*p)); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); p->subchannels = diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index 376b6b3d76..e28e4757a1 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -41,6 +41,7 @@ #include "src/core/client_config/lb_policy_registry.h" #include "src/core/iomgr/resolve_address.h" +#include "src/core/iomgr/timer.h" #include "src/core/support/string.h" typedef struct { @@ -71,6 +72,9 @@ typedef struct { grpc_client_config **target_config; /** current (fully resolved) config */ grpc_client_config *resolved_config; + /** retry timer */ + bool have_retry_timer; + grpc_timer retry_timer; } dns_resolver; static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); @@ -91,6 +95,9 @@ static const grpc_resolver_vtable dns_resolver_vtable = { static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { dns_resolver *r = (dns_resolver *)resolver; gpr_mu_lock(&r->mu); + if (r->have_retry_timer) { + grpc_timer_cancel(exec_ctx, &r->retry_timer); + } if (r->next_completion != NULL) { *r->target_config = NULL; grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL); @@ -125,6 +132,22 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, gpr_mu_unlock(&r->mu); } +static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, + bool success) { + dns_resolver *r = arg; + + gpr_mu_lock(&r->mu); + r->have_retry_timer = false; + if (success) { + if (!r->resolving) { + dns_start_resolving_locked(r); + } + } + gpr_mu_unlock(&r->mu); + + GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "retry-timer"); +} + static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_resolved_addresses *addresses) { dns_resolver *r = arg; @@ -133,29 +156,47 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_subchannel_args args; grpc_lb_policy *lb_policy; size_t i; - if (addresses) { + gpr_mu_lock(&r->mu); + GPR_ASSERT(r->resolving); + r->resolving = 0; + if (addresses != NULL) { grpc_lb_policy_args lb_policy_args; config = grpc_client_config_create(); subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); + size_t naddrs = 0; for (i = 0; i < addresses->naddrs; i++) { memset(&args, 0, sizeof(args)); args.addr = (struct sockaddr *)(addresses->addrs[i].addr); args.addr_len = (size_t)addresses->addrs[i].len; - subchannels[i] = grpc_subchannel_factory_create_subchannel( + grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel( exec_ctx, r->subchannel_factory, &args); + if (subchannel != NULL) { + subchannels[naddrs++] = subchannel; + } } memset(&lb_policy_args, 0, sizeof(lb_policy_args)); lb_policy_args.subchannels = subchannels; - lb_policy_args.num_subchannels = addresses->naddrs; + lb_policy_args.num_subchannels = naddrs; lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); - grpc_client_config_set_lb_policy(config, lb_policy); - GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); + if (lb_policy != NULL) { + grpc_client_config_set_lb_policy(config, lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); + } grpc_resolved_addresses_destroy(addresses); gpr_free(subchannels); + } else { + int retry_seconds = 15; + gpr_log(GPR_DEBUG, "dns resolution failed: retrying in %d seconds", + retry_seconds); + GPR_ASSERT(!r->have_retry_timer); + r->have_retry_timer = true; + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + GRPC_RESOLVER_REF(&r->base, "retry-timer"); + grpc_timer_init( + exec_ctx, &r->retry_timer, + gpr_time_add(now, gpr_time_from_seconds(retry_seconds, GPR_TIMESPAN)), + dns_on_retry_timer, r, now); } - gpr_mu_lock(&r->mu); - GPR_ASSERT(r->resolving); - r->resolving = 0; if (r->resolved_config) { grpc_client_config_unref(exec_ctx, r->resolved_config); } diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index d91dd116b8..5dea215668 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -45,6 +45,7 @@ #include "src/core/client_config/subchannel_index.h" #include "src/core/iomgr/timer.h" #include "src/core/profiling/timers.h" +#include "src/core/support/backoff.h" #include "src/core/surface/channel.h" #include "src/core/transport/connectivity_state.h" @@ -127,8 +128,8 @@ struct grpc_subchannel { /** next connect attempt time */ gpr_timespec next_attempt; - /** amount to backoff each failure */ - gpr_timespec backoff_delta; + /** backoff state */ + gpr_backoff backoff_state; /** do we have an active alarm? */ int have_alarm; /** our alarm */ @@ -146,7 +147,6 @@ struct grpc_subchannel_call { #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ (((grpc_subchannel_call *)(callstack)) - 1) -static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, bool iomgr_success); @@ -337,6 +337,22 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, grpc_closure_init(&c->connected, subchannel_connected, c); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); + gpr_backoff_init(&c->backoff_state, + GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, + GRPC_SUBCHANNEL_RECONNECT_JITTER, + GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, + GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + if (c->args) { + for (size_t i = 0; i < c->args->num_args; i++) { + if (0 == strcmp(c->args->args[i].key, + "grpc.testing.fixed_reconnect_backoff")) { + GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER); + gpr_backoff_init(&c->backoff_state, 1.0, 0.0, + c->args->args[i].value.integer, + c->args->args[i].value.integer); + } + } + } gpr_mu_init(&c->mu); return grpc_subchannel_index_register(exec_ctx, key, c); @@ -348,7 +364,7 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { args.interested_parties = c->pollset_set; args.addr = c->addr; args.addr_len = c->addr_len; - args.deadline = compute_connect_deadline(c); + args.deadline = c->next_attempt; args.channel_args = c->args; args.initial_connect_string = c->initial_connect_string; @@ -359,10 +375,8 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { } static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { - c->backoff_delta = gpr_time_from_seconds( - GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN); c->next_attempt = - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta); + gpr_backoff_begin(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); continue_connect(exec_ctx, c); } @@ -505,7 +519,8 @@ void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx, elem->filter->start_transport_op(exec_ctx, elem, &op); } -static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { +static void publish_transport_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel *c) { size_t channel_stack_size; grpc_connected_subchannel *con; grpc_channel_stack *stk; @@ -541,8 +556,6 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, sw_subchannel); - gpr_mu_lock(&c->mu); - if (c->disconnected) { gpr_mu_unlock(&c->mu); gpr_free(sw_subchannel); @@ -575,54 +588,9 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY, "connected"); - gpr_mu_unlock(&c->mu); gpr_free((void *)filters); } -/* Generate a random number between 0 and 1. */ -static double generate_uniform_random_number(grpc_subchannel *c) { - c->random = (1103515245 * c->random + 12345) % ((uint32_t)1 << 31); - return c->random / (double)((uint32_t)1 << 31); -} - -/* Update backoff_delta and next_attempt in subchannel */ -static void update_reconnect_parameters(grpc_subchannel *c) { - size_t i; - int32_t backoff_delta_millis, jitter; - int32_t max_backoff_millis = - GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; - double jitter_range; - - if (c->args) { - for (i = 0; i < c->args->num_args; i++) { - if (0 == strcmp(c->args->args[i].key, - "grpc.testing.fixed_reconnect_backoff")) { - GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER); - c->next_attempt = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_millis(c->args->args[i].value.integer, GPR_TIMESPAN)); - return; - } - } - } - - backoff_delta_millis = - (int32_t)(gpr_time_to_millis(c->backoff_delta) * - GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER); - if (backoff_delta_millis > max_backoff_millis) { - backoff_delta_millis = max_backoff_millis; - } - c->backoff_delta = gpr_time_from_millis(backoff_delta_millis, GPR_TIMESPAN); - c->next_attempt = - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta); - - jitter_range = GRPC_SUBCHANNEL_RECONNECT_JITTER * backoff_delta_millis; - jitter = - (int32_t)((2 * generate_uniform_random_number(c) - 1) * jitter_range); - c->next_attempt = - gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN)); -} - static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { grpc_subchannel *c = arg; gpr_mu_lock(&c->mu); @@ -631,7 +599,8 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { iomgr_success = 0; } if (iomgr_success) { - update_reconnect_parameters(c); + c->next_attempt = + gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); continue_connect(exec_ctx, c); gpr_mu_unlock(&c->mu); } else { @@ -644,32 +613,23 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { grpc_subchannel *c = arg; + GRPC_SUBCHANNEL_WEAK_REF(c, "connected"); + gpr_mu_lock(&c->mu); if (c->connecting_result.transport != NULL) { - publish_transport(exec_ctx, c); + publish_transport_locked(exec_ctx, c); } else if (c->disconnected) { GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_mu_lock(&c->mu); GPR_ASSERT(!c->have_alarm); c->have_alarm = 1; grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connect_failed"); grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); - gpr_mu_unlock(&c->mu); } -} - -static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { - gpr_timespec current_deadline = - gpr_time_add(c->next_attempt, c->backoff_delta); - gpr_timespec min_deadline = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_seconds(GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS, - GPR_TIMESPAN)); - return gpr_time_cmp(current_deadline, min_deadline) > 0 ? current_deadline - : min_deadline; + gpr_mu_unlock(&c->mu); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } /* diff --git a/src/core/client_config/subchannel_index.c b/src/core/client_config/subchannel_index.c index 3f948998f9..24cc76cf22 100644 --- a/src/core/client_config/subchannel_index.c +++ b/src/core/client_config/subchannel_index.c @@ -108,6 +108,7 @@ static int subchannel_key_compare(grpc_subchannel_key *a, if (c != 0) return c; c = memcmp(a->args.filters, b->args.filters, a->args.filter_count * sizeof(*a->args.filters)); + if (c != 0) return c; return grpc_channel_args_compare(a->args.args, b->args.args); } diff --git a/src/core/compression/compression_algorithm.c b/src/core/compression/compression_algorithm.c index 6f3a8eb28e..2810a38b68 100644 --- a/src/core/compression/compression_algorithm.c +++ b/src/core/compression/compression_algorithm.c @@ -128,20 +128,57 @@ grpc_mdelem *grpc_compression_encoding_mdelem( /* TODO(dgq): Add the ability to specify parameters to the individual * compression algorithms */ grpc_compression_algorithm grpc_compression_algorithm_for_level( - grpc_compression_level level) { + grpc_compression_level level, uint32_t accepted_encodings) { GRPC_API_TRACE("grpc_compression_algorithm_for_level(level=%d)", 1, ((int)level)); + if (level > GRPC_COMPRESS_LEVEL_HIGH) { + gpr_log(GPR_ERROR, "Unknown compression level %d.", (int)level); + abort(); + } + + const size_t num_supported = + GPR_BITCOUNT(accepted_encodings) - 1; /* discard NONE */ + if (level == GRPC_COMPRESS_LEVEL_NONE || num_supported == 0) { + return GRPC_COMPRESS_NONE; + } + + GPR_ASSERT(level > 0); + + /* Establish a "ranking" or compression algorithms in increasing order of + * compression. + * This is simplistic and we will probably want to introduce other dimensions + * in the future (cpu/memory cost, etc). */ + const grpc_compression_algorithm algos_ranking[] = {GRPC_COMPRESS_GZIP, + GRPC_COMPRESS_DEFLATE}; + + /* intersect algos_ranking with the supported ones keeping the ranked order */ + grpc_compression_algorithm + sorted_supported_algos[GRPC_COMPRESS_ALGORITHMS_COUNT]; + size_t algos_supported_idx = 0; + for (size_t i = 0; i < GPR_ARRAY_SIZE(algos_ranking); i++) { + const grpc_compression_algorithm alg = algos_ranking[i]; + for (size_t j = 0; j < num_supported; j++) { + if (GPR_BITGET(accepted_encodings, alg) == 1) { + /* if \a alg in supported */ + sorted_supported_algos[algos_supported_idx++] = alg; + break; + } + } + if (algos_supported_idx == num_supported) break; + } + switch (level) { case GRPC_COMPRESS_LEVEL_NONE: - return GRPC_COMPRESS_NONE; + abort(); /* should have been handled already */ case GRPC_COMPRESS_LEVEL_LOW: + return sorted_supported_algos[0]; case GRPC_COMPRESS_LEVEL_MED: + return sorted_supported_algos[num_supported / 2]; case GRPC_COMPRESS_LEVEL_HIGH: - return GRPC_COMPRESS_DEFLATE; + return sorted_supported_algos[num_supported - 1]; default: - gpr_log(GPR_ERROR, "Unknown compression level %d.", (int)level); abort(); - } + }; } void grpc_compression_options_init(grpc_compression_options *opts) { diff --git a/src/core/iomgr/exec_ctx.c b/src/core/iomgr/exec_ctx.c index 1fd79f6eba..893fe4515c 100644 --- a/src/core/iomgr/exec_ctx.c +++ b/src/core/iomgr/exec_ctx.c @@ -34,9 +34,12 @@ #include "src/core/iomgr/exec_ctx.h" #include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> #include "src/core/profiling/timers.h" +#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { bool did_something = 0; GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0); @@ -74,3 +77,75 @@ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, GPR_ASSERT(offload_target_or_null == NULL); grpc_closure_list_move(list, &exec_ctx->closure_list); } + +void grpc_exec_ctx_global_init(void) {} +void grpc_exec_ctx_global_shutdown(void) {} +#else +static gpr_mu g_mu; +static gpr_cv g_cv; +static int g_threads = 0; + +static void run_closure(void *arg) { + grpc_closure *closure = arg; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + closure->cb(&exec_ctx, closure->cb_arg, (closure->final_data & 1) != 0); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(&g_mu); + if (--g_threads == 0) { + gpr_cv_signal(&g_cv); + } + gpr_mu_unlock(&g_mu); +} + +static void start_closure(grpc_closure *closure) { + gpr_thd_id id; + gpr_mu_lock(&g_mu); + g_threads++; + gpr_mu_unlock(&g_mu); + gpr_thd_new(&id, run_closure, closure, NULL); +} + +bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { return false; } + +void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {} + +void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + bool success, + grpc_workqueue *offload_target_or_null) { + GPR_ASSERT(offload_target_or_null == NULL); + if (closure == NULL) return; + closure->final_data = success; + start_closure(closure); +} + +void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, + grpc_closure_list *list, + grpc_workqueue *offload_target_or_null) { + GPR_ASSERT(offload_target_or_null == NULL); + if (list == NULL) return; + grpc_closure *p = list->head; + while (p) { + grpc_closure *start = p; + p = grpc_closure_next(start); + start_closure(start); + } + grpc_closure_list r = GRPC_CLOSURE_LIST_INIT; + *list = r; +} + +void grpc_exec_ctx_global_init(void) { + gpr_mu_init(&g_mu); + gpr_cv_init(&g_cv); +} + +void grpc_exec_ctx_global_shutdown(void) { + gpr_mu_lock(&g_mu); + while (g_threads != 0) { + gpr_cv_wait(&g_cv, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&g_mu); + + gpr_mu_destroy(&g_mu); + gpr_cv_destroy(&g_cv); +} +#endif diff --git a/src/core/iomgr/exec_ctx.h b/src/core/iomgr/exec_ctx.h index 9a9b2e55fa..1b627a5dcf 100644 --- a/src/core/iomgr/exec_ctx.h +++ b/src/core/iomgr/exec_ctx.h @@ -36,6 +36,14 @@ #include "src/core/iomgr/closure.h" +/* #define GRPC_EXECUTION_CONTEXT_SANITIZER 1 */ + +/** A workqueue represents a list of work to be executed asynchronously. + Forward declared here to avoid a circular dependency with workqueue.h. */ +struct grpc_workqueue; +typedef struct grpc_workqueue grpc_workqueue; + +#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER /** Execution context. * A bag of data that collects information along a callstack. * Generally created at public API entry points, and passed down as @@ -57,13 +65,15 @@ struct grpc_exec_ctx { grpc_closure_list closure_list; }; -/** A workqueue represents a list of work to be executed asynchronously. - Forward declared here to avoid a circular dependency with workqueue.h. */ -struct grpc_workqueue; -typedef struct grpc_workqueue grpc_workqueue; - #define GRPC_EXEC_CTX_INIT \ { GRPC_CLOSURE_LIST_INIT } +#else +struct grpc_exec_ctx { + int unused; +}; +#define GRPC_EXEC_CTX_INIT \ + { 0 } +#endif /** Flush any work that has been enqueued onto this grpc_exec_ctx. * Caller must guarantee that no interfering locks are held. @@ -82,4 +92,7 @@ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, grpc_closure_list *list, grpc_workqueue *offload_target_or_null); +void grpc_exec_ctx_global_init(void); +void grpc_exec_ctx_global_shutdown(void); + #endif diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index 807729708e..fa87e5246b 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -71,7 +71,8 @@ static DWORD deadline_to_millis_timeout(gpr_timespec deadline, timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); } -void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) { +grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx, + gpr_timespec deadline) { BOOL success; DWORD bytes = 0; DWORD flags = 0; @@ -84,14 +85,14 @@ void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) { g_iocp, &bytes, &completion_key, &overlapped, deadline_to_millis_timeout(deadline, gpr_now(deadline.clock_type))); if (success == 0 && overlapped == NULL) { - return; + return GRPC_IOCP_WORK_TIMEOUT; } GPR_ASSERT(completion_key && overlapped); if (overlapped == &g_iocp_custom_overlap) { gpr_atm_full_fetch_add(&g_custom_events, -1); if (completion_key == (ULONG_PTR)&g_iocp_kick_token) { /* We were awoken from a kick. */ - return; + return GRPC_IOCP_WORK_KICK; } gpr_log(GPR_ERROR, "Unknown custom completion key."); abort(); @@ -121,6 +122,7 @@ void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) { } gpr_mu_unlock(&socket->state_mu); grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL); + return GRPC_IOCP_WORK_WORK; } void grpc_iocp_init(void) { @@ -140,10 +142,12 @@ void grpc_iocp_kick(void) { void grpc_iocp_flush(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_iocp_work_status work_status; do { - grpc_iocp_work(&exec_ctx, gpr_inf_past(GPR_CLOCK_MONOTONIC)); - } while (grpc_exec_ctx_flush(&exec_ctx)); + work_status = grpc_iocp_work(&exec_ctx, gpr_inf_past(GPR_CLOCK_MONOTONIC)); + } while (work_status == GRPC_IOCP_WORK_KICK || + grpc_exec_ctx_flush(&exec_ctx)); } void grpc_iocp_shutdown(void) { diff --git a/src/core/iomgr/iocp_windows.h b/src/core/iomgr/iocp_windows.h index 75f3ba8477..8b2b1aeb5c 100644 --- a/src/core/iomgr/iocp_windows.h +++ b/src/core/iomgr/iocp_windows.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -38,7 +38,14 @@ #include "src/core/iomgr/socket_windows.h" -void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline); +typedef enum { + GRPC_IOCP_WORK_WORK, + GRPC_IOCP_WORK_TIMEOUT, + GRPC_IOCP_WORK_KICK +} grpc_iocp_work_status; + +grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx, + gpr_timespec deadline); void grpc_iocp_init(void); void grpc_iocp_kick(void); void grpc_iocp_flush(void); diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 9c89c2c08a..3ab4430668 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -43,6 +43,7 @@ #include <grpc/support/thd.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/exec_ctx.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/timer.h" #include "src/core/support/env.h" @@ -57,6 +58,7 @@ void grpc_iomgr_init(void) { g_shutdown = 0; gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); + grpc_exec_ctx_global_init(); grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; @@ -138,6 +140,7 @@ void grpc_iomgr_shutdown(void) { grpc_pollset_global_shutdown(); grpc_iomgr_platform_shutdown(); + grpc_exec_ctx_global_shutdown(); gpr_mu_destroy(&g_mu); gpr_cv_destroy(&g_rcv); } diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index 92a0374ddd..ee1debfb71 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -55,7 +55,7 @@ typedef struct grpc_pollset_worker grpc_pollset_worker; size_t grpc_pollset_size(void); void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu); /* Begin shutting down the pollset, and call closure when done. - * GRPC_POLLSET_MU(pollset) must be held */ + * pollset's mutex must be held */ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure); /** Reset the pollset to its initial state (perhaps with some cached objects); @@ -66,16 +66,16 @@ void grpc_pollset_destroy(grpc_pollset *pollset); /* Do some work on a pollset. May involve invoking asynchronous callbacks, or actually polling file descriptors. - Requires GRPC_POLLSET_MU(pollset) locked. - May unlock GRPC_POLLSET_MU(pollset) during its execution. + Requires pollset's mutex locked. + May unlock its mutex during its execution. worker is a (platform-specific) handle that can be used to wake up from grpc_pollset_work before any events are received and before the timeout has expired. It is both initialized and destroyed by grpc_pollset_work. Initialization of worker is guaranteed to occur BEFORE the - GRPC_POLLSET_MU(pollset) is released for the first time by - grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will - not be released by grpc_pollset_work AFTER worker has been destroyed. + pollset's mutex is released for the first time by grpc_pollset_work + and it is guaranteed that it will not be released by grpc_pollset_work + AFTER worker has been destroyed. Tries not to block past deadline. May call grpc_closure_list_run on grpc_closure_list, without holding the diff --git a/src/core/iomgr/resolve_address.h b/src/core/iomgr/resolve_address.h index 01eedffa88..b059630457 100644 --- a/src/core/iomgr/resolve_address.h +++ b/src/core/iomgr/resolve_address.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -66,7 +66,7 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addresses); /* Resolve addr in a blocking fashion. Returns NULL on failure. On success, result must be freed with grpc_resolved_addresses_destroy. */ -grpc_resolved_addresses *grpc_blocking_resolve_address( - const char *addr, const char *default_port); +extern grpc_resolved_addresses *(*grpc_blocking_resolve_address)( + const char *name, const char *default_port); #endif /* GRPC_INTERNAL_CORE_IOMGR_RESOLVE_ADDRESS_H */ diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index c51745b918..a6c9893f23 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -34,18 +34,13 @@ #include <grpc/support/port_platform.h> #ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/sockaddr.h" #include "src/core/iomgr/resolve_address.h" +#include "src/core/iomgr/sockaddr.h" +#include <string.h> #include <sys/types.h> #include <sys/un.h> -#include <string.h> -#include "src/core/iomgr/executor.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/sockaddr_utils.h" -#include "src/core/support/block_annotate.h" -#include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> @@ -53,6 +48,11 @@ #include <grpc/support/thd.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/executor.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/support/block_annotate.h" +#include "src/core/support/string.h" typedef struct { char *name; @@ -62,7 +62,7 @@ typedef struct { void *arg; } request; -grpc_resolved_addresses *grpc_blocking_resolve_address( +static grpc_resolved_addresses *blocking_resolve_address_impl( const char *name, const char *default_port) { struct addrinfo hints; struct addrinfo *result = NULL, *resp; @@ -150,6 +150,9 @@ done: return addrs; } +grpc_resolved_addresses *(*grpc_blocking_resolve_address)( + const char *name, const char *default_port) = blocking_resolve_address_impl; + /* Callback to be passed to grpc_executor to asynch-ify * grpc_blocking_resolve_address */ static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) { diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c index 28c8661e73..472e797163 100644 --- a/src/core/iomgr/resolve_address_windows.c +++ b/src/core/iomgr/resolve_address_windows.c @@ -34,17 +34,12 @@ #include <grpc/support/port_platform.h> #ifdef GPR_WINSOCK_SOCKET -#include "src/core/iomgr/sockaddr.h" #include "src/core/iomgr/resolve_address.h" +#include "src/core/iomgr/sockaddr.h" -#include <sys/types.h> #include <string.h> +#include <sys/types.h> -#include "src/core/iomgr/executor.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/sockaddr_utils.h" -#include "src/core/support/block_annotate.h" -#include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> @@ -52,6 +47,11 @@ #include <grpc/support/string_util.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> +#include "src/core/iomgr/executor.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/support/block_annotate.h" +#include "src/core/support/string.h" typedef struct { char *name; @@ -61,7 +61,7 @@ typedef struct { void *arg; } request; -grpc_resolved_addresses *grpc_blocking_resolve_address( +static grpc_resolved_addresses *blocking_resolve_address_impl( const char *name, const char *default_port) { struct addrinfo hints; struct addrinfo *result = NULL, *resp; @@ -133,6 +133,9 @@ done: return addrs; } +grpc_resolved_addresses *(*grpc_blocking_resolve_address)( + const char *name, const char *default_port) = blocking_resolve_address_impl; + /* Callback to be passed to grpc_executor to asynch-ify * grpc_blocking_resolve_address */ static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) { diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index ce930b8f41..a4abc5b974 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -240,8 +240,7 @@ static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx, sp->shutting_down = 0; gpr_mu_lock(&sp->server->mu); GPR_ASSERT(sp->server->active_ports > 0); - if (0 == --sp->server->active_ports && - sp->server->shutdown_complete != NULL) { + if (0 == --sp->server->active_ports) { notify = 1; } gpr_mu_unlock(&sp->server->mu); diff --git a/src/core/iomgr/timer.c b/src/core/iomgr/timer.c index 8379fffad0..f444643428 100644 --- a/src/core/iomgr/timer.c +++ b/src/core/iomgr/timer.c @@ -33,11 +33,11 @@ #include "src/core/iomgr/timer.h" -#include "src/core/iomgr/timer_heap.h" -#include "src/core/iomgr/time_averaged_stats.h" #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/time_averaged_stats.h" +#include "src/core/iomgr/timer_heap.h" #define INVALID_HEAP_INDEX 0xffffffffu @@ -330,6 +330,18 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_checker_mu); + } else if (next != NULL) { + /* TODO(ctiller): this forces calling code to do an short poll, and + then retry the timer check (because this time through the timer list was + contended). + + We could reduce the cost here dramatically by keeping a count of how many + currently active pollers got through the uncontended case above + successfully, and waking up other pollers IFF that count drops to zero. + + Once that count is in place, this entire else branch could disappear. */ + *next = gpr_time_min( + *next, gpr_time_add(now, gpr_time_from_millis(1, GPR_TIMESPAN))); } return (int)n; diff --git a/src/core/iomgr/timer.h b/src/core/iomgr/timer.h index 9ad1e92f42..e239e884e7 100644 --- a/src/core/iomgr/timer.h +++ b/src/core/iomgr/timer.h @@ -96,7 +96,6 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer); *next is never guaranteed to be updated on any given execution; however, with high probability at least one thread in the system will see an update at any time slice. */ - bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_timespec *next); void grpc_timer_list_init(gpr_timespec now); diff --git a/src/core/iomgr/timer_heap.c b/src/core/iomgr/timer_heap.c index 9d8be5c1fc..b5df566c45 100644 --- a/src/core/iomgr/timer_heap.c +++ b/src/core/iomgr/timer_heap.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -46,7 +46,7 @@ static void adjust_upwards(grpc_timer **first, uint32_t i, grpc_timer *t) { while (i > 0) { uint32_t parent = (uint32_t)(((int)i - 1) / 2); - if (gpr_time_cmp(first[parent]->deadline, t->deadline) >= 0) break; + if (gpr_time_cmp(first[parent]->deadline, t->deadline) <= 0) break; first[i] = first[parent]; first[i]->heap_index = i; i = parent; @@ -62,16 +62,14 @@ static void adjust_downwards(grpc_timer **first, uint32_t i, uint32_t length, grpc_timer *t) { for (;;) { uint32_t left_child = 1u + 2u * i; - uint32_t right_child; - uint32_t next_i; if (left_child >= length) break; - right_child = left_child + 1; - next_i = right_child < length && - gpr_time_cmp(first[left_child]->deadline, - first[right_child]->deadline) < 0 - ? right_child - : left_child; - if (gpr_time_cmp(t->deadline, first[next_i]->deadline) >= 0) break; + uint32_t right_child = left_child + 1; + uint32_t next_i = right_child < length && + gpr_time_cmp(first[left_child]->deadline, + first[right_child]->deadline) > 0 + ? right_child + : left_child; + if (gpr_time_cmp(t->deadline, first[next_i]->deadline) <= 0) break; first[i] = first[next_i]; first[i]->heap_index = i; i = next_i; @@ -95,7 +93,7 @@ static void maybe_shrink(grpc_timer_heap *heap) { static void note_changed_priority(grpc_timer_heap *heap, grpc_timer *timer) { uint32_t i = timer->heap_index; uint32_t parent = (uint32_t)(((int)i - 1) / 2); - if (gpr_time_cmp(heap->timers[parent]->deadline, timer->deadline) < 0) { + if (gpr_time_cmp(heap->timers[parent]->deadline, timer->deadline) > 0) { adjust_upwards(heap->timers, i, timer); } else { adjust_downwards(heap->timers, i, heap->timer_count, timer); diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index c096dbfb30..2b42e6d4fb 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -107,7 +107,7 @@ void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { if (grpc_closure_list_empty(workqueue->closure_list)) { grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); } - grpc_closure_list_move(&exec_ctx->closure_list, &workqueue->closure_list); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); gpr_mu_unlock(&workqueue->mu); } @@ -123,7 +123,7 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) { gpr_free(workqueue); } else { gpr_mu_lock(&workqueue->mu); - grpc_closure_list_move(&workqueue->closure_list, &exec_ctx->closure_list); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); gpr_mu_unlock(&workqueue->mu); grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c index 33c62a20c2..fbec263eed 100644 --- a/src/core/security/security_connector.c +++ b/src/core/security/security_connector.c @@ -492,6 +492,9 @@ grpc_auth_context *tsi_ssl_peer_to_auth_context(const tsi_peer *peer) { peer_identity_property_name = GRPC_X509_SAN_PROPERTY_NAME; grpc_auth_context_add_property(ctx, GRPC_X509_SAN_PROPERTY_NAME, prop->value.data, prop->value.length); + } else if (strcmp(prop->name, TSI_X509_PEM_CERT_PROPERTY) == 0) { + grpc_auth_context_add_property(ctx, GRPC_X509_PEM_CERT_PROPERTY_NAME, + prop->value.data, prop->value.length); } } if (peer_identity_property_name != NULL) { @@ -554,9 +557,9 @@ static void ssl_server_check_peer(grpc_exec_ctx *exec_ctx, grpc_auth_context_unref(auth_context); } -static void add_shalow_auth_property_to_peer(tsi_peer *peer, - const grpc_auth_property *prop, - const char *tsi_prop_name) { +static void add_shallow_auth_property_to_peer(tsi_peer *peer, + const grpc_auth_property *prop, + const char *tsi_prop_name) { tsi_peer_property *tsi_prop = &peer->properties[peer->property_count++]; tsi_prop->name = (char *)tsi_prop_name; tsi_prop->value.data = prop->value; @@ -579,11 +582,14 @@ tsi_peer tsi_shallow_peer_from_ssl_auth_context( it = grpc_auth_context_property_iterator(auth_context); while ((prop = grpc_auth_property_iterator_next(&it)) != NULL) { if (strcmp(prop->name, GRPC_X509_SAN_PROPERTY_NAME) == 0) { - add_shalow_auth_property_to_peer( + add_shallow_auth_property_to_peer( &peer, prop, TSI_X509_SUBJECT_ALTERNATIVE_NAME_PEER_PROPERTY); } else if (strcmp(prop->name, GRPC_X509_CN_PROPERTY_NAME) == 0) { - add_shalow_auth_property_to_peer( + add_shallow_auth_property_to_peer( &peer, prop, TSI_X509_SUBJECT_COMMON_NAME_PEER_PROPERTY); + } else if (strcmp(prop->name, GRPC_X509_PEM_CERT_PROPERTY_NAME) == 0) { + add_shallow_auth_property_to_peer(&peer, prop, + TSI_X509_PEM_CERT_PROPERTY); } } } diff --git a/src/core/support/alloc.c b/src/core/support/alloc.c index 0a064b2c18..b99584bd20 100644 --- a/src/core/support/alloc.c +++ b/src/core/support/alloc.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -87,4 +87,4 @@ void *gpr_malloc_aligned(size_t size, size_t alignment_log) { return (void *)ret; } -void gpr_free_aligned(void *ptr) { free(((void **)ptr)[-1]); } +void gpr_free_aligned(void *ptr) { gpr_free(((void **)ptr)[-1]); } diff --git a/src/core/support/backoff.c b/src/core/support/backoff.c new file mode 100644 index 0000000000..7458219645 --- /dev/null +++ b/src/core/support/backoff.c @@ -0,0 +1,71 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/support/backoff.h" + +#include <grpc/support/useful.h> + +void gpr_backoff_init(gpr_backoff *backoff, double multiplier, double jitter, + int64_t min_timeout_millis, int64_t max_timeout_millis) { + backoff->multiplier = multiplier; + backoff->jitter = jitter; + backoff->min_timeout_millis = min_timeout_millis; + backoff->max_timeout_millis = max_timeout_millis; + backoff->rng_state = (uint32_t)gpr_now(GPR_CLOCK_REALTIME).tv_nsec; +} + +gpr_timespec gpr_backoff_begin(gpr_backoff *backoff, gpr_timespec now) { + backoff->current_timeout_millis = backoff->min_timeout_millis; + return gpr_time_add( + now, gpr_time_from_millis(backoff->current_timeout_millis, GPR_TIMESPAN)); +} + +/* Generate a random number between 0 and 1. */ +static double generate_uniform_random_number(uint32_t *rng_state) { + *rng_state = (1103515245 * *rng_state + 12345) % ((uint32_t)1 << 31); + return *rng_state / (double)((uint32_t)1 << 31); +} + +gpr_timespec gpr_backoff_step(gpr_backoff *backoff, gpr_timespec now) { + double new_timeout_millis = + backoff->multiplier * (double)backoff->current_timeout_millis; + double jitter_range = backoff->jitter * new_timeout_millis; + double jitter = + (2 * generate_uniform_random_number(&backoff->rng_state) - 1) * + jitter_range; + backoff->current_timeout_millis = + GPR_CLAMP((int64_t)(new_timeout_millis + jitter), + backoff->min_timeout_millis, backoff->max_timeout_millis); + return gpr_time_add( + now, gpr_time_from_millis(backoff->current_timeout_millis, GPR_TIMESPAN)); +} diff --git a/src/core/support/backoff.h b/src/core/support/backoff.h new file mode 100644 index 0000000000..3234aa214d --- /dev/null +++ b/src/core/support/backoff.h @@ -0,0 +1,65 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_SUPPORT_BACKOFF_H +#define GRPC_INTERNAL_CORE_SUPPORT_BACKOFF_H + +#include <grpc/support/time.h> + +typedef struct { + /// const: multiplier between retry attempts + double multiplier; + /// const: amount to randomize backoffs + double jitter; + /// const: minimum time between retries in milliseconds + int64_t min_timeout_millis; + /// const: maximum time between retries in milliseconds + int64_t max_timeout_millis; + + /// random number generator + uint32_t rng_state; + + /// current retry timeout in milliseconds + int64_t current_timeout_millis; +} gpr_backoff; + +/// Initialize backoff machinery - does not need to be destroyed +void gpr_backoff_init(gpr_backoff *backoff, double multiplier, double jitter, + int64_t min_timeout_millis, int64_t max_timeout_millis); + +/// Begin retry loop: returns a timespec for the NEXT retry +gpr_timespec gpr_backoff_begin(gpr_backoff *backoff, gpr_timespec now); +/// Step a retry loop: returns a timespec for the NEXT retry +gpr_timespec gpr_backoff_step(gpr_backoff *backoff, gpr_timespec now); + +#endif // GRPC_INTERNAL_CORE_SUPPORT_BACKOFF_H diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 1b117aa6b8..6f1cd1df10 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -1481,3 +1481,11 @@ void *grpc_call_context_get(grpc_call *call, grpc_context_index elem) { } uint8_t grpc_call_is_client(grpc_call *call) { return call->is_client; } + +grpc_compression_algorithm grpc_call_compression_for_level( + grpc_call *call, grpc_compression_level level) { + gpr_mu_lock(&call->mu); + const uint32_t accepted_encodings = call->encodings_accepted_by_peer; + gpr_mu_unlock(&call->mu); + return grpc_compression_algorithm_for_level(level, accepted_encodings); +} diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 0bbffb98ae..0b3f543fe4 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -38,7 +38,9 @@ #include "src/core/channel/context.h" #include "src/core/surface/api_trace.h" #include "src/core/surface/surface_trace.h" + #include <grpc/grpc.h> +#include <grpc/impl/codegen/compression_types.h> #ifdef __cplusplus extern "C" { @@ -102,6 +104,11 @@ void *grpc_call_context_get(grpc_call *call, grpc_context_index elem); uint8_t grpc_call_is_client(grpc_call *call); +/* Return an appropriate compression algorithm for the requested compression \a + * level in the context of \a call. */ +grpc_compression_algorithm grpc_call_compression_for_level( + grpc_call *call, grpc_compression_level level); + #ifdef __cplusplus } #endif diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index f6a95ebbd3..b22818ea87 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -86,7 +86,7 @@ struct grpc_completion_queue { #define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) static gpr_mu g_freelist_mu; -grpc_completion_queue *g_freelist; +static grpc_completion_queue *g_freelist; static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, bool success); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 19265252ca..03444fd4c2 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -851,9 +851,11 @@ static void perform_stream_op_locked( if (stream_global->write_closed) { grpc_chttp2_complete_closure_step( exec_ctx, &stream_global->send_message_finished, 0); - } else if (stream_global->id != 0) { + } else { stream_global->send_message = op->send_message; - grpc_chttp2_become_writable(transport_global, stream_global); + if (stream_global->id != 0) { + grpc_chttp2_become_writable(transport_global, stream_global); + } } } diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c index 6adcaac9ed..42d25ca929 100644 --- a/src/core/tsi/ssl_transport_security.c +++ b/src/core/tsi/ssl_transport_security.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -33,9 +33,18 @@ #include "src/core/tsi/ssl_transport_security.h" +#include <grpc/support/port_platform.h> + #include <limits.h> #include <string.h> +/* TODO(jboeuf): refactor inet_ntop into a portability header. */ +#ifdef GPR_WINSOCK_SOCKET +#include <ws2tcpip.h> +#else +#include <arpa/inet.h> +#endif + #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/thd.h> @@ -197,13 +206,16 @@ static void ssl_info_callback(const SSL *ssl, int where, int ret) { } /* Returns 1 if name looks like an IP address, 0 otherwise. - This is a very rough heuristic as it does not handle IPV6 or things like: - 0300.0250.00.01, 0xC0.0Xa8.0x0.0x1, 000030052000001, 0xc0.052000001 */ + This is a very rough heuristic, and only handles IPv6 in hexadecimal form. */ static int looks_like_ip_address(const char *name) { size_t i; size_t dot_count = 0; size_t num_size = 0; for (i = 0; i < strlen(name); i++) { + if (name[i] == ':') { + /* IPv6 Address in hexadecimal form, : is not allowed in DNS names. */ + return 1; + } if (name[i] >= '0' && name[i] <= '9') { if (num_size > 3) return 0; num_size++; @@ -281,6 +293,26 @@ static tsi_result peer_property_from_x509_common_name( return result; } +/* Gets the X509 cert in PEM format as a tsi_peer_property. */ +static tsi_result add_pem_certificate(X509 *cert, tsi_peer_property *property) { + BIO *bio = BIO_new(BIO_s_mem()); + if (!PEM_write_bio_X509(bio, cert)) { + BIO_free(bio); + return TSI_INTERNAL_ERROR; + } + char *contents; + long len = BIO_get_mem_data(bio, &contents); + if (len <= 0) { + BIO_free(bio); + return TSI_INTERNAL_ERROR; + } + tsi_result result = tsi_construct_string_peer_property( + TSI_X509_PEM_CERT_PROPERTY, (const char *)contents, (size_t)len, + property); + BIO_free(bio); + return result; +} + /* Gets the subject SANs from an X509 cert as a tsi_peer_property. */ static tsi_result add_subject_alt_names_properties_to_peer( tsi_peer *peer, GENERAL_NAMES *subject_alt_names, @@ -296,21 +328,44 @@ static tsi_result add_subject_alt_names_properties_to_peer( sk_GENERAL_NAME_value(subject_alt_names, TSI_SIZE_AS_SIZE(i)); /* Filter out the non-dns entries names. */ if (subject_alt_name->type == GEN_DNS) { - unsigned char *dns_name = NULL; - int dns_name_size = - ASN1_STRING_to_UTF8(&dns_name, subject_alt_name->d.dNSName); - if (dns_name_size < 0) { + unsigned char *name = NULL; + int name_size; + name_size = ASN1_STRING_to_UTF8(&name, subject_alt_name->d.dNSName); + if (name_size < 0) { gpr_log(GPR_ERROR, "Could not get utf8 from asn1 string."); result = TSI_INTERNAL_ERROR; break; } result = tsi_construct_string_peer_property( - TSI_X509_SUBJECT_ALTERNATIVE_NAME_PEER_PROPERTY, - (const char *)dns_name, (size_t)dns_name_size, + TSI_X509_SUBJECT_ALTERNATIVE_NAME_PEER_PROPERTY, (const char *)name, + (size_t)name_size, &peer->properties[peer->property_count++]); + OPENSSL_free(name); + } else if (subject_alt_name->type == GEN_IPADD) { + char ntop_buf[INET6_ADDRSTRLEN]; + int af; + + if (subject_alt_name->d.iPAddress->length == 4) { + af = AF_INET; + } else if (subject_alt_name->d.iPAddress->length == 16) { + af = AF_INET6; + } else { + gpr_log(GPR_ERROR, "SAN IP Address contained invalid IP"); + result = TSI_INTERNAL_ERROR; + break; + } + const char *name = inet_ntop(af, subject_alt_name->d.iPAddress->data, + ntop_buf, INET6_ADDRSTRLEN); + if (name == NULL) { + gpr_log(GPR_ERROR, "Could not get IP string from asn1 octet."); + result = TSI_INTERNAL_ERROR; + break; + } + + result = tsi_construct_string_peer_property_from_cstring( + TSI_X509_SUBJECT_ALTERNATIVE_NAME_PEER_PROPERTY, name, &peer->properties[peer->property_count++]); - OPENSSL_free(dns_name); - if (result != TSI_OK) break; } + if (result != TSI_OK) break; } return result; } @@ -328,7 +383,8 @@ static tsi_result peer_from_x509(X509 *cert, int include_certificate_type, tsi_result result; GPR_ASSERT(subject_alt_name_count >= 0); property_count = (include_certificate_type ? (size_t)1 : 0) + - 1 /* common name */ + (size_t)subject_alt_name_count; + 2 /* common name, certificate */ + + (size_t)subject_alt_name_count; result = tsi_construct_peer(property_count, peer); if (result != TSI_OK) return result; do { @@ -342,6 +398,10 @@ static tsi_result peer_from_x509(X509 *cert, int include_certificate_type, cert, &peer->properties[include_certificate_type ? 1 : 0]); if (result != TSI_OK) break; + result = add_pem_certificate( + cert, &peer->properties[include_certificate_type ? 2 : 1]); + if (result != TSI_OK) break; + if (subject_alt_name_count != 0) { result = add_subject_alt_names_properties_to_peer( peer, subject_alt_names, (size_t)subject_alt_name_count); @@ -1436,9 +1496,7 @@ int tsi_ssl_peer_matches_name(const tsi_peer *peer, const char *name) { size_t i = 0; size_t san_count = 0; const tsi_peer_property *cn_property = NULL; - - /* For now reject what looks like an IP address. */ - if (looks_like_ip_address(name)) return 0; + int like_ip = looks_like_ip_address(name); /* Check the SAN first. */ for (i = 0; i < peer->property_count; i++) { @@ -1447,8 +1505,15 @@ int tsi_ssl_peer_matches_name(const tsi_peer *peer, const char *name) { if (strcmp(property->name, TSI_X509_SUBJECT_ALTERNATIVE_NAME_PEER_PROPERTY) == 0) { san_count++; - if (does_entry_match_name(property->value.data, property->value.length, - name)) { + + if (!like_ip && does_entry_match_name(property->value.data, + property->value.length, name)) { + return 1; + } else if (like_ip && + strncmp(name, property->value.data, property->value.length) == + 0 && + strlen(name) == property->value.length) { + /* IP Addresses are exact matches only. */ return 1; } } else if (strcmp(property->name, @@ -1457,8 +1522,8 @@ int tsi_ssl_peer_matches_name(const tsi_peer *peer, const char *name) { } } - /* If there's no SAN, try the CN. */ - if (san_count == 0 && cn_property != NULL) { + /* If there's no SAN, try the CN, but only if its not like an IP Address */ + if (san_count == 0 && cn_property != NULL && !like_ip) { if (does_entry_match_name(cn_property->value.data, cn_property->value.length, name)) { return 1; diff --git a/src/core/tsi/ssl_transport_security.h b/src/core/tsi/ssl_transport_security.h index 51c0003a85..32bb067f0b 100644 --- a/src/core/tsi/ssl_transport_security.h +++ b/src/core/tsi/ssl_transport_security.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -48,6 +48,8 @@ extern "C" { #define TSI_X509_SUBJECT_ALTERNATIVE_NAME_PEER_PROPERTY \ "x509_subject_alternative_name" +#define TSI_X509_PEM_CERT_PROPERTY "x509_pem_cert" + #define TSI_SSL_ALPN_SELECTED_PROTOCOL "ssl_alpn_selected_protocol" /* --- tsi_ssl_handshaker_factory object --- @@ -162,8 +164,7 @@ void tsi_ssl_handshaker_factory_destroy(tsi_ssl_handshaker_factory *self); Still TODO(jboeuf): - handle mixed case. - handle %encoded chars. - - handle public suffix wildchar more strictly (e.g. *.co.uk) - - handle IP addresses in SAN. */ + - handle public suffix wildchar more strictly (e.g. *.co.uk) */ int tsi_ssl_peer_matches_name(const tsi_peer *peer, const char *name); #ifdef __cplusplus diff --git a/src/cpp/README.md b/src/cpp/README.md index baeba08315..f2935e52d9 100644 --- a/src/cpp/README.md +++ b/src/cpp/README.md @@ -6,3 +6,77 @@ This directory contains source code for C++ implementation of gRPC. #Status Beta + +#Pre-requisites + +##Linux + +```sh + $ [sudo] apt-get install build-essential autoconf libtool +``` + +##Mac OSX + +For a Mac system, git is not available by default. You will first need to +install Xcode from the Mac AppStore and then run the following command from a +terminal: + +```sh + $ [sudo] xcode-select --install +``` + +##Protoc + +By default gRPC uses [protocol buffers](https://github.com/google/protobuf), +you will need the `protoc` compiler to generate stub server and client code. + +If you compile gRPC from source, as described below, this also installs the +`protoc` compiler. + +If it hasn't been installed, you can run the following commands to install it. + +```sh +$ cd grpc/third_party/protobuf +$ sudo make install # 'make' should have been run by core grpc +``` + +Alternatively, you can download `protoc` binaries from +[the protocol buffers Github repository](https://github.com/google/protobuf/releases). + +#Installation + +Currently to install gRPC for C++, you need to build from source as described +below. + +#Build from Source + +```sh + $ git clone https://github.com/grpc/grpc.git + $ cd grpc + $ git submodule update --init + $ make + $ [sudo] make install +``` + +#Documentation + +You can find out how to build and run our simplest gRPC C++ example in our +[C++ quick start](../../examples/cpp). + +For more detailed documentation on using gRPC in C++ , see our main +documentation site at [grpc.io](http://grpc.io), specifically: + +* [Overview](http://www.grpc.io/docs/): An introduction to gRPC with a simple + Hello World example in all our supported languages, including C++. +* [gRPC Basics - C++](http://www.grpc.io/docs/tutorials/basic/c.html): + A tutorial that steps you through creating a simple gRPC C++ example + application. +* [Asynchronous Basics - C++](http://www.grpc.io/docs/tutorials/async/helloasync-cpp.html): + A tutorial that shows you how to use gRPC C++'s asynchronous/non-blocking + APIs. + + +# Examples + +Code examples for gRPC C++ live in this repository's +[examples/cpp](../../examples/cpp) directory. diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index 074dae7ca7..c34b840f90 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -60,8 +60,7 @@ std::shared_ptr<grpc::Channel> SecureChannelCredentials::CreateChannel( SecureCallCredentials::SecureCallCredentials(grpc_call_credentials* c_creds) : c_creds_(c_creds) { - internal::GrpcLibraryInitializer gli_initializer; - gli_initializer.summon(); + g_gli_initializer.summon(); } bool SecureCallCredentials::ApplyToCall(grpc_call* call) { @@ -83,14 +82,14 @@ std::shared_ptr<CallCredentials> WrapCallCredentials( } // namespace std::shared_ptr<ChannelCredentials> GoogleDefaultCredentials() { - GrpcLibrary init; // To call grpc_init(). + GrpcLibraryCodegen init; // To call grpc_init(). return WrapChannelCredentials(grpc_google_default_credentials_create()); } // Builds SSL Credentials given SSL specific options std::shared_ptr<ChannelCredentials> SslCredentials( const SslCredentialsOptions& options) { - GrpcLibrary init; // To call grpc_init(). + GrpcLibraryCodegen init; // To call grpc_init(). grpc_ssl_pem_key_cert_pair pem_key_cert_pair = { options.pem_private_key.c_str(), options.pem_cert_chain.c_str()}; @@ -102,7 +101,7 @@ std::shared_ptr<ChannelCredentials> SslCredentials( // Builds credentials for use when running in GCE std::shared_ptr<CallCredentials> GoogleComputeEngineCredentials() { - GrpcLibrary init; // To call grpc_init(). + GrpcLibraryCodegen init; // To call grpc_init(). return WrapCallCredentials( grpc_google_compute_engine_credentials_create(nullptr)); } @@ -110,7 +109,7 @@ std::shared_ptr<CallCredentials> GoogleComputeEngineCredentials() { // Builds JWT credentials. std::shared_ptr<CallCredentials> ServiceAccountJWTAccessCredentials( const grpc::string& json_key, long token_lifetime_seconds) { - GrpcLibrary init; // To call grpc_init(). + GrpcLibraryCodegen init; // To call grpc_init(). if (token_lifetime_seconds <= 0) { gpr_log(GPR_ERROR, "Trying to create JWTCredentials with non-positive lifetime"); @@ -125,7 +124,7 @@ std::shared_ptr<CallCredentials> ServiceAccountJWTAccessCredentials( // Builds refresh token credentials. std::shared_ptr<CallCredentials> GoogleRefreshTokenCredentials( const grpc::string& json_refresh_token) { - GrpcLibrary init; // To call grpc_init(). + GrpcLibraryCodegen init; // To call grpc_init(). return WrapCallCredentials(grpc_google_refresh_token_credentials_create( json_refresh_token.c_str(), nullptr)); } @@ -133,7 +132,7 @@ std::shared_ptr<CallCredentials> GoogleRefreshTokenCredentials( // Builds access token credentials. std::shared_ptr<CallCredentials> AccessTokenCredentials( const grpc::string& access_token) { - GrpcLibrary init; // To call grpc_init(). + GrpcLibraryCodegen init; // To call grpc_init(). return WrapCallCredentials( grpc_access_token_credentials_create(access_token.c_str(), nullptr)); } @@ -142,7 +141,7 @@ std::shared_ptr<CallCredentials> AccessTokenCredentials( std::shared_ptr<CallCredentials> GoogleIAMCredentials( const grpc::string& authorization_token, const grpc::string& authority_selector) { - GrpcLibrary init; // To call grpc_init(). + GrpcLibraryCodegen init; // To call grpc_init(). return WrapCallCredentials(grpc_google_iam_credentials_create( authorization_token.c_str(), authority_selector.c_str(), nullptr)); } @@ -224,7 +223,7 @@ MetadataCredentialsPluginWrapper::MetadataCredentialsPluginWrapper( std::shared_ptr<CallCredentials> MetadataCredentialsFromPlugin( std::unique_ptr<MetadataCredentialsPlugin> plugin) { - GrpcLibrary init; // To call grpc_init(). + GrpcLibraryCodegen init; // To call grpc_init(). const char* type = plugin->GetType(); MetadataCredentialsPluginWrapper* wrapper = new MetadataCredentialsPluginWrapper(std::move(plugin)); diff --git a/src/cpp/codegen/grpc_library.cc b/src/cpp/codegen/codegen_init.cc index 48acec3f3d..c5d22124b7 100644 --- a/src/cpp/codegen/grpc_library.cc +++ b/src/cpp/codegen/codegen_init.cc @@ -31,10 +31,15 @@ * */ +#include <grpc++/impl/codegen/core_codegen_interface.h> #include <grpc++/impl/codegen/grpc_library.h> -namespace grpc { +/// Initializes the global gRPC variables for the codegen library. These will +/// stay null in the absence of of grpc++ library. In this case, no gRPC +/// features such as the ability to perform calls will be available. Trying to +/// perform them would result in a segmentation fault when trying to deference +/// the following nulled globals. These should be associated with actual +/// as part of the instantiation of a \a grpc::GrpcLibraryInitializer variable. -GrpcLibraryInterface *g_glip = nullptr; - -} // namespace grpc +grpc::CoreCodegenInterface* grpc::g_core_codegen_interface = nullptr; +grpc::GrpcLibraryInterface* grpc::g_glip = nullptr; diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc deleted file mode 100644 index 5b87c2a806..0000000000 --- a/src/cpp/common/call.cc +++ /dev/null @@ -1,92 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <grpc++/impl/call.h> - -#include <grpc/support/alloc.h> -#include <grpc++/channel.h> -#include <grpc++/client_context.h> -#include <grpc++/support/byte_buffer.h> -#include "src/core/profiling/timers.h" - -namespace grpc { - -void FillMetadataMap( - grpc_metadata_array* arr, - std::multimap<grpc::string_ref, grpc::string_ref>* metadata) { - for (size_t i = 0; i < arr->count; i++) { - // TODO(yangg) handle duplicates? - metadata->insert(std::pair<grpc::string_ref, grpc::string_ref>( - arr->metadata[i].key, grpc::string_ref(arr->metadata[i].value, - arr->metadata[i].value_length))); - } - grpc_metadata_array_destroy(arr); - grpc_metadata_array_init(arr); -} - -// TODO(yangg) if the map is changed before we send, the pointers will be a -// mess. Make sure it does not happen. -grpc_metadata* FillMetadataArray( - const std::multimap<grpc::string, grpc::string>& metadata) { - if (metadata.empty()) { - return nullptr; - } - grpc_metadata* metadata_array = - (grpc_metadata*)gpr_malloc(metadata.size() * sizeof(grpc_metadata)); - size_t i = 0; - for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) { - metadata_array[i].key = iter->first.c_str(); - metadata_array[i].value = iter->second.c_str(); - metadata_array[i].value_length = iter->second.size(); - } - return metadata_array; -} - -Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq) - : call_hook_(call_hook), cq_(cq), call_(call), max_message_size_(-1) {} - -Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq, - int max_message_size) - : call_hook_(call_hook), - cq_(cq), - call_(call), - max_message_size_(max_message_size) {} - -void Call::PerformOps(CallOpSetInterface* ops) { - if (max_message_size_ > 0) { - ops->set_max_message_size(max_message_size_); - } - call_hook_->PerformOpsOnCall(ops, this); -} - -} // namespace grpc diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc index 4f76dfff1d..729dc33749 100644 --- a/src/cpp/common/completion_queue.cc +++ b/src/cpp/common/completion_queue.cc @@ -34,7 +34,6 @@ #include <memory> -#include <grpc++/impl/codegen/completion_queue_tag.h> #include <grpc++/impl/grpc_library.h> #include <grpc++/support/time.h> #include <grpc/grpc.h> @@ -43,16 +42,13 @@ namespace grpc { static internal::GrpcLibraryInitializer g_gli_initializer; -CompletionQueue::CompletionQueue() { - g_gli_initializer.summon(); - cq_ = grpc_completion_queue_create(nullptr); -} CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {} -CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); } - -void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); } +void CompletionQueue::Shutdown() { + g_gli_initializer.summon(); + grpc_completion_queue_shutdown(cq_); +} CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( void** tag, bool* ok, gpr_timespec deadline) { @@ -75,25 +71,4 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( } } -bool CompletionQueue::Pluck(CompletionQueueTag* tag) { - auto deadline = gpr_inf_future(GPR_CLOCK_REALTIME); - auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr); - bool ok = ev.success != 0; - void* ignored = tag; - GPR_ASSERT(tag->FinalizeResult(&ignored, &ok)); - GPR_ASSERT(ignored == tag); - // Ignore mutations by FinalizeResult: Pluck returns the C API status - return ev.success != 0; -} - -void CompletionQueue::TryPluck(CompletionQueueTag* tag) { - auto deadline = gpr_time_0(GPR_CLOCK_REALTIME); - auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr); - if (ev.type == GRPC_QUEUE_TIMEOUT) return; - bool ok = ev.success != 0; - void* ignored = tag; - // the tag must be swallowed if using TryPluck - GPR_ASSERT(!tag->FinalizeResult(&ignored, &ok)); -} - } // namespace grpc diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/common/core_codegen.cc index 79e7bf1801..45e9e278a0 100644 --- a/src/cpp/proto/proto_utils.cc +++ b/src/cpp/common/core_codegen.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,28 +31,31 @@ * */ -#include <grpc++/impl/proto_utils.h> +#include "src/cpp/common/core_codegen.h" -#include <climits> +#include <stdlib.h> -#include <grpc/grpc.h> +#include <grpc++/support/config.h> #include <grpc/byte_buffer.h> #include <grpc/byte_buffer_reader.h> -#include <grpc/support/log.h> +#include <grpc/grpc.h> +#include <grpc/impl/codegen/alloc.h> +#include <grpc/impl/codegen/byte_buffer.h> +#include <grpc/impl/codegen/log.h> +#include <grpc/support/port_platform.h> #include <grpc/support/slice.h> #include <grpc/support/slice_buffer.h> -#include <grpc/support/port_platform.h> -#include <grpc++/support/config.h> #include "src/core/profiling/timers.h" -const int kMaxBufferLength = 8192; +namespace { + +const int kGrpcBufferWriterMaxBufferLength = 8192; class GrpcBufferWriter GRPC_FINAL : public ::grpc::protobuf::io::ZeroCopyOutputStream { public: - explicit GrpcBufferWriter(grpc_byte_buffer** bp, - int block_size = kMaxBufferLength) + explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size) : block_size_(block_size), byte_count_(0), have_backup_(false) { *bp = grpc_raw_byte_buffer_create(NULL, 0); slice_buffer_ = &(*bp)->data.raw.slice_buffer; @@ -161,14 +164,56 @@ class GrpcBufferReader GRPC_FINAL grpc_byte_buffer_reader reader_; gpr_slice slice_; }; +} // namespace namespace grpc { -Status SerializeProto(const grpc::protobuf::Message& msg, - grpc_byte_buffer** bp) { +grpc_completion_queue* CoreCodegen::grpc_completion_queue_create( + void* reserved) { + return ::grpc_completion_queue_create(reserved); +} + +void CoreCodegen::grpc_completion_queue_destroy(grpc_completion_queue* cq) { + ::grpc_completion_queue_destroy(cq); +} + +grpc_event CoreCodegen::grpc_completion_queue_pluck(grpc_completion_queue* cq, + void* tag, + gpr_timespec deadline, + void* reserved) { + return ::grpc_completion_queue_pluck(cq, tag, deadline, reserved); +} + +void* CoreCodegen::gpr_malloc(size_t size) { return ::gpr_malloc(size); } + +void CoreCodegen::gpr_free(void* p) { return ::gpr_free(p); } + +void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) { + ::grpc_byte_buffer_destroy(bb); +} + +void CoreCodegen::grpc_metadata_array_init(grpc_metadata_array* array) { + ::grpc_metadata_array_init(array); +} + +void CoreCodegen::grpc_metadata_array_destroy(grpc_metadata_array* array) { + ::grpc_metadata_array_destroy(array); +} + +gpr_timespec CoreCodegen::gpr_inf_future(gpr_clock_type type) { + return ::gpr_inf_future(type); +} + +void CoreCodegen::assert_fail(const char* failed_assertion) { + gpr_log(GPR_ERROR, "assertion failed: %s", failed_assertion); + abort(); +} + +Status CoreCodegen::SerializeProto(const grpc::protobuf::Message& msg, + grpc_byte_buffer** bp) { GPR_TIMER_SCOPE("SerializeProto", 0); int byte_size = msg.ByteSize(); - if (byte_size <= kMaxBufferLength) { + if (byte_size <= kGrpcBufferWriterMaxBufferLength) { gpr_slice slice = gpr_slice_malloc(byte_size); GPR_ASSERT(GPR_SLICE_END_PTR(slice) == msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice))); @@ -176,31 +221,36 @@ Status SerializeProto(const grpc::protobuf::Message& msg, gpr_slice_unref(slice); return Status::OK; } else { - GrpcBufferWriter writer(bp); + GrpcBufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength); return msg.SerializeToZeroCopyStream(&writer) ? Status::OK : Status(StatusCode::INTERNAL, "Failed to serialize message"); } } -Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, - int max_message_size) { +Status CoreCodegen::DeserializeProto(grpc_byte_buffer* buffer, + grpc::protobuf::Message* msg, + int max_message_size) { GPR_TIMER_SCOPE("DeserializeProto", 0); - if (!buffer) { + if (buffer == nullptr) { return Status(StatusCode::INTERNAL, "No payload"); } - GrpcBufferReader reader(buffer); - ::grpc::protobuf::io::CodedInputStream decoder(&reader); - if (max_message_size > 0) { - decoder.SetTotalBytesLimit(max_message_size, max_message_size); - } - if (!msg->ParseFromCodedStream(&decoder)) { - return Status(StatusCode::INTERNAL, msg->InitializationErrorString()); - } - if (!decoder.ConsumedEntireMessage()) { - return Status(StatusCode::INTERNAL, "Did not read entire message"); + Status result = Status::OK; + { + GrpcBufferReader reader(buffer); + ::grpc::protobuf::io::CodedInputStream decoder(&reader); + if (max_message_size > 0) { + decoder.SetTotalBytesLimit(max_message_size, max_message_size); + } + if (!msg->ParseFromCodedStream(&decoder)) { + result = Status(StatusCode::INTERNAL, msg->InitializationErrorString()); + } + if (!decoder.ConsumedEntireMessage()) { + result = Status(StatusCode::INTERNAL, "Did not read entire message"); + } } - return Status::OK; + grpc_byte_buffer_destroy(buffer); + return result; } } // namespace grpc diff --git a/src/cpp/common/core_codegen.h b/src/cpp/common/core_codegen.h new file mode 100644 index 0000000000..0d8c6b79f7 --- /dev/null +++ b/src/cpp/common/core_codegen.h @@ -0,0 +1,71 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +// This file should be compiled as part of grpc++. + +#include <grpc++/impl/codegen/core_codegen_interface.h> +#include <grpc/impl/codegen/grpc_types.h> +#include <grpc/byte_buffer.h> + +namespace grpc { + +/// Implementation of the core codegen interface. +class CoreCodegen : public CoreCodegenInterface { + private: + Status SerializeProto(const grpc::protobuf::Message& msg, + grpc_byte_buffer** bp) override; + + Status DeserializeProto(grpc_byte_buffer* buffer, + grpc::protobuf::Message* msg, + int max_message_size) override; + + grpc_completion_queue* grpc_completion_queue_create(void* reserved) override; + void grpc_completion_queue_destroy(grpc_completion_queue* cq) override; + grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag, + gpr_timespec deadline, + void* reserved) override; + + void* gpr_malloc(size_t size) override; + void gpr_free(void* p) override; + + void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override; + + void grpc_metadata_array_init(grpc_metadata_array* array) override; + void grpc_metadata_array_destroy(grpc_metadata_array* array) override; + + gpr_timespec gpr_inf_future(gpr_clock_type type) override; + + void assert_fail(const char* failed_assertion) override; +}; + +} // namespace grpc diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index e205a1969b..5d12ce2ecf 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -43,6 +43,7 @@ #include <grpc/support/log.h> #include "src/core/channel/compress_filter.h" +#include "src/core/surface/call.h" #include "src/cpp/common/create_auth_context.h" namespace grpc { @@ -62,7 +63,11 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE; bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; - bool CheckCancelled(CompletionQueue* cq); + bool CheckCancelled(CompletionQueue* cq) { + cq->TryPluck(this); + return CheckCancelledNoPluck(); + } + bool CheckCancelledAsync() { return CheckCancelledNoPluck(); } void set_tag(void* tag) { has_tag_ = true; @@ -72,6 +77,11 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { void Unref(); private: + bool CheckCancelledNoPluck() { + grpc::lock_guard<grpc::mutex> g(mu_); + return finalized_ ? (cancelled_ != 0) : false; + } + bool has_tag_; void* tag_; grpc::mutex mu_; @@ -88,12 +98,6 @@ void ServerContext::CompletionOp::Unref() { } } -bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) { - cq->TryPluck(this); - grpc::lock_guard<grpc::mutex> g(mu_); - return finalized_ ? cancelled_ != 0 : false; -} - void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) { ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER; ops->data.recv_close_on_server.cancelled = &cancelled_; @@ -182,12 +186,19 @@ void ServerContext::TryCancel() const { } bool ServerContext::IsCancelled() const { - return completion_op_ && completion_op_->CheckCancelled(cq_); + if (has_notify_when_done_tag_) { + // when using async API, but the result is only valid + // if the tag has already been delivered at the completion queue + return completion_op_ && completion_op_->CheckCancelledAsync(); + } else { + // when using sync API + return completion_op_ && completion_op_->CheckCancelled(cq_); + } } void ServerContext::set_compression_level(grpc_compression_level level) { const grpc_compression_algorithm algorithm_for_level = - grpc_compression_algorithm_for_level(level); + grpc_call_compression_for_level(call_, level); set_compression_algorithm(algorithm_for_level); } diff --git a/src/cpp/util/string_ref.cc b/src/cpp/util/string_ref.cc index 66c79a1818..b55019b5f2 100644 --- a/src/cpp/util/string_ref.cc +++ b/src/cpp/util/string_ref.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -33,72 +33,8 @@ #include <grpc++/support/string_ref.h> -#include <string.h> - -#include <algorithm> -#include <iostream> - namespace grpc { const size_t string_ref::npos = size_t(-1); -string_ref& string_ref::operator=(const string_ref& rhs) { - data_ = rhs.data_; - length_ = rhs.length_; - return *this; -} - -string_ref::string_ref(const char* s) : data_(s), length_(strlen(s)) {} - -string_ref string_ref::substr(size_t pos, size_t n) const { - if (pos > length_) pos = length_; - if (n > (length_ - pos)) n = length_ - pos; - return string_ref(data_ + pos, n); -} - -int string_ref::compare(string_ref x) const { - size_t min_size = length_ < x.length_ ? length_ : x.length_; - int r = memcmp(data_, x.data_, min_size); - if (r < 0) return -1; - if (r > 0) return 1; - if (length_ < x.length_) return -1; - if (length_ > x.length_) return 1; - return 0; -} - -bool string_ref::starts_with(string_ref x) const { - return length_ >= x.length_ && (memcmp(data_, x.data_, x.length_) == 0); -} - -bool string_ref::ends_with(string_ref x) const { - return length_ >= x.length_ && - (memcmp(data_ + (length_ - x.length_), x.data_, x.length_) == 0); -} - -size_t string_ref::find(string_ref s) const { - auto it = std::search(cbegin(), cend(), s.cbegin(), s.cend()); - return it == cend() ? npos : std::distance(cbegin(), it); -} - -size_t string_ref::find(char c) const { - auto it = std::find(cbegin(), cend(), c); - return it == cend() ? npos : std::distance(cbegin(), it); -} - -bool operator==(string_ref x, string_ref y) { return x.compare(y) == 0; } - -bool operator!=(string_ref x, string_ref y) { return x.compare(y) != 0; } - -bool operator<(string_ref x, string_ref y) { return x.compare(y) < 0; } - -bool operator<=(string_ref x, string_ref y) { return x.compare(y) <= 0; } - -bool operator>(string_ref x, string_ref y) { return x.compare(y) > 0; } - -bool operator>=(string_ref x, string_ref y) { return x.compare(y) >= 0; } - -std::ostream& operator<<(std::ostream& out, const string_ref& string) { - return out << grpc::string(string.begin(), string.end()); -} - } // namespace grpc diff --git a/src/csharp/Grpc.Core/Internal/NativeExtension.cs b/src/csharp/Grpc.Core/Internal/NativeExtension.cs index 4c742ab6c3..282816d51e 100644 --- a/src/csharp/Grpc.Core/Internal/NativeExtension.cs +++ b/src/csharp/Grpc.Core/Internal/NativeExtension.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Globalization; using System.IO; using System.Reflection; @@ -99,14 +100,30 @@ namespace Grpc.Core.Internal // TODO: allow customizing path to native extension (possibly through exposing a GrpcEnvironment property). var libraryFlavor = string.Format("{0}_{1}", GetPlatformString(), GetArchitectureString()); - var fullPath = Path.Combine(GetExecutingAssemblyDirectory(), + var fullPath = Path.Combine(Path.GetDirectoryName(GetAssemblyPath()), NativeLibrariesDir, libraryFlavor, GetNativeLibraryFilename()); return new UnmanagedLibrary(fullPath); } - private static string GetExecutingAssemblyDirectory() + private static string GetAssemblyPath() { - return Path.GetDirectoryName(typeof(NativeExtension).GetTypeInfo().Assembly.Location); + var assembly = typeof(NativeExtension).GetTypeInfo().Assembly; + + // If assembly is shadowed (e.g. in a webapp), EscapedCodeBase is pointing + // to the original location of the assembly, and Location is pointing + // to the shadow copy. We care about the original location because + // the native dlls don't get shadowed. + var escapedCodeBase = assembly.EscapedCodeBase; + if (IsFileUri(escapedCodeBase)) + { + return new Uri(escapedCodeBase).LocalPath; + } + return assembly.Location; + } + + private static bool IsFileUri(string uri) + { + return uri.ToLowerInvariant().StartsWith(Uri.UriSchemeFile); } private static string GetPlatformString() diff --git a/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs b/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs index 5c5b802164..d41b1b9f26 100644 --- a/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs +++ b/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs @@ -1,6 +1,6 @@ #region Copyright notice and license -// Copyright 2015, Google Inc. +// Copyright 2015-2016, Google Inc. // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -33,12 +33,16 @@ using System; using System.Collections.Generic; +using System.Globalization; namespace Grpc.Core.Logging { /// <summary>Logger that logs to System.Console.</summary> public class ConsoleLogger : ILogger { + // Format similar enough to C core log format except nanosecond precision is not supported. + const string DateTimeFormatString = "MMdd HH:mm:ss.ffffff"; + readonly Type forType; readonly string forTypeString; @@ -142,7 +146,7 @@ namespace Grpc.Core.Logging { Console.Error.WriteLine("{0}{1} {2}{3}", severityString, - DateTime.Now, + DateTime.Now.ToString(DateTimeFormatString, CultureInfo.InvariantCulture), forTypeString, message); } diff --git a/src/csharp/Grpc.Core/Metadata.cs b/src/csharp/Grpc.Core/Metadata.cs index aa22f840d6..52cef96f40 100644 --- a/src/csharp/Grpc.Core/Metadata.cs +++ b/src/csharp/Grpc.Core/Metadata.cs @@ -323,7 +323,7 @@ namespace Grpc.Core private static string NormalizeKey(string key) { - var normalized = GrpcPreconditions.CheckNotNull(key, "key").ToLower(CultureInfo.InvariantCulture); + var normalized = GrpcPreconditions.CheckNotNull(key, "key").ToLowerInvariant(); GrpcPreconditions.CheckArgument(ValidKeyRegex.IsMatch(normalized), "Metadata entry key not valid. Keys can only contain lowercase alphanumeric characters, underscores and hyphens."); return normalized; diff --git a/src/csharp/README.md b/src/csharp/README.md index b4fa945ac9..201c5ab0b5 100644 --- a/src/csharp/README.md +++ b/src/csharp/README.md @@ -55,16 +55,11 @@ If you are a user of gRPC C#, go to Usage section above. **Windows** -- The grpc_csharp_ext native library needs to be built so you can build the gRPC C# solution. You can - either build the native solution in `vsprojects/grpc_csharp_ext.sln` from Visual Studio manually, or you can use - a convenience batch script that builds everything for you. +- The grpc_csharp_ext native library needs to be built so you can build the gRPC C# solution. Open the + solution `vsprojects/grpc_csharp_ext.sln` in Visual Studio and build it. - ``` - > REM From src/csharp directory - > buildall.bat - ``` - -- Open Grpc.sln using Visual Studio. +- Open `src\csharp\Grpc.sln` (path is relative to gRPC repository root) + using Visual Studio **Linux** @@ -79,7 +74,7 @@ If you are a user of gRPC C#, go to Usage section above. **Mac OS X** - The grpc_csharp_ext native library needs to be built so you can build the gRPC C# solution. - + ```sh # from the gRPC repository root $ tools/run_tests/run_tests.py -c dbg -l csharp --build_only diff --git a/src/node/src/client.js b/src/node/src/client.js index 9acf51bd98..2459e28321 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -149,6 +149,9 @@ function _readsDone(status) { if (!status) { status = {code: grpc.status.OK, details: 'OK'}; } + if (status.code !== grpc.status.OK) { + this.call.cancelWithStatus(status.code, status.details); + } this.finished = true; this.read_status = status; this._emitStatusIfDone(); @@ -408,7 +411,7 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { } } if (status.code !== grpc.status.OK) { - error = new Error(response.status.details); + error = new Error(status.details); error.code = status.code; error.metadata = status.metadata; callback(error); diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index f79b7d0bc0..2d45818b6e 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -37,6 +37,8 @@ #include <grpc/support/time.h> #import <RxLibrary/GRXConcurrentWriteable.h> +#import "private/GRPCConnectivityMonitor.h" +#import "private/GRPCHost.h" #import "private/GRPCRequestHeaders.h" #import "private/GRPCWrappedCall.h" #import "private/NSData+GRPC.h" @@ -71,8 +73,11 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey"; @implementation GRPCCall { dispatch_queue_t _callQueue; + NSString *_host; + NSString *_path; GRPCWrappedCall *_wrappedCall; dispatch_once_t _callAlreadyInvoked; + GRPCConnectivityMonitor *_connectivityMonitor; // The C gRPC library has less guarantees on the ordering of events than we // do. Particularly, in the face of errors, there's no ordering guarantee at @@ -115,13 +120,11 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey"; format:@"The requests writer can't be already started."]; } if ((self = [super init])) { - _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:host path:path]; - if (!_wrappedCall) { - return nil; - } + _host = [host copy]; + _path = [path copy]; // Serial queue to invoke the non-reentrant methods of the grpc_call object. - _callQueue = dispatch_queue_create("org.grpc.call", NULL); + _callQueue = dispatch_queue_create("io.grpc.call", NULL); _requestWriter = requestWriter; @@ -156,7 +159,7 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey"; - (void)cancel { [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain code:GRPCErrorCodeCancelled - userInfo:nil]]; + userInfo:@{NSLocalizedDescriptionKey: @"Canceled by app"}]]; [self cancelCall]; } @@ -354,8 +357,29 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey"; _retainSelf = self; _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable]; + + _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:_host path:_path]; + NSAssert(_wrappedCall, @"Error allocating RPC objects. Low memory?"); + [self sendHeaders:_requestHeaders]; [self invokeCall]; + // TODO(jcanizales): Extract this logic somewhere common. + NSString *host = [NSURL URLWithString:[@"https://" stringByAppendingString:_host]].host; + if (!host) { + // TODO(jcanizales): Check this on init. + [NSException raise:NSInvalidArgumentException format:@"host of %@ is nil", _host]; + } + __weak typeof(self) weakSelf = self; + _connectivityMonitor = [GRPCConnectivityMonitor monitorWithHost:host]; + [_connectivityMonitor handleLossWithHandler:^{ + typeof(self) strongSelf = weakSelf; + if (strongSelf) { + [strongSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeUnavailable + userInfo:@{NSLocalizedDescriptionKey: @"Connectivity lost."}]]; + [[GRPCHost hostWithAddress:strongSelf->_host] disconnect]; + } + }]; } - (void)setState:(GRXWriterState)newState { @@ -385,4 +409,5 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey"; return; } } + @end diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.h b/src/objective-c/GRPCClient/private/GRPCChannel.h index 8661ae6f97..e49a6aca29 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannel.h +++ b/src/objective-c/GRPCClient/private/GRPCChannel.h @@ -35,6 +35,7 @@ #include <grpc/grpc.h> +@class GRPCCompletionQueue; struct grpc_channel_credentials; @@ -80,4 +81,6 @@ struct grpc_channel_credentials; + (nonnull GRPCChannel *)insecureChannelWithHost:(nonnull NSString *)host channelArgs:(nullable NSDictionary *)channelArgs; +- (nullable grpc_call *)unmanagedCallWithPath:(nonnull NSString *)path + completionQueue:(nonnull GRPCCompletionQueue *)queue; @end diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.m b/src/objective-c/GRPCClient/private/GRPCChannel.m index 7e55a473d7..d7de025e21 100644 --- a/src/objective-c/GRPCClient/private/GRPCChannel.m +++ b/src/objective-c/GRPCClient/private/GRPCChannel.m @@ -38,6 +38,8 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#import "GRPCCompletionQueue.h" + /** * Returns @c grpc_channel_credentials from the specified @c path. If the file at the path could not * be read then NULL is returned. If NULL is returned, @c errorPtr may not be NULL if there are @@ -205,4 +207,16 @@ grpc_channel_args * buildChannelArgs(NSDictionary *dictionary) { channelArgs:channelArgs]; } +- (grpc_call *)unmanagedCallWithPath:(NSString *)path + completionQueue:(GRPCCompletionQueue *)queue { + return grpc_channel_create_call(_unmanagedChannel, + NULL, GRPC_PROPAGATE_DEFAULTS, + queue.unmanagedQueue, + path.UTF8String, + // Get "host" from "host:port" + // TODO(jcanizales): Use NSURLs throughout, to clarify these. + [_host componentsSeparatedByString:@":"][0].UTF8String, + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); +} + @end diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h index 7b66cd4c32..a52095dd01 100644 --- a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h +++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h @@ -36,6 +36,8 @@ typedef void(^GRPCQueueCompletionHandler)(bool success); +extern const int64_t kGRPCCompletionQueueDefaultTimeoutSecs; + /** * This class lets one more easily use |grpc_completion_queue|. To use it, pass the value of the * |unmanagedQueue| property of an instance of this class to |grpc_channel_create_call|. Then for @@ -49,6 +51,11 @@ typedef void(^GRPCQueueCompletionHandler)(bool success); */ @interface GRPCCompletionQueue : NSObject @property(nonatomic, readonly) grpc_completion_queue *unmanagedQueue; +@property(nonatomic, readonly) int64_t timeoutSecs; + (instancetype)completionQueue; + +- (instancetype)init; +- (instancetype)initWithTimeout:(int64_t)timeoutSecs NS_DESIGNATED_INITIALIZER; + @end diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m index ff3031678c..be214d4d36 100644 --- a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m +++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m @@ -35,15 +35,28 @@ #import <grpc/grpc.h> + +const int64_t kGRPCCompletionQueueDefaultTimeoutSecs = 60; + @implementation GRPCCompletionQueue + (instancetype)completionQueue { - return [[self alloc] init]; + static GRPCCompletionQueue *singleton = nil; + static dispatch_once_t onceToken; + dispatch_once(&onceToken, ^{ + singleton = [[self alloc] init]; + }); + return singleton; } - (instancetype)init { + return [self initWithTimeout:kGRPCCompletionQueueDefaultTimeoutSecs]; +} + +- (instancetype)initWithTimeout:(int64_t)timeoutSecs { if ((self = [super init])) { _unmanagedQueue = grpc_completion_queue_create(NULL); + _timeoutSecs = timeoutSecs; // This is for the following block to capture the pointer by value (instead // of retaining self and doing self->_unmanagedQueue). This is essential @@ -61,22 +74,28 @@ gDefaultConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); }); dispatch_async(gDefaultConcurrentQueue, ^{ + // Using a non-infinite deadline to re-enter grpc_completion_queue_next() + // alleviates https://github.com/grpc/grpc/issues/5593 + gpr_timespec deadline = (timeoutSecs < 0) + ? gpr_inf_future(GPR_CLOCK_REALTIME) + : gpr_time_from_seconds(timeoutSecs, GPR_CLOCK_REALTIME); while (YES) { - // The following call blocks until an event is available. - grpc_event event = grpc_completion_queue_next(unmanagedQueue, - gpr_inf_future(GPR_CLOCK_REALTIME), - NULL); + // The following call blocks until an event is available or the deadline elapses. + grpc_event event = grpc_completion_queue_next(unmanagedQueue, deadline, NULL); GRPCQueueCompletionHandler handler; switch (event.type) { case GRPC_OP_COMPLETE: handler = (__bridge_transfer GRPCQueueCompletionHandler)event.tag; handler(event.success); break; + case GRPC_QUEUE_TIMEOUT: + // Nothing to do here + break; case GRPC_QUEUE_SHUTDOWN: grpc_completion_queue_destroy(unmanagedQueue); return; default: - [NSException raise:@"Unrecognized completion type" format:@""]; + [NSException raise:@"Unrecognized completion type" format:@"type=%d", event.type]; } }; }); diff --git a/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h new file mode 100644 index 0000000000..2fae410331 --- /dev/null +++ b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.h @@ -0,0 +1,77 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#import <Foundation/Foundation.h> +#import <SystemConfiguration/SystemConfiguration.h> + +@interface GRPCReachabilityFlags : NSObject + ++ (nonnull instancetype)flagsWithFlags:(SCNetworkReachabilityFlags)flags; + +/** + * One accessor method to query each of the different flags. Example: + +@property(nonatomic, readonly) BOOL isCell; + + */ +#define GRPC_XMACRO_ITEM(methodName, FlagName) \ +@property(nonatomic, readonly) BOOL methodName; + +#include "GRPCReachabilityFlagNames.xmacro.h" +#undef GRPC_XMACRO_ITEM + +@property(nonatomic, readonly) BOOL isHostReachable; +@end + + +@interface GRPCConnectivityMonitor : NSObject + ++ (nullable instancetype)monitorWithHost:(nonnull NSString *)hostName; + +- (nonnull instancetype)init NS_UNAVAILABLE; + +/** + * Queue on which callbacks will be dispatched. Default is the main queue. Set it before calling + * handleLossWithHandler:. + */ +// TODO(jcanizales): Default to a serial background queue instead. +@property(nonatomic, strong, null_resettable) dispatch_queue_t queue; + +/** + * Calls handler every time the connectivity to this instance's host is lost. If this instance is + * released before that happens, the handler won't be called. + * Only one handler is active at a time, so if this method is called again before the previous + * handler has been called, it might never be called at all (or yes, if it has already been queued). + */ +- (void)handleLossWithHandler:(nonnull void (^)())handler; +@end diff --git a/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m new file mode 100644 index 0000000000..b4061bd5ef --- /dev/null +++ b/src/objective-c/GRPCClient/private/GRPCConnectivityMonitor.m @@ -0,0 +1,192 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#import "GRPCConnectivityMonitor.h" + +#pragma mark Flags + +@implementation GRPCReachabilityFlags { + SCNetworkReachabilityFlags _flags; +} + ++ (instancetype)flagsWithFlags:(SCNetworkReachabilityFlags)flags { + return [[self alloc] initWithFlags:flags]; +} + +- (instancetype)initWithFlags:(SCNetworkReachabilityFlags)flags { + if ((self = [super init])) { + _flags = flags; + } + return self; +} + +/* + * One accessor method implementation per flag. Example: + +- (BOOL)isCell { \ + return !!(_flags & kSCNetworkReachabilityFlagsIsWWAN); \ +} + + */ +#define GRPC_XMACRO_ITEM(methodName, FlagName) \ +- (BOOL)methodName { \ + return !!(_flags & kSCNetworkReachabilityFlags ## FlagName); \ +} +#include "GRPCReachabilityFlagNames.xmacro.h" +#undef GRPC_XMACRO_ITEM + +- (BOOL)isHostReachable { + // Note: connectionOnDemand means it'll be reachable only if using the CFSocketStream API or APIs + // on top of it. + // connectionRequired means we can't tell until a connection is attempted (e.g. for VPN on + // demand). + return self.reachable && !self.interventionRequired && !self.connectionOnDemand; +} + +- (NSString *)description { + NSMutableArray *activeOptions = [NSMutableArray arrayWithCapacity:9]; + + /* + * For each flag, add its name to the array if it's ON. Example: + + if (self.isCell) { + [activeOptions addObject:@"isCell"]; + } + + */ +#define GRPC_XMACRO_ITEM(methodName, FlagName) \ + if (self.methodName) { \ + [activeOptions addObject:@#methodName]; \ + } +#include "GRPCReachabilityFlagNames.xmacro.h" +#undef GRPC_XMACRO_ITEM + + return activeOptions.count == 0 ? @"(none)" : [activeOptions componentsJoinedByString:@", "]; +} + +- (BOOL)isEqual:(id)object { + return [object isKindOfClass:[GRPCReachabilityFlags class]] && + _flags == ((GRPCReachabilityFlags *)object)->_flags; +} + +- (NSUInteger)hash { + return _flags; +} +@end + +#pragma mark Connectivity Monitor + +// Assumes the third argument is a block that accepts a GRPCReachabilityFlags object, and passes the +// received ones to it. +static void PassFlagsToContextInfoBlock(SCNetworkReachabilityRef target, + SCNetworkReachabilityFlags flags, + void *info) { + #pragma unused (target) + // This can be called many times with the same info. The info is retained by SCNetworkReachability + // while this function is being executed. + void (^handler)(GRPCReachabilityFlags *) = (__bridge void (^)(GRPCReachabilityFlags *))info; + handler([[GRPCReachabilityFlags alloc] initWithFlags:flags]); +} + +@implementation GRPCConnectivityMonitor { + SCNetworkReachabilityRef _reachabilityRef; +} + +- (nullable instancetype)initWithReachability:(nullable SCNetworkReachabilityRef)reachability { + if (!reachability) { + return nil; + } + if ((self = [super init])) { + _reachabilityRef = CFRetain(reachability); + _queue = dispatch_get_main_queue(); + } + return self; +} + ++ (nullable instancetype)monitorWithHost:(nonnull NSString *)host { + const char *hostName = host.UTF8String; + if (!hostName) { + [NSException raise:NSInvalidArgumentException + format:@"host.UTF8String returns NULL for %@", host]; + } + SCNetworkReachabilityRef reachability = + SCNetworkReachabilityCreateWithName(NULL, hostName); + + GRPCConnectivityMonitor *returnValue = [[self alloc] initWithReachability:reachability]; + if (reachability) { + CFRelease(reachability); + } + return returnValue; +} + +- (void)handleLossWithHandler:(void (^)())handler { + [self startListeningWithHandler:^(GRPCReachabilityFlags *flags) { + if (!flags.isHostReachable) { + handler(); + } + }]; +} + +- (void)startListeningWithHandler:(void (^)(GRPCReachabilityFlags *))handler { + // Copy to ensure the handler block is in the heap (and so can't be deallocated when this method + // returns). + void (^copiedHandler)(GRPCReachabilityFlags *) = [handler copy]; + SCNetworkReachabilityContext context = { + .version = 0, + .info = (__bridge void *)copiedHandler, + .retain = CFRetain, + .release = CFRelease, + }; + // The following will retain context.info, and release it when the callback is set to NULL. + SCNetworkReachabilitySetCallback(_reachabilityRef, PassFlagsToContextInfoBlock, &context); + SCNetworkReachabilitySetDispatchQueue(_reachabilityRef, _queue); +} + +- (void)stopListening { + // This releases the block on context.info. + SCNetworkReachabilitySetCallback(_reachabilityRef, NULL, NULL); + SCNetworkReachabilitySetDispatchQueue(_reachabilityRef, NULL); +} + +- (void)setQueue:(dispatch_queue_t)queue { + _queue = queue ?: dispatch_get_main_queue(); +} + +- (void)dealloc { + if (_reachabilityRef) { + [self stopListening]; + CFRelease(_reachabilityRef); + } +} + +@end diff --git a/src/objective-c/GRPCClient/private/GRPCHost.h b/src/objective-c/GRPCClient/private/GRPCHost.h index 82c0ad6cf6..987d3e9f59 100644 --- a/src/objective-c/GRPCClient/private/GRPCHost.h +++ b/src/objective-c/GRPCClient/private/GRPCHost.h @@ -33,27 +33,39 @@ #import <Foundation/Foundation.h> +NS_ASSUME_NONNULL_BEGIN + @class GRPCCompletionQueue; struct grpc_call; @interface GRPCHost : NSObject @property(nonatomic, readonly) NSString *address; -@property(nonatomic, copy) NSString *userAgentPrefix; +@property(nonatomic, copy, nullable) NSString *userAgentPrefix; /** The following properties should only be modified for testing: */ @property(nonatomic, getter=isSecure) BOOL secure; -@property(nonatomic, copy) NSString *pathToCertificates; -@property(nonatomic, copy) NSString *hostNameOverride; +@property(nonatomic, copy, nullable) NSString *pathToCertificates; +@property(nonatomic, copy, nullable) NSString *hostNameOverride; +- (nullable instancetype)init NS_UNAVAILABLE; /** Host objects initialized with the same address are the same. */ -+ (instancetype)hostWithAddress:(NSString *)address; -- (instancetype)initWithAddress:(NSString *)address NS_DESIGNATED_INITIALIZER; ++ (nullable instancetype)hostWithAddress:(NSString *)address; +- (nullable instancetype)initWithAddress:(NSString *)address NS_DESIGNATED_INITIALIZER; /** Create a grpc_call object to the provided path on this host. */ -- (struct grpc_call *)unmanagedCallWithPath:(NSString *)path - completionQueue:(GRPCCompletionQueue *)queue; +- (nullable struct grpc_call *)unmanagedCallWithPath:(NSString *)path + completionQueue:(GRPCCompletionQueue *)queue; +// TODO: There's a race when a new RPC is coming through just as an existing one is getting +// notified that there's no connectivity. If connectivity comes back at that moment, the new RPC +// will have its channel destroyed by the other RPC, and will never get notified of a problem, so +// it'll hang (the C layer logs a timeout, with exponential back off). One solution could be to pass +// the GRPCChannel to the GRPCCall, renaming this as "disconnectChannel:channel", which would only +// act on that specific channel. +- (void)disconnect; @end + +NS_ASSUME_NONNULL_END diff --git a/src/objective-c/GRPCClient/private/GRPCHost.m b/src/objective-c/GRPCClient/private/GRPCHost.m index eb1db899b7..508cb20644 100644 --- a/src/objective-c/GRPCClient/private/GRPCHost.m +++ b/src/objective-c/GRPCClient/private/GRPCHost.m @@ -34,33 +34,30 @@ #import "GRPCHost.h" #include <grpc/grpc.h> +#import <GRPCClient/GRPCCall.h> #import <GRPCClient/GRPCCall+ChannelArg.h> #import "GRPCChannel.h" #import "GRPCCompletionQueue.h" #import "NSDictionary+GRPC.h" +NS_ASSUME_NONNULL_BEGIN + // TODO(jcanizales): Generate the version in a standalone header, from templates. Like // templates/src/core/surface/version.c.template . #define GRPC_OBJC_VERSION_STRING @"0.13.0" -@interface GRPCHost () -// TODO(mlumish): Investigate whether caching channels with strong links is a good idea. -@property(nonatomic, strong) GRPCChannel *channel; -@end - -@implementation GRPCHost - -+ (instancetype)hostWithAddress:(NSString *)address { - return [[self alloc] initWithAddress:address]; +@implementation GRPCHost { + // TODO(mlumish): Investigate whether caching channels with strong links is a good idea. + GRPCChannel *_channel; } -- (instancetype)init { - return [self initWithAddress:nil]; ++ (nullable instancetype)hostWithAddress:(NSString *)address { + return [[self alloc] initWithAddress:address]; } // Default initializer. -- (instancetype)initWithAddress:(NSString *)address { +- (nullable instancetype)initWithAddress:(NSString *)address { if (!address) { return nil; } @@ -95,46 +92,45 @@ return self; } -- (grpc_call *)unmanagedCallWithPath:(NSString *)path completionQueue:(GRPCCompletionQueue *)queue { - if (!queue || !path || !self.channel) { - return NULL; +- (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path + completionQueue:(GRPCCompletionQueue *)queue { + GRPCChannel *channel; + // This is racing -[GRPCHost disconnect]. + @synchronized(self) { + if (!_channel) { + _channel = [self newChannel]; + } + channel = _channel; } - return grpc_channel_create_call(self.channel.unmanagedChannel, - NULL, GRPC_PROPAGATE_DEFAULTS, - queue.unmanagedQueue, - path.UTF8String, - self.hostName.UTF8String, - gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + return [channel unmanagedCallWithPath:path completionQueue:queue]; } -- (GRPCChannel *)channel { - // Create it lazily, because we don't want to open a connection just because someone is - // configuring a host. +- (NSDictionary *)channelArgs { + NSMutableDictionary *args = [NSMutableDictionary dictionary]; - if (!_channel) { - NSMutableDictionary *args = [NSMutableDictionary dictionary]; + // TODO(jcanizales): Add OS and device information (see + // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#user-agents ). + NSString *userAgent = @"grpc-objc/" GRPC_OBJC_VERSION_STRING; + if (_userAgentPrefix) { + userAgent = [_userAgentPrefix stringByAppendingFormat:@" %@", userAgent]; + } + args[@GRPC_ARG_PRIMARY_USER_AGENT_STRING] = userAgent; - // TODO(jcanizales): Add OS and device information (see - // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#user-agents ). - NSString *userAgent = @"grpc-objc/" GRPC_OBJC_VERSION_STRING; - if (_userAgentPrefix) { - userAgent = [@[_userAgentPrefix, userAgent] componentsJoinedByString:@" "]; - } - args[@GRPC_ARG_PRIMARY_USER_AGENT_STRING] = userAgent; - - if (_secure) { - if (_hostNameOverride) { - args[@GRPC_SSL_TARGET_NAME_OVERRIDE_ARG] = _hostNameOverride; - } - - _channel = [GRPCChannel secureChannelWithHost:_address - pathToCertificates:_pathToCertificates - channelArgs:args]; - } else { - _channel = [GRPCChannel insecureChannelWithHost:_address channelArgs:args]; - } + if (_secure && _hostNameOverride) { + args[@GRPC_SSL_TARGET_NAME_OVERRIDE_ARG] = _hostNameOverride; + } + return args; +} + +- (GRPCChannel *)newChannel { + NSDictionary *args = [self channelArgs]; + if (_secure) { + return [GRPCChannel secureChannelWithHost:_address + pathToCertificates:_pathToCertificates + channelArgs:args]; + } else { + return [GRPCChannel insecureChannelWithHost:_address channelArgs:args]; } - return _channel; } - (NSString *)hostName { @@ -142,7 +138,16 @@ return _hostNameOverride ?: _address; } +- (void)disconnect { + // This is racing -[GRPCHost unmanagedCallWithPath:completionQueue:]. + @synchronized(self) { + _channel = nil; + } +} + // TODO(jcanizales): Don't let set |secure| to |NO| if |pathToCertificates| or |hostNameOverride| // have been set. Don't let set either of the latter if |secure| has been set to |NO|. @end + +NS_ASSUME_NONNULL_END diff --git a/src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h b/src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h new file mode 100644 index 0000000000..02871d5d02 --- /dev/null +++ b/src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h @@ -0,0 +1,65 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/** + * "X-macro" file that lists the flags names of Apple's Network Reachability API, along with a nice + * Objective-C method name used to query each of them. + * + * Example usage: To generate a dictionary from flag value to name, one can do: + + NSDictionary *flagNames = @{ +#define GRPC_XMACRO_ITEM(methodName, FlagName) \ + @(kSCNetworkReachabilityFlags ## FlagName): @#methodName, +#include "GRXReachabilityFlagNames.xmacro.h" +#undef GRPC_XMACRO_ITEM + }; + + XCTAssertEqualObjects(flagNames[@(kSCNetworkReachabilityFlagsIsWWAN)], @"isCell"); + + */ + +#ifndef GRPC_XMACRO_ITEM +#error This file is to be used with the "X-macro" pattern: Please #define \ + GRPC_XMACRO_ITEM(methodName, FlagName), then #include this file, and then #undef \ + GRPC_XMACRO_ITEM. +#endif + +GRPC_XMACRO_ITEM(isCell, IsWWAN) +GRPC_XMACRO_ITEM(reachable, Reachable) +GRPC_XMACRO_ITEM(transientConnection, TransientConnection) +GRPC_XMACRO_ITEM(connectionRequired, ConnectionRequired) +GRPC_XMACRO_ITEM(connectionOnTraffic, ConnectionOnTraffic) +GRPC_XMACRO_ITEM(interventionRequired, InterventionRequired) +GRPC_XMACRO_ITEM(connectionOnDemand, ConnectionOnDemand) +GRPC_XMACRO_ITEM(isLocalAddress, IsLocalAddress) +GRPC_XMACRO_ITEM(isDirect, IsDirect) diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h index 71e7e0e54e..e37ed1b59f 100644 --- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h +++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h @@ -34,7 +34,6 @@ #import <Foundation/Foundation.h> #include <grpc/grpc.h> -#import "GRPCChannel.h" #import "GRPCRequestHeaders.h" @interface GRPCOperation : NSObject @@ -94,4 +93,5 @@ - (void)startBatchWithOperations:(NSArray *)ops; - (void)cancel; + @end diff --git a/src/objective-c/RxLibrary/GRXWriteable.m b/src/objective-c/RxLibrary/GRXWriteable.m index 2729d62b72..028ba9b551 100644 --- a/src/objective-c/RxLibrary/GRXWriteable.m +++ b/src/objective-c/RxLibrary/GRXWriteable.m @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -42,11 +42,42 @@ if (!handler) { return [[self alloc] init]; } - return [[self alloc] initWithValueHandler:^(id value) { - handler(value, nil); - } completionHandler:^(NSError *errorOrNil) { - if (errorOrNil) { - handler(nil, errorOrNil); + // We nilify this variable when the block is invoked, so that handler is only invoked once even if + // the writer tries to write multiple values. + __block GRXEventHandler eventHandler = ^(BOOL done, id value, NSError *error) { + // Nillify eventHandler before invoking handler, in case the latter causes the former to be + // executed recursively. Because blocks can be deallocated even during execution, we have to + // first retain handler locally to guarantee it's valid. + // TODO(jcanizales): Just turn this craziness into a simple subclass of GRXWriteable. + GRXSingleHandler singleHandler = handler; + eventHandler = nil; + + if (value) { + singleHandler(value, nil); + } else if (error) { + singleHandler(nil, error); + } else { + NSDictionary *userInfo = @{ + NSLocalizedDescriptionKey: @"The writer finished without producing any value." + }; + // Even though RxLibrary is independent of gRPC, the domain and code here are, for the moment, + // set to the values of kGRPCErrorDomain and GRPCErrorCodeInternal. This way, the error formed + // is the one user of gRPC would expect if the server failed to produce a response. + // + // TODO(jcanizales): Figure out a way to keep errors of RxLibrary generic without making users + // of gRPC take care of two different error domains and error code enums. A possibility is to + // add error handling to GRXWriters or GRXWriteables, and use them to translate errors between + // the two domains. + static NSString *kGRPCErrorDomain = @"io.grpc"; + static NSUInteger kGRPCErrorCodeInternal = 13; + singleHandler(nil, [NSError errorWithDomain:kGRPCErrorDomain + code:kGRPCErrorCodeInternal + userInfo:userInfo]); + } + }; + return [self writeableWithEventHandler:^(BOOL done, id value, NSError *error) { + if (eventHandler) { + eventHandler(done, value, error); } }]; } diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m index 624958f4b9..7dd6873c80 100644 --- a/src/objective-c/tests/GRPCClientTests.m +++ b/src/objective-c/tests/GRPCClientTests.m @@ -273,10 +273,12 @@ static ProtoMethod *kUnaryCallMethod; id<GRXWriteable> responsesWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) { XCTAssertNotNil(value, @"nil value received as response."); XCTAssertEqual([value length], 0, @"Non-empty response received: %@", value); + /* This test needs to be more clever in regards to changing the version of the core. XCTAssertEqualObjects(call.responseHeaders[@"x-grpc-test-echo-useragent"], @"Foo grpc-objc/0.13.0 grpc-c/0.14.0-dev (ios)", @"Did not receive expected user agent %@", call.responseHeaders[@"x-grpc-test-echo-useragent"]); + */ [response fulfill]; } completionHandler:^(NSError *errorOrNil) { XCTAssertNil(errorOrNil, @"Finished with unexpected error: %@", errorOrNil); diff --git a/src/objective-c/tests/RxLibraryUnitTests.m b/src/objective-c/tests/RxLibraryUnitTests.m index d342662814..ae9465f58c 100644 --- a/src/objective-c/tests/RxLibraryUnitTests.m +++ b/src/objective-c/tests/RxLibraryUnitTests.m @@ -64,6 +64,8 @@ } @end +// TODO(jcanizales): Split into one file per tested class. + @interface RxLibraryUnitTests : XCTestCase @end @@ -79,6 +81,7 @@ // If: id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block]; [writeable writeValue:anyValue]; + [writeable writesFinishedWithError:nil]; // Then: XCTAssertEqual(handler.timesCalled, 1); @@ -101,6 +104,54 @@ XCTAssertEqualObjects(handler.errorOrNil, anyError); } +- (void)testWriteableSingleHandlerIsCalledOnlyOnce_ValueThenError { + // Given: + CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler]; + id anyValue = @7; + NSError *anyError = [NSError errorWithDomain:@"domain" code:7 userInfo:nil]; + + // If: + id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block]; + [writeable writeValue:anyValue]; + [writeable writesFinishedWithError:anyError]; + + // Then: + XCTAssertEqual(handler.timesCalled, 1); + XCTAssertEqualObjects(handler.value, anyValue); + XCTAssertEqualObjects(handler.errorOrNil, nil); +} + +- (void)testWriteableSingleHandlerIsCalledOnlyOnce_ValueThenValue { + // Given: + CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler]; + id anyValue = @7; + + // If: + id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block]; + [writeable writeValue:anyValue]; + [writeable writeValue:anyValue]; + [writeable writesFinishedWithError:nil]; + + // Then: + XCTAssertEqual(handler.timesCalled, 1); + XCTAssertEqualObjects(handler.value, anyValue); + XCTAssertEqualObjects(handler.errorOrNil, nil); +} + +- (void)testWriteableSingleHandlerFailsOnEmptyWriter { + // Given: + CapturingSingleValueHandler *handler = [CapturingSingleValueHandler handler]; + + // If: + id<GRXWriteable> writeable = [GRXWriteable writeableWithSingleHandler:handler.block]; + [writeable writesFinishedWithError:nil]; + + // Then: + XCTAssertEqual(handler.timesCalled, 1); + XCTAssertEqualObjects(handler.value, nil); + XCTAssertNotNil(handler.errorOrNil); +} + #pragma mark BufferedPipe - (void)testBufferedPipePropagatesValue { diff --git a/src/php/composer.json b/src/php/composer.json index 1d41f847ac..01674a25db 100644 --- a/src/php/composer.json +++ b/src/php/composer.json @@ -1,7 +1,9 @@ { "name": "grpc/grpc", + "type": "library", "description": "gRPC library for PHP", - "version": "0.6.0", + "version": "0.14.0", + "keywords": ["rpc"], "homepage": "http://grpc.io", "license": "BSD-3-Clause", "repositories": [ @@ -13,7 +15,7 @@ "require": { "php": ">=5.5.0", "datto/protobuf-php": "dev-master", - "google/auth": "dev-master" + "google/auth": "v0.7" }, "autoload": { "psr-4": { diff --git a/src/php/ext/grpc/README.md b/src/php/ext/grpc/README.md deleted file mode 100644 index 6e1cb2002f..0000000000 --- a/src/php/ext/grpc/README.md +++ /dev/null @@ -1,67 +0,0 @@ -gRPC PHP Extension -================== - -# Requirements - - * PHP 5.5+ - * [gRPC core library](https://github.com/grpc/grpc) 0.11.0 - -# Installation - -## Install PHP 5 - -``` -$ sudo apt-get install git php5 php5-dev php-pear unzip -``` - -## Compile gRPC Core Library - -Clone the gRPC source code repository - -``` -$ git clone https://github.com/grpc/grpc.git -``` - -Build and install the gRPC C core libraries - -```sh -$ cd grpc -$ git checkout --track origin/release-0_11 -$ git pull --recurse-submodules && git submodule update --init --recursive -$ make -$ sudo make install -``` - -Note: you may encounter a warning about the Protobuf compiler `protoc` 3.0.0+ not being installed. The following might help, and will be useful later on when we need to compile the `protoc-gen-php` tool. - -```sh -$ cd grpc/third_party/protobuf -$ sudo make install # 'make' should have been run by core grpc -``` - -## Install the gRPC PHP extension - -Quick install - -```sh -$ sudo pecl install grpc -``` - -Note: before a stable release, you may need to do - -```sh -$ sudo pecl install grpc-beta -``` - -OR - -Compile from source - -```sh -$ # from grpc -$ cd src/php/ext/grpc -$ phpize -$ ./configure -$ make -$ sudo make install -``` diff --git a/src/php/ext/grpc/channel.c b/src/php/ext/grpc/channel.c index f0bc7340ba..b7e7c26c10 100644 --- a/src/php/ext/grpc/channel.c +++ b/src/php/ext/grpc/channel.c @@ -110,9 +110,11 @@ void php_grpc_read_args_array(zval *args_array, grpc_channel_args *args) { switch (Z_TYPE_P(*data)) { case IS_LONG: args->args[args_index].value.integer = (int)Z_LVAL_P(*data); + args->args[args_index].type = GRPC_ARG_INTEGER; break; case IS_STRING: args->args[args_index].value.string = Z_STRVAL_P(*data); + args->args[args_index].type = GRPC_ARG_STRING; break; default: zend_throw_exception(spl_ce_InvalidArgumentException, diff --git a/src/php/tests/generated_code/math_client.php b/src/php/tests/generated_code/math_client.php index 76ccabc068..2085560d19 100644 --- a/src/php/tests/generated_code/math_client.php +++ b/src/php/tests/generated_code/math_client.php @@ -1,7 +1,7 @@ <?php /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -43,7 +43,9 @@ function p($line) $host = 'localhost:50051'; p("Connecting to host: $host"); -$client = new math\MathClient($host, []); +$client = new math\MathClient($host, [ + 'credentials' => Grpc\ChannelCredentials::createInsecure() +]); p('Client class: '.get_class($client)); p(''); diff --git a/src/proto/grpc/testing/echo_messages.proto b/src/proto/grpc/testing/echo_messages.proto index d05a35548d..5ce0a1fd64 100644 --- a/src/proto/grpc/testing/echo_messages.proto +++ b/src/proto/grpc/testing/echo_messages.proto @@ -42,6 +42,7 @@ message RequestParams { bool echo_peer = 7; string expected_client_identity = 8; // will force check_auth_context. bool skip_cancelled_check = 9; + string expected_transport_security_type = 10; } message EchoRequest { diff --git a/src/python/grpcio/README.rst b/src/python/grpcio/README.rst index f3e962c197..33a462b66f 100644 --- a/src/python/grpcio/README.rst +++ b/src/python/grpcio/README.rst @@ -6,7 +6,7 @@ Package for gRPC Python. Installation ------------ -gRPC Python is available for Linux and Mac OS X running Python 2.7. +gRPC Python is available for Linux, Mac OS X, and Windows running Python 2.7. From PyPI ~~~~~~~~~ @@ -23,21 +23,26 @@ Else system wide (on Ubuntu)... $ sudo pip install grpcio +n.b. On Windows and on Mac OS X one *must* have a recent release of :code:`pip` +to retrieve the proper wheel from PyPI. Be sure to upgrade to the latest +version! + From Source ~~~~~~~~~~~ Building from source requires that you have the Python headers (usually a -package named `python-dev`). +package named :code:`python-dev`). :: - $ export REPO_ROOT=grpc + $ export REPO_ROOT=grpc # REPO_ROOT can be any directory of your choice $ git clone https://github.com/grpc/grpc.git $REPO_ROOT $ cd $REPO_ROOT - $ pip install . -Note that `$REPO_ROOT` can be assigned to whatever directory name floats your -fancy. + # For the next two commands do `sudo pip install` if you get permission-denied errors + $ pip install -rrequirements.txt + $ GRPC_PYTHON_BUILD_WITH_CYTHON=1 pip install . + Troubleshooting ~~~~~~~~~~~~~~~ @@ -45,10 +50,43 @@ Troubleshooting Help, I ... * **... see a** :code:`pkg_resources.VersionConflict` **when I try to install - grpc!** + grpc** This is likely because :code:`pip` doesn't own the offending dependency, which in turn is likely because your operating system's package manager owns it. You'll need to force the installation of the dependency: :code:`pip install --ignore-installed $OFFENDING_DEPENDENCY` + + For example, if you get an error like the following: + + :: + + Traceback (most recent call last): + File "<string>", line 17, in <module> + ... + File "/usr/lib/python2.7/dist-packages/pkg_resources.py", line 509, in find + raise VersionConflict(dist, req) + pkg_resources.VersionConflict: (six 1.8.0 (/usr/lib/python2.7/dist-packages), Requirement.parse('six>=1.10')) + + You can fix it by doing: + + :: + + sudo pip install --ignore-installed six + +* **... see the following error on some platforms** + + :: + + /tmp/pip-build-U8pSsr/cython/Cython/Plex/Scanners.c:4:20: fatal error: Python.h: No such file or directory + #include "Python.h" + ^ + compilation terminated. + + You can fix it by installing `python-dev` package. i.e + + :: + + sudo apt-get install python-dev + diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py index 0e7f02a271..1d43547419 100644 --- a/src/python/grpcio/commands.py +++ b/src/python/grpcio/commands.py @@ -119,8 +119,7 @@ class SphinxDocumentation(setuptools.Command): import sphinx import sphinx.apidoc metadata = self.distribution.metadata - src_dir = os.path.join( - PYTHON_STEM, self.distribution.package_dir[''], 'grpc') + src_dir = os.path.join(PYTHON_STEM, 'grpc') sys.path.append(src_dir) sphinx.apidoc.main([ '', '--force', '--full', '-H', metadata.name, '-A', metadata.author, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi index 80f4da51e8..d1b9c98ffc 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -40,14 +40,17 @@ cdef class Call: def start_batch(self, operations, tag): if not self.is_valid: raise ValueError("invalid call object cannot be used from Python") + cdef grpc_call_error result cdef Operations cy_operations = Operations(operations) cdef OperationTag operation_tag = OperationTag(tag) operation_tag.operation_call = self operation_tag.batch_operations = cy_operations cpython.Py_INCREF(operation_tag) - return grpc_call_start_batch( - self.c_call, cy_operations.c_ops, cy_operations.c_nops, - <cpython.PyObject *>operation_tag, NULL) + with nogil: + result = grpc_call_start_batch( + self.c_call, cy_operations.c_ops, cy_operations.c_nops, + <cpython.PyObject *>operation_tag, NULL) + return result def cancel( self, grpc_status_code error_code=GRPC_STATUS__DO_NOT_USE, @@ -57,6 +60,8 @@ cdef class Call: if (details is None) != (error_code == GRPC_STATUS__DO_NOT_USE): raise ValueError("if error_code is specified, so must details " "(and vice-versa)") + cdef grpc_call_error result + cdef char *c_details = NULL if error_code != GRPC_STATUS__DO_NOT_USE: if isinstance(details, bytes): pass @@ -65,25 +70,37 @@ cdef class Call: else: raise TypeError("expected details to be str or bytes") self.references.append(details) - return grpc_call_cancel_with_status( - self.c_call, error_code, details, NULL) + c_details = details + with nogil: + result = grpc_call_cancel_with_status( + self.c_call, error_code, c_details, NULL) + return result else: - return grpc_call_cancel(self.c_call, NULL) + with nogil: + result = grpc_call_cancel(self.c_call, NULL) + return result def set_credentials( self, CallCredentials call_credentials not None): - return grpc_call_set_credentials( - self.c_call, call_credentials.c_credentials) + cdef grpc_call_error result + with nogil: + result = grpc_call_set_credentials( + self.c_call, call_credentials.c_credentials) + return result def peer(self): - cdef char *peer = grpc_call_get_peer(self.c_call) + cdef char *peer = NULL + with nogil: + peer = grpc_call_get_peer(self.c_call) result = <bytes>peer - gpr_free(peer) + with nogil: + gpr_free(peer) return result def __dealloc__(self): if self.c_call != NULL: - grpc_call_destroy(self.c_call) + with nogil: + grpc_call_destroy(self.c_call) # The object *should* always be valid from Python. Used for debugging. @property diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index 1f1833d5ec..d612c90791 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -35,6 +35,7 @@ cdef class Channel: def __cinit__(self, target, ChannelArgs arguments=None, ChannelCredentials channel_credentials=None): cdef grpc_channel_args *c_arguments = NULL + cdef char *c_target = NULL self.c_channel = NULL self.references = [] if arguments is not None: @@ -45,12 +46,15 @@ cdef class Channel: target = target.encode() else: raise TypeError("expected target to be str or bytes") + c_target = target if channel_credentials is None: - self.c_channel = grpc_insecure_channel_create(target, c_arguments, - NULL) + with nogil: + self.c_channel = grpc_insecure_channel_create(c_target, c_arguments, + NULL) else: - self.c_channel = grpc_secure_channel_create( - channel_credentials.c_credentials, target, c_arguments, NULL) + with nogil: + self.c_channel = grpc_secure_channel_create( + channel_credentials.c_credentials, c_target, c_arguments, NULL) self.references.append(channel_credentials) self.references.append(target) self.references.append(arguments) @@ -66,6 +70,7 @@ cdef class Channel: method = method.encode() else: raise TypeError("expected method to be str or bytes") + cdef char *method_c_string = method cdef char *host_c_string = NULL if host is None: pass @@ -81,32 +86,40 @@ cdef class Channel: cdef grpc_call *parent_call = NULL if parent is not None: parent_call = parent.c_call - operation_call.c_call = grpc_channel_create_call( - self.c_channel, parent_call, flags, - queue.c_completion_queue, method, host_c_string, deadline.c_time, - NULL) + with nogil: + operation_call.c_call = grpc_channel_create_call( + self.c_channel, parent_call, flags, + queue.c_completion_queue, method_c_string, host_c_string, + deadline.c_time, NULL) return operation_call def check_connectivity_state(self, bint try_to_connect): - return grpc_channel_check_connectivity_state(self.c_channel, - try_to_connect) + cdef grpc_connectivity_state result + with nogil: + result = grpc_channel_check_connectivity_state(self.c_channel, + try_to_connect) + return result def watch_connectivity_state( self, grpc_connectivity_state last_observed_state, Timespec deadline not None, CompletionQueue queue not None, tag): cdef OperationTag operation_tag = OperationTag(tag) - operation_tag.references = [self, queue] cpython.Py_INCREF(operation_tag) - grpc_channel_watch_connectivity_state( - self.c_channel, last_observed_state, deadline.c_time, - queue.c_completion_queue, <cpython.PyObject *>operation_tag) + with nogil: + grpc_channel_watch_connectivity_state( + self.c_channel, last_observed_state, deadline.c_time, + queue.c_completion_queue, <cpython.PyObject *>operation_tag) def target(self): - cdef char * target = grpc_channel_get_target(self.c_channel) + cdef char *target = NULL + with nogil: + target = grpc_channel_get_target(self.c_channel) result = <bytes>target - gpr_free(target) + with nogil: + gpr_free(target) return result def __dealloc__(self): if self.c_channel != NULL: - grpc_channel_destroy(self.c_channel) + with nogil: + grpc_channel_destroy(self.c_channel) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index c139147114..09e47d4222 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -36,7 +36,8 @@ import time cdef class CompletionQueue: def __cinit__(self): - self.c_completion_queue = grpc_completion_queue_create(NULL) + with nogil: + self.c_completion_queue = grpc_completion_queue_create(NULL) self.is_shutting_down = False self.is_shutdown = False self.pluck_condition = threading.Condition() @@ -82,8 +83,9 @@ cdef class CompletionQueue: def poll(self, Timespec deadline=None): # We name this 'poll' to avoid problems with CPython's expectations for # 'special' methods (like next and __next__). - cdef gpr_timespec c_deadline = gpr_inf_future( - GPR_CLOCK_REALTIME) + cdef gpr_timespec c_deadline + with nogil: + c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) if deadline is not None: c_deadline = deadline.c_time cdef grpc_event event @@ -123,7 +125,8 @@ cdef class CompletionQueue: return self._interpret_event(event) def shutdown(self): - grpc_completion_queue_shutdown(self.c_completion_queue) + with nogil: + grpc_completion_queue_shutdown(self.c_completion_queue) self.is_shutting_down = True def clear(self): @@ -133,14 +136,19 @@ cdef class CompletionQueue: pass def __dealloc__(self): - cdef gpr_timespec c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) + cdef gpr_timespec c_deadline + with nogil: + c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) if self.c_completion_queue != NULL: # Ensure shutdown if not self.is_shutting_down: - grpc_completion_queue_shutdown(self.c_completion_queue) + with nogil: + grpc_completion_queue_shutdown(self.c_completion_queue) # Pump the queue while not self.is_shutdown: - event = grpc_completion_queue_next( - self.c_completion_queue, c_deadline, NULL) + with nogil: + event = grpc_completion_queue_next( + self.c_completion_queue, c_deadline, NULL) self._interpret_event(event) - grpc_completion_queue_destroy(self.c_completion_queue) + with nogil: + grpc_completion_queue_destroy(self.c_completion_queue) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi index 3f439c8900..1d7adca23e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -46,7 +46,8 @@ cdef class ChannelCredentials: def __dealloc__(self): if self.c_credentials != NULL: - grpc_channel_credentials_release(self.c_credentials) + with nogil: + grpc_channel_credentials_release(self.c_credentials) cdef class CallCredentials: @@ -63,7 +64,8 @@ cdef class CallCredentials: def __dealloc__(self): if self.c_credentials != NULL: - grpc_call_credentials_release(self.c_credentials) + with nogil: + grpc_call_credentials_release(self.c_credentials) cdef class ServerCredentials: @@ -74,7 +76,8 @@ cdef class ServerCredentials: def __dealloc__(self): if self.c_credentials != NULL: - grpc_server_credentials_release(self.c_credentials) + with nogil: + grpc_server_credentials_release(self.c_credentials) cdef class CredentialsMetadataPlugin: @@ -139,7 +142,8 @@ cdef void plugin_destroy_c_plugin_state(void *state): def channel_credentials_google_default(): cdef ChannelCredentials credentials = ChannelCredentials(); - credentials.c_credentials = grpc_google_default_credentials_create() + with nogil: + credentials.c_credentials = grpc_google_default_credentials_create() return credentials def channel_credentials_ssl(pem_root_certificates, @@ -158,12 +162,14 @@ def channel_credentials_ssl(pem_root_certificates, c_pem_root_certificates = pem_root_certificates credentials.references.append(pem_root_certificates) if ssl_pem_key_cert_pair is not None: - credentials.c_credentials = grpc_ssl_credentials_create( - c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL) + with nogil: + credentials.c_credentials = grpc_ssl_credentials_create( + c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL) credentials.references.append(ssl_pem_key_cert_pair) else: - credentials.c_credentials = grpc_ssl_credentials_create( - c_pem_root_certificates, NULL, NULL) + with nogil: + credentials.c_credentials = grpc_ssl_credentials_create( + c_pem_root_certificates, NULL, NULL) return credentials def channel_credentials_composite( @@ -172,8 +178,9 @@ def channel_credentials_composite( if not credentials_1.is_valid or not credentials_2.is_valid: raise ValueError("passed credentials must both be valid") cdef ChannelCredentials credentials = ChannelCredentials() - credentials.c_credentials = grpc_composite_channel_credentials_create( - credentials_1.c_credentials, credentials_2.c_credentials, NULL) + with nogil: + credentials.c_credentials = grpc_composite_channel_credentials_create( + credentials_1.c_credentials, credentials_2.c_credentials, NULL) credentials.references.append(credentials_1) credentials.references.append(credentials_2) return credentials @@ -184,16 +191,18 @@ def call_credentials_composite( if not credentials_1.is_valid or not credentials_2.is_valid: raise ValueError("passed credentials must both be valid") cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = grpc_composite_call_credentials_create( - credentials_1.c_credentials, credentials_2.c_credentials, NULL) + with nogil: + credentials.c_credentials = grpc_composite_call_credentials_create( + credentials_1.c_credentials, credentials_2.c_credentials, NULL) credentials.references.append(credentials_1) credentials.references.append(credentials_2) return credentials def call_credentials_google_compute_engine(): cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = ( - grpc_google_compute_engine_credentials_create(NULL)) + with nogil: + credentials.c_credentials = ( + grpc_google_compute_engine_credentials_create(NULL)) return credentials def call_credentials_service_account_jwt_access( @@ -205,9 +214,11 @@ def call_credentials_service_account_jwt_access( else: raise TypeError("expected json_key to be str or bytes") cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = ( - grpc_service_account_jwt_access_credentials_create( - json_key, token_lifetime.c_time, NULL)) + cdef char *json_key_c_string = json_key + with nogil: + credentials.c_credentials = ( + grpc_service_account_jwt_access_credentials_create( + json_key_c_string, token_lifetime.c_time, NULL)) credentials.references.append(json_key) return credentials @@ -219,8 +230,10 @@ def call_credentials_google_refresh_token(json_refresh_token): else: raise TypeError("expected json_refresh_token to be str or bytes") cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = grpc_google_refresh_token_credentials_create( - json_refresh_token, NULL) + cdef char *json_refresh_token_c_string = json_refresh_token + with nogil: + credentials.c_credentials = grpc_google_refresh_token_credentials_create( + json_refresh_token_c_string, NULL) credentials.references.append(json_refresh_token) return credentials @@ -238,17 +251,21 @@ def call_credentials_google_iam(authorization_token, authority_selector): else: raise TypeError("expected authority_selector to be str or bytes") cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = grpc_google_iam_credentials_create( - authorization_token, authority_selector, NULL) + cdef char *authorization_token_c_string = authorization_token + cdef char *authority_selector_c_string = authority_selector + with nogil: + credentials.c_credentials = grpc_google_iam_credentials_create( + authorization_token_c_string, authority_selector_c_string, NULL) credentials.references.append(authorization_token) credentials.references.append(authority_selector) return credentials def call_credentials_metadata_plugin(CredentialsMetadataPlugin plugin): cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = ( - grpc_metadata_credentials_create_from_plugin(plugin.make_c_plugin(), - NULL)) + cdef grpc_metadata_credentials_plugin c_plugin = plugin.make_c_plugin() + with nogil: + credentials.c_credentials = ( + grpc_metadata_credentials_create_from_plugin(c_plugin, NULL)) # TODO(atash): the following held reference is *probably* never necessary credentials.references.append(plugin) return credentials @@ -274,11 +291,12 @@ def server_credentials_ssl(pem_root_certs, pem_key_cert_pairs, credentials.references.append(pem_key_cert_pairs) credentials.references.append(pem_root_certs) credentials.c_ssl_pem_key_cert_pairs_count = len(pem_key_cert_pairs) - credentials.c_ssl_pem_key_cert_pairs = ( - <grpc_ssl_pem_key_cert_pair *>gpr_malloc( - sizeof(grpc_ssl_pem_key_cert_pair) * - credentials.c_ssl_pem_key_cert_pairs_count - )) + with nogil: + credentials.c_ssl_pem_key_cert_pairs = ( + <grpc_ssl_pem_key_cert_pair *>gpr_malloc( + sizeof(grpc_ssl_pem_key_cert_pair) * + credentials.c_ssl_pem_key_cert_pairs_count + )) for i in range(credentials.c_ssl_pem_key_cert_pairs_count): credentials.c_ssl_pem_key_cert_pairs[i] = ( (<SslPemKeyCertPair>pem_key_cert_pairs[i]).c_pair) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index dbf0045710..61165cb021 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -38,27 +38,27 @@ cdef extern from "grpc/_cython/loader.h": int pygrpc_load_core(char*) - void *gpr_malloc(size_t size) - void gpr_free(void *ptr) - void *gpr_realloc(void *p, size_t size) + void *gpr_malloc(size_t size) nogil + void gpr_free(void *ptr) nogil + void *gpr_realloc(void *p, size_t size) nogil ctypedef struct gpr_slice: # don't worry about writing out the members of gpr_slice; we never access # them directly. pass - gpr_slice gpr_slice_ref(gpr_slice s) - void gpr_slice_unref(gpr_slice s) - gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) + gpr_slice gpr_slice_ref(gpr_slice s) nogil + void gpr_slice_unref(gpr_slice s) nogil + gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil gpr_slice gpr_slice_new_with_len( - void *p, size_t len, void (*destroy)(void *, size_t)) - gpr_slice gpr_slice_malloc(size_t length) - gpr_slice gpr_slice_from_copied_string(const char *source) - gpr_slice gpr_slice_from_copied_buffer(const char *source, size_t len) + void *p, size_t len, void (*destroy)(void *, size_t)) nogil + gpr_slice gpr_slice_malloc(size_t length) nogil + gpr_slice gpr_slice_from_copied_string(const char *source) nogil + gpr_slice gpr_slice_from_copied_buffer(const char *source, size_t len) nogil # Declare functions for function-like macros (because Cython)... - void *gpr_slice_start_ptr "GPR_SLICE_START_PTR" (gpr_slice s) - size_t gpr_slice_length "GPR_SLICE_LENGTH" (gpr_slice s) + void *gpr_slice_start_ptr "GPR_SLICE_START_PTR" (gpr_slice s) nogil + size_t gpr_slice_length "GPR_SLICE_LENGTH" (gpr_slice s) nogil ctypedef enum gpr_clock_type: GPR_CLOCK_MONOTONIC @@ -71,14 +71,14 @@ cdef extern from "grpc/_cython/loader.h": int32_t nanoseconds "tv_nsec" gpr_clock_type clock_type - gpr_timespec gpr_time_0(gpr_clock_type type) - gpr_timespec gpr_inf_future(gpr_clock_type type) - gpr_timespec gpr_inf_past(gpr_clock_type type) + gpr_timespec gpr_time_0(gpr_clock_type type) nogil + gpr_timespec gpr_inf_future(gpr_clock_type type) nogil + gpr_timespec gpr_inf_past(gpr_clock_type type) nogil - gpr_timespec gpr_now(gpr_clock_type clock) + gpr_timespec gpr_now(gpr_clock_type clock) nogil gpr_timespec gpr_convert_clock_type(gpr_timespec t, - gpr_clock_type target_clock) + gpr_clock_type target_clock) nogil ctypedef enum grpc_status_code: GRPC_STATUS_OK @@ -114,15 +114,15 @@ cdef extern from "grpc/_cython/loader.h": pass grpc_byte_buffer *grpc_raw_byte_buffer_create(gpr_slice *slices, - size_t nslices) - size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) - void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer) + size_t nslices) nogil + size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) nogil + void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer) nogil void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader, - grpc_byte_buffer *buffer) + grpc_byte_buffer *buffer) nogil int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader, - gpr_slice *slice) - void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) + gpr_slice *slice) nogil + void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) nogil const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING const char *GRPC_ARG_ENABLE_CENSUS @@ -221,8 +221,8 @@ cdef extern from "grpc/_cython/loader.h": size_t capacity grpc_metadata *metadata - void grpc_metadata_array_init(grpc_metadata_array *array) - void grpc_metadata_array_destroy(grpc_metadata_array *array) + void grpc_metadata_array_init(grpc_metadata_array *array) nogil + void grpc_metadata_array_destroy(grpc_metadata_array *array) nogil ctypedef struct grpc_call_details: char *method @@ -231,8 +231,8 @@ cdef extern from "grpc/_cython/loader.h": size_t host_capacity gpr_timespec deadline - void grpc_call_details_init(grpc_call_details *details) - void grpc_call_details_destroy(grpc_call_details *details) + void grpc_call_details_init(grpc_call_details *details) nogil + void grpc_call_details_destroy(grpc_call_details *details) nogil ctypedef enum grpc_op_type: GRPC_OP_SEND_INITIAL_METADATA @@ -277,61 +277,62 @@ cdef extern from "grpc/_cython/loader.h": uint32_t flags grpc_op_data data - void grpc_init() - void grpc_shutdown() + void grpc_init() nogil + void grpc_shutdown() nogil - grpc_completion_queue *grpc_completion_queue_create(void *reserved) + grpc_completion_queue *grpc_completion_queue_create(void *reserved) nogil grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) nogil grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved) nogil - void grpc_completion_queue_shutdown(grpc_completion_queue *cq) - void grpc_completion_queue_destroy(grpc_completion_queue *cq) + void grpc_completion_queue_shutdown(grpc_completion_queue *cq) nogil + void grpc_completion_queue_destroy(grpc_completion_queue *cq) nogil - grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, - size_t nops, void *tag, void *reserved) - grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) + grpc_call_error grpc_call_start_batch( + grpc_call *call, const grpc_op *ops, size_t nops, void *tag, + void *reserved) nogil + grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) nogil grpc_call_error grpc_call_cancel_with_status(grpc_call *call, grpc_status_code status, const char *description, - void *reserved) - char *grpc_call_get_peer(grpc_call *call) - void grpc_call_destroy(grpc_call *call) + void *reserved) nogil + char *grpc_call_get_peer(grpc_call *call) nogil + void grpc_call_destroy(grpc_call *call) nogil grpc_channel *grpc_insecure_channel_create(const char *target, const grpc_channel_args *args, - void *reserved) - grpc_call *grpc_channel_create_call(grpc_channel *channel, - grpc_call *parent_call, - uint32_t propagation_mask, - grpc_completion_queue *completion_queue, - const char *method, const char *host, - gpr_timespec deadline, void *reserved) + void *reserved) nogil + grpc_call *grpc_channel_create_call( + grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, + grpc_completion_queue *completion_queue, const char *method, + const char *host, gpr_timespec deadline, void *reserved) nogil grpc_connectivity_state grpc_channel_check_connectivity_state( - grpc_channel *channel, int try_to_connect) + grpc_channel *channel, int try_to_connect) nogil void grpc_channel_watch_connectivity_state( grpc_channel *channel, grpc_connectivity_state last_observed_state, - gpr_timespec deadline, grpc_completion_queue *cq, void *tag) - char *grpc_channel_get_target(grpc_channel *channel) - void grpc_channel_destroy(grpc_channel *channel) + gpr_timespec deadline, grpc_completion_queue *cq, void *tag) nogil + char *grpc_channel_get_target(grpc_channel *channel) nogil + void grpc_channel_destroy(grpc_channel *channel) nogil - grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) + grpc_server *grpc_server_create( + const grpc_channel_args *args, void *reserved) nogil grpc_call_error grpc_server_request_call( grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void - *tag_new) + *tag_new) nogil void grpc_server_register_completion_queue(grpc_server *server, grpc_completion_queue *cq, - void *reserved) - int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) - void grpc_server_start(grpc_server *server) + void *reserved) nogil + int grpc_server_add_insecure_http2_port( + grpc_server *server, const char *addr) nogil + void grpc_server_start(grpc_server *server) nogil void grpc_server_shutdown_and_notify( - grpc_server *server, grpc_completion_queue *cq, void *tag) - void grpc_server_cancel_all_calls(grpc_server *server) - void grpc_server_destroy(grpc_server *server) + grpc_server *server, grpc_completion_queue *cq, void *tag) nogil + void grpc_server_cancel_all_calls(grpc_server *server) nogil + void grpc_server_destroy(grpc_server *server) nogil ctypedef struct grpc_ssl_pem_key_cert_pair: const char *private_key @@ -347,35 +348,36 @@ cdef extern from "grpc/_cython/loader.h": ctypedef void (*grpc_ssl_roots_override_callback)(char **pem_root_certs) - void grpc_set_ssl_roots_override_callback(grpc_ssl_roots_override_callback cb) + void grpc_set_ssl_roots_override_callback( + grpc_ssl_roots_override_callback cb) nogil - grpc_channel_credentials *grpc_google_default_credentials_create() + grpc_channel_credentials *grpc_google_default_credentials_create() nogil grpc_channel_credentials *grpc_ssl_credentials_create( const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pair, - void *reserved) + void *reserved) nogil grpc_channel_credentials *grpc_composite_channel_credentials_create( grpc_channel_credentials *creds1, grpc_call_credentials *creds2, - void *reserved) - void grpc_channel_credentials_release(grpc_channel_credentials *creds) + void *reserved) nogil + void grpc_channel_credentials_release(grpc_channel_credentials *creds) nogil grpc_call_credentials *grpc_composite_call_credentials_create( grpc_call_credentials *creds1, grpc_call_credentials *creds2, - void *reserved) + void *reserved) nogil grpc_call_credentials *grpc_google_compute_engine_credentials_create( - void *reserved) + void *reserved) nogil grpc_call_credentials *grpc_service_account_jwt_access_credentials_create( const char *json_key, - gpr_timespec token_lifetime, void *reserved) + gpr_timespec token_lifetime, void *reserved) nogil grpc_call_credentials *grpc_google_refresh_token_credentials_create( - const char *json_refresh_token, void *reserved) + const char *json_refresh_token, void *reserved) nogil grpc_call_credentials *grpc_google_iam_credentials_create( const char *authorization_token, const char *authority_selector, - void *reserved) - void grpc_call_credentials_release(grpc_call_credentials *creds) + void *reserved) nogil + void grpc_call_credentials_release(grpc_call_credentials *creds) nogil grpc_channel *grpc_secure_channel_create( grpc_channel_credentials *creds, const char *target, - const grpc_channel_args *args, void *reserved) + const grpc_channel_args *args, void *reserved) nogil ctypedef struct grpc_server_credentials: # We don't care about the internals (and in fact don't know them) @@ -385,13 +387,13 @@ cdef extern from "grpc/_cython/loader.h": const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, size_t num_key_cert_pairs, int force_client_auth, void *reserved) - void grpc_server_credentials_release(grpc_server_credentials *creds) + void grpc_server_credentials_release(grpc_server_credentials *creds) nogil int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, - grpc_server_credentials *creds) + grpc_server_credentials *creds) nogil grpc_call_error grpc_call_set_credentials(grpc_call *call, - grpc_call_credentials *creds) + grpc_call_credentials *creds) nogil ctypedef struct grpc_auth_context: # We don't care about the internals (and in fact don't know them) @@ -415,4 +417,4 @@ cdef extern from "grpc/_cython/loader.h": const char *type grpc_call_credentials *grpc_metadata_credentials_create_from_plugin( - grpc_metadata_credentials_plugin plugin, void *reserved) + grpc_metadata_credentials_plugin plugin, void *reserved) nogil diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index fa4ea99ea9..851389a261 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -107,15 +107,18 @@ cdef class Timespec: def __cinit__(self, time): if time is None: - self.c_time = gpr_now(GPR_CLOCK_REALTIME) + with nogil: + self.c_time = gpr_now(GPR_CLOCK_REALTIME) return if isinstance(time, int): time = float(time) if isinstance(time, float): if time == float("+inf"): - self.c_time = gpr_inf_future(GPR_CLOCK_REALTIME) + with nogil: + self.c_time = gpr_inf_future(GPR_CLOCK_REALTIME) elif time == float("-inf"): - self.c_time = gpr_inf_past(GPR_CLOCK_REALTIME) + with nogil: + self.c_time = gpr_inf_past(GPR_CLOCK_REALTIME) else: self.c_time.seconds = time self.c_time.nanoseconds = (time - float(self.c_time.seconds)) * 1e9 @@ -131,8 +134,10 @@ cdef class Timespec: # TODO(atash) ensure that everywhere a Timespec is created that it's # converted to GPR_CLOCK_REALTIME then and not every time someone wants to # read values off in Python. - cdef gpr_timespec real_time = ( - gpr_convert_clock_type(self.c_time, GPR_CLOCK_REALTIME)) + cdef gpr_timespec real_time + with nogil: + real_time = ( + gpr_convert_clock_type(self.c_time, GPR_CLOCK_REALTIME)) return real_time.seconds @property @@ -158,10 +163,12 @@ cdef class Timespec: cdef class CallDetails: def __cinit__(self): - grpc_call_details_init(&self.c_details) + with nogil: + grpc_call_details_init(&self.c_details) def __dealloc__(self): - grpc_call_details_destroy(&self.c_details) + with nogil: + grpc_call_details_destroy(&self.c_details) @property def method(self): @@ -229,10 +236,15 @@ cdef class ByteBuffer: "ByteBuffer, not {}".format(type(data))) cdef char *c_data = data - data_slice = gpr_slice_from_copied_buffer(c_data, len(data)) - self.c_byte_buffer = grpc_raw_byte_buffer_create( - &data_slice, 1) - gpr_slice_unref(data_slice) + cdef gpr_slice data_slice + cdef size_t data_length = len(data) + with nogil: + data_slice = gpr_slice_from_copied_buffer(c_data, data_length) + with nogil: + self.c_byte_buffer = grpc_raw_byte_buffer_create( + &data_slice, 1) + with nogil: + gpr_slice_unref(data_slice) def bytes(self): cdef grpc_byte_buffer_reader reader @@ -240,20 +252,27 @@ cdef class ByteBuffer: cdef size_t data_slice_length cdef void *data_slice_pointer if self.c_byte_buffer != NULL: - grpc_byte_buffer_reader_init(&reader, self.c_byte_buffer) + with nogil: + grpc_byte_buffer_reader_init(&reader, self.c_byte_buffer) result = b"" - while grpc_byte_buffer_reader_next(&reader, &data_slice): - data_slice_pointer = gpr_slice_start_ptr(data_slice) - data_slice_length = gpr_slice_length(data_slice) - result += (<char *>data_slice_pointer)[:data_slice_length] - grpc_byte_buffer_reader_destroy(&reader) + with nogil: + while grpc_byte_buffer_reader_next(&reader, &data_slice): + data_slice_pointer = gpr_slice_start_ptr(data_slice) + data_slice_length = gpr_slice_length(data_slice) + with gil: + result += (<char *>data_slice_pointer)[:data_slice_length] + with nogil: + grpc_byte_buffer_reader_destroy(&reader) return result else: return None def __len__(self): + cdef size_t result if self.c_byte_buffer != NULL: - return grpc_byte_buffer_length(self.c_byte_buffer) + with nogil: + result = grpc_byte_buffer_length(self.c_byte_buffer) + return result else: return 0 @@ -262,7 +281,8 @@ cdef class ByteBuffer: def __dealloc__(self): if self.c_byte_buffer != NULL: - grpc_byte_buffer_destroy(self.c_byte_buffer) + with nogil: + grpc_byte_buffer_destroy(self.c_byte_buffer) cdef class SslPemKeyCertPair: @@ -319,14 +339,15 @@ cdef class ChannelArgs: if not isinstance(arg, ChannelArg): raise TypeError("expected list of ChannelArg") self.c_args.arguments_length = len(self.args) - self.c_args.arguments = <grpc_arg *>gpr_malloc( - self.c_args.arguments_length*sizeof(grpc_arg) - ) + with nogil: + self.c_args.arguments = <grpc_arg *>gpr_malloc( + self.c_args.arguments_length*sizeof(grpc_arg)) for i in range(self.c_args.arguments_length): self.c_args.arguments[i] = (<ChannelArg>self.args[i]).c_arg def __dealloc__(self): - gpr_free(self.c_args.arguments) + with nogil: + gpr_free(self.c_args.arguments) def __len__(self): # self.args is never stale; it's only updated from this file @@ -407,21 +428,24 @@ cdef class Metadata: for metadatum in metadata: if not isinstance(metadatum, Metadatum): raise TypeError("expected list of Metadatum") - grpc_metadata_array_init(&self.c_metadata_array) + with nogil: + grpc_metadata_array_init(&self.c_metadata_array) self.c_metadata_array.count = len(self.metadata) self.c_metadata_array.capacity = len(self.metadata) - self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc( - self.c_metadata_array.count*sizeof(grpc_metadata) - ) + with nogil: + self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc( + self.c_metadata_array.count*sizeof(grpc_metadata) + ) for i in range(self.c_metadata_array.count): self.c_metadata_array.metadata[i] = ( (<Metadatum>self.metadata[i]).c_metadata) def __dealloc__(self): # this frees the allocated memory for the grpc_metadata_array (although - # it'd be nice if that were documented somewhere...) TODO(atash): document - # this in the C core - grpc_metadata_array_destroy(&self.c_metadata_array) + # it'd be nice if that were documented somewhere...) + # TODO(atash): document this in the C core + with nogil: + grpc_metadata_array_destroy(&self.c_metadata_array) def __len__(self): return self.c_metadata_array.count @@ -526,7 +550,8 @@ cdef class Operation: # Python. The remaining one(s) are primitive fields filled in by GRPC core. # This means that we need to clean up after receive_status_on_client. if self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT: - gpr_free(self._received_status_details) + with nogil: + gpr_free(self._received_status_details) def operation_send_initial_metadata(Metadata metadata): cdef Operation op = Operation() @@ -648,8 +673,8 @@ cdef class Operations: if not isinstance(operation, Operation): raise TypeError("expected operations to be iterable of Operation") self.c_nops = len(self.operations) - self.c_ops = <grpc_op *>gpr_malloc( - sizeof(grpc_op)*self.c_nops) + with nogil: + self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op)*self.c_nops) for i in range(self.c_nops): self.c_ops[i] = (<Operation>(self.operations[i])).c_op @@ -661,7 +686,8 @@ cdef class Operations: return self.operations[i] def __dealloc__(self): - gpr_free(self.c_ops) + with nogil: + gpr_free(self.c_ops) def __iter__(self): return _OperationsIterator(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index fe93da6c12..a098f11da2 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -41,7 +41,8 @@ cdef class Server: if arguments is not None: c_arguments = &arguments.c_args self.references.append(arguments) - self.c_server = grpc_server_create(c_arguments, NULL) + with nogil: + self.c_server = grpc_server_create(c_arguments, NULL) self.is_started = False self.is_shutting_down = False self.is_shutdown = False @@ -53,6 +54,7 @@ cdef class Server: raise ValueError("server must be started and not shutting down") if server_queue not in self.registered_completion_queues: raise ValueError("server_queue must be a registered completion queue") + cdef grpc_call_error result cdef OperationTag operation_tag = OperationTag(tag) operation_tag.operation_call = Call() operation_tag.request_call_details = CallDetails() @@ -61,19 +63,22 @@ cdef class Server: operation_tag.is_new_request = True operation_tag.batch_operations = Operations([]) cpython.Py_INCREF(operation_tag) - return grpc_server_request_call( - self.c_server, &operation_tag.operation_call.c_call, - &operation_tag.request_call_details.c_details, - &operation_tag.request_metadata.c_metadata_array, - call_queue.c_completion_queue, server_queue.c_completion_queue, - <cpython.PyObject *>operation_tag) + with nogil: + result = grpc_server_request_call( + self.c_server, &operation_tag.operation_call.c_call, + &operation_tag.request_call_details.c_details, + &operation_tag.request_metadata.c_metadata_array, + call_queue.c_completion_queue, server_queue.c_completion_queue, + <cpython.PyObject *>operation_tag) + return result def register_completion_queue( self, CompletionQueue queue not None): if self.is_started: raise ValueError("cannot register completion queues after start") - grpc_server_register_completion_queue( - self.c_server, queue.c_completion_queue, NULL) + with nogil: + grpc_server_register_completion_queue( + self.c_server, queue.c_completion_queue, NULL) self.registered_completion_queues.append(queue) def start(self): @@ -82,7 +87,8 @@ cdef class Server: self.backup_shutdown_queue = CompletionQueue() self.register_completion_queue(self.backup_shutdown_queue) self.is_started = True - grpc_server_start(self.c_server) + with nogil: + grpc_server_start(self.c_server) # Ensure the core has gotten a chance to do the start-up work self.backup_shutdown_queue.pluck(None, Timespec(None)) @@ -95,22 +101,28 @@ cdef class Server: else: raise TypeError("expected address to be a str or bytes") self.references.append(address) + cdef int result + cdef char *address_c_string = address if server_credentials is not None: self.references.append(server_credentials) - return grpc_server_add_secure_http2_port( - self.c_server, address, server_credentials.c_credentials) + with nogil: + result = grpc_server_add_secure_http2_port( + self.c_server, address_c_string, server_credentials.c_credentials) else: - return grpc_server_add_insecure_http2_port(self.c_server, address) + with nogil: + result = grpc_server_add_insecure_http2_port(self.c_server, + address_c_string) + return result cdef _c_shutdown(self, CompletionQueue queue, tag): self.is_shutting_down = True operation_tag = OperationTag(tag) operation_tag.shutting_down_server = self - operation_tag.references.extend([self, queue]) cpython.Py_INCREF(operation_tag) - grpc_server_shutdown_and_notify( - self.c_server, queue.c_completion_queue, - <cpython.PyObject *>operation_tag) + with nogil: + grpc_server_shutdown_and_notify( + self.c_server, queue.c_completion_queue, + <cpython.PyObject *>operation_tag) def shutdown(self, CompletionQueue queue not None, tag): cdef OperationTag operation_tag @@ -135,7 +147,8 @@ cdef class Server: elif self.is_shutdown: return else: - grpc_server_cancel_all_calls(self.c_server) + with nogil: + grpc_server_cancel_all_calls(self.c_server) def __dealloc__(self): if self.c_server != NULL: @@ -154,5 +167,6 @@ cdef class Server: # much but repeatedly release the GIL and wait while not self.is_shutdown: time.sleep(0) - grpc_server_destroy(self.c_server) + with nogil: + grpc_server_destroy(self.c_server) diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index 30cc7a132b..8a0f171ee7 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -57,14 +57,17 @@ cdef class _ModuleState: 'grpc._cython', '_windows/grpc_c.64.python') if not pygrpc_load_core(filename): raise ImportError('failed to load core gRPC library') - grpc_init() + with nogil: + grpc_init() self.is_loaded = True - grpc_set_ssl_roots_override_callback( - <grpc_ssl_roots_override_callback>ssl_roots_override_callback) + with nogil: + grpc_set_ssl_roots_override_callback( + <grpc_ssl_roots_override_callback>ssl_roots_override_callback) def __dealloc__(self): if self.is_loaded: - grpc_shutdown() + with nogil: + grpc_shutdown() _module_state = _ModuleState() diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h index b70dcccd17..4d18369e1f 100644 --- a/src/python/grpcio/grpc/_cython/imports.generated.h +++ b/src/python/grpcio/grpc/_cython/imports.generated.h @@ -166,7 +166,7 @@ extern grpc_compression_algorithm_parse_type grpc_compression_algorithm_parse_im typedef int(*grpc_compression_algorithm_name_type)(grpc_compression_algorithm algorithm, char **name); extern grpc_compression_algorithm_name_type grpc_compression_algorithm_name_import; #define grpc_compression_algorithm_name grpc_compression_algorithm_name_import -typedef grpc_compression_algorithm(*grpc_compression_algorithm_for_level_type)(grpc_compression_level level); +typedef grpc_compression_algorithm(*grpc_compression_algorithm_for_level_type)(grpc_compression_level level, uint32_t accepted_encodings); extern grpc_compression_algorithm_for_level_type grpc_compression_algorithm_for_level_import; #define grpc_compression_algorithm_for_level grpc_compression_algorithm_for_level_import typedef void(*grpc_compression_options_init_type)(grpc_compression_options *opts); diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index a543791f5c..31e16e0491 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -34,6 +34,7 @@ CORE_SOURCE_FILES = [ 'src/core/profiling/stap_timers.c', 'src/core/support/alloc.c', 'src/core/support/avl.c', + 'src/core/support/backoff.c', 'src/core/support/cmdline.c', 'src/core/support/cpu_iphone.c', 'src/core/support/cpu_linux.c', diff --git a/src/python/grpcio/precompiled.py b/src/python/grpcio/precompiled.py index ae2a0c835a..d34250b02c 100644 --- a/src/python/grpcio/precompiled.py +++ b/src/python/grpcio/precompiled.py @@ -31,6 +31,7 @@ import os import platform import shutil import sys +import sysconfig import setuptools @@ -51,9 +52,15 @@ USE_PRECOMPILED_BINARIES = bool(int(os.environ.get( def _tagged_ext_name(base): uname = platform.uname() - tags = '-'.join((grpc_version.VERSION, uname[0], uname[4])) - flavor = 'ucs2' if sys.maxunicode == 65535 else 'ucs4' - return '{base}-{tags}-{flavor}'.format(base=base, tags=tags, flavor=flavor) + tags = ( + grpc_version.VERSION, + 'py{}'.format(sysconfig.get_python_version()), + uname[0], + uname[4], + ) + ucs = 'ucs{}'.format(sysconfig.get_config_var('Py_UNICODE_SIZE')) + return '{base}-{tags}-{ucs}'.format( + base=base, tags='-'.join(tags), ucs=ucs) class BuildTaggedExt(setuptools.Command): diff --git a/src/python/grpcio/tests/_runner.py b/src/python/grpcio/tests/_runner.py index 38a5432e79..3b5ca03dd9 100644 --- a/src/python/grpcio/tests/_runner.py +++ b/src/python/grpcio/tests/_runner.py @@ -35,6 +35,7 @@ import os import select import signal import sys +import tempfile import threading import time import unittest @@ -44,60 +45,46 @@ from tests import _loader from tests import _result -class CapturePipe(object): - """A context-manager pipe to redirect output to a byte array. +class CaptureFile(object): + """A context-managed file to redirect output to a byte array. + + Use by invoking `start` (`__enter__`) and at some point invoking `stop` + (`__exit__`). At any point after the initial call to `start` call `output` to + get the current redirected output. Note that we don't currently use file + locking, so calling `output` between calls to `start` and `stop` may muddle + the result (you should only be doing this during a Python-handled interrupt as + a last ditch effort to provide output to the user). Attributes: - _redirect_fd (int): File descriptor of file to redirect writes from. + _redirected_fd (int): File descriptor of file to redirect writes from. _saved_fd (int): A copy of the original value of the redirected file descriptor. - _read_thread (threading.Thread or None): Thread upon which reads through the - pipe are performed. Only non-None when self is started. - _read_fd (int or None): File descriptor of the read end of the redirect - pipe. Only non-None when self is started. - _write_fd (int or None): File descriptor of the write end of the redirect - pipe. Only non-None when self is started. - output (bytearray or None): Redirected output from writes to the redirected - file descriptor. Only valid during and after self has started. + _into_file (TemporaryFile or None): File to which writes are redirected. + Only non-None when self is started. """ def __init__(self, fd): - self._redirect_fd = fd - self._saved_fd = os.dup(self._redirect_fd) - self._read_thread = None - self._read_fd = None - self._write_fd = None - self.output = None + self._redirected_fd = fd + self._saved_fd = os.dup(self._redirected_fd) + self._into_file = None + + def output(self): + """Get all output from the redirected-to file if it exists.""" + if self._into_file: + self._into_file.seek(0) + return bytes(self._into_file.read()) + else: + return bytes() def start(self): """Start redirection of writes to the file descriptor.""" - self._read_fd, self._write_fd = os.pipe() - os.dup2(self._write_fd, self._redirect_fd) - flags = fcntl.fcntl(self._read_fd, fcntl.F_GETFL) - fcntl.fcntl(self._read_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - self._read_thread = threading.Thread(target=self._read) - self._read_thread.start() + self._into_file = tempfile.TemporaryFile() + os.dup2(self._into_file.fileno(), self._redirected_fd) def stop(self): """Stop redirection of writes to the file descriptor.""" - os.close(self._write_fd) - os.dup2(self._saved_fd, self._redirect_fd) # auto-close self._redirect_fd - self._read_thread.join() - self._read_thread = None - # we waited for the read thread to finish, so _read_fd has been read and we - # can close it. - os.close(self._read_fd) - - def _read(self): - """Read-thread target for self.""" - self.output = bytearray() - while True: - select.select([self._read_fd], [], []) - read_bytes = os.read(self._read_fd, 1024) - if read_bytes: - self.output.extend(read_bytes) - else: - break + # n.b. this dup2 call auto-closes self._redirected_fd + os.dup2(self._saved_fd, self._redirected_fd) def write_bypass(self, value): """Bypass the redirection and write directly to the original file. @@ -144,7 +131,7 @@ class Runner(object): def run(self, suite): """See setuptools' test_runner setup argument for information.""" # only run test cases with id starting with given prefix - testcase_filter = os.getenv('GPRC_PYTHON_TESTRUNNER_FILTER') + testcase_filter = os.getenv('GRPC_PYTHON_TESTRUNNER_FILTER') filtered_cases = [] for case in _loader.iterate_suite_cases(suite): if not testcase_filter or case.id().startswith(testcase_filter): @@ -159,8 +146,8 @@ class Runner(object): result_out = StringIO.StringIO() result = _result.TerminalResult( result_out, id_map=lambda case: case_id_by_case[case]) - stdout_pipe = CapturePipe(sys.stdout.fileno()) - stderr_pipe = CapturePipe(sys.stderr.fileno()) + stdout_pipe = CaptureFile(sys.stdout.fileno()) + stderr_pipe = CaptureFile(sys.stderr.fileno()) kill_flag = [False] def sigint_handler(signal_number, frame): @@ -171,7 +158,8 @@ class Runner(object): def fault_handler(signal_number, frame): stdout_pipe.write_bypass( 'Received fault signal {}\nstdout:\n{}\n\nstderr:{}\n' - .format(signal_number, stdout_pipe.output, stderr_pipe.output)) + .format(signal_number, stdout_pipe.output(), + stderr_pipe.output())) os._exit(1) def check_kill_self(): @@ -180,9 +168,9 @@ class Runner(object): result.stopTestRun() stdout_pipe.write_bypass(result_out.getvalue()) stdout_pipe.write_bypass( - '\ninterrupted stdout:\n{}\n'.format(stdout_pipe.output)) + '\ninterrupted stdout:\n{}\n'.format(stdout_pipe.output())) stderr_pipe.write_bypass( - '\ninterrupted stderr:\n{}\n'.format(stderr_pipe.output)) + '\ninterrupted stderr:\n{}\n'.format(stderr_pipe.output())) os._exit(1) signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGSEGV, fault_handler) @@ -212,7 +200,7 @@ class Runner(object): # re-raise the exception after forcing the with-block to end raise result.set_output( - augmented_case.case, stdout_pipe.output, stderr_pipe.output) + augmented_case.case, stdout_pipe.output(), stderr_pipe.output()) sys.stdout.write(result_out.getvalue()) sys.stdout.flush() result_out.truncate(0) diff --git a/src/python/grpcio/tests/tests.json b/src/python/grpcio/tests/tests.json index 388d040d5c..84870aaa5c 100644 --- a/src/python/grpcio/tests/tests.json +++ b/src/python/grpcio/tests/tests.json @@ -12,31 +12,22 @@ "_core_over_links_base_interface_test.SyncEasyTest", "_core_over_links_base_interface_test.SyncPeasyTest", "_crust_over_core_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest", - "_crust_over_core_face_interface_test.DynamicInvokerEventInvocationSynchronousEventServiceTest", "_crust_over_core_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest", "_crust_over_core_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest", - "_crust_over_core_face_interface_test.GenericInvokerEventInvocationSynchronousEventServiceTest", "_crust_over_core_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest", "_crust_over_core_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest", - "_crust_over_core_face_interface_test.MultiCallableInvokerEventInvocationSynchronousEventServiceTest", "_crust_over_core_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest", "_crust_over_core_over_links_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest", - "_crust_over_core_over_links_face_interface_test.DynamicInvokerEventInvocationSynchronousEventServiceTest", "_crust_over_core_over_links_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest", "_crust_over_core_over_links_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest", - "_crust_over_core_over_links_face_interface_test.GenericInvokerEventInvocationSynchronousEventServiceTest", "_crust_over_core_over_links_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest", "_crust_over_core_over_links_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest", - "_crust_over_core_over_links_face_interface_test.MultiCallableInvokerEventInvocationSynchronousEventServiceTest", "_crust_over_core_over_links_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest", "_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest", - "_face_interface_test.DynamicInvokerEventInvocationSynchronousEventServiceTest", "_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest", "_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest", - "_face_interface_test.GenericInvokerEventInvocationSynchronousEventServiceTest", "_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest", "_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest", - "_face_interface_test.MultiCallableInvokerEventInvocationSynchronousEventServiceTest", "_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest", "_implementations_test.ChannelCredentialsTest", "_insecure_interop_test.InsecureInteropTest", diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_event_invocation_synchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_event_invocation_synchronous_event_service.py deleted file mode 100644 index 34db6c3e55..0000000000 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/_event_invocation_synchronous_event_service.py +++ /dev/null @@ -1,381 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Test code for the Face layer of RPC Framework.""" - -import abc -import unittest - -# test_interfaces is referenced from specification in this module. -from grpc.framework.interfaces.face import face -from tests.unit.framework.common import test_constants -from tests.unit.framework.common import test_control -from tests.unit.framework.common import test_coverage -from tests.unit.framework.interfaces.face import _3069_test_constant -from tests.unit.framework.interfaces.face import _digest -from tests.unit.framework.interfaces.face import _receiver -from tests.unit.framework.interfaces.face import _stock_service -from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import - - -class TestCase(test_coverage.Coverage, unittest.TestCase): - """A test of the Face layer of RPC Framework. - - Concrete subclasses must have an "implementation" attribute of type - test_interfaces.Implementation and an "invoker_constructor" attribute of type - _invocation.InvokerConstructor. - """ - __metaclass__ = abc.ABCMeta - - NAME = 'EventInvocationSynchronousEventServiceTest' - - def setUp(self): - """See unittest.TestCase.setUp for full specification. - - Overriding implementations must call this implementation. - """ - self._control = test_control.PauseFailControl() - self._digest = _digest.digest( - _stock_service.STOCK_TEST_SERVICE, self._control, None) - - generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate( - self._digest.methods, self._digest.event_method_implementations, None) - self._invoker = self.invoker_constructor.construct_invoker( - generic_stub, dynamic_stubs, self._digest.methods) - - def tearDown(self): - """See unittest.TestCase.tearDown for full specification. - - Overriding implementations must call this implementation. - """ - self._invoker = None - self.implementation.destantiate(self._memo) - - def testSuccessfulUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - request = test_messages.request() - receiver = _receiver.Receiver() - - self._invoker.event(group, method)( - request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) - receiver.block_until_terminated() - response = receiver.unary_response() - - test_messages.verify(request, response, self) - - def testSuccessfulUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.unary_stream_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - request = test_messages.request() - receiver = _receiver.Receiver() - - self._invoker.event(group, method)( - request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) - receiver.block_until_terminated() - responses = receiver.stream_responses() - - test_messages.verify(request, responses, self) - - def testSuccessfulStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.stream_unary_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - receiver = _receiver.Receiver() - - call_consumer = self._invoker.event(group, method)( - receiver, receiver.abort, test_constants.LONG_TIMEOUT) - for request in requests: - call_consumer.consume(request) - call_consumer.terminate() - receiver.block_until_terminated() - response = receiver.unary_response() - - test_messages.verify(requests, response, self) - - def testSuccessfulStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.stream_stream_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - receiver = _receiver.Receiver() - - call_consumer = self._invoker.event(group, method)( - receiver, receiver.abort, test_constants.LONG_TIMEOUT) - for request in requests: - call_consumer.consume(request) - call_consumer.terminate() - receiver.block_until_terminated() - responses = receiver.stream_responses() - - test_messages.verify(requests, responses, self) - - def testSequentialInvocations(self): - # pylint: disable=cell-var-from-loop - for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - first_request = test_messages.request() - second_request = test_messages.request() - second_receiver = _receiver.Receiver() - - def make_second_invocation(): - self._invoker.event(group, method)( - second_request, second_receiver, second_receiver.abort, - test_constants.LONG_TIMEOUT) - - class FirstReceiver(_receiver.Receiver): - - def complete(self, terminal_metadata, code, details): - super(FirstReceiver, self).complete( - terminal_metadata, code, details) - make_second_invocation() - - first_receiver = FirstReceiver() - - self._invoker.event(group, method)( - first_request, first_receiver, first_receiver.abort, - test_constants.LONG_TIMEOUT) - second_receiver.block_until_terminated() - - first_response = first_receiver.unary_response() - second_response = second_receiver.unary_response() - test_messages.verify(first_request, first_response, self) - test_messages.verify(second_request, second_response, self) - - def testParallelInvocations(self): - for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - first_request = test_messages.request() - first_receiver = _receiver.Receiver() - second_request = test_messages.request() - second_receiver = _receiver.Receiver() - - self._invoker.event(group, method)( - first_request, first_receiver, first_receiver.abort, - test_constants.LONG_TIMEOUT) - self._invoker.event(group, method)( - second_request, second_receiver, second_receiver.abort, - test_constants.LONG_TIMEOUT) - first_receiver.block_until_terminated() - second_receiver.block_until_terminated() - - first_response = first_receiver.unary_response() - second_response = second_receiver.unary_response() - test_messages.verify(first_request, first_response, self) - test_messages.verify(second_request, second_response, self) - - @unittest.skip('TODO(nathaniel): implement.') - def testWaitingForSomeButNotAllParallelInvocations(self): - raise NotImplementedError() - - def testCancelledUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - request = test_messages.request() - receiver = _receiver.Receiver() - - with self._control.pause(): - call = self._invoker.event(group, method)( - request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) - call.cancel() - receiver.block_until_terminated() - - self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind) - - def testCancelledUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.unary_stream_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - request = test_messages.request() - receiver = _receiver.Receiver() - - call = self._invoker.event(group, method)( - request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) - call.cancel() - receiver.block_until_terminated() - - self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind) - - def testCancelledStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.stream_unary_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - receiver = _receiver.Receiver() - - call_consumer = self._invoker.event(group, method)( - receiver, receiver.abort, test_constants.LONG_TIMEOUT) - for request in requests: - call_consumer.consume(request) - call_consumer.cancel() - receiver.block_until_terminated() - - self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind) - - def testCancelledStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.stream_stream_messages_sequences.iteritems()): - for unused_test_messages in test_messages_sequence: - receiver = _receiver.Receiver() - - call_consumer = self._invoker.event(group, method)( - receiver, receiver.abort, test_constants.LONG_TIMEOUT) - call_consumer.cancel() - receiver.block_until_terminated() - - self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind) - - def testExpiredUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - request = test_messages.request() - receiver = _receiver.Receiver() - - with self._control.pause(): - self._invoker.event(group, method)( - request, receiver, receiver.abort, - _3069_test_constant.REALLY_SHORT_TIMEOUT) - receiver.block_until_terminated() - - self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind) - - def testExpiredUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.unary_stream_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - request = test_messages.request() - receiver = _receiver.Receiver() - - with self._control.pause(): - self._invoker.event(group, method)( - request, receiver, receiver.abort, - _3069_test_constant.REALLY_SHORT_TIMEOUT) - receiver.block_until_terminated() - - self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind) - - def testExpiredStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.stream_unary_messages_sequences.iteritems()): - for unused_test_messages in test_messages_sequence: - receiver = _receiver.Receiver() - - self._invoker.event(group, method)( - receiver, receiver.abort, _3069_test_constant.REALLY_SHORT_TIMEOUT) - receiver.block_until_terminated() - - self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind) - - def testExpiredStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.stream_stream_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - receiver = _receiver.Receiver() - - call_consumer = self._invoker.event(group, method)( - receiver, receiver.abort, _3069_test_constant.REALLY_SHORT_TIMEOUT) - for request in requests: - call_consumer.consume(request) - receiver.block_until_terminated() - - self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind) - - def testFailedUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - request = test_messages.request() - receiver = _receiver.Receiver() - - with self._control.fail(): - self._invoker.event(group, method)( - request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) - receiver.block_until_terminated() - - self.assertIs( - face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind) - - def testFailedUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.unary_stream_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - request = test_messages.request() - receiver = _receiver.Receiver() - - with self._control.fail(): - self._invoker.event(group, method)( - request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) - receiver.block_until_terminated() - - self.assertIs( - face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind) - - def testFailedStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.stream_unary_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - receiver = _receiver.Receiver() - - with self._control.fail(): - call_consumer = self._invoker.event(group, method)( - receiver, receiver.abort, test_constants.LONG_TIMEOUT) - for request in requests: - call_consumer.consume(request) - call_consumer.terminate() - receiver.block_until_terminated() - - self.assertIs( - face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind) - - def testFailedStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.stream_stream_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - receiver = _receiver.Receiver() - - with self._control.fail(): - call_consumer = self._invoker.event(group, method)( - receiver, receiver.abort, test_constants.LONG_TIMEOUT) - for request in requests: - call_consumer.consume(request) - call_consumer.terminate() - receiver.block_until_terminated() - - self.assertIs( - face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind) diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py b/src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py index 462829b660..06b9d77e52 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -34,14 +34,12 @@ import unittest # pylint: disable=unused-import # test_interfaces is referenced from specification in this module. from tests.unit.framework.interfaces.face import _blocking_invocation_inline_service -from tests.unit.framework.interfaces.face import _event_invocation_synchronous_event_service from tests.unit.framework.interfaces.face import _future_invocation_asynchronous_event_service from tests.unit.framework.interfaces.face import _invocation from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import _TEST_CASE_SUPERCLASSES = ( _blocking_invocation_inline_service.TestCase, - _event_invocation_synchronous_event_service.TestCase, _future_invocation_asynchronous_event_service.TestCase, ) diff --git a/src/ruby/.rubocop.yml b/src/ruby/.rubocop.yml index ff5cf8db83..d13ce42655 100644 --- a/src/ruby/.rubocop.yml +++ b/src/ruby/.rubocop.yml @@ -15,3 +15,6 @@ Metrics/CyclomaticComplexity: Metrics/PerceivedComplexity: Max: 8 + +Metrics/ClassLength: + Max: 250 diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index b972f60fc3..3bf81af8fb 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -166,7 +166,7 @@ extern grpc_compression_algorithm_parse_type grpc_compression_algorithm_parse_im typedef int(*grpc_compression_algorithm_name_type)(grpc_compression_algorithm algorithm, char **name); extern grpc_compression_algorithm_name_type grpc_compression_algorithm_name_import; #define grpc_compression_algorithm_name grpc_compression_algorithm_name_import -typedef grpc_compression_algorithm(*grpc_compression_algorithm_for_level_type)(grpc_compression_level level); +typedef grpc_compression_algorithm(*grpc_compression_algorithm_for_level_type)(grpc_compression_level level, uint32_t accepted_encodings); extern grpc_compression_algorithm_for_level_type grpc_compression_algorithm_for_level_import; #define grpc_compression_algorithm_for_level grpc_compression_algorithm_for_level_import typedef void(*grpc_compression_options_init_type)(grpc_compression_options *opts); diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index ef2997c991..b30d19dd2b 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -107,7 +107,9 @@ module GRPC # Starts running the jobs in the thread pool. def start - fail 'already stopped' if @stopped + @stop_mutex.synchronize do + fail 'already stopped' if @stopped + end until @workers.size == @size.to_i next_thread = Thread.new do catch(:exit) do # allows { throw :exit } to kill a thread @@ -264,10 +266,10 @@ module GRPC @pool = Pool.new(@pool_size) @run_cond = ConditionVariable.new @run_mutex = Mutex.new - @running = false + # running_state can take 4 values: :not_started, :running, :stopping, and + # :stopped. State transitions can only proceed in that order. + @running_state = :not_started @server = RpcServer.setup_srv(server_override, @cq, **kw) - @stopped = false - @stop_mutex = Mutex.new end # stops a running server @@ -275,27 +277,42 @@ module GRPC # the call has no impact if the server is already stopped, otherwise # server's current call loop is it's last. def stop - return unless @running - @stop_mutex.synchronize do - @stopped = true + @run_mutex.synchronize do + fail 'Cannot stop before starting' if @running_state == :not_started + return if @running_state != :running + transition_running_state(:stopping) end deadline = from_relative_time(@poll_period) - return if @server.close(@cq, deadline) - deadline = from_relative_time(@poll_period) @server.close(@cq, deadline) @pool.stop end - # determines if the server has been stopped - def stopped? - @stop_mutex.synchronize do - return @stopped + def running_state + @run_mutex.synchronize do + return @running_state + end + end + + # Can only be called while holding @run_mutex + def transition_running_state(target_state) + state_transitions = { + not_started: :running, + running: :stopping, + stopping: :stopped + } + if state_transitions[@running_state] == target_state + @running_state = target_state + else + fail "Bad server state transition: #{@running_state}->#{target_state}" end end - # determines if the server is currently running def running? - @running + running_state == :running + end + + def stopped? + running_state == :stopped end # Is called from other threads to wait for #run to start up the server. @@ -304,13 +321,11 @@ module GRPC # # @param timeout [Numeric] number of seconds to wait # @result [true, false] true if the server is running, false otherwise - def wait_till_running(timeout = 0.1) - end_time, sleep_period = Time.now + timeout, (1.0 * timeout) / 100 - while Time.now < end_time - @run_mutex.synchronize { @run_cond.wait(@run_mutex) } unless running? - sleep(sleep_period) + def wait_till_running(timeout = nil) + @run_mutex.synchronize do + @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started + return @running_state == :running end - running? end # Runs the server in its own thread, then waits for signal INT or TERM on @@ -360,11 +375,14 @@ module GRPC # @param service [Object|Class] a service class or object as described # above def handle(service) - fail 'cannot add services if the server is running' if running? - fail 'cannot add services if the server is stopped' if stopped? - cls = service.is_a?(Class) ? service : service.class - assert_valid_service_class(cls) - add_rpc_descs_for(service) + @run_mutex.synchronize do + unless @running_state == :not_started + fail 'cannot add services if the server has been started' + end + cls = service.is_a?(Class) ? service : service.class + assert_valid_service_class(cls) + add_rpc_descs_for(service) + end end # runs the server @@ -375,16 +393,13 @@ module GRPC # - #running? returns true after this is called, until #stop cause the # the server to stop. def run - if rpc_descs.size.zero? - GRPC.logger.warn('did not run as no services were present') - return - end @run_mutex.synchronize do - @running = true - @run_cond.signal + fail 'cannot run without registering services' if rpc_descs.size.zero? + @pool.start + @server.start + transition_running_state(:running) + @run_cond.broadcast end - @pool.start - @server.start loop_handle_server_calls end @@ -413,9 +428,9 @@ module GRPC # handles calls to the server def loop_handle_server_calls - fail 'not running' unless @running + fail 'not started' if running_state == :not_started loop_tag = Object.new - until stopped? + while running_state == :running begin an_rpc = @server.request_call(@cq, loop_tag, INFINITE_FUTURE) break if (!an_rpc.nil?) && an_rpc.call.nil? @@ -430,11 +445,14 @@ module GRPC rescue Core::CallError, RuntimeError => e # these might happen for various reasonse. The correct behaviour of # the server is to log them and continue, if it's not shutting down. - GRPC.logger.warn("server call failed: #{e}") unless stopped? + if running_state == :running + GRPC.logger.warn("server call failed: #{e}") + end next end end - @running = false + # @running_state should be :stopping here + @run_mutex.synchronize { transition_running_state(:stopped) } GRPC.logger.info("stopped: #{self}") end @@ -484,9 +502,10 @@ module GRPC cls.assert_rpc_descs_have_methods end + # This should be called while holding @run_mutex def add_rpc_descs_for(service) cls = service.is_a?(Class) ? service : service.class - specs, handlers = rpc_descs, rpc_handlers + specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {}) cls.rpc_descs.each_pair do |name, spec| route = "/#{cls.service_name}/#{name}".to_sym fail "already registered: rpc #{route} from #{spec}" if specs.key? route diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index be6331d68b..dfaec6d6ed 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -220,19 +220,10 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**opts) end - after(:each) do - @srv.stop - end - it 'starts out false' do expect(@srv.stopped?).to be(false) end - it 'stays false after a #stop is called before #run' do - @srv.stop - expect(@srv.stopped?).to be(false) - end - it 'stays false after the server starts running', server: true do @srv.handle(EchoService) t = Thread.new { @srv.run } @@ -247,8 +238,8 @@ describe GRPC::RpcServer do t = Thread.new { @srv.run } @srv.wait_till_running @srv.stop - expect(@srv.stopped?).to be(true) t.join + expect(@srv.stopped?).to be(true) end end @@ -266,9 +257,7 @@ describe GRPC::RpcServer do server_override: @server } r = RpcServer.new(**opts) - r.run - expect(r.running?).to be(false) - r.stop + expect { r.run }.to raise_error(RuntimeError) end it 'is true after run is called with a registered service' do @@ -293,10 +282,6 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**@opts) end - after(:each) do - @srv.stop - end - it 'raises if #run has already been called' do @srv.handle(EchoService) t = Thread.new { @srv.run } @@ -528,10 +513,6 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**server_opts) end - after(:each) do - @srv.stop - end - it 'should be added to BadStatus when requests fail', server: true do service = FailingService.new @srv.handle(service) |