diff options
author | ncteisen <ncteisen@gmail.com> | 2018-01-02 11:44:03 -0800 |
---|---|---|
committer | ncteisen <ncteisen@gmail.com> | 2018-01-02 11:44:03 -0800 |
commit | 24902641d434005bee6641e62fb2b57fc0b6bd87 (patch) | |
tree | d9ab96b6531e30ca28044d571f2941f05cad52cb /src | |
parent | d230ad086a894cc963235b27814e19bf686eb7aa (diff) | |
parent | 63392f682e21543099926251b642cdcd0be2a17f (diff) |
Merge branch 'master' of https://github.com/grpc/grpc into flow-control-part4
Diffstat (limited to 'src')
74 files changed, 1405 insertions, 1044 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 965d91b68b..f35bfd9ab9 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -1568,11 +1568,24 @@ grpc::string GetMockIncludes(grpc_generator::File* file, static const char* headers_strs[] = { "grpc++/impl/codegen/async_stream.h", "grpc++/impl/codegen/sync_stream.h", - "gmock/gmock.h", }; std::vector<grpc::string> headers(headers_strs, array_end(headers_strs)); PrintIncludes(printer.get(), headers, params); + std::vector<grpc::string> gmock_header; + if (params.gmock_search_path.empty()) { + gmock_header.push_back("gmock/gmock.h"); + PrintIncludes(printer.get(), gmock_header, params); + } else { + gmock_header.push_back("gmock.h"); + // Copy a params to generate gmock header. + Parameters gmock_params(params); + // We use local includes when a gmock_search_path is given + gmock_params.use_system_headers = false; + gmock_params.grpc_search_path = params.gmock_search_path; + PrintIncludes(printer.get(), gmock_header, gmock_params); + } + if (!file->package().empty()) { std::vector<grpc::string> parts = file->package_parts(); diff --git a/src/compiler/cpp_generator.h b/src/compiler/cpp_generator.h index a93376acef..300a27c589 100644 --- a/src/compiler/cpp_generator.h +++ b/src/compiler/cpp_generator.h @@ -50,8 +50,10 @@ struct Parameters { bool use_system_headers; // Prefix to any grpc include grpc::string grpc_search_path; - // Generate GMOCK code to facilitate unit testing. + // Generate Google Mock code to facilitate unit testing. bool generate_mock_code; + // Google Mock search path, when non-empty, local includes will be used. + grpc::string gmock_search_path; }; // Return the prologue of the generated header file. diff --git a/src/compiler/cpp_plugin.cc b/src/compiler/cpp_plugin.cc index adac0e23d3..661282f880 100644 --- a/src/compiler/cpp_plugin.cc +++ b/src/compiler/cpp_plugin.cc @@ -78,6 +78,8 @@ class CppGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator { *error = grpc::string("Invalid parameter: ") + *parameter_string; return false; } + } else if (param[0] == "gmock_search_path") { + generator_parameters.gmock_search_path = param[1]; } else { *error = grpc::string("Unknown parameter: ") + *parameter_string; return false; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 3c64213fb9..dd6fc602ab 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -113,13 +113,13 @@ #include "src/core/lib/slice/slice_hash_table.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/static_metadata.h" -#define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120 @@ -408,7 +408,7 @@ typedef struct glb_lb_policy { grpc_slice lb_call_status_details; /** LB call retry backoff state */ - grpc_backoff lb_call_backoff_state; + grpc_core::ManualConstructor<grpc_core::BackOff> lb_call_backoff; /** LB call retry timer */ grpc_timer lb_call_retry_timer; @@ -1167,7 +1167,7 @@ static void start_picking_locked(glb_lb_policy* glb_policy) { } glb_policy->started_picking = true; - grpc_backoff_reset(&glb_policy->lb_call_backoff_state); + glb_policy->lb_call_backoff->Reset(); query_for_backends_locked(glb_policy); } @@ -1302,8 +1302,7 @@ static void maybe_restart_lb_call(glb_lb_policy* glb_policy) { glb_policy->updating_lb_call = false; } else if (!glb_policy->shutting_down) { /* if we aren't shutting down, restart the LB client call after some time */ - grpc_millis next_try = grpc_backoff_step(&glb_policy->lb_call_backoff_state) - .next_attempt_start_time; + grpc_millis next_try = glb_policy->lb_call_backoff->Step(); if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", glb_policy); @@ -1463,12 +1462,14 @@ static void lb_call_init_locked(glb_lb_policy* glb_policy) { lb_on_response_received_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner)); - grpc_backoff_init(&glb_policy->lb_call_backoff_state, - GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER, - GRPC_GRPCLB_RECONNECT_JITTER, - GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_core::BackOff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER) + .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + + glb_policy->lb_call_backoff.Init(backoff_options); glb_policy->seen_initial_response = false; glb_policy->last_client_load_report_counters_were_zero = false; @@ -1572,7 +1573,7 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) { memset(ops, 0, sizeof(ops)); grpc_op* op = ops; if (glb_policy->lb_response_payload != nullptr) { - grpc_backoff_reset(&glb_policy->lb_call_backoff_state); + glb_policy->lb_call_backoff->Reset(); /* Received data from the LB server. Look inside * glb_policy->lb_response_payload, for a serverlist. */ grpc_byte_buffer_reader bbr; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 4ec4477c82..4659a5f3ed 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -40,10 +40,10 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/json/json.h" #include "src/core/lib/support/env.h" +#include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/service_config.h" -#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1 #define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120 @@ -89,7 +89,7 @@ typedef struct { bool have_retry_timer; grpc_timer retry_timer; /** retry backoff state */ - grpc_backoff backoff_state; + grpc_core::ManualConstructor<grpc_core::BackOff> backoff; /** currently resolving addresses */ grpc_lb_addresses* lb_addresses; @@ -131,7 +131,7 @@ static void dns_ares_shutdown_locked(grpc_resolver* resolver) { static void dns_ares_channel_saw_error_locked(grpc_resolver* resolver) { ares_dns_resolver* r = (ares_dns_resolver*)resolver; if (!r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_ares_start_resolving_locked(r); } } @@ -264,8 +264,7 @@ static void dns_ares_on_resolved_locked(void* arg, grpc_error* error) { } else { const char* msg = grpc_error_string(error); gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg); - grpc_millis next_try = - grpc_backoff_step(&r->backoff_state).next_attempt_start_time; + grpc_millis next_try = r->backoff->Step(); grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); @@ -298,7 +297,7 @@ static void dns_ares_next_locked(grpc_resolver* resolver, r->next_completion = on_complete; r->target_result = target_result; if (r->resolved_version == 0 && !r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_ares_start_resolving_locked(r); } else { dns_ares_maybe_finish_next_locked(r); @@ -368,11 +367,13 @@ static grpc_resolver* dns_ares_create(grpc_resolver_args* args, if (args->pollset_set != nullptr) { grpc_pollset_set_add_pollset_set(r->interested_parties, args->pollset_set); } - grpc_backoff_init( - &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER, - GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_core::BackOff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_DNS_RECONNECT_JITTER) + .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + r->backoff.Init(grpc_core::BackOff(backoff_options)); GRPC_CLOSURE_INIT(&r->dns_ares_on_retry_timer_locked, dns_ares_on_retry_timer_locked, r, grpc_combiner_scheduler(r->base.combiner)); diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 77698e97aa..1c2cfc08e7 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -33,9 +33,9 @@ #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/support/env.h" +#include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/support/string.h" -#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1 #define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120 @@ -70,7 +70,7 @@ typedef struct { grpc_timer retry_timer; grpc_closure on_retry; /** retry backoff state */ - grpc_backoff backoff_state; + grpc_core::ManualConstructor<grpc_core::BackOff> backoff; /** currently resolving addresses */ grpc_resolved_addresses* addresses; @@ -106,7 +106,7 @@ static void dns_shutdown_locked(grpc_resolver* resolver) { static void dns_channel_saw_error_locked(grpc_resolver* resolver) { dns_resolver* r = (dns_resolver*)resolver; if (!r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_start_resolving_locked(r); } } @@ -119,7 +119,7 @@ static void dns_next_locked(grpc_resolver* resolver, r->next_completion = on_complete; r->target_result = target_result; if (r->resolved_version == 0 && !r->resolving) { - grpc_backoff_reset(&r->backoff_state); + r->backoff->Reset(); dns_start_resolving_locked(r); } else { dns_maybe_finish_next_locked(r); @@ -161,8 +161,7 @@ static void dns_on_resolved_locked(void* arg, grpc_error* error) { grpc_resolved_addresses_destroy(r->addresses); grpc_lb_addresses_destroy(addresses); } else { - grpc_millis next_try = - grpc_backoff_step(&r->backoff_state).next_attempt_start_time; + grpc_millis next_try = r->backoff->Step(); grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); @@ -244,11 +243,13 @@ static grpc_resolver* dns_create(grpc_resolver_args* args, if (args->pollset_set != nullptr) { grpc_pollset_set_add_pollset_set(r->interested_parties, args->pollset_set); } - grpc_backoff_init( - &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000, - GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER, - GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000, - GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + grpc_core::BackOff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_DNS_RECONNECT_JITTER) + .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + r->backoff.Init(grpc_core::BackOff(backoff_options)); return &r->base; } diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 84a5ace31d..f07394d29b 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -20,7 +20,9 @@ #include <inttypes.h> #include <limits.h> -#include <string.h> + +#include <algorithm> +#include <cstring> #include <grpc/support/alloc.h> #include <grpc/support/avl.h> @@ -39,6 +41,7 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/support/manual_constructor.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/connectivity_state.h" @@ -48,7 +51,7 @@ #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6 -#define GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS 20 +#define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 @@ -118,8 +121,9 @@ struct grpc_subchannel { external_state_watcher root_external_state_watcher; /** backoff state */ - grpc_backoff backoff_state; - grpc_backoff_result backoff_result; + grpc_core::ManualConstructor<grpc_core::BackOff> backoff; + grpc_millis next_attempt_deadline; + grpc_millis min_connect_timeout_ms; /** do we have an active alarm? */ bool have_alarm; @@ -274,6 +278,54 @@ void grpc_subchannel_weak_unref( } } +static void parse_args_for_backoff_values( + const grpc_channel_args* args, grpc_core::BackOff::Options* backoff_options, + grpc_millis* min_connect_timeout_ms) { + grpc_millis initial_backoff_ms = + GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; + *min_connect_timeout_ms = + GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000; + grpc_millis max_backoff_ms = + GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; + bool fixed_reconnect_backoff = false; + if (args != nullptr) { + for (size_t i = 0; i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, + "grpc.testing.fixed_reconnect_backoff_ms")) { + fixed_reconnect_backoff = true; + initial_backoff_ms = *min_connect_timeout_ms = max_backoff_ms = + grpc_channel_arg_get_integer( + &args->args[i], + {static_cast<int>(initial_backoff_ms), 100, INT_MAX}); + } else if (0 == + strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + *min_connect_timeout_ms = grpc_channel_arg_get_integer( + &args->args[i], + {static_cast<int>(*min_connect_timeout_ms), 100, INT_MAX}); + } else if (0 == + strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + max_backoff_ms = grpc_channel_arg_get_integer( + &args->args[i], {static_cast<int>(max_backoff_ms), 100, INT_MAX}); + } else if (0 == strcmp(args->args[i].key, + GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) { + fixed_reconnect_backoff = false; + initial_backoff_ms = grpc_channel_arg_get_integer( + &args->args[i], + {static_cast<int>(initial_backoff_ms), 100, INT_MAX}); + } + } + } + backoff_options->set_initial_backoff(initial_backoff_ms) + .set_multiplier(fixed_reconnect_backoff + ? 1.0 + : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(fixed_reconnect_backoff ? 0.0 + : GRPC_SUBCHANNEL_RECONNECT_JITTER) + .set_max_backoff(max_backoff_ms); +} + grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, const grpc_subchannel_args* args) { grpc_subchannel_key* key = grpc_subchannel_key_create(args); @@ -324,43 +376,10 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, grpc_schedule_on_exec_ctx); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); - int initial_backoff_ms = - GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; - int min_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000; - int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; - bool fixed_reconnect_backoff = false; - if (c->args) { - for (size_t i = 0; i < c->args->num_args; i++) { - if (0 == strcmp(c->args->args[i].key, - "grpc.testing.fixed_reconnect_backoff_ms")) { - fixed_reconnect_backoff = true; - initial_backoff_ms = min_backoff_ms = max_backoff_ms = - grpc_channel_arg_get_integer(&c->args->args[i], - {initial_backoff_ms, 100, INT_MAX}); - } else if (0 == strcmp(c->args->args[i].key, - GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) { - fixed_reconnect_backoff = false; - min_backoff_ms = grpc_channel_arg_get_integer( - &c->args->args[i], {min_backoff_ms, 100, INT_MAX}); - } else if (0 == strcmp(c->args->args[i].key, - GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { - fixed_reconnect_backoff = false; - max_backoff_ms = grpc_channel_arg_get_integer( - &c->args->args[i], {max_backoff_ms, 100, INT_MAX}); - } else if (0 == strcmp(c->args->args[i].key, - GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) { - fixed_reconnect_backoff = false; - initial_backoff_ms = grpc_channel_arg_get_integer( - &c->args->args[i], {initial_backoff_ms, 100, INT_MAX}); - } - } - } - grpc_backoff_init( - &c->backoff_state, initial_backoff_ms, - fixed_reconnect_backoff ? 1.0 - : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, - fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER, - min_backoff_ms, max_backoff_ms); + grpc_core::BackOff::Options backoff_options; + parse_args_for_backoff_values(args->args, &backoff_options, + &c->min_connect_timeout_ms); + c->backoff.Init(backoff_options); gpr_mu_init(&c->mu); return grpc_subchannel_index_register(key, c); @@ -368,11 +387,11 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, static void continue_connect_locked(grpc_subchannel* c) { grpc_connect_in_args args; - args.interested_parties = c->pollset_set; - args.deadline = c->backoff_result.current_deadline; + const grpc_millis min_deadline = + c->min_connect_timeout_ms + grpc_core::ExecCtx::Get()->Now(); + args.deadline = std::max(c->next_attempt_deadline, min_deadline); args.channel_args = c->args; - grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "state_change"); grpc_connector_connect(c->connector, &args, &c->connecting_result, @@ -416,7 +435,7 @@ static void on_alarm(void* arg, grpc_error* error) { } if (error == GRPC_ERROR_NONE) { gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); - c->backoff_result = grpc_backoff_step(&c->backoff_state); + c->next_attempt_deadline = c->backoff->Step(); continue_connect_locked(c); gpr_mu_unlock(&c->mu); } else { @@ -452,22 +471,20 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) { if (!c->backoff_begun) { c->backoff_begun = true; - c->backoff_result = grpc_backoff_begin(&c->backoff_state); + c->next_attempt_deadline = c->backoff->Begin(); continue_connect_locked(c); } else { GPR_ASSERT(!c->have_alarm); c->have_alarm = true; const grpc_millis time_til_next = - c->backoff_result.next_attempt_start_time - - grpc_core::ExecCtx::Get()->Now(); + c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now(); if (time_til_next <= 0) { gpr_log(GPR_INFO, "Retry immediately"); } else { gpr_log(GPR_INFO, "Retry in %" PRIdPTR " milliseconds", time_til_next); } GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); - grpc_timer_init(&c->alarm, c->backoff_result.next_attempt_start_time, - &c->on_alarm); + grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm); } } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index c13c4056c1..5ee1f08e61 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -79,7 +79,9 @@ static int g_default_server_keepalive_time_ms = DEFAULT_SERVER_KEEPALIVE_TIME_MS; static int g_default_server_keepalive_timeout_ms = DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS; -static bool g_default_keepalive_permit_without_calls = +static bool g_default_client_keepalive_permit_without_calls = + DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS; +static bool g_default_server_keepalive_permit_without_calls = DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS; static int g_default_min_sent_ping_interval_without_data_ms = @@ -346,6 +348,8 @@ static void init_transport(grpc_chttp2_transport* t, t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX ? GRPC_MILLIS_INF_FUTURE : g_default_client_keepalive_timeout_ms; + t->keepalive_permit_without_calls = + g_default_client_keepalive_permit_without_calls; } else { t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX ? GRPC_MILLIS_INF_FUTURE @@ -353,8 +357,9 @@ static void init_transport(grpc_chttp2_transport* t, t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX ? GRPC_MILLIS_INF_FUTURE : g_default_server_keepalive_timeout_ms; + t->keepalive_permit_without_calls = + g_default_server_keepalive_permit_without_calls; } - t->keepalive_permit_without_calls = g_default_keepalive_permit_without_calls; t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; @@ -2550,7 +2555,9 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, for (i = 0; i < args->num_args; i++) { if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) { const int value = grpc_channel_arg_get_integer( - &args->args[i], {g_default_client_keepalive_time_ms, 1, INT_MAX}); + &args->args[i], {is_client ? g_default_client_keepalive_time_ms + : g_default_server_keepalive_time_ms, + 1, INT_MAX}); if (is_client) { g_default_client_keepalive_time_ms = value; } else { @@ -2559,8 +2566,9 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, } else if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) { const int value = grpc_channel_arg_get_integer( - &args->args[i], - {g_default_client_keepalive_timeout_ms, 0, INT_MAX}); + &args->args[i], {is_client ? g_default_client_keepalive_timeout_ms + : g_default_server_keepalive_timeout_ms, + 0, INT_MAX}); if (is_client) { g_default_client_keepalive_timeout_ms = value; } else { @@ -2568,10 +2576,16 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args, } } else if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { - g_default_keepalive_permit_without_calls = - (uint32_t)grpc_channel_arg_get_integer( - &args->args[i], - {g_default_keepalive_permit_without_calls, 0, 1}); + const bool value = (uint32_t)grpc_channel_arg_get_integer( + &args->args[i], + {is_client ? g_default_client_keepalive_permit_without_calls + : g_default_server_keepalive_timeout_ms, + 0, 1}); + if (is_client) { + g_default_client_keepalive_permit_without_calls = value; + } else { + g_default_server_keepalive_permit_without_calls = value; + } } else if (0 == strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) { g_default_max_ping_strikes = grpc_channel_arg_get_integer( diff --git a/src/core/lib/backoff/backoff.cc b/src/core/lib/backoff/backoff.cc index da3b9b1b2d..41f625a636 100644 --- a/src/core/lib/backoff/backoff.cc +++ b/src/core/lib/backoff/backoff.cc @@ -18,61 +18,53 @@ #include "src/core/lib/backoff/backoff.h" +#include <algorithm> + #include <grpc/support/useful.h> -void grpc_backoff_init(grpc_backoff* backoff, grpc_millis initial_backoff, - double multiplier, double jitter, - grpc_millis min_connect_timeout, - grpc_millis max_backoff) { - backoff->initial_backoff = initial_backoff; - backoff->multiplier = multiplier; - backoff->jitter = jitter; - backoff->min_connect_timeout = min_connect_timeout; - backoff->max_backoff = max_backoff; - backoff->rng_state = (uint32_t)gpr_now(GPR_CLOCK_REALTIME).tv_nsec; -} +namespace grpc_core { -grpc_backoff_result grpc_backoff_begin(grpc_backoff* backoff) { - backoff->current_backoff = backoff->initial_backoff; - const grpc_millis initial_timeout = - GPR_MAX(backoff->initial_backoff, backoff->min_connect_timeout); - const grpc_millis now = grpc_core::ExecCtx::Get()->Now(); - const grpc_backoff_result result = {now + initial_timeout, - now + backoff->current_backoff}; - return result; -} +namespace { -/* Generate a random number between 0 and 1. */ -static double generate_uniform_random_number(uint32_t* rng_state) { - *rng_state = (1103515245 * *rng_state + 12345) % ((uint32_t)1 << 31); - return *rng_state / (double)((uint32_t)1 << 31); +/* Generate a random number between 0 and 1. We roll our own RNG because seeding + * rand() modifies a global variable we have no control over. */ +double generate_uniform_random_number(uint32_t* rng_state) { + constexpr uint32_t two_raise_31 = uint32_t(1) << 31; + *rng_state = (1103515245 * *rng_state + 12345) % two_raise_31; + return *rng_state / static_cast<double>(two_raise_31); } -static double generate_uniform_random_number_between(uint32_t* rng_state, - double a, double b) { +double generate_uniform_random_number_between(uint32_t* rng_state, double a, + double b) { if (a == b) return a; if (a > b) GPR_SWAP(double, a, b); // make sure a < b const double range = b - a; return a + generate_uniform_random_number(rng_state) * range; } +} // namespace -grpc_backoff_result grpc_backoff_step(grpc_backoff* backoff) { - backoff->current_backoff = (grpc_millis)(GPR_MIN( - backoff->current_backoff * backoff->multiplier, backoff->max_backoff)); - const double jitter = generate_uniform_random_number_between( - &backoff->rng_state, -backoff->jitter * backoff->current_backoff, - backoff->jitter * backoff->current_backoff); - const grpc_millis current_timeout = - GPR_MAX((grpc_millis)(backoff->current_backoff + jitter), - backoff->min_connect_timeout); - const grpc_millis next_timeout = GPR_MIN( - (grpc_millis)(backoff->current_backoff + jitter), backoff->max_backoff); - const grpc_millis now = grpc_core::ExecCtx::Get()->Now(); - const grpc_backoff_result result = {now + current_timeout, - now + next_timeout}; - return result; +BackOff::BackOff(const Options& options) : options_(options) { + rng_state_ = static_cast<uint32_t>(gpr_now(GPR_CLOCK_REALTIME).tv_nsec); +} + +grpc_millis BackOff::Begin() { + current_backoff_ = options_.initial_backoff(); + return current_backoff_ + grpc_core::ExecCtx::Get()->Now(); } -void grpc_backoff_reset(grpc_backoff* backoff) { - backoff->current_backoff = backoff->initial_backoff; +grpc_millis BackOff::Step() { + current_backoff_ = + (grpc_millis)(std::min(current_backoff_ * options_.multiplier(), + (double)options_.max_backoff())); + const double jitter = generate_uniform_random_number_between( + &rng_state_, -options_.jitter() * current_backoff_, + options_.jitter() * current_backoff_); + const grpc_millis next_timeout = (grpc_millis)(current_backoff_ + jitter); + return next_timeout + grpc_core::ExecCtx::Get()->Now(); } + +void BackOff::Reset() { current_backoff_ = options_.initial_backoff(); } + +void BackOff::SetRandomSeed(uint32_t seed) { rng_state_ = seed; } + +} // namespace grpc_core diff --git a/src/core/lib/backoff/backoff.h b/src/core/lib/backoff/backoff.h index f61d14ec95..84ef9b82e4 100644 --- a/src/core/lib/backoff/backoff.h +++ b/src/core/lib/backoff/backoff.h @@ -21,53 +21,69 @@ #include "src/core/lib/iomgr/exec_ctx.h" -typedef struct { - /// const: how long to wait after the first failure before retrying - grpc_millis initial_backoff; - - /// const: factor with which to multiply backoff after a failed retry - double multiplier; - - /// const: amount to randomize backoffs - double jitter; - - /// const: minimum time between retries - grpc_millis min_connect_timeout; - - /// const: maximum time between retries - grpc_millis max_backoff; - +namespace grpc_core { + +/// Implementation of the backoff mechanism described in +/// doc/connection-backoff.md +class BackOff { + public: + class Options; + + /// Initialize backoff machinery - does not need to be destroyed + explicit BackOff(const Options& options); + + /// Begin retry loop: returns the deadline to be used for the next attempt, + /// following the backoff strategy. + grpc_millis Begin(); + /// Step a retry loop: returns the deadline to be used for the next attempt, + /// following the backoff strategy. + grpc_millis Step(); + /// Reset the backoff, so the next grpc_backoff_step will be a + /// grpc_backoff_begin. + void Reset(); + + void SetRandomSeed(unsigned int seed); + + class Options { + public: + Options& set_initial_backoff(grpc_millis initial_backoff) { + initial_backoff_ = initial_backoff; + return *this; + } + Options& set_multiplier(double multiplier) { + multiplier_ = multiplier; + return *this; + } + Options& set_jitter(double jitter) { + jitter_ = jitter; + return *this; + } + Options& set_max_backoff(grpc_millis max_backoff) { + max_backoff_ = max_backoff; + return *this; + } + /// how long to wait after the first failure before retrying + grpc_millis initial_backoff() const { return initial_backoff_; } + /// factor with which to multiply backoff after a failed retry + double multiplier() const { return multiplier_; } + /// amount to randomize backoffs + double jitter() const { return jitter_; } + /// maximum time between retries + grpc_millis max_backoff() const { return max_backoff_; } + + private: + grpc_millis initial_backoff_; + double multiplier_; + double jitter_; + grpc_millis max_backoff_; + }; // class Options + + private: + const Options options_; /// current delay before retries - grpc_millis current_backoff; - - /// random number generator - uint32_t rng_state; -} grpc_backoff; - -typedef struct { - /// Deadline to be used for the current attempt. - grpc_millis current_deadline; - - /// Deadline to be used for the next attempt, following the backoff strategy. - grpc_millis next_attempt_start_time; -} grpc_backoff_result; - -/// Initialize backoff machinery - does not need to be destroyed -void grpc_backoff_init(grpc_backoff* backoff, grpc_millis initial_backoff, - double multiplier, double jitter, - grpc_millis min_connect_timeout, - grpc_millis max_backoff); - -/// Begin retry loop: returns the deadlines to be used for the current attempt -/// and the subsequent retry, if any. -grpc_backoff_result grpc_backoff_begin(grpc_backoff* backoff); - -/// Step a retry loop: returns the deadlines to be used for the current attempt -/// and the subsequent retry, if any. -grpc_backoff_result grpc_backoff_step(grpc_backoff* backoff); - -/// Reset the backoff, so the next grpc_backoff_step will be a -/// grpc_backoff_begin. -void grpc_backoff_reset(grpc_backoff* backoff); + grpc_millis current_backoff_; + uint32_t rng_state_; +}; +} // namespace grpc_core #endif /* GRPC_CORE_LIB_BACKOFF_BACKOFF_H */ diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc index 42cd7c455d..67c3caf5ee 100644 --- a/src/core/lib/iomgr/error.cc +++ b/src/core/lib/iomgr/error.cc @@ -749,7 +749,7 @@ const char* grpc_error_string(grpc_error* err) { if (!gpr_atm_rel_cas(&err->atomics.error_string, 0, (gpr_atm)out)) { gpr_free(out); - out = (char*)gpr_atm_no_barrier_load(&err->atomics.error_string); + out = (char*)gpr_atm_acq_load(&err->atomics.error_string); } GPR_TIMER_END("grpc_error_string", 0); diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 4759ee0791..8c72a439f6 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -165,6 +165,8 @@ void grpc_error_unref(grpc_error* err); grpc_error* grpc_error_set_int(grpc_error* src, grpc_error_ints which, intptr_t value) GRPC_MUST_USE_RESULT; bool grpc_error_get_int(grpc_error* error, grpc_error_ints which, intptr_t* p); +/// This call takes ownership of the slice; the error is responsible for +/// eventually unref-ing it. grpc_error* grpc_error_set_str(grpc_error* src, grpc_error_strs which, grpc_slice str) GRPC_MUST_USE_RESULT; /// Returns false if the specified string is not set. @@ -174,7 +176,8 @@ bool grpc_error_get_str(grpc_error* error, grpc_error_strs which, /// Add a child error: an error that is believed to have contributed to this /// error occurring. Allows root causing high level errors from lower level -/// errors that contributed to them. +/// errors that contributed to them. The src error takes ownership of the +/// child error. grpc_error* grpc_error_add_child(grpc_error* src, grpc_error* child) GRPC_MUST_USE_RESULT; grpc_error* grpc_os_error(const char* file, int line, int err, diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index ae9d47ece5..1ab7e516de 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -1232,8 +1232,6 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return * NULL */ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { - gpr_log(GPR_ERROR, - "Skipping epoll1 becuase GRPC_LINUX_EPOLL is not defined."); return nullptr; } #endif /* defined(GRPC_POSIX_SOCKET) */ diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index b2817156a8..5f5f45a7a5 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -1449,8 +1449,6 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux( * NULL */ const grpc_event_engine_vtable* grpc_init_epollex_linux( bool explicitly_requested) { - gpr_log(GPR_ERROR, - "Skipping epollex becuase GRPC_LINUX_EPOLL is not defined."); return nullptr; } #endif /* defined(GRPC_POSIX_SOCKET) */ diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index 7a8962f4a8..8072a6cbed 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -1732,8 +1732,6 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux( * NULL */ const grpc_event_engine_vtable* grpc_init_epollsig_linux( bool explicit_request) { - gpr_log(GPR_ERROR, - "Skipping epollsig becuase GRPC_LINUX_EPOLL is not defined."); return nullptr; } #endif /* defined(GRPC_POSIX_SOCKET) */ diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 53de94fb6e..7ea1dfaa80 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -71,6 +71,7 @@ struct grpc_fd { int shutdown; int closed; int released; + gpr_atm pollhup; grpc_error* shutdown_error; /* The watcher list. @@ -242,7 +243,7 @@ struct grpc_pollset_set { typedef struct poll_result { gpr_refcount refcount; - cv_node* watchers; + grpc_cv_node* watchers; int watchcount; struct pollfd* fds; nfds_t nfds; @@ -273,7 +274,7 @@ typedef struct poll_hash_table { } poll_hash_table; poll_hash_table poll_cache; -cv_fd_table g_cvfds; +grpc_cv_fd_table g_cvfds; /******************************************************************************* * fd_posix.c @@ -335,6 +336,7 @@ static grpc_fd* fd_create(int fd, const char* name) { r->on_done_closure = nullptr; r->closed = 0; r->released = 0; + gpr_atm_no_barrier_store(&r->pollhup, 0); r->read_notifier_pollset = nullptr; char* name2; @@ -950,7 +952,8 @@ static grpc_error* pollset_work(grpc_pollset* pollset, pfds[0].events = POLLIN; pfds[0].revents = 0; for (i = 0; i < pollset->fd_count; i++) { - if (fd_is_orphaned(pollset->fds[i])) { + if (fd_is_orphaned(pollset->fds[i]) || + gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) { GRPC_FD_UNREF(pollset->fds[i], "multipoller"); } else { pollset->fds[fd_count++] = pollset->fds[i]; @@ -1017,6 +1020,12 @@ static grpc_error* pollset_work(grpc_pollset* pollset, pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0, (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents); } + /* This is a mitigation to prevent poll() from spinning on a + ** POLLHUP https://github.com/grpc/grpc/pull/13665 + */ + if (pfds[i].revents & POLLHUP) { + gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1); + } fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK, pfds[i].revents & POLLOUT_CHECK, pollset); } @@ -1435,7 +1444,7 @@ static void decref_poll_result(poll_result* res) { } } -void remove_cvn(cv_node** head, cv_node* target) { +void remove_cvn(grpc_cv_node** head, grpc_cv_node* target) { if (target->next) { target->next->prev = target->prev; } @@ -1460,7 +1469,7 @@ static void run_poll(void* args) { result->completed = 1; result->retval = retval; result->err = errno; - cv_node* watcher = result->watchers; + grpc_cv_node* watcher = result->watchers; while (watcher) { gpr_cv_signal(watcher->cv); watcher = watcher->next; @@ -1494,17 +1503,17 @@ static void run_poll(void* args) { static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) { unsigned int i; int res, idx; - cv_node* pollcv; + grpc_cv_node* pollcv; int skip_poll = 0; nfds_t nsockfds = 0; poll_result* result = nullptr; gpr_mu_lock(&g_cvfds.mu); - pollcv = (cv_node*)gpr_malloc(sizeof(cv_node)); + pollcv = (grpc_cv_node*)gpr_malloc(sizeof(grpc_cv_node)); pollcv->next = nullptr; gpr_cv pollcv_cv; gpr_cv_init(&pollcv_cv); pollcv->cv = &pollcv_cv; - cv_node* fd_cvs = (cv_node*)gpr_malloc(nfds * sizeof(cv_node)); + grpc_cv_node* fd_cvs = (grpc_cv_node*)gpr_malloc(nfds * sizeof(grpc_cv_node)); for (i = 0; i < nfds; i++) { fds[i].revents = 0; @@ -1600,7 +1609,8 @@ static void global_cv_fd_table_init() { gpr_cv_init(&g_cvfds.shutdown_cv); gpr_ref_init(&g_cvfds.pollcount, 1); g_cvfds.size = CV_DEFAULT_TABLE_SIZE; - g_cvfds.cvfds = (fd_node*)gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE); + g_cvfds.cvfds = + (grpc_fd_node*)gpr_malloc(sizeof(grpc_fd_node) * CV_DEFAULT_TABLE_SIZE); g_cvfds.free_fds = nullptr; thread_grace = gpr_time_from_millis(POLLCV_THREAD_GRACE_MS, GPR_TIMESPAN); for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) { diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 24ccab14b2..8cd5f8d618 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -212,6 +212,9 @@ finish: fd = nullptr; } done = (--ac->refs == 0); + // Create a copy of the data from "ac" to be accessed after the unlock, as + // "ac" and its contents may be deallocated by the time they are read. + const grpc_slice addr_str_slice = grpc_slice_from_copied_string(ac->addr_str); gpr_mu_unlock(&ac->mu); if (error != GRPC_ERROR_NONE) { char* error_descr; @@ -225,9 +228,13 @@ finish: gpr_free(error_descr); gpr_free(desc); error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, - grpc_slice_from_copied_string(ac->addr_str)); + addr_str_slice /* takes ownership */); + } else { + grpc_slice_unref(addr_str_slice); } if (done) { + // This is safe even outside the lock, because "done", the sentinel, is + // populated *inside* the lock. gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_str); grpc_channel_args_destroy(ac->channel_args); diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 55e0b165ec..4a97f3353d 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -72,6 +72,7 @@ struct grpc_udp_listener { grpc_udp_server_read_cb read_cb; grpc_udp_server_write_cb write_cb; grpc_udp_server_orphan_cb orphan_cb; + grpc_udp_server_start_cb start_cb; // To be scheduled on another thread to actually read/write. grpc_closure do_read_closure; grpc_closure do_write_closure; @@ -353,7 +354,7 @@ static void do_read(void* arg, grpc_error* error) { * read lock if available. */ gpr_mu_lock(&sp->server->mu); /* Tell the registered callback that data is available to read. */ - if (!sp->already_shutdown && sp->read_cb(sp->emfd, sp->server->user_data)) { + if (!sp->already_shutdown && sp->read_cb(sp->emfd)) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE); @@ -383,7 +384,7 @@ static void on_read(void* arg, grpc_error* error) { /* Read once. If there is more data to read, off load the work to another * thread to finish. */ GPR_ASSERT(sp->read_cb); - if (sp->read_cb(sp->emfd, sp->server->user_data)) { + if (sp->read_cb(sp->emfd)) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg, @@ -411,7 +412,7 @@ void fd_notify_on_write_wrapper(void* arg, grpc_error* error) { static void do_write(void* arg, grpc_error* error) { grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg); - gpr_mu_lock(&(sp->server->mu)); + gpr_mu_lock(&sp->server->mu); if (sp->already_shutdown) { // If fd has been shutdown, don't write any more and re-arm notification. grpc_fd_notify_on_write(sp->emfd, &sp->write_closure); @@ -429,7 +430,7 @@ static void do_write(void* arg, grpc_error* error) { static void on_write(void* arg, grpc_error* error) { grpc_udp_listener* sp = (grpc_udp_listener*)arg; - gpr_mu_lock(&(sp->server->mu)); + gpr_mu_lock(&sp->server->mu); if (error != GRPC_ERROR_NONE) { if (0 == --sp->server->active_ports && sp->server->shutdown) { gpr_mu_unlock(&sp->server->mu); @@ -450,6 +451,7 @@ static void on_write(void* arg, grpc_error* error) { static int add_socket_to_server(grpc_udp_server* s, int fd, const grpc_resolved_address* addr, + grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { @@ -480,6 +482,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, sp->read_cb = read_cb; sp->write_cb = write_cb; sp->orphan_cb = orphan_cb; + sp->start_cb = start_cb; sp->orphan_notified = false; sp->already_shutdown = false; GPR_ASSERT(sp->emfd); @@ -492,6 +495,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd, int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, + grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { @@ -541,8 +545,8 @@ int grpc_udp_server_add_port(grpc_udp_server* s, // TODO(rjshade): Test and propagate the returned grpc_error*: GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory( s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); - allocated_port1 = - add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); + allocated_port1 = add_socket_to_server(s, fd, addr, start_cb, read_cb, + write_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } @@ -565,7 +569,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s, addr = &addr4_copy; } allocated_port2 = - add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); + add_socket_to_server(s, fd, addr, start_cb, read_cb, write_cb, orphan_cb); done: gpr_free(allocated_addr); @@ -587,6 +591,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) { void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets, size_t pollset_count, void* user_data) { + gpr_log(GPR_DEBUG, "grpc_udp_server_start"); size_t i; gpr_mu_lock(&s->mu); grpc_udp_listener* sp; @@ -596,6 +601,7 @@ void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets, sp = s->head; while (sp != nullptr) { + sp->start_cb(sp->emfd, sp->server->user_data); for (i = 0; i < pollset_count; i++) { grpc_pollset_add_fd(pollsets[i], sp->emfd); } diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index 02e3acb7f5..a469ab9be5 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -30,9 +30,12 @@ struct grpc_server; /* Forward decl of grpc_udp_server */ typedef struct grpc_udp_server grpc_udp_server; +/* Called when grpc server starts to listening on the grpc_fd. */ +typedef void (*grpc_udp_server_start_cb)(grpc_fd* emfd, void* user_data); + /* Called when data is available to read from the socket. * Return true if there is more data to read from fd. */ -typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd, void* user_data); +typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd); /* Called when the socket is writeable. The given closure should be scheduled * when the socket becomes blocked next time. */ @@ -65,6 +68,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index); all of the multiple socket port matching logic in one place */ int grpc_udp_server_add_port(grpc_udp_server* s, const grpc_resolved_address* addr, + grpc_udp_server_start_cb start_cb, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb); diff --git a/src/core/lib/iomgr/wakeup_fd_cv.cc b/src/core/lib/iomgr/wakeup_fd_cv.cc index 5c1f16d3fc..c785114212 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.cc +++ b/src/core/lib/iomgr/wakeup_fd_cv.cc @@ -34,7 +34,7 @@ #define MAX_TABLE_RESIZE 256 -extern cv_fd_table g_cvfds; +extern grpc_cv_fd_table g_cvfds; static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { unsigned int i, newsize; @@ -42,8 +42,8 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { gpr_mu_lock(&g_cvfds.mu); if (!g_cvfds.free_fds) { newsize = GPR_MIN(g_cvfds.size * 2, g_cvfds.size + MAX_TABLE_RESIZE); - g_cvfds.cvfds = - (fd_node*)gpr_realloc(g_cvfds.cvfds, sizeof(fd_node) * newsize); + g_cvfds.cvfds = (grpc_fd_node*)gpr_realloc(g_cvfds.cvfds, + sizeof(grpc_fd_node) * newsize); for (i = g_cvfds.size; i < newsize; i++) { g_cvfds.cvfds[i].is_set = 0; g_cvfds.cvfds[i].cvs = nullptr; @@ -64,7 +64,7 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { } static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) { - cv_node* cvn; + grpc_cv_node* cvn; gpr_mu_lock(&g_cvfds.mu); g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].is_set = 1; cvn = g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].cvs; diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h index 017e41bfa8..399620af76 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.h +++ b/src/core/lib/iomgr/wakeup_fd_cv.h @@ -40,27 +40,27 @@ #define GRPC_FD_TO_IDX(fd) (-(fd)-1) #define GRPC_IDX_TO_FD(idx) (-(idx)-1) -typedef struct cv_node { +typedef struct grpc_cv_node { gpr_cv* cv; - struct cv_node* next; - struct cv_node* prev; -} cv_node; + struct grpc_cv_node* next; + struct grpc_cv_node* prev; +} grpc_cv_node; -typedef struct fd_node { +typedef struct grpc_fd_node { int is_set; - cv_node* cvs; - struct fd_node* next_free; -} fd_node; + grpc_cv_node* cvs; + struct grpc_fd_node* next_free; +} grpc_fd_node; -typedef struct cv_fd_table { +typedef struct grpc_cv_fd_table { gpr_mu mu; gpr_refcount pollcount; gpr_cv shutdown_cv; - fd_node* cvfds; - fd_node* free_fds; + grpc_fd_node* cvfds; + grpc_fd_node* free_fds; unsigned int size; grpc_poll_function_type poll; -} cv_fd_table; +} grpc_cv_fd_table; extern const grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable; diff --git a/src/core/lib/support/debug_location.h b/src/core/lib/support/debug_location.h index 0939da595d..9b3f9220fc 100644 --- a/src/core/lib/support/debug_location.h +++ b/src/core/lib/support/debug_location.h @@ -36,7 +36,7 @@ class DebugLocation { const char* file_; const int line_; }; -#define DEBUG_LOCATION DebugLocation(__FILE__, __LINE__) +#define DEBUG_LOCATION ::grpc_core::DebugLocation(__FILE__, __LINE__) #else class DebugLocation { public: @@ -44,7 +44,7 @@ class DebugLocation { const char* file() const { return nullptr; } int line() const { return -1; } }; -#define DEBUG_LOCATION DebugLocation() +#define DEBUG_LOCATION ::grpc_core::DebugLocation() #endif } // namespace grpc_core diff --git a/src/node/health_check/package.json b/src/node/health_check/package.json deleted file mode 100644 index fca3a2a7a6..0000000000 --- a/src/node/health_check/package.json +++ /dev/null @@ -1,29 +0,0 @@ -{ - "name": "grpc-health-check", - "version": "1.7.2", - "author": "Google Inc.", - "description": "Health check service for use with gRPC", - "repository": { - "type": "git", - "url": "https://github.com/grpc/grpc.git" - }, - "bugs": "https://github.com/grpc/grpc/issues", - "contributors": [ - { - "name": "Michael Lumish", - "email": "mlumish@google.com" - } - ], - "dependencies": { - "grpc": "^1.7.2", - "lodash": "^3.9.3", - "google-protobuf": "^3.0.0" - }, - "files": [ - "LICENSE", - "health.js", - "v1" - ], - "main": "src/node/index.js", - "license": "Apache-2.0" -} diff --git a/src/node/tools/package.json b/src/node/tools/package.json deleted file mode 100644 index 99fd854067..0000000000 --- a/src/node/tools/package.json +++ /dev/null @@ -1,41 +0,0 @@ -{ - "name": "grpc-tools", - "version": "1.7.2", - "author": "Google Inc.", - "description": "Tools for developing with gRPC on Node.js", - "homepage": "https://grpc.io/", - "repository": { - "type": "git", - "url": "https://github.com/grpc/grpc.git" - }, - "bugs": "https://github.com/grpc/grpc/issues", - "contributors": [ - { - "name": "Michael Lumish", - "email": "mlumish@google.com" - } - ], - "bin": { - "grpc_tools_node_protoc": "./bin/protoc.js", - "grpc_tools_node_protoc_plugin": "./bin/protoc_plugin.js" - }, - "scripts": { - "install": "./node_modules/.bin/node-pre-gyp install" - }, - "bundledDependencies": ["node-pre-gyp"], - "binary": { - "module_name": "grpc_tools", - "host": "https://storage.googleapis.com/", - "remote_path": "grpc-precompiled-binaries/node/{name}/v{version}", - "package_name": "{platform}-{arch}.tar.gz", - "module_path": "bin" - }, - "files": [ - "index.js", - "bin/protoc.js", - "bin/protoc_plugin.js", - "bin/google/protobuf", - "LICENSE" - ], - "main": "index.js" -} diff --git a/src/objective-c/tests/run_tests.sh b/src/objective-c/tests/run_tests.sh index cf0b07e8c0..cec34787cf 100755 --- a/src/objective-c/tests/run_tests.sh +++ b/src/objective-c/tests/run_tests.sh @@ -34,36 +34,50 @@ $BINDIR/interop_server --port=5051 --max_send_message_size=8388608 --use_tls & # Kill them when this script exits. trap 'kill -9 `jobs -p` ; echo "EXIT TIME: $(date)"' EXIT -# Boot Xcode first with several retries since Xcode might fail due to a bug: -# http://www.openradar.me/29785686 -xcrun simctl list | egrep 'iPhone 6 \(' -udid=`xcrun simctl list | egrep 'iPhone 6 \(.*\) \(.*\)' | sed -E 's/ *iPhone 6 \(([^\)]*)\).*/\1/g' | head -n 1` -retries=0 -while [ $retries -lt 3 ] && ! open -a Simulator --args -CurrentDeviceUDID $udid ; do -retries=$(($retries+1)) -done -if [ $retries == 3 ]; then - echo "Xcode simulator failed to start after 3 retries." - exit 1 -fi +set -o pipefail # xcodebuild is very verbose. We filter its output and tell Bash to fail if any # element of the pipe fails. # TODO(jcanizales): Use xctool instead? Issue #2540. -set -o pipefail XCODEBUILD_FILTER='(^CompileC |^Ld |^ *[^ ]*clang |^ *cd |^ *export |^Libtool |^ *[^ ]*libtool |^CpHeader |^ *builtin-copy )' + echo "TIME: $(date)" -xcodebuild \ - -workspace Tests.xcworkspace \ - -scheme AllTests \ - -destination name="iPhone 6" \ - HOST_PORT_LOCALSSL=localhost:5051 \ - HOST_PORT_LOCAL=localhost:5050 \ - HOST_PORT_REMOTE=grpc-test.sandbox.googleapis.com \ - test \ - | egrep -v "$XCODEBUILD_FILTER" \ - | egrep -v '^$' \ - | egrep -v "(GPBDictionary|GPBArray)" - + +# Retry the test for up to 3 times when return code is 65, due to Xcode issue: +# http://www.openradar.me/29785686 +# The issue seems to be a connectivity issue to Xcode simulator so only retry +# the first xcodebuild command +retries=0 +while [ $retries -lt 3 ]; do + return_code=0 + out=$(xcodebuild \ + -workspace Tests.xcworkspace \ + -scheme AllTests \ + -destination name="iPhone 6" \ + HOST_PORT_LOCALSSL=localhost:5051 \ + HOST_PORT_LOCAL=localhost:5050 \ + HOST_PORT_REMOTE=grpc-test.sandbox.googleapis.com \ + test 2>&1 \ + | egrep -v "$XCODEBUILD_FILTER" \ + | egrep -v '^$' \ + | egrep -v "(GPBDictionary|GPBArray)" - ) || return_code=$? + if [ $return_code == 65 ] && [[ $out == *"DTXProxyChannel error 1"* ]]; then + echo "$out" + echo "Failed with code 65 (DTXProxyChannel error 1); retry." + retries=$(($retries+1)) + elif [ $return_code == 0 ]; then + echo "$out" + break + else + echo "$out" + echo "Failed with code $return_code." + exit 1 + fi +done +if [ $retries == 3 ]; then + echo "Failed with code 65 for 3 times; abort." + exit 1 +fi echo "TIME: $(date)" xcodebuild \ @@ -95,3 +109,5 @@ xcodebuild \ | egrep -v "$XCODEBUILD_FILTER" \ | egrep -v '^$' \ | egrep -v "(GPBDictionary|GPBArray)" - + +exit 0 diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index d7456a3dd1..3572737c87 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -129,12 +129,12 @@ def _abort(state, code, details): def _handle_event(event, state, response_deserializer): callbacks = [] for batch_operation in event.batch_operations: - operation_type = batch_operation.type + operation_type = batch_operation.type() state.due.remove(operation_type) if operation_type == cygrpc.OperationType.receive_initial_metadata: - state.initial_metadata = batch_operation.received_metadata + state.initial_metadata = batch_operation.initial_metadata() elif operation_type == cygrpc.OperationType.receive_message: - serialized_response = batch_operation.received_message.bytes() + serialized_response = batch_operation.message() if serialized_response is not None: response = _common.deserialize(serialized_response, response_deserializer) @@ -144,18 +144,17 @@ def _handle_event(event, state, response_deserializer): else: state.response = response elif operation_type == cygrpc.OperationType.receive_status_on_client: - state.trailing_metadata = batch_operation.received_metadata + state.trailing_metadata = batch_operation.trailing_metadata() if state.code is None: code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get( - batch_operation.received_status_code) + batch_operation.code()) if code is None: state.code = grpc.StatusCode.UNKNOWN state.details = _unknown_code_details( - batch_operation.received_status_code, - batch_operation.received_status_details) + code, batch_operation.details()) else: state.code = code - state.details = batch_operation.received_status_details + state.details = batch_operation.details() callbacks.extend(state.callbacks) state.callbacks = None return callbacks @@ -200,7 +199,7 @@ def _consume_request_iterator(request_iterator, state, call, _abort(state, grpc.StatusCode.INTERNAL, details) return else: - operations = (cygrpc.operation_send_message( + operations = (cygrpc.SendMessageOperation( serialized_request, _EMPTY_FLAGS),) call.start_client_batch(operations, event_handler) state.due.add(cygrpc.OperationType.send_message) @@ -216,7 +215,7 @@ def _consume_request_iterator(request_iterator, state, call, with state.condition: if state.code is None: operations = ( - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),) + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),) call.start_client_batch(operations, event_handler) state.due.add(cygrpc.OperationType.send_close_from_client) @@ -319,7 +318,7 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): event_handler = _event_handler(self._state, self._call, self._response_deserializer) self._call.start_client_batch( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),), + (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), event_handler) self._state.due.add(cygrpc.OperationType.receive_message) elif self._state.code is grpc.StatusCode.OK: @@ -453,12 +452,12 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): else: state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) operations = ( - cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), - cygrpc.operation_send_message(serialized_request, _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),) return state, operations, deadline, deadline_timespec, None def _blocking(self, request, timeout, metadata, credentials): @@ -536,14 +535,14 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): self._response_deserializer) with state.condition: call.start_client_batch( - (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), event_handler) operations = ( - cygrpc.operation_send_initial_metadata( - metadata, _EMPTY_FLAGS), cygrpc.operation_send_message( + cygrpc.SendInitialMetadataOperation( + metadata, _EMPTY_FLAGS), cygrpc.SendMessageOperation( serialized_request, _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),) call_error = call.start_client_batch(operations, event_handler) if call_error != cygrpc.CallError.ok: _call_error_set_RPCstate(state, call_error, metadata) @@ -573,12 +572,11 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): call.set_credentials(credentials._credentials) with state.condition: call.start_client_batch( - (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), - None) + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), None) operations = ( - cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),) call_error = call.start_client_batch(operations, None) _check_call_error(call_error, metadata) _consume_request_iterator(request_iterator, state, call, @@ -624,12 +622,12 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): event_handler = _event_handler(state, call, self._response_deserializer) with state.condition: call.start_client_batch( - (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), event_handler) operations = ( - cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),) call_error = call.start_client_batch(operations, event_handler) if call_error != cygrpc.CallError.ok: _call_error_set_RPCstate(state, call_error, metadata) @@ -664,11 +662,11 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): event_handler = _event_handler(state, call, self._response_deserializer) with state.condition: call.start_client_batch( - (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), event_handler) operations = ( - cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),) call_error = call.start_client_batch(operations, event_handler) if call_error != cygrpc.CallError.ok: _call_error_set_RPCstate(state, call_error, metadata) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi index 6361669757..0892215b6d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi @@ -26,16 +26,13 @@ cdef class Call: def _start_batch(self, operations, tag, retain_self): if not self.is_valid: raise ValueError("invalid call object cannot be used from Python") - cdef OperationTag operation_tag = OperationTag(tag, operations) - if retain_self: - operation_tag.operation_call = self - else: - operation_tag.operation_call = None - operation_tag.store_ops() - cpython.Py_INCREF(operation_tag) + cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag( + tag, operations, self if retain_self else None) + batch_operation_tag.prepare() + cpython.Py_INCREF(batch_operation_tag) return grpc_call_start_batch( - self.c_call, operation_tag.c_ops, operation_tag.c_nops, - <cpython.PyObject *>operation_tag, NULL) + self.c_call, batch_operation_tag.c_ops, batch_operation_tag.c_nops, + <cpython.PyObject *>batch_operation_tag, NULL) def start_client_batch(self, operations, tag): # We don't reference this call in the operations tag because diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index 644df674cc..443d534d7e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -76,12 +76,12 @@ cdef class Channel: def watch_connectivity_state( self, grpc_connectivity_state last_observed_state, Timespec deadline not None, CompletionQueue queue not None, tag): - cdef OperationTag operation_tag = OperationTag(tag, None) - cpython.Py_INCREF(operation_tag) + cdef _ConnectivityTag connectivity_tag = _ConnectivityTag(tag) + cpython.Py_INCREF(connectivity_tag) with nogil: grpc_channel_watch_connectivity_state( self.c_channel, last_observed_state, deadline.c_time, - queue.c_completion_queue, <cpython.PyObject *>operation_tag) + queue.c_completion_queue, <cpython.PyObject *>connectivity_tag) def target(self): cdef char *target = NULL diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 140fc357b9..e259789b35 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -37,42 +37,20 @@ cdef class CompletionQueue: self.is_shutdown = False cdef _interpret_event(self, grpc_event event): - cdef OperationTag tag = None - cdef object user_tag = None - cdef Call operation_call = None - cdef CallDetails request_call_details = None - cdef object request_metadata = None - cdef object batch_operations = None + cdef _Tag tag = None if event.type == GRPC_QUEUE_TIMEOUT: - return Event( - event.type, False, None, None, None, None, False, None) + # NOTE(nathaniel): For now we coopt ConnectivityEvent here. + return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None) elif event.type == GRPC_QUEUE_SHUTDOWN: self.is_shutdown = True - return Event( - event.type, True, None, None, None, None, False, None) + # NOTE(nathaniel): For now we coopt ConnectivityEvent here. + return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, True, None) else: - if event.tag != NULL: - tag = <OperationTag>event.tag - # We receive event tags only after they've been inc-ref'd elsewhere in - # the code. - cpython.Py_DECREF(tag) - if tag.shutting_down_server is not None: - tag.shutting_down_server.notify_shutdown_complete() - user_tag = tag.user_tag - operation_call = tag.operation_call - request_call_details = tag.request_call_details - if tag.is_new_request: - request_metadata = _metadata(&tag._c_request_metadata) - grpc_metadata_array_destroy(&tag._c_request_metadata) - batch_operations = tag.release_ops() - if tag.is_new_request: - # Stuff in the tag not explicitly handled by us needs to live through - # the life of the call - operation_call.references.extend(tag.references) - return Event( - event.type, event.success, user_tag, operation_call, - request_call_details, request_metadata, tag.is_new_request, - batch_operations) + tag = <_Tag>event.tag + # We receive event tags only after they've been inc-ref'd elsewhere in + # the code. + cpython.Py_DECREF(tag) + return tag.event(event) def poll(self, Timespec deadline=None): # We name this 'poll' to avoid problems with CPython's expectations for diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi new file mode 100644 index 0000000000..686199ecf4 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi @@ -0,0 +1,45 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class ConnectivityEvent: + + cdef readonly grpc_completion_type completion_type + cdef readonly bint success + cdef readonly object tag + + +cdef class RequestCallEvent: + + cdef readonly grpc_completion_type completion_type + cdef readonly bint success + cdef readonly object tag + cdef readonly Call call + cdef readonly CallDetails call_details + cdef readonly tuple invocation_metadata + + +cdef class BatchOperationEvent: + + cdef readonly grpc_completion_type completion_type + cdef readonly bint success + cdef readonly object tag + cdef readonly object batch_operations + + +cdef class ServerShutdownEvent: + + cdef readonly grpc_completion_type completion_type + cdef readonly bint success + cdef readonly object tag diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi new file mode 100644 index 0000000000..af26d27318 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi @@ -0,0 +1,55 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class ConnectivityEvent: + + def __cinit__( + self, grpc_completion_type completion_type, bint success, object tag): + self.completion_type = completion_type + self.success = success + self.tag = tag + + +cdef class RequestCallEvent: + + def __cinit__( + self, grpc_completion_type completion_type, bint success, object tag, + Call call, CallDetails call_details, tuple invocation_metadata): + self.completion_type = completion_type + self.success = success + self.tag = tag + self.call = call + self.call_details = call_details + self.invocation_metadata = invocation_metadata + + +cdef class BatchOperationEvent: + + def __cinit__( + self, grpc_completion_type completion_type, bint success, object tag, + object batch_operations): + self.completion_type = completion_type + self.success = success + self.tag = tag + self.batch_operations = batch_operations + + +cdef class ServerShutdownEvent: + + def __cinit__( + self, grpc_completion_type completion_type, bint success, object tag): + self.completion_type = completion_type + self.success = success + self.tag = tag diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 6a72bbf693..6ee833697d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -17,6 +17,7 @@ cimport libc.time # Typedef types with approximately the same semantics to provide their names to # Cython +ctypedef unsigned char uint8_t ctypedef int int32_t ctypedef unsigned uint32_t ctypedef long int64_t @@ -25,6 +26,7 @@ ctypedef long int64_t cdef extern from "grpc/support/alloc.h": void *gpr_malloc(size_t size) nogil + void *gpr_zalloc(size_t size) nogil void gpr_free(void *ptr) nogil void *gpr_realloc(void *p, size_t size) nogil @@ -183,6 +185,18 @@ cdef extern from "grpc/grpc.h": size_t arguments_length "num_args" grpc_arg *arguments "args" + ctypedef enum grpc_compression_level: + GRPC_COMPRESS_LEVEL_NONE + GRPC_COMPRESS_LEVEL_LOW + GRPC_COMPRESS_LEVEL_MED + GRPC_COMPRESS_LEVEL_HIGH + + ctypedef enum grpc_stream_compression_level: + GRPC_STREAM_COMPRESS_LEVEL_NONE + GRPC_STREAM_COMPRESS_LEVEL_LOW + GRPC_STREAM_COMPRESS_LEVEL_MED + GRPC_STREAM_COMPRESS_LEVEL_HIGH + ctypedef enum grpc_call_error: GRPC_CALL_OK GRPC_CALL_ERROR @@ -258,9 +272,19 @@ cdef extern from "grpc/grpc.h": GRPC_OP_RECV_STATUS_ON_CLIENT GRPC_OP_RECV_CLOSE_ON_SERVER + ctypedef struct grpc_op_send_initial_metadata_maybe_compression_level: + uint8_t is_set + grpc_compression_level level + + ctypedef struct grpc_op_send_initial_metadata_maybe_stream_compression_level: + uint8_t is_set + grpc_stream_compression_level level + ctypedef struct grpc_op_data_send_initial_metadata: size_t count grpc_metadata *metadata + grpc_op_send_initial_metadata_maybe_compression_level maybe_compression_level + grpc_op_send_initial_metadata_maybe_stream_compression_level maybe_stream_compression_level ctypedef struct grpc_op_data_send_status_from_server: size_t trailing_metadata_count diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi new file mode 100644 index 0000000000..bfbe27785b --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi @@ -0,0 +1,109 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class Operation: + + cdef void c(self) + cdef void un_c(self) + + # TODO(https://github.com/grpc/grpc/issues/7950): Eliminate this! + cdef grpc_op c_op + + +cdef class SendInitialMetadataOperation(Operation): + + cdef readonly object _initial_metadata; + cdef readonly int _flags + cdef grpc_metadata *_c_initial_metadata + cdef size_t _c_initial_metadata_count + + cdef void c(self) + cdef void un_c(self) + + +cdef class SendMessageOperation(Operation): + + cdef readonly bytes _message + cdef readonly int _flags + cdef grpc_byte_buffer *_c_message_byte_buffer + + cdef void c(self) + cdef void un_c(self) + + +cdef class SendCloseFromClientOperation(Operation): + + cdef readonly int _flags + + cdef void c(self) + cdef void un_c(self) + + +cdef class SendStatusFromServerOperation(Operation): + + cdef readonly object _trailing_metadata + cdef readonly object _code + cdef readonly object _details + cdef readonly int _flags + cdef grpc_metadata *_c_trailing_metadata + cdef size_t _c_trailing_metadata_count + cdef grpc_slice _c_details + + cdef void c(self) + cdef void un_c(self) + + +cdef class ReceiveInitialMetadataOperation(Operation): + + cdef readonly int _flags + cdef tuple _initial_metadata + cdef grpc_metadata_array _c_initial_metadata + + cdef void c(self) + cdef void un_c(self) + + +cdef class ReceiveMessageOperation(Operation): + + cdef readonly int _flags + cdef grpc_byte_buffer *_c_message_byte_buffer + cdef bytes _message + + cdef void c(self) + cdef void un_c(self) + + +cdef class ReceiveStatusOnClientOperation(Operation): + + cdef readonly int _flags + cdef grpc_metadata_array _c_trailing_metadata + cdef grpc_status_code _c_code + cdef grpc_slice _c_details + cdef tuple _trailing_metadata + cdef object _code + cdef str _details + + cdef void c(self) + cdef void un_c(self) + + +cdef class ReceiveCloseOnServerOperation(Operation): + + cdef readonly int _flags + cdef object _cancelled + cdef int _c_cancelled + + cdef void c(self) + cdef void un_c(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi new file mode 100644 index 0000000000..3c91abf722 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi @@ -0,0 +1,238 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class Operation: + + cdef void c(self): + raise NotImplementedError() + + cdef void un_c(self): + raise NotImplementedError() + + +cdef class SendInitialMetadataOperation(Operation): + + def __cinit__(self, initial_metadata, flags): + self._initial_metadata = initial_metadata + self._flags = flags + + def type(self): + return GRPC_OP_SEND_INITIAL_METADATA + + cdef void c(self): + self.c_op.type = GRPC_OP_SEND_INITIAL_METADATA + self.c_op.flags = self._flags + _store_c_metadata( + self._initial_metadata, &self._c_initial_metadata, + &self._c_initial_metadata_count) + self.c_op.data.send_initial_metadata.metadata = self._c_initial_metadata + self.c_op.data.send_initial_metadata.count = self._c_initial_metadata_count + self.c_op.data.send_initial_metadata.maybe_compression_level.is_set = 0 + self.c_op.data.send_initial_metadata.maybe_stream_compression_level.is_set = 0 + + cdef void un_c(self): + _release_c_metadata( + self._c_initial_metadata, self._c_initial_metadata_count) + + +cdef class SendMessageOperation(Operation): + + def __cinit__(self, bytes message, int flags): + self._message = message + self._flags = flags + + def type(self): + return GRPC_OP_SEND_MESSAGE + + cdef void c(self): + self.c_op.type = GRPC_OP_SEND_MESSAGE + self.c_op.flags = self._flags + cdef grpc_slice message_slice = grpc_slice_from_copied_buffer( + self._message, len(self._message)) + self._c_message_byte_buffer = grpc_raw_byte_buffer_create( + &message_slice, 1) + grpc_slice_unref(message_slice) + self.c_op.data.send_message.send_message = self._c_message_byte_buffer + + cdef void un_c(self): + grpc_byte_buffer_destroy(self._c_message_byte_buffer) + + +cdef class SendCloseFromClientOperation(Operation): + + def __cinit__(self, int flags): + self._flags = flags + + def type(self): + return GRPC_OP_SEND_CLOSE_FROM_CLIENT + + cdef void c(self): + self.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT + self.c_op.flags = self._flags + + cdef void un_c(self): + pass + + +cdef class SendStatusFromServerOperation(Operation): + + def __cinit__(self, trailing_metadata, code, object details, int flags): + self._trailing_metadata = trailing_metadata + self._code = code + self._details = details + self._flags = flags + + def type(self): + return GRPC_OP_SEND_STATUS_FROM_SERVER + + cdef void c(self): + self.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER + self.c_op.flags = self._flags + _store_c_metadata( + self._trailing_metadata, &self._c_trailing_metadata, + &self._c_trailing_metadata_count) + self.c_op.data.send_status_from_server.trailing_metadata = ( + self._c_trailing_metadata) + self.c_op.data.send_status_from_server.trailing_metadata_count = ( + self._c_trailing_metadata_count) + self.c_op.data.send_status_from_server.status = self._code + self._c_details = _slice_from_bytes(_encode(self._details)) + self.c_op.data.send_status_from_server.status_details = &self._c_details + + cdef void un_c(self): + grpc_slice_unref(self._c_details) + _release_c_metadata( + self._c_trailing_metadata, self._c_trailing_metadata_count) + + +cdef class ReceiveInitialMetadataOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_INITIAL_METADATA + + cdef void c(self): + self.c_op.type = GRPC_OP_RECV_INITIAL_METADATA + self.c_op.flags = self._flags + grpc_metadata_array_init(&self._c_initial_metadata) + self.c_op.data.receive_initial_metadata.receive_initial_metadata = ( + &self._c_initial_metadata) + + cdef void un_c(self): + self._initial_metadata = _metadata(&self._c_initial_metadata) + grpc_metadata_array_destroy(&self._c_initial_metadata) + + def initial_metadata(self): + return self._initial_metadata + + +cdef class ReceiveMessageOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_MESSAGE + + cdef void c(self): + self.c_op.type = GRPC_OP_RECV_MESSAGE + self.c_op.flags = self._flags + self.c_op.data.receive_message.receive_message = ( + &self._c_message_byte_buffer) + + cdef void un_c(self): + cdef grpc_byte_buffer_reader message_reader + cdef bint message_reader_status + cdef grpc_slice message_slice + cdef size_t message_slice_length + cdef void *message_slice_pointer + if self._c_message_byte_buffer != NULL: + message_reader_status = grpc_byte_buffer_reader_init( + &message_reader, self._c_message_byte_buffer) + if message_reader_status: + message = bytearray() + while grpc_byte_buffer_reader_next(&message_reader, &message_slice): + message_slice_pointer = grpc_slice_start_ptr(message_slice) + message_slice_length = grpc_slice_length(message_slice) + message += (<char *>message_slice_pointer)[:message_slice_length] + grpc_slice_unref(message_slice) + grpc_byte_buffer_reader_destroy(&message_reader) + self._message = bytes(message) + else: + self._message = None + grpc_byte_buffer_destroy(self._c_message_byte_buffer) + else: + self._message = None + + def message(self): + return self._message + + +cdef class ReceiveStatusOnClientOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_STATUS_ON_CLIENT + + cdef void c(self): + self.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT + self.c_op.flags = self._flags + grpc_metadata_array_init(&self._c_trailing_metadata) + self.c_op.data.receive_status_on_client.trailing_metadata = ( + &self._c_trailing_metadata) + self.c_op.data.receive_status_on_client.status = ( + &self._c_code) + self.c_op.data.receive_status_on_client.status_details = ( + &self._c_details) + + cdef void un_c(self): + self._trailing_metadata = _metadata(&self._c_trailing_metadata) + grpc_metadata_array_destroy(&self._c_trailing_metadata) + self._code = self._c_code + self._details = _decode(_slice_bytes(self._c_details)) + grpc_slice_unref(self._c_details) + + def trailing_metadata(self): + return self._trailing_metadata + + def code(self): + return self._code + + def details(self): + return self._details + + +cdef class ReceiveCloseOnServerOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_CLOSE_ON_SERVER + + cdef void c(self): + self.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER + self.c_op.flags = self._flags + self.c_op.data.receive_close_on_server.cancelled = &self._c_cancelled + + cdef void un_c(self): + self._cancelled = bool(self._c_cancelled) + + def cancelled(self): + return self._cancelled diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi index 594fdb1a8b..7b2482d947 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi @@ -28,48 +28,6 @@ cdef class CallDetails: cdef grpc_call_details c_details -cdef class OperationTag: - - cdef object user_tag - cdef list references - # This allows CompletionQueue to notify the Python Server object that the - # underlying GRPC core server has shutdown - cdef Server shutting_down_server - cdef Call operation_call - cdef CallDetails request_call_details - cdef grpc_metadata_array _c_request_metadata - cdef grpc_op *c_ops - cdef size_t c_nops - cdef readonly object _operations - cdef bint is_new_request - - cdef void store_ops(self) - cdef object release_ops(self) - - -cdef class Event: - - cdef readonly grpc_completion_type type - cdef readonly bint success - cdef readonly object tag - - # For Server.request_call - cdef readonly bint is_new_request - cdef readonly CallDetails request_call_details - cdef readonly object request_metadata - - # For server calls - cdef readonly Call operation_call - - # For Call.start_batch - cdef readonly object batch_operations - - -cdef class ByteBuffer: - - cdef grpc_byte_buffer *c_byte_buffer - - cdef class SslPemKeyCertPair: cdef grpc_ssl_pem_key_cert_pair c_pair @@ -89,22 +47,6 @@ cdef class ChannelArgs: cdef list args -cdef class Operation: - - cdef grpc_op c_op - cdef bint _c_metadata_needs_release - cdef size_t _c_metadata_count - cdef grpc_metadata *_c_metadata - cdef ByteBuffer _received_message - cdef bint _c_metadata_array_needs_destruction - cdef grpc_metadata_array _c_metadata_array - cdef grpc_status_code _received_status_code - cdef grpc_slice _status_details - cdef int _received_cancelled - cdef readonly bint is_valid - cdef object references - - cdef class CompressionOptions: cdef grpc_compression_options c_options diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index 26eaf50eb4..bc2cd0338e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -218,111 +218,6 @@ cdef class CallDetails: return timespec -cdef class OperationTag: - - def __cinit__(self, user_tag, operations): - self.user_tag = user_tag - self.references = [] - self._operations = operations - - cdef void store_ops(self): - self.c_nops = 0 if self._operations is None else len(self._operations) - if 0 < self.c_nops: - self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops) - for index in range(self.c_nops): - self.c_ops[index] = (<Operation>(self._operations[index])).c_op - - cdef object release_ops(self): - if 0 < self.c_nops: - for index, operation in enumerate(self._operations): - (<Operation>operation).c_op = self.c_ops[index] - gpr_free(self.c_ops) - return self._operations - else: - return () - - -cdef class Event: - - def __cinit__(self, grpc_completion_type type, bint success, - object tag, Call operation_call, - CallDetails request_call_details, - object request_metadata, - bint is_new_request, - object batch_operations): - self.type = type - self.success = success - self.tag = tag - self.operation_call = operation_call - self.request_call_details = request_call_details - self.request_metadata = request_metadata - self.batch_operations = batch_operations - self.is_new_request = is_new_request - - -cdef class ByteBuffer: - - def __cinit__(self, bytes data): - grpc_init() - if data is None: - self.c_byte_buffer = NULL - return - - cdef char *c_data = data - cdef grpc_slice data_slice - cdef size_t data_length = len(data) - with nogil: - data_slice = grpc_slice_from_copied_buffer(c_data, data_length) - with nogil: - self.c_byte_buffer = grpc_raw_byte_buffer_create( - &data_slice, 1) - with nogil: - grpc_slice_unref(data_slice) - - def bytes(self): - cdef grpc_byte_buffer_reader reader - cdef grpc_slice data_slice - cdef size_t data_slice_length - cdef void *data_slice_pointer - cdef bint reader_status - if self.c_byte_buffer != NULL: - with nogil: - reader_status = grpc_byte_buffer_reader_init( - &reader, self.c_byte_buffer) - if not reader_status: - return None - result = bytearray() - with nogil: - while grpc_byte_buffer_reader_next(&reader, &data_slice): - data_slice_pointer = grpc_slice_start_ptr(data_slice) - data_slice_length = grpc_slice_length(data_slice) - with gil: - result += (<char *>data_slice_pointer)[:data_slice_length] - grpc_slice_unref(data_slice) - with nogil: - grpc_byte_buffer_reader_destroy(&reader) - return bytes(result) - else: - return None - - def __len__(self): - cdef size_t result - if self.c_byte_buffer != NULL: - with nogil: - result = grpc_byte_buffer_length(self.c_byte_buffer) - return result - else: - return 0 - - def __str__(self): - return self.bytes() - - def __dealloc__(self): - if self.c_byte_buffer != NULL: - grpc_byte_buffer_destroy(self.c_byte_buffer) - grpc_shutdown() - - cdef class SslPemKeyCertPair: def __cinit__(self, bytes private_key, bytes certificate_chain): @@ -407,185 +302,6 @@ cdef class ChannelArgs: return self.args[i] -cdef class Operation: - - def __cinit__(self): - grpc_init() - self.references = [] - self._c_metadata_needs_release = False - self._c_metadata_array_needs_destruction = False - self._status_details = grpc_empty_slice() - self.is_valid = False - - @property - def type(self): - return self.c_op.type - - @property - def flags(self): - return self.c_op.flags - - @property - def has_status(self): - return self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT - - @property - def received_message(self): - if self.c_op.type != GRPC_OP_RECV_MESSAGE: - raise TypeError("self must be an operation receiving a message") - return self._received_message - - @property - def received_message_or_none(self): - if self.c_op.type != GRPC_OP_RECV_MESSAGE: - return None - return self._received_message - - @property - def received_metadata(self): - if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and - self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT): - raise TypeError("self must be an operation receiving metadata") - return _metadata(&self._c_metadata_array) - - @property - def received_status_code(self): - if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: - raise TypeError("self must be an operation receiving a status code") - return self._received_status_code - - @property - def received_status_code_or_none(self): - if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: - return None - return self._received_status_code - - @property - def received_status_details(self): - if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: - raise TypeError("self must be an operation receiving status details") - return _slice_bytes(self._status_details) - - @property - def received_status_details_or_none(self): - if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: - return None - return _slice_bytes(self._status_details) - - @property - def received_cancelled(self): - if self.c_op.type != GRPC_OP_RECV_CLOSE_ON_SERVER: - raise TypeError("self must be an operation receiving cancellation " - "information") - return False if self._received_cancelled == 0 else True - - @property - def received_cancelled_or_none(self): - if self.c_op.type != GRPC_OP_RECV_CLOSE_ON_SERVER: - return None - return False if self._received_cancelled == 0 else True - - def __dealloc__(self): - if self._c_metadata_needs_release: - _release_c_metadata(self._c_metadata, self._c_metadata_count) - if self._c_metadata_array_needs_destruction: - grpc_metadata_array_destroy(&self._c_metadata_array) - grpc_slice_unref(self._status_details) - grpc_shutdown() - -def operation_send_initial_metadata(metadata, int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA - op.c_op.flags = flags - _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count) - op._c_metadata_needs_release = True - op.c_op.data.send_initial_metadata.count = op._c_metadata_count - op.c_op.data.send_initial_metadata.metadata = op._c_metadata - op.is_valid = True - return op - -def operation_send_message(data, int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_SEND_MESSAGE - op.c_op.flags = flags - byte_buffer = ByteBuffer(data) - op.c_op.data.send_message.send_message = byte_buffer.c_byte_buffer - op.references.append(byte_buffer) - op.is_valid = True - return op - -def operation_send_close_from_client(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT - op.c_op.flags = flags - op.is_valid = True - return op - -def operation_send_status_from_server( - metadata, grpc_status_code code, bytes details, int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER - op.c_op.flags = flags - _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count) - op._c_metadata_needs_release = True - op.c_op.data.send_status_from_server.trailing_metadata_count = ( - op._c_metadata_count) - op.c_op.data.send_status_from_server.trailing_metadata = op._c_metadata - op.c_op.data.send_status_from_server.status = code - grpc_slice_unref(op._status_details) - op._status_details = _slice_from_bytes(details) - op.c_op.data.send_status_from_server.status_details = &op._status_details - op.is_valid = True - return op - -def operation_receive_initial_metadata(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA - op.c_op.flags = flags - grpc_metadata_array_init(&op._c_metadata_array) - op.c_op.data.receive_initial_metadata.receive_initial_metadata = ( - &op._c_metadata_array) - op._c_metadata_array_needs_destruction = True - op.is_valid = True - return op - -def operation_receive_message(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_RECV_MESSAGE - op.c_op.flags = flags - op._received_message = ByteBuffer(None) - # n.b. the c_op.data.receive_message field needs to be deleted by us, - # anyway, so we just let that be handled by the ByteBuffer() we allocated - # the line before. - op.c_op.data.receive_message.receive_message = ( - &op._received_message.c_byte_buffer) - op.is_valid = True - return op - -def operation_receive_status_on_client(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT - op.c_op.flags = flags - grpc_metadata_array_init(&op._c_metadata_array) - op.c_op.data.receive_status_on_client.trailing_metadata = ( - &op._c_metadata_array) - op._c_metadata_array_needs_destruction = True - op.c_op.data.receive_status_on_client.status = ( - &op._received_status_code) - op.c_op.data.receive_status_on_client.status_details = ( - &op._status_details) - op.is_valid = True - return op - -def operation_receive_close_on_server(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER - op.c_op.flags = flags - op.c_op.data.receive_close_on_server.cancelled = &op._received_cancelled - op.is_valid = True - return op - - cdef class CompressionOptions: def __cinit__(self): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index f8d7892858..c19beccde6 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -78,19 +78,15 @@ cdef class Server: raise ValueError("server must be started and not shutting down") if server_queue not in self.registered_completion_queues: raise ValueError("server_queue must be a registered completion queue") - cdef OperationTag operation_tag = OperationTag(tag, None) - operation_tag.operation_call = Call() - operation_tag.request_call_details = CallDetails() - grpc_metadata_array_init(&operation_tag._c_request_metadata) - operation_tag.references.extend([self, call_queue, server_queue]) - operation_tag.is_new_request = True - cpython.Py_INCREF(operation_tag) + cdef _RequestCallTag request_call_tag = _RequestCallTag(tag) + request_call_tag.prepare() + cpython.Py_INCREF(request_call_tag) return grpc_server_request_call( - self.c_server, &operation_tag.operation_call.c_call, - &operation_tag.request_call_details.c_details, - &operation_tag._c_request_metadata, + self.c_server, &request_call_tag.call.c_call, + &request_call_tag.call_details.c_details, + &request_call_tag.c_invocation_metadata, call_queue.c_completion_queue, server_queue.c_completion_queue, - <cpython.PyObject *>operation_tag) + <cpython.PyObject *>request_call_tag) def register_completion_queue( self, CompletionQueue queue not None): @@ -131,16 +127,14 @@ cdef class Server: cdef _c_shutdown(self, CompletionQueue queue, tag): self.is_shutting_down = True - operation_tag = OperationTag(tag, None) - operation_tag.shutting_down_server = self - cpython.Py_INCREF(operation_tag) + cdef _ServerShutdownTag server_shutdown_tag = _ServerShutdownTag(tag, self) + cpython.Py_INCREF(server_shutdown_tag) with nogil: grpc_server_shutdown_and_notify( self.c_server, queue.c_completion_queue, - <cpython.PyObject *>operation_tag) + <cpython.PyObject *>server_shutdown_tag) def shutdown(self, CompletionQueue queue not None, tag): - cdef OperationTag operation_tag if queue.is_shutting_down: raise ValueError("queue must be live") elif not self.is_started: @@ -153,7 +147,8 @@ cdef class Server: self._c_shutdown(queue, tag) cdef notify_shutdown_complete(self): - # called only by a completion queue on receiving our shutdown operation tag + # called only after our server shutdown tag has emerged from a completion + # queue. self.is_shutdown = True def cancel_all_calls(self): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi new file mode 100644 index 0000000000..f9a3b5e8f4 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi @@ -0,0 +1,58 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class _Tag: + + cdef object event(self, grpc_event c_event) + + +cdef class _ConnectivityTag(_Tag): + + cdef readonly object _user_tag + + cdef ConnectivityEvent event(self, grpc_event c_event) + + +cdef class _RequestCallTag(_Tag): + + cdef readonly object _user_tag + cdef Call call + cdef CallDetails call_details + cdef grpc_metadata_array c_invocation_metadata + + cdef void prepare(self) + cdef RequestCallEvent event(self, grpc_event c_event) + + +cdef class _BatchOperationTag(_Tag): + + cdef object _user_tag + cdef readonly object _operations + cdef readonly object _retained_call + cdef grpc_op *c_ops + cdef size_t c_nops + + cdef void prepare(self) + cdef BatchOperationEvent event(self, grpc_event c_event) + + +cdef class _ServerShutdownTag(_Tag): + + cdef readonly object _user_tag + # This allows CompletionQueue to notify the Python Server object that the + # underlying GRPC core server has shutdown + cdef readonly Server _shutting_down_server + + cdef ServerShutdownEvent event(self, grpc_event c_event) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi new file mode 100644 index 0000000000..aaca458442 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi @@ -0,0 +1,87 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class _Tag: + + cdef object event(self, grpc_event c_event): + raise NotImplementedError() + + +cdef class _ConnectivityTag(_Tag): + + def __cinit__(self, user_tag): + self._user_tag = user_tag + + cdef ConnectivityEvent event(self, grpc_event c_event): + return ConnectivityEvent(c_event.type, c_event.success, self._user_tag) + + +cdef class _RequestCallTag(_Tag): + + def __cinit__(self, user_tag): + self._user_tag = user_tag + self.call = None + self.call_details = None + + cdef void prepare(self): + self.call = Call() + self.call_details = CallDetails() + grpc_metadata_array_init(&self.c_invocation_metadata) + + cdef RequestCallEvent event(self, grpc_event c_event): + cdef tuple invocation_metadata = _metadata(&self.c_invocation_metadata) + grpc_metadata_array_destroy(&self.c_invocation_metadata) + return RequestCallEvent( + c_event.type, c_event.success, self._user_tag, self.call, + self.call_details, invocation_metadata) + + +cdef class _BatchOperationTag: + + def __cinit__(self, user_tag, operations, call): + self._user_tag = user_tag + self._operations = operations + self._retained_call = call + + cdef void prepare(self): + self.c_nops = 0 if self._operations is None else len(self._operations) + if 0 < self.c_nops: + self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops) + for index, operation in enumerate(self._operations): + (<Operation>operation).c() + self.c_ops[index] = (<Operation>operation).c_op + + cdef BatchOperationEvent event(self, grpc_event c_event): + if 0 < self.c_nops: + for index, operation in enumerate(self._operations): + (<Operation>operation).c_op = self.c_ops[index] + (<Operation>operation).un_c() + gpr_free(self.c_ops) + return BatchOperationEvent( + c_event.type, c_event.success, self._user_tag, self._operations) + else: + return BatchOperationEvent( + c_event.type, c_event.success, self._user_tag, ()) + + +cdef class _ServerShutdownTag(_Tag): + + def __cinit__(self, user_tag, shutting_down_server): + self._user_tag = user_tag + self._shutting_down_server = shutting_down_server + + cdef ServerShutdownEvent event(self, grpc_event c_event): + self._shutting_down_server.notify_shutdown_complete() + return ServerShutdownEvent(c_event.type, c_event.success, self._user_tag)
\ No newline at end of file diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd index 6fc5638d5d..b32fa518fc 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pxd +++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd @@ -18,7 +18,10 @@ include "_cygrpc/call.pxd.pxi" include "_cygrpc/channel.pxd.pxi" include "_cygrpc/credentials.pxd.pxi" include "_cygrpc/completion_queue.pxd.pxi" +include "_cygrpc/event.pxd.pxi" include "_cygrpc/metadata.pxd.pxi" +include "_cygrpc/operation.pxd.pxi" include "_cygrpc/records.pxd.pxi" include "_cygrpc/security.pxd.pxi" include "_cygrpc/server.pxd.pxi" +include "_cygrpc/tag.pxd.pxi" diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index d605229822..5106394708 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -25,10 +25,13 @@ include "_cygrpc/call.pyx.pxi" include "_cygrpc/channel.pyx.pxi" include "_cygrpc/credentials.pyx.pxi" include "_cygrpc/completion_queue.pyx.pxi" +include "_cygrpc/event.pyx.pxi" include "_cygrpc/metadata.pyx.pxi" +include "_cygrpc/operation.pyx.pxi" include "_cygrpc/records.pyx.pxi" include "_cygrpc/security.pyx.pxi" include "_cygrpc/server.pyx.pxi" +include "_cygrpc/tag.pyx.pxi" # # initialize gRPC diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 02d3af8706..22244b9cec 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -50,7 +50,7 @@ _UNEXPECTED_EXIT_SERVER_GRACE = 1.0 def _serialized_request(request_event): - return request_event.batch_operations[0].received_message.bytes() + return request_event.batch_operations[0].message() def _application_code(code): @@ -130,13 +130,13 @@ def _abort(state, call, code, details): effective_code = _abortion_code(state, code) effective_details = details if state.details is None else state.details if state.initial_metadata_allowed: - operations = (cygrpc.operation_send_initial_metadata( - (), _EMPTY_FLAGS), cygrpc.operation_send_status_from_server( + operations = (cygrpc.SendInitialMetadataOperation( + None, _EMPTY_FLAGS), cygrpc.SendStatusFromServerOperation( state.trailing_metadata, effective_code, effective_details, _EMPTY_FLAGS),) token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN else: - operations = (cygrpc.operation_send_status_from_server( + operations = (cygrpc.SendStatusFromServerOperation( state.trailing_metadata, effective_code, effective_details, _EMPTY_FLAGS),) token = _SEND_STATUS_FROM_SERVER_TOKEN @@ -150,8 +150,7 @@ def _receive_close_on_server(state): def receive_close_on_server(receive_close_on_server_event): with state.condition: - if receive_close_on_server_event.batch_operations[ - 0].received_cancelled: + if receive_close_on_server_event.batch_operations[0].cancelled(): state.client = _CANCELLED elif state.client is _OPEN: state.client = _CLOSED @@ -218,11 +217,10 @@ class _Context(grpc.ServicerContext): def time_remaining(self): return max( - float(self._rpc_event.request_call_details.deadline) - time.time(), - 0) + float(self._rpc_event.call_details.deadline) - time.time(), 0) def cancel(self): - self._rpc_event.operation_call.cancel() + self._rpc_event.call.cancel() def add_callback(self, callback): with self._state.condition: @@ -237,23 +235,23 @@ class _Context(grpc.ServicerContext): self._state.disable_next_compression = True def invocation_metadata(self): - return self._rpc_event.request_metadata + return self._rpc_event.invocation_metadata def peer(self): - return _common.decode(self._rpc_event.operation_call.peer()) + return _common.decode(self._rpc_event.call.peer()) def peer_identities(self): - return cygrpc.peer_identities(self._rpc_event.operation_call) + return cygrpc.peer_identities(self._rpc_event.call) def peer_identity_key(self): - id_key = cygrpc.peer_identity_key(self._rpc_event.operation_call) + id_key = cygrpc.peer_identity_key(self._rpc_event.call) return id_key if id_key is None else _common.decode(id_key) def auth_context(self): return { _common.decode(key): value for key, value in six.iteritems( - cygrpc.auth_context(self._rpc_event.operation_call)) + cygrpc.auth_context(self._rpc_event.call)) } def send_initial_metadata(self, initial_metadata): @@ -262,9 +260,9 @@ class _Context(grpc.ServicerContext): _raise_rpc_error(self._state) else: if self._state.initial_metadata_allowed: - operation = cygrpc.operation_send_initial_metadata( + operation = cygrpc.SendInitialMetadataOperation( initial_metadata, _EMPTY_FLAGS) - self._rpc_event.operation_call.start_server_batch( + self._rpc_event.call.start_server_batch( (operation,), _send_initial_metadata(self._state)) self._state.initial_metadata_allowed = False self._state.due.add(_SEND_INITIAL_METADATA_TOKEN) @@ -305,7 +303,7 @@ class _RequestIterator(object): raise StopIteration() else: self._call.start_server_batch( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),), + (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), _receive_message(self._state, self._call, self._request_deserializer)) self._state.due.add(_RECEIVE_MESSAGE_TOKEN) @@ -347,9 +345,9 @@ def _unary_request(rpc_event, state, request_deserializer): if state.client is _CANCELLED or state.statused: return None else: - rpc_event.operation_call.start_server_batch( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),), - _receive_message(state, rpc_event.operation_call, + rpc_event.call.start_server_batch( + (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), + _receive_message(state, rpc_event.call, request_deserializer)) state.due.add(_RECEIVE_MESSAGE_TOKEN) while True: @@ -357,8 +355,8 @@ def _unary_request(rpc_event, state, request_deserializer): if state.request is None: if state.client is _CLOSED: details = '"{}" requires exactly one request message.'.format( - rpc_event.request_call_details.method) - _abort(state, rpc_event.operation_call, + rpc_event.call_details.method) + _abort(state, rpc_event.call, cygrpc.StatusCode.unimplemented, _common.encode(details)) return None @@ -379,13 +377,13 @@ def _call_behavior(rpc_event, state, behavior, argument, request_deserializer): except Exception as exception: # pylint: disable=broad-except with state.condition: if exception is state.abortion: - _abort(state, rpc_event.operation_call, - cygrpc.StatusCode.unknown, b'RPC Aborted') + _abort(state, rpc_event.call, cygrpc.StatusCode.unknown, + b'RPC Aborted') elif exception not in state.rpc_errors: details = 'Exception calling application: {}'.format(exception) logging.exception(details) - _abort(state, rpc_event.operation_call, - cygrpc.StatusCode.unknown, _common.encode(details)) + _abort(state, rpc_event.call, cygrpc.StatusCode.unknown, + _common.encode(details)) return None, False @@ -397,13 +395,13 @@ def _take_response_from_response_iterator(rpc_event, state, response_iterator): except Exception as exception: # pylint: disable=broad-except with state.condition: if exception is state.abortion: - _abort(state, rpc_event.operation_call, - cygrpc.StatusCode.unknown, b'RPC Aborted') + _abort(state, rpc_event.call, cygrpc.StatusCode.unknown, + b'RPC Aborted') elif exception not in state.rpc_errors: details = 'Exception iterating responses: {}'.format(exception) logging.exception(details) - _abort(state, rpc_event.operation_call, - cygrpc.StatusCode.unknown, _common.encode(details)) + _abort(state, rpc_event.call, cygrpc.StatusCode.unknown, + _common.encode(details)) return None, False @@ -411,7 +409,7 @@ def _serialize_response(rpc_event, state, response, response_serializer): serialized_response = _common.serialize(response, response_serializer) if serialized_response is None: with state.condition: - _abort(state, rpc_event.operation_call, cygrpc.StatusCode.internal, + _abort(state, rpc_event.call, cygrpc.StatusCode.internal, b'Failed to serialize response!') return None else: @@ -424,17 +422,18 @@ def _send_response(rpc_event, state, serialized_response): return False else: if state.initial_metadata_allowed: - operations = (cygrpc.operation_send_initial_metadata( - (), _EMPTY_FLAGS), cygrpc.operation_send_message( - serialized_response, _EMPTY_FLAGS),) + operations = (cygrpc.SendInitialMetadataOperation(None, + _EMPTY_FLAGS), + cygrpc.SendMessageOperation(serialized_response, + _EMPTY_FLAGS),) state.initial_metadata_allowed = False token = _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN else: - operations = (cygrpc.operation_send_message(serialized_response, - _EMPTY_FLAGS),) + operations = (cygrpc.SendMessageOperation(serialized_response, + _EMPTY_FLAGS),) token = _SEND_MESSAGE_TOKEN - rpc_event.operation_call.start_server_batch( - operations, _send_message(state, token)) + rpc_event.call.start_server_batch(operations, + _send_message(state, token)) state.due.add(token) while True: state.condition.wait() @@ -448,17 +447,17 @@ def _status(rpc_event, state, serialized_response): code = _completion_code(state) details = _details(state) operations = [ - cygrpc.operation_send_status_from_server( + cygrpc.SendStatusFromServerOperation( state.trailing_metadata, code, details, _EMPTY_FLAGS), ] if state.initial_metadata_allowed: operations.append( - cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS)) + cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS)) if serialized_response is not None: operations.append( - cygrpc.operation_send_message(serialized_response, - _EMPTY_FLAGS)) - rpc_event.operation_call.start_server_batch( + cygrpc.SendMessageOperation(serialized_response, + _EMPTY_FLAGS)) + rpc_event.call.start_server_batch( operations, _send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN)) state.statused = True @@ -525,7 +524,7 @@ def _handle_unary_stream(rpc_event, state, method_handler, thread_pool): def _handle_stream_unary(rpc_event, state, method_handler, thread_pool): - request_iterator = _RequestIterator(state, rpc_event.operation_call, + request_iterator = _RequestIterator(state, rpc_event.call, method_handler.request_deserializer) return thread_pool.submit( _unary_response_in_pool, rpc_event, state, method_handler.stream_unary, @@ -534,7 +533,7 @@ def _handle_stream_unary(rpc_event, state, method_handler, thread_pool): def _handle_stream_stream(rpc_event, state, method_handler, thread_pool): - request_iterator = _RequestIterator(state, rpc_event.operation_call, + request_iterator = _RequestIterator(state, rpc_event.call, method_handler.request_deserializer) return thread_pool.submit( _stream_response_in_pool, rpc_event, state, @@ -552,8 +551,8 @@ def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline): return None handler_call_details = _HandlerCallDetails( - _common.decode(rpc_event.request_call_details.method), - rpc_event.request_metadata) + _common.decode(rpc_event.call_details.method), + rpc_event.invocation_metadata) if interceptor_pipeline is not None: return interceptor_pipeline.execute(query_handlers, @@ -563,21 +562,21 @@ def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline): def _reject_rpc(rpc_event, status, details): - operations = (cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS), - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server((), status, details, - _EMPTY_FLAGS),) + operations = (cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS), + cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation(None, status, details, + _EMPTY_FLAGS),) rpc_state = _RPCState() - rpc_event.operation_call.start_server_batch( - operations, lambda ignored_event: (rpc_state, (),)) + rpc_event.call.start_server_batch(operations, + lambda ignored_event: (rpc_state, (),)) return rpc_state def _handle_with_method_handler(rpc_event, method_handler, thread_pool): state = _RPCState() with state.condition: - rpc_event.operation_call.start_server_batch( - (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),), + rpc_event.call.start_server_batch( + (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),), _receive_close_on_server(state)) state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN) if method_handler.request_streaming: @@ -600,7 +599,7 @@ def _handle_call(rpc_event, generic_handlers, interceptor_pipeline, thread_pool, concurrency_exceeded): if not rpc_event.success: return None, None - if rpc_event.request_call_details.method is not None: + if rpc_event.call_details.method is not None: try: method_handler = _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline) diff --git a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py index ac31e72409..3cbbb8de33 100644 --- a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py +++ b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py @@ -16,12 +16,11 @@ import unittest import grpc -from grpc.framework.foundation import logging_pool from grpc_health.v1 import health from grpc_health.v1 import health_pb2 from grpc_health.v1 import health_pb2_grpc -from tests.unit.framework.common import test_constants +from tests.unit import test_common class HealthServicerTest(unittest.TestCase): @@ -35,8 +34,7 @@ class HealthServicerTest(unittest.TestCase): health_pb2.HealthCheckResponse.UNKNOWN) servicer.set('grpc.test.TestServiceNotServing', health_pb2.HealthCheckResponse.NOT_SERVING) - server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - self._server = grpc.server(server_pool) + self._server = test_common.test_server() port = self._server.add_insecure_port('[::]:0') health_pb2_grpc.add_HealthServicer_to_server(servicer, self._server) self._server.start() diff --git a/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py b/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py index 4136739f05..8d464b2d4b 100644 --- a/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py +++ b/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py @@ -13,7 +13,6 @@ # limitations under the License. """Insecure client-server interoperability as a unit test.""" -from concurrent import futures import unittest import grpc @@ -22,13 +21,14 @@ from src.proto.grpc.testing import test_pb2_grpc from tests.interop import _intraop_test_case from tests.interop import methods from tests.interop import server +from tests.unit import test_common class InsecureIntraopTest(_intraop_test_case.IntraopTestCase, unittest.TestCase): def setUp(self): - self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + self.server = test_common.test_server() test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(), self.server) port = self.server.add_insecure_port('[::]:0') diff --git a/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py index 6514d77371..6ce8b3715b 100644 --- a/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py +++ b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py @@ -13,7 +13,6 @@ # limitations under the License. """Secure client-server interoperability as a unit test.""" -from concurrent import futures import unittest import grpc @@ -22,6 +21,7 @@ from src.proto.grpc.testing import test_pb2_grpc from tests.interop import _intraop_test_case from tests.interop import methods from tests.interop import resources +from tests.unit import test_common _SERVER_HOST_OVERRIDE = 'foo.test.google.fr' @@ -29,7 +29,7 @@ _SERVER_HOST_OVERRIDE = 'foo.test.google.fr' class SecureIntraopTest(_intraop_test_case.IntraopTestCase, unittest.TestCase): def setUp(self): - self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + self.server = test_common.test_server() test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(), self.server) port = self.server.add_secure_port( diff --git a/src/python/grpcio_tests/tests/interop/server.py b/src/python/grpcio_tests/tests/interop/server.py index eeb41a21d2..dd4f5146e9 100644 --- a/src/python/grpcio_tests/tests/interop/server.py +++ b/src/python/grpcio_tests/tests/interop/server.py @@ -23,6 +23,7 @@ from src.proto.grpc.testing import test_pb2_grpc from tests.interop import methods from tests.interop import resources +from tests.unit import test_common _ONE_DAY_IN_SECONDS = 60 * 60 * 24 @@ -38,7 +39,7 @@ def serve(): help='require a secure connection') args = parser.parse_args() - server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + server = test_common.test_server() test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(), server) if args.use_tls: diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py index 5b84001aab..8fc539e641 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py @@ -13,7 +13,6 @@ # limitations under the License. import collections -from concurrent import futures import contextlib import distutils.spawn import errno @@ -28,6 +27,7 @@ import unittest from six import moves import grpc +from tests.unit import test_common from tests.unit.framework.common import test_constants import tests.protoc_plugin.protos.payload.test_payload_pb2 as payload_pb2 @@ -155,8 +155,7 @@ def _CreateService(): def HalfDuplexCall(self, request_iter, context): return servicer_methods.HalfDuplexCall(request_iter, context) - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE)) + server = test_common.test_server() getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), server) port = server.add_insecure_port('[::]:0') @@ -177,8 +176,7 @@ def _CreateIncompleteService(): class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)): pass - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE)) + server = test_common.test_server() getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), server) port = server.add_insecure_port('[::]:0') diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py index 7868cdbfb3..c732e55108 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py @@ -13,7 +13,6 @@ # limitations under the License. import abc -from concurrent import futures import contextlib import importlib import os @@ -29,7 +28,7 @@ import six import grpc from grpc_tools import protoc -from tests.unit.framework.common import test_constants +from tests.unit import test_common _MESSAGES_IMPORT = b'import "messages.proto";' _SPLIT_NAMESPACE = b'package grpc_protoc_plugin.invocation_testing.split;' @@ -256,9 +255,7 @@ class _Test(six.with_metaclass(abc.ABCMeta, unittest.TestCase)): self._protoc() for services_module in self._services_modules(): - server = grpc.server( - futures.ThreadPoolExecutor( - max_workers=test_constants.POOL_SIZE)) + server = test_common.test_server() services_module.add_TestServiceServicer_to_server( _Servicer(self._messages_pb2.Response), server) port = server.add_insecure_port('[::]:0') diff --git a/src/python/grpcio_tests/tests/qps/qps_worker.py b/src/python/grpcio_tests/tests/qps/qps_worker.py index 3e46c0b8c2..54f69db109 100644 --- a/src/python/grpcio_tests/tests/qps/qps_worker.py +++ b/src/python/grpcio_tests/tests/qps/qps_worker.py @@ -16,15 +16,15 @@ import argparse import time -from concurrent import futures import grpc from src.proto.grpc.testing import services_pb2_grpc from tests.qps import worker_server +from tests.unit import test_common def run_worker_server(port): - server = grpc.server(futures.ThreadPoolExecutor(max_workers=5)) + server = test_common.test_server() servicer = worker_server.WorkerServer() services_pb2_grpc.add_WorkerServiceServicer_to_server(servicer, server) server.add_insecure_port('[::]:{}'.format(port)) diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py index adb10cbcec..fef4fb0459 100644 --- a/src/python/grpcio_tests/tests/qps/worker_server.py +++ b/src/python/grpcio_tests/tests/qps/worker_server.py @@ -28,6 +28,7 @@ from tests.qps import benchmark_server from tests.qps import client_runner from tests.qps import histogram from tests.unit import resources +from tests.unit import test_common class WorkerServer(services_pb2_grpc.WorkerServiceServicer): @@ -68,8 +69,7 @@ class WorkerServer(services_pb2_grpc.WorkerServiceServicer): server_threads = multiprocessing.cpu_count() * 5 else: server_threads = config.async_server_threads - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=server_threads)) + server = test_common.test_server(max_workers=server_threads) if config.server_type == control_pb2.ASYNC_SERVER: servicer = benchmark_server.BenchmarkServer() services_pb2_grpc.add_BenchmarkServiceServicer_to_server(servicer, diff --git a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py index a86743fa5a..86037e258a 100644 --- a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py +++ b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py @@ -16,7 +16,6 @@ import unittest import grpc -from grpc.framework.foundation import logging_pool from grpc_reflection.v1alpha import reflection from grpc_reflection.v1alpha import reflection_pb2 from grpc_reflection.v1alpha import reflection_pb2_grpc @@ -27,7 +26,7 @@ from google.protobuf import descriptor_pb2 from src.proto.grpc.testing import empty_pb2 from src.proto.grpc.testing.proto2 import empty2_extensions_pb2 -from tests.unit.framework.common import test_constants +from tests.unit import test_common _EMPTY_PROTO_FILE_NAME = 'src/proto/grpc/testing/empty.proto' _EMPTY_PROTO_SYMBOL_NAME = 'grpc.testing.Empty' @@ -46,8 +45,7 @@ def _file_descriptor_to_proto(descriptor): class ReflectionServicerTest(unittest.TestCase): def setUp(self): - server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - self._server = grpc.server(server_pool) + self._server = test_common.test_server() reflection.enable_server_reflection(_SERVICE_NAMES, self._server) port = self._server.add_insecure_port('[::]:0') self._server.start() diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index 3bf5308749..e033c1063f 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -34,6 +34,7 @@ "unit._cython._no_messages_server_completion_queue_per_call_test.Test", "unit._cython._no_messages_single_server_completion_queue_test.Test", "unit._cython._read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest", + "unit._cython._server_test.Test", "unit._cython.cygrpc_test.InsecureServerInsecureClient", "unit._cython.cygrpc_test.SecureServerSecureClient", "unit._cython.cygrpc_test.TypeSmokeTest", diff --git a/src/python/grpcio_tests/tests/unit/_auth_context_test.py b/src/python/grpcio_tests/tests/unit/_auth_context_test.py index c6a0a23549..ebc04a71e0 100644 --- a/src/python/grpcio_tests/tests/unit/_auth_context_test.py +++ b/src/python/grpcio_tests/tests/unit/_auth_context_test.py @@ -18,11 +18,9 @@ import unittest import grpc from grpc import _channel -from grpc.framework.foundation import logging_pool import six from tests.unit import test_common -from tests.unit.framework.common import test_constants from tests.unit import resources _REQUEST = b'\x00\x00\x00' @@ -55,12 +53,12 @@ def handle_unary_unary(request, servicer_context): class AuthContextTest(unittest.TestCase): def testInsecure(self): - server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) handler = grpc.method_handlers_generic_handler('test', { 'UnaryUnary': grpc.unary_unary_rpc_method_handler(handle_unary_unary) }) - server = grpc.server(server_pool, (handler,)) + server = test_common.test_server() + server.add_generic_rpc_handlers((handler,)) port = server.add_insecure_port('[::]:0') server.start() @@ -74,12 +72,12 @@ class AuthContextTest(unittest.TestCase): self.assertDictEqual({}, auth_data[_AUTH_CTX]) def testSecureNoCert(self): - server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) handler = grpc.method_handlers_generic_handler('test', { 'UnaryUnary': grpc.unary_unary_rpc_method_handler(handle_unary_unary) }) - server = grpc.server(server_pool, (handler,)) + server = test_common.test_server() + server.add_generic_rpc_handlers((handler,)) server_cred = grpc.ssl_server_credentials(_SERVER_CERTS) port = server.add_secure_port('[::]:0', server_cred) server.start() @@ -101,12 +99,12 @@ class AuthContextTest(unittest.TestCase): }, auth_data[_AUTH_CTX]) def testSecureClientCert(self): - server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) handler = grpc.method_handlers_generic_handler('test', { 'UnaryUnary': grpc.unary_unary_rpc_method_handler(handle_unary_unary) }) - server = grpc.server(server_pool, (handler,)) + server = test_common.test_server() + server.add_generic_rpc_handlers((handler,)) server_cred = grpc.ssl_server_credentials( _SERVER_CERTS, root_certificates=_TEST_ROOT_CERTIFICATES, diff --git a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py index f8c61270ca..f9eb0011dc 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py @@ -83,7 +83,7 @@ class ChannelConnectivityTest(unittest.TestCase): def test_immediately_connectable_channel_connectivity(self): thread_pool = _thread_pool.RecordingThreadPool(max_workers=None) - server = grpc.server(thread_pool) + server = grpc.server(thread_pool, options=(('grpc.so_reuseport', 0),)) port = server.add_insecure_port('[::]:0') server.start() first_callback = _Callback() @@ -125,7 +125,7 @@ class ChannelConnectivityTest(unittest.TestCase): def test_reachable_then_unreachable_channel_connectivity(self): thread_pool = _thread_pool.RecordingThreadPool(max_workers=None) - server = grpc.server(thread_pool) + server = grpc.server(thread_pool, options=(('grpc.so_reuseport', 0),)) port = server.add_insecure_port('[::]:0') server.start() callback = _Callback() diff --git a/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py b/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py index bdd2d86169..30b486079c 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py @@ -61,7 +61,7 @@ class ChannelReadyFutureTest(unittest.TestCase): def test_immediately_connectable_channel_connectivity(self): thread_pool = _thread_pool.RecordingThreadPool(max_workers=None) - server = grpc.server(thread_pool) + server = grpc.server(thread_pool, options=(('grpc.so_reuseport', 0),)) port = server.add_insecure_port('[::]:0') server.start() channel = grpc.insecure_channel('localhost:{}'.format(port)) diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py index e576a5aca9..93e599d8f8 100644 --- a/src/python/grpcio_tests/tests/unit/_compression_test.py +++ b/src/python/grpcio_tests/tests/unit/_compression_test.py @@ -17,7 +17,6 @@ import unittest import grpc from grpc import _grpcio_metadata -from grpc.framework.foundation import logging_pool from tests.unit import test_common from tests.unit.framework.common import test_constants @@ -72,9 +71,8 @@ class _GenericHandler(grpc.GenericRpcHandler): class CompressionTest(unittest.TestCase): def setUp(self): - self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - self._server = grpc.server( - self._server_pool, handlers=(_GenericHandler(),)) + self._server = test_common.test_server() + self._server.add_generic_rpc_handlers((_GenericHandler(),)) self._port = self._server.add_insecure_port('[::]:0') self._server.start() diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py index a8a7175cc7..cdb3572453 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py @@ -53,7 +53,7 @@ class _Handler(object): self._state = state self._lock = threading.Lock() self._completion_queue = completion_queue - self._call = rpc_event.operation_call + self._call = rpc_event.call def __call__(self): with self._state.condition: @@ -65,10 +65,10 @@ class _Handler(object): with self._lock: self._call.start_server_batch( - (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),), + (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),), _RECEIVE_CLOSE_ON_SERVER_TAG) self._call.start_server_batch( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),), + (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), _RECEIVE_MESSAGE_TAG) first_event = self._completion_queue.poll() if _is_cancellation_event(first_event): @@ -76,10 +76,10 @@ class _Handler(object): else: with self._lock: operations = ( - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.operation_send_message(b'\x79\x57', _EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'\x79\x57', _EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( _EMPTY_METADATA, cygrpc.StatusCode.ok, b'test details!', _EMPTY_FLAGS),) self._call.start_server_batch(operations, @@ -141,7 +141,8 @@ class CancelManyCallsTest(unittest.TestCase): test_constants.THREAD_CONCURRENCY) server_completion_queue = cygrpc.CompletionQueue() - server = cygrpc.Server(cygrpc.ChannelArgs([])) + server = cygrpc.Server( + cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)])) server.register_completion_queue(server_completion_queue) port = server.add_http2_port(b'[::]:0') server.start() @@ -169,13 +170,13 @@ class CancelManyCallsTest(unittest.TestCase): None, _EMPTY_FLAGS, client_completion_queue, b'/twinkies', None, _INFINITE_FUTURE) operations = ( - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.operation_send_message(b'\x45\x56', _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'\x45\x56', _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),) tag = 'client_complete_call_{0:04d}_tag'.format(index) client_call.start_client_batch(operations, tag) client_due.add(tag) diff --git a/src/python/grpcio_tests/tests/unit/_cython/_common.py b/src/python/grpcio_tests/tests/unit/_cython/_common.py index 96f0f1589b..c5acd36bf2 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_common.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_common.py @@ -88,7 +88,8 @@ class RpcTest(object): def setUp(self): self.server_completion_queue = cygrpc.CompletionQueue() - self.server = cygrpc.Server(cygrpc.ChannelArgs([])) + self.server = cygrpc.Server( + cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)])) self.server.register_completion_queue(self.server_completion_queue) port = self.server.add_http2_port(b'[::]:0') self.server.start() diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py index d08003af44..583136cf23 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py @@ -49,18 +49,19 @@ class Test(_common.RpcTest, unittest.TestCase): with self.client_condition: client_receive_initial_metadata_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_receive_initial_metadata( - _common.EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), ], client_receive_initial_metadata_tag)) + self.assertEqual(cygrpc.CallError.ok, + client_receive_initial_metadata_start_batch_result) client_complete_rpc_start_batch_result = client_call.start_client_batch( [ - cygrpc.operation_send_initial_metadata( + cygrpc.SendInitialMetadataOperation( _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), - cygrpc.operation_send_close_from_client( - _common.EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client( - _common.EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS), ], client_complete_rpc_tag) + self.assertEqual(cygrpc.CallError.ok, + client_complete_rpc_start_batch_result) self.client_driver.add_due({ client_receive_initial_metadata_tag, client_complete_rpc_tag, @@ -71,8 +72,8 @@ class Test(_common.RpcTest, unittest.TestCase): with server_call_condition: server_send_initial_metadata_start_batch_result = ( - server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_send_initial_metadata( + server_request_call_event.call.start_server_batch([ + cygrpc.SendInitialMetadataOperation( _common.INITIAL_METADATA, _common.EMPTY_FLAGS), ], server_send_initial_metadata_tag)) server_call_driver.add_due({ @@ -83,10 +84,9 @@ class Test(_common.RpcTest, unittest.TestCase): with server_call_condition: server_complete_rpc_start_batch_result = ( - server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_receive_close_on_server( - _common.EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + server_request_call_event.call.start_server_batch([ + cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( _common.TRAILING_METADATA, cygrpc.StatusCode.ok, b'test details', _common.EMPTY_FLAGS), ], server_complete_rpc_tag)) @@ -101,23 +101,24 @@ class Test(_common.RpcTest, unittest.TestCase): client_complete_rpc_event = self.client_driver.event_with_tag( client_complete_rpc_tag) - return (_common.OperationResult(server_request_call_start_batch_result, - server_request_call_event.type, - server_request_call_event.success), + return (_common.OperationResult( + server_request_call_start_batch_result, + server_request_call_event.completion_type, + server_request_call_event.success), _common.OperationResult( + client_receive_initial_metadata_start_batch_result, + client_receive_initial_metadata_event.completion_type, + client_receive_initial_metadata_event.success), _common.OperationResult( - client_receive_initial_metadata_start_batch_result, - client_receive_initial_metadata_event.type, - client_receive_initial_metadata_event.success), - _common.OperationResult(client_complete_rpc_start_batch_result, - client_complete_rpc_event.type, - client_complete_rpc_event.success), + client_complete_rpc_start_batch_result, + client_complete_rpc_event.completion_type, + client_complete_rpc_event.success), _common.OperationResult( + server_send_initial_metadata_start_batch_result, + server_send_initial_metadata_event.completion_type, + server_send_initial_metadata_event.success), _common.OperationResult( - server_send_initial_metadata_start_batch_result, - server_send_initial_metadata_event.type, - server_send_initial_metadata_event.success), - _common.OperationResult(server_complete_rpc_start_batch_result, - server_complete_rpc_event.type, - server_complete_rpc_event.success),) + server_complete_rpc_start_batch_result, + server_complete_rpc_event.completion_type, + server_complete_rpc_event.success),) def test_rpcs(self): expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) * diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py index d0166a2b29..c5cf606c90 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py @@ -44,17 +44,14 @@ class Test(_common.RpcTest, unittest.TestCase): with self.client_condition: client_receive_initial_metadata_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_receive_initial_metadata( - _common.EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), ], client_receive_initial_metadata_tag)) client_complete_rpc_start_batch_result = client_call.start_client_batch( [ - cygrpc.operation_send_initial_metadata( + cygrpc.SendInitialMetadataOperation( _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), - cygrpc.operation_send_close_from_client( - _common.EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client( - _common.EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS), ], client_complete_rpc_tag) self.client_driver.add_due({ client_receive_initial_metadata_tag, @@ -66,8 +63,8 @@ class Test(_common.RpcTest, unittest.TestCase): with self.server_condition: server_send_initial_metadata_start_batch_result = ( - server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_send_initial_metadata( + server_request_call_event.call.start_server_batch([ + cygrpc.SendInitialMetadataOperation( _common.INITIAL_METADATA, _common.EMPTY_FLAGS), ], server_send_initial_metadata_tag)) self.server_driver.add_due({ @@ -78,12 +75,11 @@ class Test(_common.RpcTest, unittest.TestCase): with self.server_condition: server_complete_rpc_start_batch_result = ( - server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_receive_close_on_server( - _common.EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + server_request_call_event.call.start_server_batch([ + cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( _common.TRAILING_METADATA, cygrpc.StatusCode.ok, - b'test details', _common.EMPTY_FLAGS), + 'test details', _common.EMPTY_FLAGS), ], server_complete_rpc_tag)) self.server_driver.add_due({ server_complete_rpc_tag, @@ -96,23 +92,24 @@ class Test(_common.RpcTest, unittest.TestCase): client_complete_rpc_event = self.client_driver.event_with_tag( client_complete_rpc_tag) - return (_common.OperationResult(server_request_call_start_batch_result, - server_request_call_event.type, - server_request_call_event.success), + return (_common.OperationResult( + server_request_call_start_batch_result, + server_request_call_event.completion_type, + server_request_call_event.success), _common.OperationResult( + client_receive_initial_metadata_start_batch_result, + client_receive_initial_metadata_event.completion_type, + client_receive_initial_metadata_event.success), _common.OperationResult( - client_receive_initial_metadata_start_batch_result, - client_receive_initial_metadata_event.type, - client_receive_initial_metadata_event.success), - _common.OperationResult(client_complete_rpc_start_batch_result, - client_complete_rpc_event.type, - client_complete_rpc_event.success), + client_complete_rpc_start_batch_result, + client_complete_rpc_event.completion_type, + client_complete_rpc_event.success), _common.OperationResult( + server_send_initial_metadata_start_batch_result, + server_send_initial_metadata_event.completion_type, + server_send_initial_metadata_event.success), _common.OperationResult( - server_send_initial_metadata_start_batch_result, - server_send_initial_metadata_event.type, - server_send_initial_metadata_event.success), - _common.OperationResult(server_complete_rpc_start_batch_result, - server_complete_rpc_event.type, - server_complete_rpc_event.success),) + server_complete_rpc_start_batch_result, + server_complete_rpc_event.completion_type, + server_complete_rpc_event.success),) def test_rpcs(self): expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) * diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py index 1deb15ba03..a5ec54ee59 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py @@ -112,7 +112,8 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): def testReadSomeButNotAllResponses(self): server_completion_queue = cygrpc.CompletionQueue() - server = cygrpc.Server(cygrpc.ChannelArgs([])) + server = cygrpc.Server( + cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)])) server.register_completion_queue(server_completion_queue) port = server.add_http2_port(b'[::]:0') server.start() @@ -158,15 +159,15 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): with client_condition: client_receive_initial_metadata_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), ], client_receive_initial_metadata_tag)) client_due.add(client_receive_initial_metadata_tag) client_complete_rpc_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), ], client_complete_rpc_tag)) client_due.add(client_complete_rpc_tag) @@ -174,13 +175,13 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): with server_call_condition: server_send_initial_metadata_start_batch_result = ( - server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), + server_rpc_event.call.start_server_batch([ + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), ], server_send_initial_metadata_tag)) server_send_first_message_start_batch_result = ( - server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS), + server_rpc_event.call.start_server_batch([ + cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS), ], server_send_first_message_tag)) server_send_initial_metadata_event = server_call_driver.event_with_tag( server_send_initial_metadata_tag) @@ -188,13 +189,13 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): server_send_first_message_tag) with server_call_condition: server_send_second_message_start_batch_result = ( - server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS), + server_rpc_event.call.start_server_batch([ + cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS), ], server_send_second_message_tag)) server_complete_rpc_start_batch_result = ( - server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + server_rpc_event.call.start_server_batch([ + cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( (), cygrpc.StatusCode.ok, b'test details', _EMPTY_FLAGS), ], server_complete_rpc_tag)) @@ -208,7 +209,7 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): client_receive_first_message_tag = 'client_receive_first_message_tag' client_receive_first_message_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], client_receive_first_message_tag)) client_due.add(client_receive_first_message_tag) client_receive_first_message_event = client_driver.event_with_tag( @@ -231,9 +232,8 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): self.assertEqual(cygrpc.CallError.ok, client_call_cancel_result) self.assertIs(server_rpc_tag, server_rpc_event.tag) self.assertEqual(cygrpc.CompletionType.operation_complete, - server_rpc_event.type) - self.assertIsInstance(server_rpc_event.operation_call, cygrpc.Call) - self.assertEqual(0, len(server_rpc_event.batch_operations)) + server_rpc_event.completion_type) + self.assertIsInstance(server_rpc_event.call, cygrpc.Call) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/unit/_cython/_server_test.py b/src/python/grpcio_tests/tests/unit/_cython/_server_test.py new file mode 100644 index 0000000000..12bf40be6b --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_cython/_server_test.py @@ -0,0 +1,49 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test servers at the level of the Cython API.""" + +import threading +import time +import unittest + +from grpc._cython import cygrpc + + +class Test(unittest.TestCase): + + def test_lonely_server(self): + server_call_completion_queue = cygrpc.CompletionQueue() + server_shutdown_completion_queue = cygrpc.CompletionQueue() + server = cygrpc.Server(cygrpc.ChannelArgs([])) + server.register_completion_queue(server_call_completion_queue) + server.register_completion_queue(server_shutdown_completion_queue) + port = server.add_http2_port(b'[::]:0') + server.start() + + server_request_call_tag = 'server_request_call_tag' + server_request_call_start_batch_result = server.request_call( + server_call_completion_queue, server_call_completion_queue, + server_request_call_tag) + + time.sleep(4) + + server_shutdown_tag = 'server_shutdown_tag' + server_shutdown_result = server.shutdown( + server_shutdown_completion_queue, server_shutdown_tag) + server_request_call_event = server_call_completion_queue.poll() + server_shutdown_event = server_shutdown_completion_queue.poll() + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py index 4eda685486..5453735f11 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py @@ -35,11 +35,6 @@ def _metadata_plugin(context, callback): class TypeSmokeTest(unittest.TestCase): - def testOperationFlags(self): - operation = cygrpc.operation_send_message(b'asdf', - cygrpc.WriteFlag.no_compress) - self.assertEqual(cygrpc.WriteFlag.no_compress, operation.flags) - def testTimespec(self): now = time.time() now_timespec_a = cygrpc.Timespec(now) @@ -60,7 +55,8 @@ class TypeSmokeTest(unittest.TestCase): del completion_queue def testServerUpDown(self): - server = cygrpc.Server(cygrpc.ChannelArgs([])) + server = cygrpc.Server( + cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)])) del server def testChannelUpDown(self): @@ -72,7 +68,8 @@ class TypeSmokeTest(unittest.TestCase): b'test plugin name!') def testServerStartNoExplicitShutdown(self): - server = cygrpc.Server(cygrpc.ChannelArgs([])) + server = cygrpc.Server( + cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)])) completion_queue = cygrpc.CompletionQueue() server.register_completion_queue(completion_queue) port = server.add_http2_port(b'[::]:0') @@ -82,14 +79,16 @@ class TypeSmokeTest(unittest.TestCase): def testServerStartShutdown(self): completion_queue = cygrpc.CompletionQueue() - server = cygrpc.Server(cygrpc.ChannelArgs([])) + server = cygrpc.Server( + cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)])) server.add_http2_port(b'[::]:0') server.register_completion_queue(completion_queue) server.start() shutdown_tag = object() server.shutdown(completion_queue, shutdown_tag) event = completion_queue.poll() - self.assertEqual(cygrpc.CompletionType.operation_complete, event.type) + self.assertEqual(cygrpc.CompletionType.operation_complete, + event.completion_type) self.assertIs(shutdown_tag, event.tag) del server del completion_queue @@ -99,7 +98,8 @@ class ServerClientMixin(object): def setUpMixin(self, server_credentials, client_credentials, host_override): self.server_completion_queue = cygrpc.CompletionQueue() - self.server = cygrpc.Server(cygrpc.ChannelArgs([])) + self.server = cygrpc.Server( + cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)])) self.server.register_completion_queue(self.server_completion_queue) if server_credentials: self.port = self.server.add_http2_port(b'[::]:0', @@ -148,7 +148,7 @@ class ServerClientMixin(object): self.assertEqual(cygrpc.CallError.ok, call_result) event = queue.poll(deadline) self.assertEqual(cygrpc.CompletionType.operation_complete, - event.type) + event.completion_type) self.assertTrue(event.success) self.assertIs(tag, event.tag) except Exception as error: @@ -170,7 +170,7 @@ class ServerClientMixin(object): SERVER_TRAILING_METADATA_KEY = 'california_is_in_a_drought' SERVER_TRAILING_METADATA_VALUE = 'zomg it is' SERVER_STATUS_CODE = cygrpc.StatusCode.ok - SERVER_STATUS_DETAILS = b'our work is never over' + SERVER_STATUS_DETAILS = 'our work is never over' REQUEST = b'in death a member of project mayhem has a name' RESPONSE = b'his name is robert paulson' METHOD = b'twinkies' @@ -192,13 +192,13 @@ class ServerClientMixin(object): (CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE,), (CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE,),) client_start_batch_result = client_call.start_client_batch([ - cygrpc.operation_send_initial_metadata(client_initial_metadata, - _EMPTY_FLAGS), - cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS) + cygrpc.SendInitialMetadataOperation(client_initial_metadata, + _EMPTY_FLAGS), + cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), ], client_call_tag) self.assertEqual(cygrpc.CallError.ok, client_start_batch_result) client_event_future = test_utilities.CompletionQueuePollFuture( @@ -206,33 +206,31 @@ class ServerClientMixin(object): request_event = self.server_completion_queue.poll(cygrpc_deadline) self.assertEqual(cygrpc.CompletionType.operation_complete, - request_event.type) - self.assertIsInstance(request_event.operation_call, cygrpc.Call) + request_event.completion_type) + self.assertIsInstance(request_event.call, cygrpc.Call) self.assertIs(server_request_tag, request_event.tag) - self.assertEqual(0, len(request_event.batch_operations)) self.assertTrue( test_common.metadata_transmitted(client_initial_metadata, - request_event.request_metadata)) - self.assertEqual(METHOD, request_event.request_call_details.method) - self.assertEqual(self.expected_host, - request_event.request_call_details.host) + request_event.invocation_metadata)) + self.assertEqual(METHOD, request_event.call_details.method) + self.assertEqual(self.expected_host, request_event.call_details.host) self.assertLess( - abs(DEADLINE - float(request_event.request_call_details.deadline)), + abs(DEADLINE - float(request_event.call_details.deadline)), DEADLINE_TOLERANCE) server_call_tag = object() - server_call = request_event.operation_call + server_call = request_event.call server_initial_metadata = ( (SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE,),) server_trailing_metadata = ( (SERVER_TRAILING_METADATA_KEY, SERVER_TRAILING_METADATA_VALUE,),) server_start_batch_result = server_call.start_server_batch([ - cygrpc.operation_send_initial_metadata( + cygrpc.SendInitialMetadataOperation( server_initial_metadata, - _EMPTY_FLAGS), cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS), - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + _EMPTY_FLAGS), cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.SendMessageOperation(RESPONSE, _EMPTY_FLAGS), + cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS, _EMPTY_FLAGS) ], server_call_tag) @@ -245,25 +243,24 @@ class ServerClientMixin(object): found_client_op_types = set() for client_result in client_event.batch_operations: # we expect each op type to be unique - self.assertNotIn(client_result.type, found_client_op_types) - found_client_op_types.add(client_result.type) - if client_result.type == cygrpc.OperationType.receive_initial_metadata: + self.assertNotIn(client_result.type(), found_client_op_types) + found_client_op_types.add(client_result.type()) + if client_result.type( + ) == cygrpc.OperationType.receive_initial_metadata: self.assertTrue( test_common.metadata_transmitted( server_initial_metadata, - client_result.received_metadata)) - elif client_result.type == cygrpc.OperationType.receive_message: - self.assertEqual(RESPONSE, - client_result.received_message.bytes()) - elif client_result.type == cygrpc.OperationType.receive_status_on_client: + client_result.initial_metadata())) + elif client_result.type() == cygrpc.OperationType.receive_message: + self.assertEqual(RESPONSE, client_result.message()) + elif client_result.type( + ) == cygrpc.OperationType.receive_status_on_client: self.assertTrue( test_common.metadata_transmitted( server_trailing_metadata, - client_result.received_metadata)) - self.assertEqual(SERVER_STATUS_DETAILS, - client_result.received_status_details) - self.assertEqual(SERVER_STATUS_CODE, - client_result.received_status_code) + client_result.trailing_metadata())) + self.assertEqual(SERVER_STATUS_DETAILS, client_result.details()) + self.assertEqual(SERVER_STATUS_CODE, client_result.code()) self.assertEqual( set([ cygrpc.OperationType.send_initial_metadata, @@ -277,13 +274,13 @@ class ServerClientMixin(object): self.assertEqual(5, len(server_event.batch_operations)) found_server_op_types = set() for server_result in server_event.batch_operations: - self.assertNotIn(client_result.type, found_server_op_types) - found_server_op_types.add(server_result.type) - if server_result.type == cygrpc.OperationType.receive_message: - self.assertEqual(REQUEST, - server_result.received_message.bytes()) - elif server_result.type == cygrpc.OperationType.receive_close_on_server: - self.assertFalse(server_result.received_cancelled) + self.assertNotIn(client_result.type(), found_server_op_types) + found_server_op_types.add(server_result.type()) + if server_result.type() == cygrpc.OperationType.receive_message: + self.assertEqual(REQUEST, server_result.message()) + elif server_result.type( + ) == cygrpc.OperationType.receive_close_on_server: + self.assertFalse(server_result.cancelled()) self.assertEqual( set([ cygrpc.OperationType.send_initial_metadata, @@ -319,13 +316,12 @@ class ServerClientMixin(object): cygrpc_deadline, description) client_event_future = perform_client_operations([ - cygrpc.operation_send_initial_metadata(empty_metadata, - _EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), ], "Client prologue") request_event = self.server_completion_queue.poll(cygrpc_deadline) - server_call = request_event.operation_call + server_call = request_event.call def perform_server_operations(operations, description): return self._perform_operations(operations, server_call, @@ -333,8 +329,7 @@ class ServerClientMixin(object): cygrpc_deadline, description) server_event_future = perform_server_operations([ - cygrpc.operation_send_initial_metadata(empty_metadata, - _EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS), ], "Server prologue") client_event_future.result() # force completion @@ -343,12 +338,12 @@ class ServerClientMixin(object): # Messaging for _ in range(10): client_event_future = perform_client_operations([ - cygrpc.operation_send_message(b'', _EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], "Client message") server_event_future = perform_server_operations([ - cygrpc.operation_send_message(b'', _EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], "Server receive") client_event_future.result() # force completion @@ -356,13 +351,13 @@ class ServerClientMixin(object): # Epilogue client_event_future = perform_client_operations([ - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS) + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS) ], "Client epilogue") server_event_future = perform_server_operations([ - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( empty_metadata, cygrpc.StatusCode.ok, b'', _EMPTY_FLAGS) ], "Server epilogue") diff --git a/src/python/grpcio_tests/tests/unit/_empty_message_test.py b/src/python/grpcio_tests/tests/unit/_empty_message_test.py index 62077e7677..14695bc13f 100644 --- a/src/python/grpcio_tests/tests/unit/_empty_message_test.py +++ b/src/python/grpcio_tests/tests/unit/_empty_message_test.py @@ -15,8 +15,8 @@ import unittest import grpc -from grpc.framework.foundation import logging_pool +from tests.unit import test_common from tests.unit.framework.common import test_constants _REQUEST = b'' @@ -87,9 +87,8 @@ class _GenericHandler(grpc.GenericRpcHandler): class EmptyMessageTest(unittest.TestCase): def setUp(self): - self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - self._server = grpc.server( - self._server_pool, handlers=(_GenericHandler(),)) + self._server = test_common.test_server() + self._server.add_generic_rpc_handlers((_GenericHandler(),)) port = self._server.add_insecure_port('[::]:0') self._server.start() self._channel = grpc.insecure_channel('localhost:%d' % port) diff --git a/src/python/grpcio_tests/tests/unit/_exit_scenarios.py b/src/python/grpcio_tests/tests/unit/_exit_scenarios.py index 7c13dab756..0a0239a63d 100644 --- a/src/python/grpcio_tests/tests/unit/_exit_scenarios.py +++ b/src/python/grpcio_tests/tests/unit/_exit_scenarios.py @@ -168,11 +168,11 @@ if __name__ == '__main__': args = parser.parse_args() if args.scenario == UNSTARTED_SERVER: - server = grpc.server(DaemonPool()) + server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),)) if args.wait_for_interrupt: time.sleep(WAIT_TIME) elif args.scenario == RUNNING_SERVER: - server = grpc.server(DaemonPool()) + server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),)) port = server.add_insecure_port('[::]:0') server.start() if args.wait_for_interrupt: @@ -187,7 +187,7 @@ if __name__ == '__main__': if args.wait_for_interrupt: time.sleep(WAIT_TIME) elif args.scenario == POLL_CONNECTIVITY: - server = grpc.server(DaemonPool()) + server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),)) port = server.add_insecure_port('[::]:0') server.start() channel = grpc.insecure_channel('localhost:%d' % port) @@ -201,7 +201,7 @@ if __name__ == '__main__': else: handler = GenericHandler() - server = grpc.server(DaemonPool()) + server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),)) port = server.add_insecure_port('[::]:0') server.add_generic_rpc_handlers((handler,)) server.start() diff --git a/src/python/grpcio_tests/tests/unit/_interceptor_test.py b/src/python/grpcio_tests/tests/unit/_interceptor_test.py index cf875ed7da..2aee298df2 100644 --- a/src/python/grpcio_tests/tests/unit/_interceptor_test.py +++ b/src/python/grpcio_tests/tests/unit/_interceptor_test.py @@ -22,6 +22,7 @@ from concurrent import futures import grpc from grpc.framework.foundation import logging_pool +from tests.unit import test_common from tests.unit.framework.common import test_constants from tests.unit.framework.common import test_control @@ -304,6 +305,7 @@ class InterceptorTest(unittest.TestCase): self._server = grpc.server( self._server_pool, + options=(('grpc.so_reuseport', 0),), interceptors=(_LoggingInterceptor('s1', self._record), conditional_interceptor, _LoggingInterceptor('s2', self._record),)) diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py index 2a1a49ce74..b46d176d04 100644 --- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py +++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py @@ -15,11 +15,10 @@ import itertools import threading import unittest -from concurrent import futures import grpc -from grpc.framework.foundation import logging_pool +from tests.unit import test_common from tests.unit.framework.common import test_constants from tests.unit.framework.common import test_control @@ -191,9 +190,8 @@ class InvocationDefectsTest(unittest.TestCase): def setUp(self): self._control = test_control.PauseFailControl() self._handler = _Handler(self._control) - self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - self._server = grpc.server(self._server_pool) + self._server = test_common.test_server() port = self._server.add_insecure_port('[::]:0') self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),)) self._server.start() diff --git a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py index cb59cd3769..ec67f99fbc 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py @@ -17,7 +17,6 @@ import threading import unittest import grpc -from grpc.framework.foundation import logging_pool from tests.unit import test_common from tests.unit.framework.common import test_constants @@ -186,9 +185,9 @@ class MetadataCodeDetailsTest(unittest.TestCase): def setUp(self): self._servicer = _Servicer() - self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - self._server = grpc.server( - self._server_pool, handlers=(_generic_handler(self._servicer),)) + self._server = test_common.test_server() + self._server.add_generic_rpc_handlers( + (_generic_handler(self._servicer),)) port = self._server.add_insecure_port('[::]:0') self._server.start() diff --git a/src/python/grpcio_tests/tests/unit/_metadata_test.py b/src/python/grpcio_tests/tests/unit/_metadata_test.py index 0669486892..f2dac7bdc5 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_test.py @@ -18,7 +18,6 @@ import weakref import grpc from grpc import _channel -from grpc.framework.foundation import logging_pool from tests.unit import test_common from tests.unit.framework.common import test_constants @@ -146,9 +145,9 @@ class _GenericHandler(grpc.GenericRpcHandler): class MetadataTest(unittest.TestCase): def setUp(self): - self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - self._server = grpc.server( - self._server_pool, handlers=(_GenericHandler(weakref.proxy(self)),)) + self._server = test_common.test_server() + self._server.add_generic_rpc_handlers( + (_GenericHandler(weakref.proxy(self)),)) port = self._server.add_insecure_port('[::]:0') self._server.start() self._channel = grpc.insecure_channel( diff --git a/src/python/grpcio_tests/tests/unit/_reconnect_test.py b/src/python/grpcio_tests/tests/unit/_reconnect_test.py index 53fd1c2ca4..10aee9fb4f 100644 --- a/src/python/grpcio_tests/tests/unit/_reconnect_test.py +++ b/src/python/grpcio_tests/tests/unit/_reconnect_test.py @@ -13,6 +13,7 @@ # limitations under the License. """Tests that a channel will reconnect if a connection is dropped""" +import socket import unittest import grpc @@ -30,6 +31,44 @@ def _handle_unary_unary(unused_request, unused_servicer_context): return _RESPONSE +def _get_reuse_socket_option(): + try: + return socket.SO_REUSEPORT + except AttributeError: + # SO_REUSEPORT is unavailable on Windows, but SO_REUSEADDR + # allows forcibly re-binding to a port + return socket.SO_REUSEADDR + + +def _pick_and_bind_port(sock_opt): + # Reserve a port, when we restart the server we want + # to hold onto the port + port = 0 + for address_family in (socket.AF_INET6, socket.AF_INET): + try: + s = socket.socket(address_family, socket.SOCK_STREAM) + except socket.error: + continue # this address family is unavailable + s.setsockopt(socket.SOL_SOCKET, sock_opt, 1) + try: + s.bind(('localhost', port)) + # for socket.SOCK_STREAM sockets, it is necessary to call + # listen to get the desired behavior. + s.listen(1) + port = s.getsockname()[1] + except socket.error: + # port was not available on the current address family + # try again + port = 0 + break + finally: + s.close() + if s: + return port if port != 0 else _pick_and_bind_port(sock_opt) + else: + return None # no address family was available + + class ReconnectTest(unittest.TestCase): def test_reconnect(self): @@ -38,8 +77,12 @@ class ReconnectTest(unittest.TestCase): 'UnaryUnary': grpc.unary_unary_rpc_method_handler(_handle_unary_unary) }) + sock_opt = _get_reuse_socket_option() + port = _pick_and_bind_port(sock_opt) + self.assertIsNotNone(port) + server = grpc.server(server_pool, (handler,)) - port = server.add_insecure_port('[::]:0') + server.add_insecure_port('[::]:{}'.format(port)) server.start() channel = grpc.insecure_channel('localhost:%d' % port) multi_callable = channel.unary_unary(_UNARY_UNARY) diff --git a/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py b/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py index e425a0adfe..df4b129018 100644 --- a/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py +++ b/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py @@ -139,6 +139,7 @@ class ResourceExhaustedTest(unittest.TestCase): self._server = grpc.server( self._server_pool, handlers=(_GenericHandler(self._trigger),), + options=(('grpc.so_reuseport', 0),), maximum_concurrent_rpcs=test_constants.THREAD_CONCURRENCY) port = self._server.add_insecure_port('[::]:0') self._server.start() diff --git a/src/python/grpcio_tests/tests/unit/_rpc_test.py b/src/python/grpcio_tests/tests/unit/_rpc_test.py index 74d8541808..1515a87d93 100644 --- a/src/python/grpcio_tests/tests/unit/_rpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_rpc_test.py @@ -21,6 +21,7 @@ from concurrent import futures import grpc from grpc.framework.foundation import logging_pool +from tests.unit import test_common from tests.unit.framework.common import test_constants from tests.unit.framework.common import test_control @@ -169,9 +170,8 @@ class RPCTest(unittest.TestCase): def setUp(self): self._control = test_control.PauseFailControl() self._handler = _Handler(self._control) - self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - self._server = grpc.server(self._server_pool) + self._server = test_common.test_server() port = self._server.add_insecure_port('[::]:0') self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),)) self._server.start() @@ -180,7 +180,6 @@ class RPCTest(unittest.TestCase): def tearDown(self): self._server.stop(None) - self._server_pool.shutdown(wait=True) def testUnrecognizedMethod(self): request = b'abc' diff --git a/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py b/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py index 005d16ea75..2c513da5d0 100644 --- a/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py +++ b/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py @@ -40,6 +40,7 @@ from concurrent import futures import grpc from tests.unit import resources +from tests.unit import test_common from tests.testing import _application_common from tests.testing import _server_application from tests.testing.proto import services_pb2_grpc @@ -135,7 +136,7 @@ class _ServerSSLCertReloadTest( raise NotImplementedError() def setUp(self): - self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + self.server = test_common.test_server() services_pb2_grpc.add_FirstServiceServicer_to_server( _server_application.FirstServiceServicer(), self.server) switch_cert_on_client_num = 10 @@ -407,7 +408,7 @@ class ServerSSLCertReloadTestCertConfigReuse(_ServerSSLCertReloadTest): return True def setUp(self): - self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + self.server = test_common.test_server() services_pb2_grpc.add_FirstServiceServicer_to_server( _server_application.FirstServiceServicer(), self.server) self.cert_config_A = grpc.ssl_server_certificate_configuration( diff --git a/src/python/grpcio_tests/tests/unit/test_common.py b/src/python/grpcio_tests/tests/unit/test_common.py index ed71cc996b..6334a32b0e 100644 --- a/src/python/grpcio_tests/tests/unit/test_common.py +++ b/src/python/grpcio_tests/tests/unit/test_common.py @@ -15,6 +15,7 @@ import collections +from concurrent import futures import grpc import six @@ -82,3 +83,13 @@ def test_secure_channel(target, channel_credentials, server_host_override): channel = grpc.secure_channel(target, channel_credentials, ( ('grpc.ssl_target_name_override', server_host_override,),)) return channel + + +def test_server(max_workers=10): + """Creates an insecure grpc server. + + These servers have SO_REUSEPORT disabled to prevent cross-talk. + """ + return grpc.server( + futures.ThreadPoolExecutor(max_workers=max_workers), + options=(('grpc.so_reuseport', 0),)) |