diff options
Diffstat (limited to 'src')
27 files changed, 753 insertions, 511 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index ec9e47a6dd..5dea215668 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -45,6 +45,7 @@ #include "src/core/client_config/subchannel_index.h" #include "src/core/iomgr/timer.h" #include "src/core/profiling/timers.h" +#include "src/core/support/backoff.h" #include "src/core/surface/channel.h" #include "src/core/transport/connectivity_state.h" @@ -127,8 +128,8 @@ struct grpc_subchannel { /** next connect attempt time */ gpr_timespec next_attempt; - /** amount to backoff each failure */ - gpr_timespec backoff_delta; + /** backoff state */ + gpr_backoff backoff_state; /** do we have an active alarm? */ int have_alarm; /** our alarm */ @@ -146,7 +147,6 @@ struct grpc_subchannel_call { #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ (((grpc_subchannel_call *)(callstack)) - 1) -static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, bool iomgr_success); @@ -337,6 +337,22 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, grpc_closure_init(&c->connected, subchannel_connected, c); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); + gpr_backoff_init(&c->backoff_state, + GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, + GRPC_SUBCHANNEL_RECONNECT_JITTER, + GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, + GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + if (c->args) { + for (size_t i = 0; i < c->args->num_args; i++) { + if (0 == strcmp(c->args->args[i].key, + "grpc.testing.fixed_reconnect_backoff")) { + GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER); + gpr_backoff_init(&c->backoff_state, 1.0, 0.0, + c->args->args[i].value.integer, + c->args->args[i].value.integer); + } + } + } gpr_mu_init(&c->mu); return grpc_subchannel_index_register(exec_ctx, key, c); @@ -348,7 +364,7 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { args.interested_parties = c->pollset_set; args.addr = c->addr; args.addr_len = c->addr_len; - args.deadline = compute_connect_deadline(c); + args.deadline = c->next_attempt; args.channel_args = c->args; args.initial_connect_string = c->initial_connect_string; @@ -359,10 +375,8 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { } static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { - c->backoff_delta = gpr_time_from_seconds( - GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN); c->next_attempt = - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta); + gpr_backoff_begin(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); continue_connect(exec_ctx, c); } @@ -577,50 +591,6 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, gpr_free((void *)filters); } -/* Generate a random number between 0 and 1. */ -static double generate_uniform_random_number(grpc_subchannel *c) { - c->random = (1103515245 * c->random + 12345) % ((uint32_t)1 << 31); - return c->random / (double)((uint32_t)1 << 31); -} - -/* Update backoff_delta and next_attempt in subchannel */ -static void update_reconnect_parameters(grpc_subchannel *c) { - size_t i; - int32_t backoff_delta_millis, jitter; - int32_t max_backoff_millis = - GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; - double jitter_range; - - if (c->args) { - for (i = 0; i < c->args->num_args; i++) { - if (0 == strcmp(c->args->args[i].key, - "grpc.testing.fixed_reconnect_backoff")) { - GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER); - c->next_attempt = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_millis(c->args->args[i].value.integer, GPR_TIMESPAN)); - return; - } - } - } - - backoff_delta_millis = - (int32_t)(gpr_time_to_millis(c->backoff_delta) * - GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER); - if (backoff_delta_millis > max_backoff_millis) { - backoff_delta_millis = max_backoff_millis; - } - c->backoff_delta = gpr_time_from_millis(backoff_delta_millis, GPR_TIMESPAN); - c->next_attempt = - gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta); - - jitter_range = GRPC_SUBCHANNEL_RECONNECT_JITTER * backoff_delta_millis; - jitter = - (int32_t)((2 * generate_uniform_random_number(c) - 1) * jitter_range); - c->next_attempt = - gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN)); -} - static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { grpc_subchannel *c = arg; gpr_mu_lock(&c->mu); @@ -629,7 +599,8 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { iomgr_success = 0; } if (iomgr_success) { - update_reconnect_parameters(c); + c->next_attempt = + gpr_backoff_step(&c->backoff_state, gpr_now(GPR_CLOCK_MONOTONIC)); continue_connect(exec_ctx, c); gpr_mu_unlock(&c->mu); } else { @@ -661,17 +632,6 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } -static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { - gpr_timespec current_deadline = - gpr_time_add(c->next_attempt, c->backoff_delta); - gpr_timespec min_deadline = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_seconds(GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS, - GPR_TIMESPAN)); - return gpr_time_cmp(current_deadline, min_deadline) > 0 ? current_deadline - : min_deadline; -} - /* * grpc_subchannel_call implementation */ diff --git a/src/core/iomgr/exec_ctx.c b/src/core/iomgr/exec_ctx.c index 1fd79f6eba..893fe4515c 100644 --- a/src/core/iomgr/exec_ctx.c +++ b/src/core/iomgr/exec_ctx.c @@ -34,9 +34,12 @@ #include "src/core/iomgr/exec_ctx.h" #include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> #include "src/core/profiling/timers.h" +#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { bool did_something = 0; GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0); @@ -74,3 +77,75 @@ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, GPR_ASSERT(offload_target_or_null == NULL); grpc_closure_list_move(list, &exec_ctx->closure_list); } + +void grpc_exec_ctx_global_init(void) {} +void grpc_exec_ctx_global_shutdown(void) {} +#else +static gpr_mu g_mu; +static gpr_cv g_cv; +static int g_threads = 0; + +static void run_closure(void *arg) { + grpc_closure *closure = arg; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + closure->cb(&exec_ctx, closure->cb_arg, (closure->final_data & 1) != 0); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(&g_mu); + if (--g_threads == 0) { + gpr_cv_signal(&g_cv); + } + gpr_mu_unlock(&g_mu); +} + +static void start_closure(grpc_closure *closure) { + gpr_thd_id id; + gpr_mu_lock(&g_mu); + g_threads++; + gpr_mu_unlock(&g_mu); + gpr_thd_new(&id, run_closure, closure, NULL); +} + +bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { return false; } + +void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {} + +void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + bool success, + grpc_workqueue *offload_target_or_null) { + GPR_ASSERT(offload_target_or_null == NULL); + if (closure == NULL) return; + closure->final_data = success; + start_closure(closure); +} + +void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, + grpc_closure_list *list, + grpc_workqueue *offload_target_or_null) { + GPR_ASSERT(offload_target_or_null == NULL); + if (list == NULL) return; + grpc_closure *p = list->head; + while (p) { + grpc_closure *start = p; + p = grpc_closure_next(start); + start_closure(start); + } + grpc_closure_list r = GRPC_CLOSURE_LIST_INIT; + *list = r; +} + +void grpc_exec_ctx_global_init(void) { + gpr_mu_init(&g_mu); + gpr_cv_init(&g_cv); +} + +void grpc_exec_ctx_global_shutdown(void) { + gpr_mu_lock(&g_mu); + while (g_threads != 0) { + gpr_cv_wait(&g_cv, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&g_mu); + + gpr_mu_destroy(&g_mu); + gpr_cv_destroy(&g_cv); +} +#endif diff --git a/src/core/iomgr/exec_ctx.h b/src/core/iomgr/exec_ctx.h index ac2088eace..07b54a0ab8 100644 --- a/src/core/iomgr/exec_ctx.h +++ b/src/core/iomgr/exec_ctx.h @@ -36,6 +36,14 @@ #include "src/core/iomgr/closure.h" +/* #define GRPC_EXECUTION_CONTEXT_SANITIZER 1 */ + +/** A workqueue represents a list of work to be executed asynchronously. + Forward declared here to avoid a circular dependency with workqueue.h. */ +struct grpc_workqueue; +typedef struct grpc_workqueue grpc_workqueue; + +#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER /** Execution context. * A bag of data that collects information along a callstack. * Generally created at public API entry points, and passed down as @@ -57,13 +65,15 @@ struct grpc_exec_ctx { grpc_closure_list closure_list; }; -/** A workqueue represents a list of work to be executed asynchronously. - Forward declared here to avoid a circular dependency with workqueue.h. */ -struct grpc_workqueue; -typedef struct grpc_workqueue grpc_workqueue; - #define GRPC_EXEC_CTX_INIT \ { GRPC_CLOSURE_LIST_INIT } +#else +struct grpc_exec_ctx { + int unused; +}; +#define GRPC_EXEC_CTX_INIT \ + { 0 } +#endif /** Flush any work that has been enqueued onto this grpc_exec_ctx. * Caller must guarantee that no interfering locks are held. @@ -82,4 +92,7 @@ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, grpc_closure_list *list, grpc_workqueue *offload_target_or_null); +void grpc_exec_ctx_global_init(void); +void grpc_exec_ctx_global_shutdown(void); + #endif /* GRPC_CORE_IOMGR_EXEC_CTX_H */ diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 9c89c2c08a..3ab4430668 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -43,6 +43,7 @@ #include <grpc/support/thd.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/exec_ctx.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/timer.h" #include "src/core/support/env.h" @@ -57,6 +58,7 @@ void grpc_iomgr_init(void) { g_shutdown = 0; gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); + grpc_exec_ctx_global_init(); grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; @@ -138,6 +140,7 @@ void grpc_iomgr_shutdown(void) { grpc_pollset_global_shutdown(); grpc_iomgr_platform_shutdown(); + grpc_exec_ctx_global_shutdown(); gpr_mu_destroy(&g_mu); gpr_cv_destroy(&g_rcv); } diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index 43ce574e34..9500b1a73a 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -55,7 +55,7 @@ typedef struct grpc_pollset_worker grpc_pollset_worker; size_t grpc_pollset_size(void); void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu); /* Begin shutting down the pollset, and call closure when done. - * GRPC_POLLSET_MU(pollset) must be held */ + * pollset's mutex must be held */ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure); /** Reset the pollset to its initial state (perhaps with some cached objects); @@ -66,16 +66,16 @@ void grpc_pollset_destroy(grpc_pollset *pollset); /* Do some work on a pollset. May involve invoking asynchronous callbacks, or actually polling file descriptors. - Requires GRPC_POLLSET_MU(pollset) locked. - May unlock GRPC_POLLSET_MU(pollset) during its execution. + Requires pollset's mutex locked. + May unlock its mutex during its execution. worker is a (platform-specific) handle that can be used to wake up from grpc_pollset_work before any events are received and before the timeout has expired. It is both initialized and destroyed by grpc_pollset_work. Initialization of worker is guaranteed to occur BEFORE the - GRPC_POLLSET_MU(pollset) is released for the first time by - grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will - not be released by grpc_pollset_work AFTER worker has been destroyed. + pollset's mutex is released for the first time by grpc_pollset_work + and it is guaranteed that it will not be released by grpc_pollset_work + AFTER worker has been destroyed. Tries not to block past deadline. May call grpc_closure_list_run on grpc_closure_list, without holding the diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index c096dbfb30..2b42e6d4fb 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -107,7 +107,7 @@ void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { if (grpc_closure_list_empty(workqueue->closure_list)) { grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); } - grpc_closure_list_move(&exec_ctx->closure_list, &workqueue->closure_list); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); gpr_mu_unlock(&workqueue->mu); } @@ -123,7 +123,7 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) { gpr_free(workqueue); } else { gpr_mu_lock(&workqueue->mu); - grpc_closure_list_move(&workqueue->closure_list, &exec_ctx->closure_list); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); gpr_mu_unlock(&workqueue->mu); grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, diff --git a/src/core/support/backoff.c b/src/core/support/backoff.c new file mode 100644 index 0000000000..7458219645 --- /dev/null +++ b/src/core/support/backoff.c @@ -0,0 +1,71 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/support/backoff.h" + +#include <grpc/support/useful.h> + +void gpr_backoff_init(gpr_backoff *backoff, double multiplier, double jitter, + int64_t min_timeout_millis, int64_t max_timeout_millis) { + backoff->multiplier = multiplier; + backoff->jitter = jitter; + backoff->min_timeout_millis = min_timeout_millis; + backoff->max_timeout_millis = max_timeout_millis; + backoff->rng_state = (uint32_t)gpr_now(GPR_CLOCK_REALTIME).tv_nsec; +} + +gpr_timespec gpr_backoff_begin(gpr_backoff *backoff, gpr_timespec now) { + backoff->current_timeout_millis = backoff->min_timeout_millis; + return gpr_time_add( + now, gpr_time_from_millis(backoff->current_timeout_millis, GPR_TIMESPAN)); +} + +/* Generate a random number between 0 and 1. */ +static double generate_uniform_random_number(uint32_t *rng_state) { + *rng_state = (1103515245 * *rng_state + 12345) % ((uint32_t)1 << 31); + return *rng_state / (double)((uint32_t)1 << 31); +} + +gpr_timespec gpr_backoff_step(gpr_backoff *backoff, gpr_timespec now) { + double new_timeout_millis = + backoff->multiplier * (double)backoff->current_timeout_millis; + double jitter_range = backoff->jitter * new_timeout_millis; + double jitter = + (2 * generate_uniform_random_number(&backoff->rng_state) - 1) * + jitter_range; + backoff->current_timeout_millis = + GPR_CLAMP((int64_t)(new_timeout_millis + jitter), + backoff->min_timeout_millis, backoff->max_timeout_millis); + return gpr_time_add( + now, gpr_time_from_millis(backoff->current_timeout_millis, GPR_TIMESPAN)); +} diff --git a/src/core/support/backoff.h b/src/core/support/backoff.h new file mode 100644 index 0000000000..3234aa214d --- /dev/null +++ b/src/core/support/backoff.h @@ -0,0 +1,65 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_SUPPORT_BACKOFF_H +#define GRPC_INTERNAL_CORE_SUPPORT_BACKOFF_H + +#include <grpc/support/time.h> + +typedef struct { + /// const: multiplier between retry attempts + double multiplier; + /// const: amount to randomize backoffs + double jitter; + /// const: minimum time between retries in milliseconds + int64_t min_timeout_millis; + /// const: maximum time between retries in milliseconds + int64_t max_timeout_millis; + + /// random number generator + uint32_t rng_state; + + /// current retry timeout in milliseconds + int64_t current_timeout_millis; +} gpr_backoff; + +/// Initialize backoff machinery - does not need to be destroyed +void gpr_backoff_init(gpr_backoff *backoff, double multiplier, double jitter, + int64_t min_timeout_millis, int64_t max_timeout_millis); + +/// Begin retry loop: returns a timespec for the NEXT retry +gpr_timespec gpr_backoff_begin(gpr_backoff *backoff, gpr_timespec now); +/// Step a retry loop: returns a timespec for the NEXT retry +gpr_timespec gpr_backoff_step(gpr_backoff *backoff, gpr_timespec now); + +#endif // GRPC_INTERNAL_CORE_SUPPORT_BACKOFF_H diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index 074dae7ca7..308455527c 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -83,14 +83,14 @@ std::shared_ptr<CallCredentials> WrapCallCredentials( } // namespace std::shared_ptr<ChannelCredentials> GoogleDefaultCredentials() { - GrpcLibrary init; // To call grpc_init(). + GrpcLibraryCodegen init; // To call grpc_init(). return WrapChannelCredentials(grpc_google_default_credentials_create()); } // Builds SSL Credentials given SSL specific options std::shared_ptr<ChannelCredentials> SslCredentials( const SslCredentialsOptions& options) { - GrpcLibrary init; // To call grpc_init(). + GrpcLibraryCodegen init; // To call grpc_init(). grpc_ssl_pem_key_cert_pair pem_key_cert_pair = { options.pem_private_key.c_str(), options.pem_cert_chain.c_str()}; @@ -102,7 +102,7 @@ std::shared_ptr<ChannelCredentials> SslCredentials( // Builds credentials for use when running in GCE std::shared_ptr<CallCredentials> GoogleComputeEngineCredentials() { - GrpcLibrary init; // To call grpc_init(). + GrpcLibraryCodegen init; // To call grpc_init(). return WrapCallCredentials( grpc_google_compute_engine_credentials_create(nullptr)); } @@ -110,7 +110,7 @@ std::shared_ptr<CallCredentials> GoogleComputeEngineCredentials() { // Builds JWT credentials. std::shared_ptr<CallCredentials> ServiceAccountJWTAccessCredentials( const grpc::string& json_key, long token_lifetime_seconds) { - GrpcLibrary init; // To call grpc_init(). + GrpcLibraryCodegen init; // To call grpc_init(). if (token_lifetime_seconds <= 0) { gpr_log(GPR_ERROR, "Trying to create JWTCredentials with non-positive lifetime"); @@ -125,7 +125,7 @@ std::shared_ptr<CallCredentials> ServiceAccountJWTAccessCredentials( // Builds refresh token credentials. std::shared_ptr<CallCredentials> GoogleRefreshTokenCredentials( const grpc::string& json_refresh_token) { - GrpcLibrary init; // To call grpc_init(). + GrpcLibraryCodegen init; // To call grpc_init(). return WrapCallCredentials(grpc_google_refresh_token_credentials_create( json_refresh_token.c_str(), nullptr)); } @@ -133,7 +133,7 @@ std::shared_ptr<CallCredentials> GoogleRefreshTokenCredentials( // Builds access token credentials. std::shared_ptr<CallCredentials> AccessTokenCredentials( const grpc::string& access_token) { - GrpcLibrary init; // To call grpc_init(). + GrpcLibraryCodegen init; // To call grpc_init(). return WrapCallCredentials( grpc_access_token_credentials_create(access_token.c_str(), nullptr)); } @@ -142,7 +142,7 @@ std::shared_ptr<CallCredentials> AccessTokenCredentials( std::shared_ptr<CallCredentials> GoogleIAMCredentials( const grpc::string& authorization_token, const grpc::string& authority_selector) { - GrpcLibrary init; // To call grpc_init(). + GrpcLibraryCodegen init; // To call grpc_init(). return WrapCallCredentials(grpc_google_iam_credentials_create( authorization_token.c_str(), authority_selector.c_str(), nullptr)); } @@ -224,7 +224,7 @@ MetadataCredentialsPluginWrapper::MetadataCredentialsPluginWrapper( std::shared_ptr<CallCredentials> MetadataCredentialsFromPlugin( std::unique_ptr<MetadataCredentialsPlugin> plugin) { - GrpcLibrary init; // To call grpc_init(). + GrpcLibraryCodegen init; // To call grpc_init(). const char* type = plugin->GetType(); MetadataCredentialsPluginWrapper* wrapper = new MetadataCredentialsPluginWrapper(std::move(plugin)); diff --git a/src/cpp/codegen/grpc_library.cc b/src/cpp/codegen/codegen_init.cc index 48acec3f3d..c5d22124b7 100644 --- a/src/cpp/codegen/grpc_library.cc +++ b/src/cpp/codegen/codegen_init.cc @@ -31,10 +31,15 @@ * */ +#include <grpc++/impl/codegen/core_codegen_interface.h> #include <grpc++/impl/codegen/grpc_library.h> -namespace grpc { +/// Initializes the global gRPC variables for the codegen library. These will +/// stay null in the absence of of grpc++ library. In this case, no gRPC +/// features such as the ability to perform calls will be available. Trying to +/// perform them would result in a segmentation fault when trying to deference +/// the following nulled globals. These should be associated with actual +/// as part of the instantiation of a \a grpc::GrpcLibraryInitializer variable. -GrpcLibraryInterface *g_glip = nullptr; - -} // namespace grpc +grpc::CoreCodegenInterface* grpc::g_core_codegen_interface = nullptr; +grpc::GrpcLibraryInterface* grpc::g_glip = nullptr; diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc deleted file mode 100644 index 5b87c2a806..0000000000 --- a/src/cpp/common/call.cc +++ /dev/null @@ -1,92 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <grpc++/impl/call.h> - -#include <grpc/support/alloc.h> -#include <grpc++/channel.h> -#include <grpc++/client_context.h> -#include <grpc++/support/byte_buffer.h> -#include "src/core/profiling/timers.h" - -namespace grpc { - -void FillMetadataMap( - grpc_metadata_array* arr, - std::multimap<grpc::string_ref, grpc::string_ref>* metadata) { - for (size_t i = 0; i < arr->count; i++) { - // TODO(yangg) handle duplicates? - metadata->insert(std::pair<grpc::string_ref, grpc::string_ref>( - arr->metadata[i].key, grpc::string_ref(arr->metadata[i].value, - arr->metadata[i].value_length))); - } - grpc_metadata_array_destroy(arr); - grpc_metadata_array_init(arr); -} - -// TODO(yangg) if the map is changed before we send, the pointers will be a -// mess. Make sure it does not happen. -grpc_metadata* FillMetadataArray( - const std::multimap<grpc::string, grpc::string>& metadata) { - if (metadata.empty()) { - return nullptr; - } - grpc_metadata* metadata_array = - (grpc_metadata*)gpr_malloc(metadata.size() * sizeof(grpc_metadata)); - size_t i = 0; - for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) { - metadata_array[i].key = iter->first.c_str(); - metadata_array[i].value = iter->second.c_str(); - metadata_array[i].value_length = iter->second.size(); - } - return metadata_array; -} - -Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq) - : call_hook_(call_hook), cq_(cq), call_(call), max_message_size_(-1) {} - -Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq, - int max_message_size) - : call_hook_(call_hook), - cq_(cq), - call_(call), - max_message_size_(max_message_size) {} - -void Call::PerformOps(CallOpSetInterface* ops) { - if (max_message_size_ > 0) { - ops->set_max_message_size(max_message_size_); - } - call_hook_->PerformOpsOnCall(ops, this); -} - -} // namespace grpc diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc index 4f76dfff1d..729dc33749 100644 --- a/src/cpp/common/completion_queue.cc +++ b/src/cpp/common/completion_queue.cc @@ -34,7 +34,6 @@ #include <memory> -#include <grpc++/impl/codegen/completion_queue_tag.h> #include <grpc++/impl/grpc_library.h> #include <grpc++/support/time.h> #include <grpc/grpc.h> @@ -43,16 +42,13 @@ namespace grpc { static internal::GrpcLibraryInitializer g_gli_initializer; -CompletionQueue::CompletionQueue() { - g_gli_initializer.summon(); - cq_ = grpc_completion_queue_create(nullptr); -} CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {} -CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); } - -void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); } +void CompletionQueue::Shutdown() { + g_gli_initializer.summon(); + grpc_completion_queue_shutdown(cq_); +} CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( void** tag, bool* ok, gpr_timespec deadline) { @@ -75,25 +71,4 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( } } -bool CompletionQueue::Pluck(CompletionQueueTag* tag) { - auto deadline = gpr_inf_future(GPR_CLOCK_REALTIME); - auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr); - bool ok = ev.success != 0; - void* ignored = tag; - GPR_ASSERT(tag->FinalizeResult(&ignored, &ok)); - GPR_ASSERT(ignored == tag); - // Ignore mutations by FinalizeResult: Pluck returns the C API status - return ev.success != 0; -} - -void CompletionQueue::TryPluck(CompletionQueueTag* tag) { - auto deadline = gpr_time_0(GPR_CLOCK_REALTIME); - auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr); - if (ev.type == GRPC_QUEUE_TIMEOUT) return; - bool ok = ev.success != 0; - void* ignored = tag; - // the tag must be swallowed if using TryPluck - GPR_ASSERT(!tag->FinalizeResult(&ignored, &ok)); -} - } // namespace grpc diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/common/core_codegen.cc index 79e7bf1801..45e9e278a0 100644 --- a/src/cpp/proto/proto_utils.cc +++ b/src/cpp/common/core_codegen.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,28 +31,31 @@ * */ -#include <grpc++/impl/proto_utils.h> +#include "src/cpp/common/core_codegen.h" -#include <climits> +#include <stdlib.h> -#include <grpc/grpc.h> +#include <grpc++/support/config.h> #include <grpc/byte_buffer.h> #include <grpc/byte_buffer_reader.h> -#include <grpc/support/log.h> +#include <grpc/grpc.h> +#include <grpc/impl/codegen/alloc.h> +#include <grpc/impl/codegen/byte_buffer.h> +#include <grpc/impl/codegen/log.h> +#include <grpc/support/port_platform.h> #include <grpc/support/slice.h> #include <grpc/support/slice_buffer.h> -#include <grpc/support/port_platform.h> -#include <grpc++/support/config.h> #include "src/core/profiling/timers.h" -const int kMaxBufferLength = 8192; +namespace { + +const int kGrpcBufferWriterMaxBufferLength = 8192; class GrpcBufferWriter GRPC_FINAL : public ::grpc::protobuf::io::ZeroCopyOutputStream { public: - explicit GrpcBufferWriter(grpc_byte_buffer** bp, - int block_size = kMaxBufferLength) + explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size) : block_size_(block_size), byte_count_(0), have_backup_(false) { *bp = grpc_raw_byte_buffer_create(NULL, 0); slice_buffer_ = &(*bp)->data.raw.slice_buffer; @@ -161,14 +164,56 @@ class GrpcBufferReader GRPC_FINAL grpc_byte_buffer_reader reader_; gpr_slice slice_; }; +} // namespace namespace grpc { -Status SerializeProto(const grpc::protobuf::Message& msg, - grpc_byte_buffer** bp) { +grpc_completion_queue* CoreCodegen::grpc_completion_queue_create( + void* reserved) { + return ::grpc_completion_queue_create(reserved); +} + +void CoreCodegen::grpc_completion_queue_destroy(grpc_completion_queue* cq) { + ::grpc_completion_queue_destroy(cq); +} + +grpc_event CoreCodegen::grpc_completion_queue_pluck(grpc_completion_queue* cq, + void* tag, + gpr_timespec deadline, + void* reserved) { + return ::grpc_completion_queue_pluck(cq, tag, deadline, reserved); +} + +void* CoreCodegen::gpr_malloc(size_t size) { return ::gpr_malloc(size); } + +void CoreCodegen::gpr_free(void* p) { return ::gpr_free(p); } + +void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) { + ::grpc_byte_buffer_destroy(bb); +} + +void CoreCodegen::grpc_metadata_array_init(grpc_metadata_array* array) { + ::grpc_metadata_array_init(array); +} + +void CoreCodegen::grpc_metadata_array_destroy(grpc_metadata_array* array) { + ::grpc_metadata_array_destroy(array); +} + +gpr_timespec CoreCodegen::gpr_inf_future(gpr_clock_type type) { + return ::gpr_inf_future(type); +} + +void CoreCodegen::assert_fail(const char* failed_assertion) { + gpr_log(GPR_ERROR, "assertion failed: %s", failed_assertion); + abort(); +} + +Status CoreCodegen::SerializeProto(const grpc::protobuf::Message& msg, + grpc_byte_buffer** bp) { GPR_TIMER_SCOPE("SerializeProto", 0); int byte_size = msg.ByteSize(); - if (byte_size <= kMaxBufferLength) { + if (byte_size <= kGrpcBufferWriterMaxBufferLength) { gpr_slice slice = gpr_slice_malloc(byte_size); GPR_ASSERT(GPR_SLICE_END_PTR(slice) == msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice))); @@ -176,31 +221,36 @@ Status SerializeProto(const grpc::protobuf::Message& msg, gpr_slice_unref(slice); return Status::OK; } else { - GrpcBufferWriter writer(bp); + GrpcBufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength); return msg.SerializeToZeroCopyStream(&writer) ? Status::OK : Status(StatusCode::INTERNAL, "Failed to serialize message"); } } -Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, - int max_message_size) { +Status CoreCodegen::DeserializeProto(grpc_byte_buffer* buffer, + grpc::protobuf::Message* msg, + int max_message_size) { GPR_TIMER_SCOPE("DeserializeProto", 0); - if (!buffer) { + if (buffer == nullptr) { return Status(StatusCode::INTERNAL, "No payload"); } - GrpcBufferReader reader(buffer); - ::grpc::protobuf::io::CodedInputStream decoder(&reader); - if (max_message_size > 0) { - decoder.SetTotalBytesLimit(max_message_size, max_message_size); - } - if (!msg->ParseFromCodedStream(&decoder)) { - return Status(StatusCode::INTERNAL, msg->InitializationErrorString()); - } - if (!decoder.ConsumedEntireMessage()) { - return Status(StatusCode::INTERNAL, "Did not read entire message"); + Status result = Status::OK; + { + GrpcBufferReader reader(buffer); + ::grpc::protobuf::io::CodedInputStream decoder(&reader); + if (max_message_size > 0) { + decoder.SetTotalBytesLimit(max_message_size, max_message_size); + } + if (!msg->ParseFromCodedStream(&decoder)) { + result = Status(StatusCode::INTERNAL, msg->InitializationErrorString()); + } + if (!decoder.ConsumedEntireMessage()) { + result = Status(StatusCode::INTERNAL, "Did not read entire message"); + } } - return Status::OK; + grpc_byte_buffer_destroy(buffer); + return result; } } // namespace grpc diff --git a/src/cpp/common/core_codegen.h b/src/cpp/common/core_codegen.h new file mode 100644 index 0000000000..0d8c6b79f7 --- /dev/null +++ b/src/cpp/common/core_codegen.h @@ -0,0 +1,71 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +// This file should be compiled as part of grpc++. + +#include <grpc++/impl/codegen/core_codegen_interface.h> +#include <grpc/impl/codegen/grpc_types.h> +#include <grpc/byte_buffer.h> + +namespace grpc { + +/// Implementation of the core codegen interface. +class CoreCodegen : public CoreCodegenInterface { + private: + Status SerializeProto(const grpc::protobuf::Message& msg, + grpc_byte_buffer** bp) override; + + Status DeserializeProto(grpc_byte_buffer* buffer, + grpc::protobuf::Message* msg, + int max_message_size) override; + + grpc_completion_queue* grpc_completion_queue_create(void* reserved) override; + void grpc_completion_queue_destroy(grpc_completion_queue* cq) override; + grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag, + gpr_timespec deadline, + void* reserved) override; + + void* gpr_malloc(size_t size) override; + void gpr_free(void* p) override; + + void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override; + + void grpc_metadata_array_init(grpc_metadata_array* array) override; + void grpc_metadata_array_destroy(grpc_metadata_array* array) override; + + gpr_timespec gpr_inf_future(gpr_clock_type type) override; + + void assert_fail(const char* failed_assertion) override; +}; + +} // namespace grpc diff --git a/src/cpp/util/string_ref.cc b/src/cpp/util/string_ref.cc index 66c79a1818..b55019b5f2 100644 --- a/src/cpp/util/string_ref.cc +++ b/src/cpp/util/string_ref.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -33,72 +33,8 @@ #include <grpc++/support/string_ref.h> -#include <string.h> - -#include <algorithm> -#include <iostream> - namespace grpc { const size_t string_ref::npos = size_t(-1); -string_ref& string_ref::operator=(const string_ref& rhs) { - data_ = rhs.data_; - length_ = rhs.length_; - return *this; -} - -string_ref::string_ref(const char* s) : data_(s), length_(strlen(s)) {} - -string_ref string_ref::substr(size_t pos, size_t n) const { - if (pos > length_) pos = length_; - if (n > (length_ - pos)) n = length_ - pos; - return string_ref(data_ + pos, n); -} - -int string_ref::compare(string_ref x) const { - size_t min_size = length_ < x.length_ ? length_ : x.length_; - int r = memcmp(data_, x.data_, min_size); - if (r < 0) return -1; - if (r > 0) return 1; - if (length_ < x.length_) return -1; - if (length_ > x.length_) return 1; - return 0; -} - -bool string_ref::starts_with(string_ref x) const { - return length_ >= x.length_ && (memcmp(data_, x.data_, x.length_) == 0); -} - -bool string_ref::ends_with(string_ref x) const { - return length_ >= x.length_ && - (memcmp(data_ + (length_ - x.length_), x.data_, x.length_) == 0); -} - -size_t string_ref::find(string_ref s) const { - auto it = std::search(cbegin(), cend(), s.cbegin(), s.cend()); - return it == cend() ? npos : std::distance(cbegin(), it); -} - -size_t string_ref::find(char c) const { - auto it = std::find(cbegin(), cend(), c); - return it == cend() ? npos : std::distance(cbegin(), it); -} - -bool operator==(string_ref x, string_ref y) { return x.compare(y) == 0; } - -bool operator!=(string_ref x, string_ref y) { return x.compare(y) != 0; } - -bool operator<(string_ref x, string_ref y) { return x.compare(y) < 0; } - -bool operator<=(string_ref x, string_ref y) { return x.compare(y) <= 0; } - -bool operator>(string_ref x, string_ref y) { return x.compare(y) > 0; } - -bool operator>=(string_ref x, string_ref y) { return x.compare(y) >= 0; } - -std::ostream& operator<<(std::ostream& out, const string_ref& string) { - return out << grpc::string(string.begin(), string.end()); -} - } // namespace grpc diff --git a/src/csharp/README.md b/src/csharp/README.md index b4fa945ac9..201c5ab0b5 100644 --- a/src/csharp/README.md +++ b/src/csharp/README.md @@ -55,16 +55,11 @@ If you are a user of gRPC C#, go to Usage section above. **Windows** -- The grpc_csharp_ext native library needs to be built so you can build the gRPC C# solution. You can - either build the native solution in `vsprojects/grpc_csharp_ext.sln` from Visual Studio manually, or you can use - a convenience batch script that builds everything for you. +- The grpc_csharp_ext native library needs to be built so you can build the gRPC C# solution. Open the + solution `vsprojects/grpc_csharp_ext.sln` in Visual Studio and build it. - ``` - > REM From src/csharp directory - > buildall.bat - ``` - -- Open Grpc.sln using Visual Studio. +- Open `src\csharp\Grpc.sln` (path is relative to gRPC repository root) + using Visual Studio **Linux** @@ -79,7 +74,7 @@ If you are a user of gRPC C#, go to Usage section above. **Mac OS X** - The grpc_csharp_ext native library needs to be built so you can build the gRPC C# solution. - + ```sh # from the gRPC repository root $ tools/run_tests/run_tests.py -c dbg -l csharp --build_only diff --git a/src/python/grpcio/README.rst b/src/python/grpcio/README.rst index 3dfae50b4b..3f4c6fad02 100644 --- a/src/python/grpcio/README.rst +++ b/src/python/grpcio/README.rst @@ -35,13 +35,14 @@ package named :code:`python-dev`). :: - $ export REPO_ROOT=grpc + $ export REPO_ROOT=grpc # REPO_ROOT can be any directory of your choice $ git clone https://github.com/grpc/grpc.git $REPO_ROOT $ cd $REPO_ROOT - $ pip install . -Note that :code:`$REPO_ROOT` can be assigned to whatever directory name floats -your fancy. + # For the next two commands do `sudo pip install` if you get permission-denied errors + $ pip install -rrequirements.txt + $ GRPC_PYTHON_BUILD_WITH_CYTHON=1 pip install . + Troubleshooting ~~~~~~~~~~~~~~~ diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi index 80f4da51e8..d1b9c98ffc 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -40,14 +40,17 @@ cdef class Call: def start_batch(self, operations, tag): if not self.is_valid: raise ValueError("invalid call object cannot be used from Python") + cdef grpc_call_error result cdef Operations cy_operations = Operations(operations) cdef OperationTag operation_tag = OperationTag(tag) operation_tag.operation_call = self operation_tag.batch_operations = cy_operations cpython.Py_INCREF(operation_tag) - return grpc_call_start_batch( - self.c_call, cy_operations.c_ops, cy_operations.c_nops, - <cpython.PyObject *>operation_tag, NULL) + with nogil: + result = grpc_call_start_batch( + self.c_call, cy_operations.c_ops, cy_operations.c_nops, + <cpython.PyObject *>operation_tag, NULL) + return result def cancel( self, grpc_status_code error_code=GRPC_STATUS__DO_NOT_USE, @@ -57,6 +60,8 @@ cdef class Call: if (details is None) != (error_code == GRPC_STATUS__DO_NOT_USE): raise ValueError("if error_code is specified, so must details " "(and vice-versa)") + cdef grpc_call_error result + cdef char *c_details = NULL if error_code != GRPC_STATUS__DO_NOT_USE: if isinstance(details, bytes): pass @@ -65,25 +70,37 @@ cdef class Call: else: raise TypeError("expected details to be str or bytes") self.references.append(details) - return grpc_call_cancel_with_status( - self.c_call, error_code, details, NULL) + c_details = details + with nogil: + result = grpc_call_cancel_with_status( + self.c_call, error_code, c_details, NULL) + return result else: - return grpc_call_cancel(self.c_call, NULL) + with nogil: + result = grpc_call_cancel(self.c_call, NULL) + return result def set_credentials( self, CallCredentials call_credentials not None): - return grpc_call_set_credentials( - self.c_call, call_credentials.c_credentials) + cdef grpc_call_error result + with nogil: + result = grpc_call_set_credentials( + self.c_call, call_credentials.c_credentials) + return result def peer(self): - cdef char *peer = grpc_call_get_peer(self.c_call) + cdef char *peer = NULL + with nogil: + peer = grpc_call_get_peer(self.c_call) result = <bytes>peer - gpr_free(peer) + with nogil: + gpr_free(peer) return result def __dealloc__(self): if self.c_call != NULL: - grpc_call_destroy(self.c_call) + with nogil: + grpc_call_destroy(self.c_call) # The object *should* always be valid from Python. Used for debugging. @property diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index bc03c4dcf1..d612c90791 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -35,6 +35,7 @@ cdef class Channel: def __cinit__(self, target, ChannelArgs arguments=None, ChannelCredentials channel_credentials=None): cdef grpc_channel_args *c_arguments = NULL + cdef char *c_target = NULL self.c_channel = NULL self.references = [] if arguments is not None: @@ -45,12 +46,15 @@ cdef class Channel: target = target.encode() else: raise TypeError("expected target to be str or bytes") + c_target = target if channel_credentials is None: - self.c_channel = grpc_insecure_channel_create(target, c_arguments, - NULL) + with nogil: + self.c_channel = grpc_insecure_channel_create(c_target, c_arguments, + NULL) else: - self.c_channel = grpc_secure_channel_create( - channel_credentials.c_credentials, target, c_arguments, NULL) + with nogil: + self.c_channel = grpc_secure_channel_create( + channel_credentials.c_credentials, c_target, c_arguments, NULL) self.references.append(channel_credentials) self.references.append(target) self.references.append(arguments) @@ -66,6 +70,7 @@ cdef class Channel: method = method.encode() else: raise TypeError("expected method to be str or bytes") + cdef char *method_c_string = method cdef char *host_c_string = NULL if host is None: pass @@ -81,31 +86,40 @@ cdef class Channel: cdef grpc_call *parent_call = NULL if parent is not None: parent_call = parent.c_call - operation_call.c_call = grpc_channel_create_call( - self.c_channel, parent_call, flags, - queue.c_completion_queue, method, host_c_string, deadline.c_time, - NULL) + with nogil: + operation_call.c_call = grpc_channel_create_call( + self.c_channel, parent_call, flags, + queue.c_completion_queue, method_c_string, host_c_string, + deadline.c_time, NULL) return operation_call def check_connectivity_state(self, bint try_to_connect): - return grpc_channel_check_connectivity_state(self.c_channel, - try_to_connect) + cdef grpc_connectivity_state result + with nogil: + result = grpc_channel_check_connectivity_state(self.c_channel, + try_to_connect) + return result def watch_connectivity_state( self, grpc_connectivity_state last_observed_state, Timespec deadline not None, CompletionQueue queue not None, tag): cdef OperationTag operation_tag = OperationTag(tag) cpython.Py_INCREF(operation_tag) - grpc_channel_watch_connectivity_state( - self.c_channel, last_observed_state, deadline.c_time, - queue.c_completion_queue, <cpython.PyObject *>operation_tag) + with nogil: + grpc_channel_watch_connectivity_state( + self.c_channel, last_observed_state, deadline.c_time, + queue.c_completion_queue, <cpython.PyObject *>operation_tag) def target(self): - cdef char * target = grpc_channel_get_target(self.c_channel) + cdef char *target = NULL + with nogil: + target = grpc_channel_get_target(self.c_channel) result = <bytes>target - gpr_free(target) + with nogil: + gpr_free(target) return result def __dealloc__(self): if self.c_channel != NULL: - grpc_channel_destroy(self.c_channel) + with nogil: + grpc_channel_destroy(self.c_channel) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index e11138b1cd..09e47d4222 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -36,7 +36,8 @@ import time cdef class CompletionQueue: def __cinit__(self): - self.c_completion_queue = grpc_completion_queue_create(NULL) + with nogil: + self.c_completion_queue = grpc_completion_queue_create(NULL) self.is_shutting_down = False self.is_shutdown = False self.pluck_condition = threading.Condition() @@ -82,8 +83,9 @@ cdef class CompletionQueue: def poll(self, Timespec deadline=None): # We name this 'poll' to avoid problems with CPython's expectations for # 'special' methods (like next and __next__). - cdef gpr_timespec c_deadline = gpr_inf_future( - GPR_CLOCK_REALTIME) + cdef gpr_timespec c_deadline + with nogil: + c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) if deadline is not None: c_deadline = deadline.c_time cdef grpc_event event @@ -123,7 +125,8 @@ cdef class CompletionQueue: return self._interpret_event(event) def shutdown(self): - grpc_completion_queue_shutdown(self.c_completion_queue) + with nogil: + grpc_completion_queue_shutdown(self.c_completion_queue) self.is_shutting_down = True def clear(self): @@ -133,15 +136,19 @@ cdef class CompletionQueue: pass def __dealloc__(self): - cdef gpr_timespec c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) + cdef gpr_timespec c_deadline + with nogil: + c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) if self.c_completion_queue != NULL: # Ensure shutdown if not self.is_shutting_down: - grpc_completion_queue_shutdown(self.c_completion_queue) + with nogil: + grpc_completion_queue_shutdown(self.c_completion_queue) # Pump the queue while not self.is_shutdown: with nogil: event = grpc_completion_queue_next( self.c_completion_queue, c_deadline, NULL) self._interpret_event(event) - grpc_completion_queue_destroy(self.c_completion_queue) + with nogil: + grpc_completion_queue_destroy(self.c_completion_queue) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi index 3f439c8900..1d7adca23e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -46,7 +46,8 @@ cdef class ChannelCredentials: def __dealloc__(self): if self.c_credentials != NULL: - grpc_channel_credentials_release(self.c_credentials) + with nogil: + grpc_channel_credentials_release(self.c_credentials) cdef class CallCredentials: @@ -63,7 +64,8 @@ cdef class CallCredentials: def __dealloc__(self): if self.c_credentials != NULL: - grpc_call_credentials_release(self.c_credentials) + with nogil: + grpc_call_credentials_release(self.c_credentials) cdef class ServerCredentials: @@ -74,7 +76,8 @@ cdef class ServerCredentials: def __dealloc__(self): if self.c_credentials != NULL: - grpc_server_credentials_release(self.c_credentials) + with nogil: + grpc_server_credentials_release(self.c_credentials) cdef class CredentialsMetadataPlugin: @@ -139,7 +142,8 @@ cdef void plugin_destroy_c_plugin_state(void *state): def channel_credentials_google_default(): cdef ChannelCredentials credentials = ChannelCredentials(); - credentials.c_credentials = grpc_google_default_credentials_create() + with nogil: + credentials.c_credentials = grpc_google_default_credentials_create() return credentials def channel_credentials_ssl(pem_root_certificates, @@ -158,12 +162,14 @@ def channel_credentials_ssl(pem_root_certificates, c_pem_root_certificates = pem_root_certificates credentials.references.append(pem_root_certificates) if ssl_pem_key_cert_pair is not None: - credentials.c_credentials = grpc_ssl_credentials_create( - c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL) + with nogil: + credentials.c_credentials = grpc_ssl_credentials_create( + c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL) credentials.references.append(ssl_pem_key_cert_pair) else: - credentials.c_credentials = grpc_ssl_credentials_create( - c_pem_root_certificates, NULL, NULL) + with nogil: + credentials.c_credentials = grpc_ssl_credentials_create( + c_pem_root_certificates, NULL, NULL) return credentials def channel_credentials_composite( @@ -172,8 +178,9 @@ def channel_credentials_composite( if not credentials_1.is_valid or not credentials_2.is_valid: raise ValueError("passed credentials must both be valid") cdef ChannelCredentials credentials = ChannelCredentials() - credentials.c_credentials = grpc_composite_channel_credentials_create( - credentials_1.c_credentials, credentials_2.c_credentials, NULL) + with nogil: + credentials.c_credentials = grpc_composite_channel_credentials_create( + credentials_1.c_credentials, credentials_2.c_credentials, NULL) credentials.references.append(credentials_1) credentials.references.append(credentials_2) return credentials @@ -184,16 +191,18 @@ def call_credentials_composite( if not credentials_1.is_valid or not credentials_2.is_valid: raise ValueError("passed credentials must both be valid") cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = grpc_composite_call_credentials_create( - credentials_1.c_credentials, credentials_2.c_credentials, NULL) + with nogil: + credentials.c_credentials = grpc_composite_call_credentials_create( + credentials_1.c_credentials, credentials_2.c_credentials, NULL) credentials.references.append(credentials_1) credentials.references.append(credentials_2) return credentials def call_credentials_google_compute_engine(): cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = ( - grpc_google_compute_engine_credentials_create(NULL)) + with nogil: + credentials.c_credentials = ( + grpc_google_compute_engine_credentials_create(NULL)) return credentials def call_credentials_service_account_jwt_access( @@ -205,9 +214,11 @@ def call_credentials_service_account_jwt_access( else: raise TypeError("expected json_key to be str or bytes") cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = ( - grpc_service_account_jwt_access_credentials_create( - json_key, token_lifetime.c_time, NULL)) + cdef char *json_key_c_string = json_key + with nogil: + credentials.c_credentials = ( + grpc_service_account_jwt_access_credentials_create( + json_key_c_string, token_lifetime.c_time, NULL)) credentials.references.append(json_key) return credentials @@ -219,8 +230,10 @@ def call_credentials_google_refresh_token(json_refresh_token): else: raise TypeError("expected json_refresh_token to be str or bytes") cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = grpc_google_refresh_token_credentials_create( - json_refresh_token, NULL) + cdef char *json_refresh_token_c_string = json_refresh_token + with nogil: + credentials.c_credentials = grpc_google_refresh_token_credentials_create( + json_refresh_token_c_string, NULL) credentials.references.append(json_refresh_token) return credentials @@ -238,17 +251,21 @@ def call_credentials_google_iam(authorization_token, authority_selector): else: raise TypeError("expected authority_selector to be str or bytes") cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = grpc_google_iam_credentials_create( - authorization_token, authority_selector, NULL) + cdef char *authorization_token_c_string = authorization_token + cdef char *authority_selector_c_string = authority_selector + with nogil: + credentials.c_credentials = grpc_google_iam_credentials_create( + authorization_token_c_string, authority_selector_c_string, NULL) credentials.references.append(authorization_token) credentials.references.append(authority_selector) return credentials def call_credentials_metadata_plugin(CredentialsMetadataPlugin plugin): cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = ( - grpc_metadata_credentials_create_from_plugin(plugin.make_c_plugin(), - NULL)) + cdef grpc_metadata_credentials_plugin c_plugin = plugin.make_c_plugin() + with nogil: + credentials.c_credentials = ( + grpc_metadata_credentials_create_from_plugin(c_plugin, NULL)) # TODO(atash): the following held reference is *probably* never necessary credentials.references.append(plugin) return credentials @@ -274,11 +291,12 @@ def server_credentials_ssl(pem_root_certs, pem_key_cert_pairs, credentials.references.append(pem_key_cert_pairs) credentials.references.append(pem_root_certs) credentials.c_ssl_pem_key_cert_pairs_count = len(pem_key_cert_pairs) - credentials.c_ssl_pem_key_cert_pairs = ( - <grpc_ssl_pem_key_cert_pair *>gpr_malloc( - sizeof(grpc_ssl_pem_key_cert_pair) * - credentials.c_ssl_pem_key_cert_pairs_count - )) + with nogil: + credentials.c_ssl_pem_key_cert_pairs = ( + <grpc_ssl_pem_key_cert_pair *>gpr_malloc( + sizeof(grpc_ssl_pem_key_cert_pair) * + credentials.c_ssl_pem_key_cert_pairs_count + )) for i in range(credentials.c_ssl_pem_key_cert_pairs_count): credentials.c_ssl_pem_key_cert_pairs[i] = ( (<SslPemKeyCertPair>pem_key_cert_pairs[i]).c_pair) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index dbf0045710..61165cb021 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -38,27 +38,27 @@ cdef extern from "grpc/_cython/loader.h": int pygrpc_load_core(char*) - void *gpr_malloc(size_t size) - void gpr_free(void *ptr) - void *gpr_realloc(void *p, size_t size) + void *gpr_malloc(size_t size) nogil + void gpr_free(void *ptr) nogil + void *gpr_realloc(void *p, size_t size) nogil ctypedef struct gpr_slice: # don't worry about writing out the members of gpr_slice; we never access # them directly. pass - gpr_slice gpr_slice_ref(gpr_slice s) - void gpr_slice_unref(gpr_slice s) - gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) + gpr_slice gpr_slice_ref(gpr_slice s) nogil + void gpr_slice_unref(gpr_slice s) nogil + gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil gpr_slice gpr_slice_new_with_len( - void *p, size_t len, void (*destroy)(void *, size_t)) - gpr_slice gpr_slice_malloc(size_t length) - gpr_slice gpr_slice_from_copied_string(const char *source) - gpr_slice gpr_slice_from_copied_buffer(const char *source, size_t len) + void *p, size_t len, void (*destroy)(void *, size_t)) nogil + gpr_slice gpr_slice_malloc(size_t length) nogil + gpr_slice gpr_slice_from_copied_string(const char *source) nogil + gpr_slice gpr_slice_from_copied_buffer(const char *source, size_t len) nogil # Declare functions for function-like macros (because Cython)... - void *gpr_slice_start_ptr "GPR_SLICE_START_PTR" (gpr_slice s) - size_t gpr_slice_length "GPR_SLICE_LENGTH" (gpr_slice s) + void *gpr_slice_start_ptr "GPR_SLICE_START_PTR" (gpr_slice s) nogil + size_t gpr_slice_length "GPR_SLICE_LENGTH" (gpr_slice s) nogil ctypedef enum gpr_clock_type: GPR_CLOCK_MONOTONIC @@ -71,14 +71,14 @@ cdef extern from "grpc/_cython/loader.h": int32_t nanoseconds "tv_nsec" gpr_clock_type clock_type - gpr_timespec gpr_time_0(gpr_clock_type type) - gpr_timespec gpr_inf_future(gpr_clock_type type) - gpr_timespec gpr_inf_past(gpr_clock_type type) + gpr_timespec gpr_time_0(gpr_clock_type type) nogil + gpr_timespec gpr_inf_future(gpr_clock_type type) nogil + gpr_timespec gpr_inf_past(gpr_clock_type type) nogil - gpr_timespec gpr_now(gpr_clock_type clock) + gpr_timespec gpr_now(gpr_clock_type clock) nogil gpr_timespec gpr_convert_clock_type(gpr_timespec t, - gpr_clock_type target_clock) + gpr_clock_type target_clock) nogil ctypedef enum grpc_status_code: GRPC_STATUS_OK @@ -114,15 +114,15 @@ cdef extern from "grpc/_cython/loader.h": pass grpc_byte_buffer *grpc_raw_byte_buffer_create(gpr_slice *slices, - size_t nslices) - size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) - void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer) + size_t nslices) nogil + size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) nogil + void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer) nogil void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader, - grpc_byte_buffer *buffer) + grpc_byte_buffer *buffer) nogil int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader, - gpr_slice *slice) - void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) + gpr_slice *slice) nogil + void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) nogil const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING const char *GRPC_ARG_ENABLE_CENSUS @@ -221,8 +221,8 @@ cdef extern from "grpc/_cython/loader.h": size_t capacity grpc_metadata *metadata - void grpc_metadata_array_init(grpc_metadata_array *array) - void grpc_metadata_array_destroy(grpc_metadata_array *array) + void grpc_metadata_array_init(grpc_metadata_array *array) nogil + void grpc_metadata_array_destroy(grpc_metadata_array *array) nogil ctypedef struct grpc_call_details: char *method @@ -231,8 +231,8 @@ cdef extern from "grpc/_cython/loader.h": size_t host_capacity gpr_timespec deadline - void grpc_call_details_init(grpc_call_details *details) - void grpc_call_details_destroy(grpc_call_details *details) + void grpc_call_details_init(grpc_call_details *details) nogil + void grpc_call_details_destroy(grpc_call_details *details) nogil ctypedef enum grpc_op_type: GRPC_OP_SEND_INITIAL_METADATA @@ -277,61 +277,62 @@ cdef extern from "grpc/_cython/loader.h": uint32_t flags grpc_op_data data - void grpc_init() - void grpc_shutdown() + void grpc_init() nogil + void grpc_shutdown() nogil - grpc_completion_queue *grpc_completion_queue_create(void *reserved) + grpc_completion_queue *grpc_completion_queue_create(void *reserved) nogil grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) nogil grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved) nogil - void grpc_completion_queue_shutdown(grpc_completion_queue *cq) - void grpc_completion_queue_destroy(grpc_completion_queue *cq) + void grpc_completion_queue_shutdown(grpc_completion_queue *cq) nogil + void grpc_completion_queue_destroy(grpc_completion_queue *cq) nogil - grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, - size_t nops, void *tag, void *reserved) - grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) + grpc_call_error grpc_call_start_batch( + grpc_call *call, const grpc_op *ops, size_t nops, void *tag, + void *reserved) nogil + grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) nogil grpc_call_error grpc_call_cancel_with_status(grpc_call *call, grpc_status_code status, const char *description, - void *reserved) - char *grpc_call_get_peer(grpc_call *call) - void grpc_call_destroy(grpc_call *call) + void *reserved) nogil + char *grpc_call_get_peer(grpc_call *call) nogil + void grpc_call_destroy(grpc_call *call) nogil grpc_channel *grpc_insecure_channel_create(const char *target, const grpc_channel_args *args, - void *reserved) - grpc_call *grpc_channel_create_call(grpc_channel *channel, - grpc_call *parent_call, - uint32_t propagation_mask, - grpc_completion_queue *completion_queue, - const char *method, const char *host, - gpr_timespec deadline, void *reserved) + void *reserved) nogil + grpc_call *grpc_channel_create_call( + grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, + grpc_completion_queue *completion_queue, const char *method, + const char *host, gpr_timespec deadline, void *reserved) nogil grpc_connectivity_state grpc_channel_check_connectivity_state( - grpc_channel *channel, int try_to_connect) + grpc_channel *channel, int try_to_connect) nogil void grpc_channel_watch_connectivity_state( grpc_channel *channel, grpc_connectivity_state last_observed_state, - gpr_timespec deadline, grpc_completion_queue *cq, void *tag) - char *grpc_channel_get_target(grpc_channel *channel) - void grpc_channel_destroy(grpc_channel *channel) + gpr_timespec deadline, grpc_completion_queue *cq, void *tag) nogil + char *grpc_channel_get_target(grpc_channel *channel) nogil + void grpc_channel_destroy(grpc_channel *channel) nogil - grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) + grpc_server *grpc_server_create( + const grpc_channel_args *args, void *reserved) nogil grpc_call_error grpc_server_request_call( grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void - *tag_new) + *tag_new) nogil void grpc_server_register_completion_queue(grpc_server *server, grpc_completion_queue *cq, - void *reserved) - int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) - void grpc_server_start(grpc_server *server) + void *reserved) nogil + int grpc_server_add_insecure_http2_port( + grpc_server *server, const char *addr) nogil + void grpc_server_start(grpc_server *server) nogil void grpc_server_shutdown_and_notify( - grpc_server *server, grpc_completion_queue *cq, void *tag) - void grpc_server_cancel_all_calls(grpc_server *server) - void grpc_server_destroy(grpc_server *server) + grpc_server *server, grpc_completion_queue *cq, void *tag) nogil + void grpc_server_cancel_all_calls(grpc_server *server) nogil + void grpc_server_destroy(grpc_server *server) nogil ctypedef struct grpc_ssl_pem_key_cert_pair: const char *private_key @@ -347,35 +348,36 @@ cdef extern from "grpc/_cython/loader.h": ctypedef void (*grpc_ssl_roots_override_callback)(char **pem_root_certs) - void grpc_set_ssl_roots_override_callback(grpc_ssl_roots_override_callback cb) + void grpc_set_ssl_roots_override_callback( + grpc_ssl_roots_override_callback cb) nogil - grpc_channel_credentials *grpc_google_default_credentials_create() + grpc_channel_credentials *grpc_google_default_credentials_create() nogil grpc_channel_credentials *grpc_ssl_credentials_create( const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pair, - void *reserved) + void *reserved) nogil grpc_channel_credentials *grpc_composite_channel_credentials_create( grpc_channel_credentials *creds1, grpc_call_credentials *creds2, - void *reserved) - void grpc_channel_credentials_release(grpc_channel_credentials *creds) + void *reserved) nogil + void grpc_channel_credentials_release(grpc_channel_credentials *creds) nogil grpc_call_credentials *grpc_composite_call_credentials_create( grpc_call_credentials *creds1, grpc_call_credentials *creds2, - void *reserved) + void *reserved) nogil grpc_call_credentials *grpc_google_compute_engine_credentials_create( - void *reserved) + void *reserved) nogil grpc_call_credentials *grpc_service_account_jwt_access_credentials_create( const char *json_key, - gpr_timespec token_lifetime, void *reserved) + gpr_timespec token_lifetime, void *reserved) nogil grpc_call_credentials *grpc_google_refresh_token_credentials_create( - const char *json_refresh_token, void *reserved) + const char *json_refresh_token, void *reserved) nogil grpc_call_credentials *grpc_google_iam_credentials_create( const char *authorization_token, const char *authority_selector, - void *reserved) - void grpc_call_credentials_release(grpc_call_credentials *creds) + void *reserved) nogil + void grpc_call_credentials_release(grpc_call_credentials *creds) nogil grpc_channel *grpc_secure_channel_create( grpc_channel_credentials *creds, const char *target, - const grpc_channel_args *args, void *reserved) + const grpc_channel_args *args, void *reserved) nogil ctypedef struct grpc_server_credentials: # We don't care about the internals (and in fact don't know them) @@ -385,13 +387,13 @@ cdef extern from "grpc/_cython/loader.h": const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, size_t num_key_cert_pairs, int force_client_auth, void *reserved) - void grpc_server_credentials_release(grpc_server_credentials *creds) + void grpc_server_credentials_release(grpc_server_credentials *creds) nogil int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, - grpc_server_credentials *creds) + grpc_server_credentials *creds) nogil grpc_call_error grpc_call_set_credentials(grpc_call *call, - grpc_call_credentials *creds) + grpc_call_credentials *creds) nogil ctypedef struct grpc_auth_context: # We don't care about the internals (and in fact don't know them) @@ -415,4 +417,4 @@ cdef extern from "grpc/_cython/loader.h": const char *type grpc_call_credentials *grpc_metadata_credentials_create_from_plugin( - grpc_metadata_credentials_plugin plugin, void *reserved) + grpc_metadata_credentials_plugin plugin, void *reserved) nogil diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index fa4ea99ea9..851389a261 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -107,15 +107,18 @@ cdef class Timespec: def __cinit__(self, time): if time is None: - self.c_time = gpr_now(GPR_CLOCK_REALTIME) + with nogil: + self.c_time = gpr_now(GPR_CLOCK_REALTIME) return if isinstance(time, int): time = float(time) if isinstance(time, float): if time == float("+inf"): - self.c_time = gpr_inf_future(GPR_CLOCK_REALTIME) + with nogil: + self.c_time = gpr_inf_future(GPR_CLOCK_REALTIME) elif time == float("-inf"): - self.c_time = gpr_inf_past(GPR_CLOCK_REALTIME) + with nogil: + self.c_time = gpr_inf_past(GPR_CLOCK_REALTIME) else: self.c_time.seconds = time self.c_time.nanoseconds = (time - float(self.c_time.seconds)) * 1e9 @@ -131,8 +134,10 @@ cdef class Timespec: # TODO(atash) ensure that everywhere a Timespec is created that it's # converted to GPR_CLOCK_REALTIME then and not every time someone wants to # read values off in Python. - cdef gpr_timespec real_time = ( - gpr_convert_clock_type(self.c_time, GPR_CLOCK_REALTIME)) + cdef gpr_timespec real_time + with nogil: + real_time = ( + gpr_convert_clock_type(self.c_time, GPR_CLOCK_REALTIME)) return real_time.seconds @property @@ -158,10 +163,12 @@ cdef class Timespec: cdef class CallDetails: def __cinit__(self): - grpc_call_details_init(&self.c_details) + with nogil: + grpc_call_details_init(&self.c_details) def __dealloc__(self): - grpc_call_details_destroy(&self.c_details) + with nogil: + grpc_call_details_destroy(&self.c_details) @property def method(self): @@ -229,10 +236,15 @@ cdef class ByteBuffer: "ByteBuffer, not {}".format(type(data))) cdef char *c_data = data - data_slice = gpr_slice_from_copied_buffer(c_data, len(data)) - self.c_byte_buffer = grpc_raw_byte_buffer_create( - &data_slice, 1) - gpr_slice_unref(data_slice) + cdef gpr_slice data_slice + cdef size_t data_length = len(data) + with nogil: + data_slice = gpr_slice_from_copied_buffer(c_data, data_length) + with nogil: + self.c_byte_buffer = grpc_raw_byte_buffer_create( + &data_slice, 1) + with nogil: + gpr_slice_unref(data_slice) def bytes(self): cdef grpc_byte_buffer_reader reader @@ -240,20 +252,27 @@ cdef class ByteBuffer: cdef size_t data_slice_length cdef void *data_slice_pointer if self.c_byte_buffer != NULL: - grpc_byte_buffer_reader_init(&reader, self.c_byte_buffer) + with nogil: + grpc_byte_buffer_reader_init(&reader, self.c_byte_buffer) result = b"" - while grpc_byte_buffer_reader_next(&reader, &data_slice): - data_slice_pointer = gpr_slice_start_ptr(data_slice) - data_slice_length = gpr_slice_length(data_slice) - result += (<char *>data_slice_pointer)[:data_slice_length] - grpc_byte_buffer_reader_destroy(&reader) + with nogil: + while grpc_byte_buffer_reader_next(&reader, &data_slice): + data_slice_pointer = gpr_slice_start_ptr(data_slice) + data_slice_length = gpr_slice_length(data_slice) + with gil: + result += (<char *>data_slice_pointer)[:data_slice_length] + with nogil: + grpc_byte_buffer_reader_destroy(&reader) return result else: return None def __len__(self): + cdef size_t result if self.c_byte_buffer != NULL: - return grpc_byte_buffer_length(self.c_byte_buffer) + with nogil: + result = grpc_byte_buffer_length(self.c_byte_buffer) + return result else: return 0 @@ -262,7 +281,8 @@ cdef class ByteBuffer: def __dealloc__(self): if self.c_byte_buffer != NULL: - grpc_byte_buffer_destroy(self.c_byte_buffer) + with nogil: + grpc_byte_buffer_destroy(self.c_byte_buffer) cdef class SslPemKeyCertPair: @@ -319,14 +339,15 @@ cdef class ChannelArgs: if not isinstance(arg, ChannelArg): raise TypeError("expected list of ChannelArg") self.c_args.arguments_length = len(self.args) - self.c_args.arguments = <grpc_arg *>gpr_malloc( - self.c_args.arguments_length*sizeof(grpc_arg) - ) + with nogil: + self.c_args.arguments = <grpc_arg *>gpr_malloc( + self.c_args.arguments_length*sizeof(grpc_arg)) for i in range(self.c_args.arguments_length): self.c_args.arguments[i] = (<ChannelArg>self.args[i]).c_arg def __dealloc__(self): - gpr_free(self.c_args.arguments) + with nogil: + gpr_free(self.c_args.arguments) def __len__(self): # self.args is never stale; it's only updated from this file @@ -407,21 +428,24 @@ cdef class Metadata: for metadatum in metadata: if not isinstance(metadatum, Metadatum): raise TypeError("expected list of Metadatum") - grpc_metadata_array_init(&self.c_metadata_array) + with nogil: + grpc_metadata_array_init(&self.c_metadata_array) self.c_metadata_array.count = len(self.metadata) self.c_metadata_array.capacity = len(self.metadata) - self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc( - self.c_metadata_array.count*sizeof(grpc_metadata) - ) + with nogil: + self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc( + self.c_metadata_array.count*sizeof(grpc_metadata) + ) for i in range(self.c_metadata_array.count): self.c_metadata_array.metadata[i] = ( (<Metadatum>self.metadata[i]).c_metadata) def __dealloc__(self): # this frees the allocated memory for the grpc_metadata_array (although - # it'd be nice if that were documented somewhere...) TODO(atash): document - # this in the C core - grpc_metadata_array_destroy(&self.c_metadata_array) + # it'd be nice if that were documented somewhere...) + # TODO(atash): document this in the C core + with nogil: + grpc_metadata_array_destroy(&self.c_metadata_array) def __len__(self): return self.c_metadata_array.count @@ -526,7 +550,8 @@ cdef class Operation: # Python. The remaining one(s) are primitive fields filled in by GRPC core. # This means that we need to clean up after receive_status_on_client. if self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT: - gpr_free(self._received_status_details) + with nogil: + gpr_free(self._received_status_details) def operation_send_initial_metadata(Metadata metadata): cdef Operation op = Operation() @@ -648,8 +673,8 @@ cdef class Operations: if not isinstance(operation, Operation): raise TypeError("expected operations to be iterable of Operation") self.c_nops = len(self.operations) - self.c_ops = <grpc_op *>gpr_malloc( - sizeof(grpc_op)*self.c_nops) + with nogil: + self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op)*self.c_nops) for i in range(self.c_nops): self.c_ops[i] = (<Operation>(self.operations[i])).c_op @@ -661,7 +686,8 @@ cdef class Operations: return self.operations[i] def __dealloc__(self): - gpr_free(self.c_ops) + with nogil: + gpr_free(self.c_ops) def __iter__(self): return _OperationsIterator(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 5f85923524..a098f11da2 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -41,7 +41,8 @@ cdef class Server: if arguments is not None: c_arguments = &arguments.c_args self.references.append(arguments) - self.c_server = grpc_server_create(c_arguments, NULL) + with nogil: + self.c_server = grpc_server_create(c_arguments, NULL) self.is_started = False self.is_shutting_down = False self.is_shutdown = False @@ -53,6 +54,7 @@ cdef class Server: raise ValueError("server must be started and not shutting down") if server_queue not in self.registered_completion_queues: raise ValueError("server_queue must be a registered completion queue") + cdef grpc_call_error result cdef OperationTag operation_tag = OperationTag(tag) operation_tag.operation_call = Call() operation_tag.request_call_details = CallDetails() @@ -61,19 +63,22 @@ cdef class Server: operation_tag.is_new_request = True operation_tag.batch_operations = Operations([]) cpython.Py_INCREF(operation_tag) - return grpc_server_request_call( - self.c_server, &operation_tag.operation_call.c_call, - &operation_tag.request_call_details.c_details, - &operation_tag.request_metadata.c_metadata_array, - call_queue.c_completion_queue, server_queue.c_completion_queue, - <cpython.PyObject *>operation_tag) + with nogil: + result = grpc_server_request_call( + self.c_server, &operation_tag.operation_call.c_call, + &operation_tag.request_call_details.c_details, + &operation_tag.request_metadata.c_metadata_array, + call_queue.c_completion_queue, server_queue.c_completion_queue, + <cpython.PyObject *>operation_tag) + return result def register_completion_queue( self, CompletionQueue queue not None): if self.is_started: raise ValueError("cannot register completion queues after start") - grpc_server_register_completion_queue( - self.c_server, queue.c_completion_queue, NULL) + with nogil: + grpc_server_register_completion_queue( + self.c_server, queue.c_completion_queue, NULL) self.registered_completion_queues.append(queue) def start(self): @@ -82,7 +87,8 @@ cdef class Server: self.backup_shutdown_queue = CompletionQueue() self.register_completion_queue(self.backup_shutdown_queue) self.is_started = True - grpc_server_start(self.c_server) + with nogil: + grpc_server_start(self.c_server) # Ensure the core has gotten a chance to do the start-up work self.backup_shutdown_queue.pluck(None, Timespec(None)) @@ -95,21 +101,28 @@ cdef class Server: else: raise TypeError("expected address to be a str or bytes") self.references.append(address) + cdef int result + cdef char *address_c_string = address if server_credentials is not None: self.references.append(server_credentials) - return grpc_server_add_secure_http2_port( - self.c_server, address, server_credentials.c_credentials) + with nogil: + result = grpc_server_add_secure_http2_port( + self.c_server, address_c_string, server_credentials.c_credentials) else: - return grpc_server_add_insecure_http2_port(self.c_server, address) + with nogil: + result = grpc_server_add_insecure_http2_port(self.c_server, + address_c_string) + return result cdef _c_shutdown(self, CompletionQueue queue, tag): self.is_shutting_down = True operation_tag = OperationTag(tag) operation_tag.shutting_down_server = self cpython.Py_INCREF(operation_tag) - grpc_server_shutdown_and_notify( - self.c_server, queue.c_completion_queue, - <cpython.PyObject *>operation_tag) + with nogil: + grpc_server_shutdown_and_notify( + self.c_server, queue.c_completion_queue, + <cpython.PyObject *>operation_tag) def shutdown(self, CompletionQueue queue not None, tag): cdef OperationTag operation_tag @@ -134,7 +147,8 @@ cdef class Server: elif self.is_shutdown: return else: - grpc_server_cancel_all_calls(self.c_server) + with nogil: + grpc_server_cancel_all_calls(self.c_server) def __dealloc__(self): if self.c_server != NULL: @@ -153,5 +167,6 @@ cdef class Server: # much but repeatedly release the GIL and wait while not self.is_shutdown: time.sleep(0) - grpc_server_destroy(self.c_server) + with nogil: + grpc_server_destroy(self.c_server) diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index 30cc7a132b..8a0f171ee7 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -57,14 +57,17 @@ cdef class _ModuleState: 'grpc._cython', '_windows/grpc_c.64.python') if not pygrpc_load_core(filename): raise ImportError('failed to load core gRPC library') - grpc_init() + with nogil: + grpc_init() self.is_loaded = True - grpc_set_ssl_roots_override_callback( - <grpc_ssl_roots_override_callback>ssl_roots_override_callback) + with nogil: + grpc_set_ssl_roots_override_callback( + <grpc_ssl_roots_override_callback>ssl_roots_override_callback) def __dealloc__(self): if self.is_loaded: - grpc_shutdown() + with nogil: + grpc_shutdown() _module_state = _ModuleState() diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index a543791f5c..31e16e0491 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -34,6 +34,7 @@ CORE_SOURCE_FILES = [ 'src/core/profiling/stap_timers.c', 'src/core/support/alloc.c', 'src/core/support/avl.c', + 'src/core/support/backoff.c', 'src/core/support/cmdline.c', 'src/core/support/cpu_iphone.c', 'src/core/support/cpu_linux.c', diff --git a/src/python/grpcio/tests/_runner.py b/src/python/grpcio/tests/_runner.py index b0dbd92a49..32a31ce00e 100644 --- a/src/python/grpcio/tests/_runner.py +++ b/src/python/grpcio/tests/_runner.py @@ -43,6 +43,13 @@ import uuid from tests import _loader from tests import _result +# This number needs to be large enough to outpace output on stdout and stderr +# from the gRPC core, otherwise we could end up in a potential deadlock. This +# stems from the OS waiting on someone to clear a filled pipe buffer while the +# GIL is held from a write to stderr from gRPC core, but said someone is in +# Python code thus necessitating GIL acquisition. +_READ_BYTES = 2**20 + class CapturePipe(object): """A context-manager pipe to redirect output to a byte array. @@ -76,6 +83,10 @@ class CapturePipe(object): flags = fcntl.fcntl(self._read_fd, fcntl.F_GETFL) fcntl.fcntl(self._read_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) self._read_thread = threading.Thread(target=self._read) + # If the user wants to exit from the Python program and hits ctrl-C and the + # read thread is somehow deadlocked with something else, the Python code may + # refuse to exit. This prevents that by making the read thread second-class. + self._read_thread.daemon = True self._read_thread.start() def stop(self): @@ -93,7 +104,7 @@ class CapturePipe(object): self.output = bytearray() while True: select.select([self._read_fd], [], []) - read_bytes = os.read(self._read_fd, 1024) + read_bytes = os.read(self._read_fd, _READ_BYTES) if read_bytes: self.output.extend(read_bytes) else: |