aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar ncteisen <ncteisen@gmail.com>2018-01-02 11:44:03 -0800
committerGravatar ncteisen <ncteisen@gmail.com>2018-01-02 11:44:03 -0800
commit24902641d434005bee6641e62fb2b57fc0b6bd87 (patch)
treed9ab96b6531e30ca28044d571f2941f05cad52cb /src
parentd230ad086a894cc963235b27814e19bf686eb7aa (diff)
parent63392f682e21543099926251b642cdcd0be2a17f (diff)
Merge branch 'master' of https://github.com/grpc/grpc into flow-control-part4
Diffstat (limited to 'src')
-rw-r--r--src/compiler/cpp_generator.cc15
-rw-r--r--src/compiler/cpp_generator.h4
-rw-r--r--src/compiler/cpp_plugin.cc2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc25
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc23
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc23
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc117
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc32
-rw-r--r--src/core/lib/backoff/backoff.cc78
-rw-r--r--src/core/lib/backoff/backoff.h110
-rw-r--r--src/core/lib/iomgr/error.cc2
-rw-r--r--src/core/lib/iomgr/error.h5
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc2
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc2
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.cc2
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc28
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.cc9
-rw-r--r--src/core/lib/iomgr/udp_server.cc20
-rw-r--r--src/core/lib/iomgr/udp_server.h6
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.cc8
-rw-r--r--src/core/lib/iomgr/wakeup_fd_cv.h24
-rw-r--r--src/core/lib/support/debug_location.h4
-rw-r--r--src/node/health_check/package.json29
-rw-r--r--src/node/tools/package.json41
-rwxr-xr-xsrc/objective-c/tests/run_tests.sh64
-rw-r--r--src/python/grpcio/grpc/_channel.py66
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi15
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi6
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi42
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi45
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi55
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi24
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi109
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi238
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi58
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi284
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi29
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi58
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi87
-rw-r--r--src/python/grpcio/grpc/_cython/cygrpc.pxd3
-rw-r--r--src/python/grpcio/grpc/_cython/cygrpc.pyx3
-rw-r--r--src/python/grpcio/grpc/_server.py111
-rw-r--r--src/python/grpcio_tests/tests/health_check/_health_servicer_test.py6
-rw-r--r--src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py4
-rw-r--r--src/python/grpcio_tests/tests/interop/_secure_intraop_test.py4
-rw-r--r--src/python/grpcio_tests/tests/interop/server.py3
-rw-r--r--src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py8
-rw-r--r--src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py7
-rw-r--r--src/python/grpcio_tests/tests/qps/qps_worker.py4
-rw-r--r--src/python/grpcio_tests/tests/qps/worker_server.py4
-rw-r--r--src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py6
-rw-r--r--src/python/grpcio_tests/tests/tests.json1
-rw-r--r--src/python/grpcio_tests/tests/unit/_auth_context_test.py14
-rw-r--r--src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py4
-rw-r--r--src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py2
-rw-r--r--src/python/grpcio_tests/tests/unit/_compression_test.py6
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py31
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_common.py3
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py57
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py55
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py40
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_server_test.py49
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py129
-rw-r--r--src/python/grpcio_tests/tests/unit/_empty_message_test.py7
-rw-r--r--src/python/grpcio_tests/tests/unit/_exit_scenarios.py8
-rw-r--r--src/python/grpcio_tests/tests/unit/_interceptor_test.py2
-rw-r--r--src/python/grpcio_tests/tests/unit/_invocation_defects_test.py6
-rw-r--r--src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py7
-rw-r--r--src/python/grpcio_tests/tests/unit/_metadata_test.py7
-rw-r--r--src/python/grpcio_tests/tests/unit/_reconnect_test.py45
-rw-r--r--src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py1
-rw-r--r--src/python/grpcio_tests/tests/unit/_rpc_test.py5
-rw-r--r--src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py5
-rw-r--r--src/python/grpcio_tests/tests/unit/test_common.py11
74 files changed, 1405 insertions, 1044 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index 965d91b68b..f35bfd9ab9 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -1568,11 +1568,24 @@ grpc::string GetMockIncludes(grpc_generator::File* file,
static const char* headers_strs[] = {
"grpc++/impl/codegen/async_stream.h",
"grpc++/impl/codegen/sync_stream.h",
- "gmock/gmock.h",
};
std::vector<grpc::string> headers(headers_strs, array_end(headers_strs));
PrintIncludes(printer.get(), headers, params);
+ std::vector<grpc::string> gmock_header;
+ if (params.gmock_search_path.empty()) {
+ gmock_header.push_back("gmock/gmock.h");
+ PrintIncludes(printer.get(), gmock_header, params);
+ } else {
+ gmock_header.push_back("gmock.h");
+ // Copy a params to generate gmock header.
+ Parameters gmock_params(params);
+ // We use local includes when a gmock_search_path is given
+ gmock_params.use_system_headers = false;
+ gmock_params.grpc_search_path = params.gmock_search_path;
+ PrintIncludes(printer.get(), gmock_header, gmock_params);
+ }
+
if (!file->package().empty()) {
std::vector<grpc::string> parts = file->package_parts();
diff --git a/src/compiler/cpp_generator.h b/src/compiler/cpp_generator.h
index a93376acef..300a27c589 100644
--- a/src/compiler/cpp_generator.h
+++ b/src/compiler/cpp_generator.h
@@ -50,8 +50,10 @@ struct Parameters {
bool use_system_headers;
// Prefix to any grpc include
grpc::string grpc_search_path;
- // Generate GMOCK code to facilitate unit testing.
+ // Generate Google Mock code to facilitate unit testing.
bool generate_mock_code;
+ // Google Mock search path, when non-empty, local includes will be used.
+ grpc::string gmock_search_path;
};
// Return the prologue of the generated header file.
diff --git a/src/compiler/cpp_plugin.cc b/src/compiler/cpp_plugin.cc
index adac0e23d3..661282f880 100644
--- a/src/compiler/cpp_plugin.cc
+++ b/src/compiler/cpp_plugin.cc
@@ -78,6 +78,8 @@ class CppGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator {
*error = grpc::string("Invalid parameter: ") + *parameter_string;
return false;
}
+ } else if (param[0] == "gmock_search_path") {
+ generator_parameters.gmock_search_path = param[1];
} else {
*error = grpc::string("Unknown parameter: ") + *parameter_string;
return false;
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 3c64213fb9..dd6fc602ab 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -113,13 +113,13 @@
#include "src/core/lib/slice/slice_hash_table.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
+#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/static_metadata.h"
-#define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20
#define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
@@ -408,7 +408,7 @@ typedef struct glb_lb_policy {
grpc_slice lb_call_status_details;
/** LB call retry backoff state */
- grpc_backoff lb_call_backoff_state;
+ grpc_core::ManualConstructor<grpc_core::BackOff> lb_call_backoff;
/** LB call retry timer */
grpc_timer lb_call_retry_timer;
@@ -1167,7 +1167,7 @@ static void start_picking_locked(glb_lb_policy* glb_policy) {
}
glb_policy->started_picking = true;
- grpc_backoff_reset(&glb_policy->lb_call_backoff_state);
+ glb_policy->lb_call_backoff->Reset();
query_for_backends_locked(glb_policy);
}
@@ -1302,8 +1302,7 @@ static void maybe_restart_lb_call(glb_lb_policy* glb_policy) {
glb_policy->updating_lb_call = false;
} else if (!glb_policy->shutting_down) {
/* if we aren't shutting down, restart the LB client call after some time */
- grpc_millis next_try = grpc_backoff_step(&glb_policy->lb_call_backoff_state)
- .next_attempt_start_time;
+ grpc_millis next_try = glb_policy->lb_call_backoff->Step();
if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...",
glb_policy);
@@ -1463,12 +1462,14 @@ static void lb_call_init_locked(glb_lb_policy* glb_policy) {
lb_on_response_received_locked, glb_policy,
grpc_combiner_scheduler(glb_policy->base.combiner));
- grpc_backoff_init(&glb_policy->lb_call_backoff_state,
- GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
- GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER,
- GRPC_GRPCLB_RECONNECT_JITTER,
- GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
- GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ grpc_core::BackOff::Options backoff_options;
+ backoff_options
+ .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
+ .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
+ .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
+ .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+
+ glb_policy->lb_call_backoff.Init(backoff_options);
glb_policy->seen_initial_response = false;
glb_policy->last_client_load_report_counters_were_zero = false;
@@ -1572,7 +1573,7 @@ static void lb_on_response_received_locked(void* arg, grpc_error* error) {
memset(ops, 0, sizeof(ops));
grpc_op* op = ops;
if (glb_policy->lb_response_payload != nullptr) {
- grpc_backoff_reset(&glb_policy->lb_call_backoff_state);
+ glb_policy->lb_call_backoff->Reset();
/* Received data from the LB server. Look inside
* glb_policy->lb_response_payload, for a serverlist. */
grpc_byte_buffer_reader bbr;
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index 4ec4477c82..4659a5f3ed 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -40,10 +40,10 @@
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/support/env.h"
+#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/service_config.h"
-#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1
#define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120
@@ -89,7 +89,7 @@ typedef struct {
bool have_retry_timer;
grpc_timer retry_timer;
/** retry backoff state */
- grpc_backoff backoff_state;
+ grpc_core::ManualConstructor<grpc_core::BackOff> backoff;
/** currently resolving addresses */
grpc_lb_addresses* lb_addresses;
@@ -131,7 +131,7 @@ static void dns_ares_shutdown_locked(grpc_resolver* resolver) {
static void dns_ares_channel_saw_error_locked(grpc_resolver* resolver) {
ares_dns_resolver* r = (ares_dns_resolver*)resolver;
if (!r->resolving) {
- grpc_backoff_reset(&r->backoff_state);
+ r->backoff->Reset();
dns_ares_start_resolving_locked(r);
}
}
@@ -264,8 +264,7 @@ static void dns_ares_on_resolved_locked(void* arg, grpc_error* error) {
} else {
const char* msg = grpc_error_string(error);
gpr_log(GPR_DEBUG, "dns resolution failed: %s", msg);
- grpc_millis next_try =
- grpc_backoff_step(&r->backoff_state).next_attempt_start_time;
+ grpc_millis next_try = r->backoff->Step();
grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
gpr_log(GPR_INFO, "dns resolution failed (will retry): %s",
grpc_error_string(error));
@@ -298,7 +297,7 @@ static void dns_ares_next_locked(grpc_resolver* resolver,
r->next_completion = on_complete;
r->target_result = target_result;
if (r->resolved_version == 0 && !r->resolving) {
- grpc_backoff_reset(&r->backoff_state);
+ r->backoff->Reset();
dns_ares_start_resolving_locked(r);
} else {
dns_ares_maybe_finish_next_locked(r);
@@ -368,11 +367,13 @@ static grpc_resolver* dns_ares_create(grpc_resolver_args* args,
if (args->pollset_set != nullptr) {
grpc_pollset_set_add_pollset_set(r->interested_parties, args->pollset_set);
}
- grpc_backoff_init(
- &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
- GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER,
- GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
- GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ grpc_core::BackOff::Options backoff_options;
+ backoff_options
+ .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
+ .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER)
+ .set_jitter(GRPC_DNS_RECONNECT_JITTER)
+ .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ r->backoff.Init(grpc_core::BackOff(backoff_options));
GRPC_CLOSURE_INIT(&r->dns_ares_on_retry_timer_locked,
dns_ares_on_retry_timer_locked, r,
grpc_combiner_scheduler(r->base.combiner));
diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
index 77698e97aa..1c2cfc08e7 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
@@ -33,9 +33,9 @@
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/env.h"
+#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/support/string.h"
-#define GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS 1
#define GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS 120
@@ -70,7 +70,7 @@ typedef struct {
grpc_timer retry_timer;
grpc_closure on_retry;
/** retry backoff state */
- grpc_backoff backoff_state;
+ grpc_core::ManualConstructor<grpc_core::BackOff> backoff;
/** currently resolving addresses */
grpc_resolved_addresses* addresses;
@@ -106,7 +106,7 @@ static void dns_shutdown_locked(grpc_resolver* resolver) {
static void dns_channel_saw_error_locked(grpc_resolver* resolver) {
dns_resolver* r = (dns_resolver*)resolver;
if (!r->resolving) {
- grpc_backoff_reset(&r->backoff_state);
+ r->backoff->Reset();
dns_start_resolving_locked(r);
}
}
@@ -119,7 +119,7 @@ static void dns_next_locked(grpc_resolver* resolver,
r->next_completion = on_complete;
r->target_result = target_result;
if (r->resolved_version == 0 && !r->resolving) {
- grpc_backoff_reset(&r->backoff_state);
+ r->backoff->Reset();
dns_start_resolving_locked(r);
} else {
dns_maybe_finish_next_locked(r);
@@ -161,8 +161,7 @@ static void dns_on_resolved_locked(void* arg, grpc_error* error) {
grpc_resolved_addresses_destroy(r->addresses);
grpc_lb_addresses_destroy(addresses);
} else {
- grpc_millis next_try =
- grpc_backoff_step(&r->backoff_state).next_attempt_start_time;
+ grpc_millis next_try = r->backoff->Step();
grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now();
gpr_log(GPR_INFO, "dns resolution failed (will retry): %s",
grpc_error_string(error));
@@ -244,11 +243,13 @@ static grpc_resolver* dns_create(grpc_resolver_args* args,
if (args->pollset_set != nullptr) {
grpc_pollset_set_add_pollset_set(r->interested_parties, args->pollset_set);
}
- grpc_backoff_init(
- &r->backoff_state, GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000,
- GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER, GRPC_DNS_RECONNECT_JITTER,
- GRPC_DNS_MIN_CONNECT_TIMEOUT_SECONDS * 1000,
- GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ grpc_core::BackOff::Options backoff_options;
+ backoff_options
+ .set_initial_backoff(GRPC_DNS_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
+ .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER)
+ .set_jitter(GRPC_DNS_RECONNECT_JITTER)
+ .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000);
+ r->backoff.Init(grpc_core::BackOff(backoff_options));
return &r->base;
}
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 84a5ace31d..f07394d29b 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -20,7 +20,9 @@
#include <inttypes.h>
#include <limits.h>
-#include <string.h>
+
+#include <algorithm>
+#include <cstring>
#include <grpc/support/alloc.h>
#include <grpc/support/avl.h>
@@ -39,6 +41,7 @@
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/support/manual_constructor.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -48,7 +51,7 @@
#define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
-#define GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS 20
+#define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
@@ -118,8 +121,9 @@ struct grpc_subchannel {
external_state_watcher root_external_state_watcher;
/** backoff state */
- grpc_backoff backoff_state;
- grpc_backoff_result backoff_result;
+ grpc_core::ManualConstructor<grpc_core::BackOff> backoff;
+ grpc_millis next_attempt_deadline;
+ grpc_millis min_connect_timeout_ms;
/** do we have an active alarm? */
bool have_alarm;
@@ -274,6 +278,54 @@ void grpc_subchannel_weak_unref(
}
}
+static void parse_args_for_backoff_values(
+ const grpc_channel_args* args, grpc_core::BackOff::Options* backoff_options,
+ grpc_millis* min_connect_timeout_ms) {
+ grpc_millis initial_backoff_ms =
+ GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
+ *min_connect_timeout_ms =
+ GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000;
+ grpc_millis max_backoff_ms =
+ GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
+ bool fixed_reconnect_backoff = false;
+ if (args != nullptr) {
+ for (size_t i = 0; i < args->num_args; i++) {
+ if (0 == strcmp(args->args[i].key,
+ "grpc.testing.fixed_reconnect_backoff_ms")) {
+ fixed_reconnect_backoff = true;
+ initial_backoff_ms = *min_connect_timeout_ms = max_backoff_ms =
+ grpc_channel_arg_get_integer(
+ &args->args[i],
+ {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
+ } else if (0 ==
+ strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
+ fixed_reconnect_backoff = false;
+ *min_connect_timeout_ms = grpc_channel_arg_get_integer(
+ &args->args[i],
+ {static_cast<int>(*min_connect_timeout_ms), 100, INT_MAX});
+ } else if (0 ==
+ strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
+ fixed_reconnect_backoff = false;
+ max_backoff_ms = grpc_channel_arg_get_integer(
+ &args->args[i], {static_cast<int>(max_backoff_ms), 100, INT_MAX});
+ } else if (0 == strcmp(args->args[i].key,
+ GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
+ fixed_reconnect_backoff = false;
+ initial_backoff_ms = grpc_channel_arg_get_integer(
+ &args->args[i],
+ {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
+ }
+ }
+ }
+ backoff_options->set_initial_backoff(initial_backoff_ms)
+ .set_multiplier(fixed_reconnect_backoff
+ ? 1.0
+ : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER)
+ .set_jitter(fixed_reconnect_backoff ? 0.0
+ : GRPC_SUBCHANNEL_RECONNECT_JITTER)
+ .set_max_backoff(max_backoff_ms);
+}
+
grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
const grpc_subchannel_args* args) {
grpc_subchannel_key* key = grpc_subchannel_key_create(args);
@@ -324,43 +376,10 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
grpc_schedule_on_exec_ctx);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel");
- int initial_backoff_ms =
- GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
- int min_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MIN_BACKOFF_SECONDS * 1000;
- int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
- bool fixed_reconnect_backoff = false;
- if (c->args) {
- for (size_t i = 0; i < c->args->num_args; i++) {
- if (0 == strcmp(c->args->args[i].key,
- "grpc.testing.fixed_reconnect_backoff_ms")) {
- fixed_reconnect_backoff = true;
- initial_backoff_ms = min_backoff_ms = max_backoff_ms =
- grpc_channel_arg_get_integer(&c->args->args[i],
- {initial_backoff_ms, 100, INT_MAX});
- } else if (0 == strcmp(c->args->args[i].key,
- GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
- fixed_reconnect_backoff = false;
- min_backoff_ms = grpc_channel_arg_get_integer(
- &c->args->args[i], {min_backoff_ms, 100, INT_MAX});
- } else if (0 == strcmp(c->args->args[i].key,
- GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
- fixed_reconnect_backoff = false;
- max_backoff_ms = grpc_channel_arg_get_integer(
- &c->args->args[i], {max_backoff_ms, 100, INT_MAX});
- } else if (0 == strcmp(c->args->args[i].key,
- GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
- fixed_reconnect_backoff = false;
- initial_backoff_ms = grpc_channel_arg_get_integer(
- &c->args->args[i], {initial_backoff_ms, 100, INT_MAX});
- }
- }
- }
- grpc_backoff_init(
- &c->backoff_state, initial_backoff_ms,
- fixed_reconnect_backoff ? 1.0
- : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER,
- fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER,
- min_backoff_ms, max_backoff_ms);
+ grpc_core::BackOff::Options backoff_options;
+ parse_args_for_backoff_values(args->args, &backoff_options,
+ &c->min_connect_timeout_ms);
+ c->backoff.Init(backoff_options);
gpr_mu_init(&c->mu);
return grpc_subchannel_index_register(key, c);
@@ -368,11 +387,11 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
static void continue_connect_locked(grpc_subchannel* c) {
grpc_connect_in_args args;
-
args.interested_parties = c->pollset_set;
- args.deadline = c->backoff_result.current_deadline;
+ const grpc_millis min_deadline =
+ c->min_connect_timeout_ms + grpc_core::ExecCtx::Get()->Now();
+ args.deadline = std::max(c->next_attempt_deadline, min_deadline);
args.channel_args = c->args;
-
grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
GRPC_ERROR_NONE, "state_change");
grpc_connector_connect(c->connector, &args, &c->connecting_result,
@@ -416,7 +435,7 @@ static void on_alarm(void* arg, grpc_error* error) {
}
if (error == GRPC_ERROR_NONE) {
gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
- c->backoff_result = grpc_backoff_step(&c->backoff_state);
+ c->next_attempt_deadline = c->backoff->Step();
continue_connect_locked(c);
gpr_mu_unlock(&c->mu);
} else {
@@ -452,22 +471,20 @@ static void maybe_start_connecting_locked(grpc_subchannel* c) {
if (!c->backoff_begun) {
c->backoff_begun = true;
- c->backoff_result = grpc_backoff_begin(&c->backoff_state);
+ c->next_attempt_deadline = c->backoff->Begin();
continue_connect_locked(c);
} else {
GPR_ASSERT(!c->have_alarm);
c->have_alarm = true;
const grpc_millis time_til_next =
- c->backoff_result.next_attempt_start_time -
- grpc_core::ExecCtx::Get()->Now();
+ c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now();
if (time_til_next <= 0) {
gpr_log(GPR_INFO, "Retry immediately");
} else {
gpr_log(GPR_INFO, "Retry in %" PRIdPTR " milliseconds", time_til_next);
}
GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx);
- grpc_timer_init(&c->alarm, c->backoff_result.next_attempt_start_time,
- &c->on_alarm);
+ grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm);
}
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index c13c4056c1..5ee1f08e61 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -79,7 +79,9 @@ static int g_default_server_keepalive_time_ms =
DEFAULT_SERVER_KEEPALIVE_TIME_MS;
static int g_default_server_keepalive_timeout_ms =
DEFAULT_SERVER_KEEPALIVE_TIMEOUT_MS;
-static bool g_default_keepalive_permit_without_calls =
+static bool g_default_client_keepalive_permit_without_calls =
+ DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
+static bool g_default_server_keepalive_permit_without_calls =
DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS;
static int g_default_min_sent_ping_interval_without_data_ms =
@@ -346,6 +348,8 @@ static void init_transport(grpc_chttp2_transport* t,
t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX
? GRPC_MILLIS_INF_FUTURE
: g_default_client_keepalive_timeout_ms;
+ t->keepalive_permit_without_calls =
+ g_default_client_keepalive_permit_without_calls;
} else {
t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX
? GRPC_MILLIS_INF_FUTURE
@@ -353,8 +357,9 @@ static void init_transport(grpc_chttp2_transport* t,
t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX
? GRPC_MILLIS_INF_FUTURE
: g_default_server_keepalive_timeout_ms;
+ t->keepalive_permit_without_calls =
+ g_default_server_keepalive_permit_without_calls;
}
- t->keepalive_permit_without_calls = g_default_keepalive_permit_without_calls;
t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
@@ -2550,7 +2555,9 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
for (i = 0; i < args->num_args; i++) {
if (0 == strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
const int value = grpc_channel_arg_get_integer(
- &args->args[i], {g_default_client_keepalive_time_ms, 1, INT_MAX});
+ &args->args[i], {is_client ? g_default_client_keepalive_time_ms
+ : g_default_server_keepalive_time_ms,
+ 1, INT_MAX});
if (is_client) {
g_default_client_keepalive_time_ms = value;
} else {
@@ -2559,8 +2566,9 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
} else if (0 ==
strcmp(args->args[i].key, GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
const int value = grpc_channel_arg_get_integer(
- &args->args[i],
- {g_default_client_keepalive_timeout_ms, 0, INT_MAX});
+ &args->args[i], {is_client ? g_default_client_keepalive_timeout_ms
+ : g_default_server_keepalive_timeout_ms,
+ 0, INT_MAX});
if (is_client) {
g_default_client_keepalive_timeout_ms = value;
} else {
@@ -2568,10 +2576,16 @@ void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
}
} else if (0 == strcmp(args->args[i].key,
GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
- g_default_keepalive_permit_without_calls =
- (uint32_t)grpc_channel_arg_get_integer(
- &args->args[i],
- {g_default_keepalive_permit_without_calls, 0, 1});
+ const bool value = (uint32_t)grpc_channel_arg_get_integer(
+ &args->args[i],
+ {is_client ? g_default_client_keepalive_permit_without_calls
+ : g_default_server_keepalive_timeout_ms,
+ 0, 1});
+ if (is_client) {
+ g_default_client_keepalive_permit_without_calls = value;
+ } else {
+ g_default_server_keepalive_permit_without_calls = value;
+ }
} else if (0 ==
strcmp(args->args[i].key, GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
g_default_max_ping_strikes = grpc_channel_arg_get_integer(
diff --git a/src/core/lib/backoff/backoff.cc b/src/core/lib/backoff/backoff.cc
index da3b9b1b2d..41f625a636 100644
--- a/src/core/lib/backoff/backoff.cc
+++ b/src/core/lib/backoff/backoff.cc
@@ -18,61 +18,53 @@
#include "src/core/lib/backoff/backoff.h"
+#include <algorithm>
+
#include <grpc/support/useful.h>
-void grpc_backoff_init(grpc_backoff* backoff, grpc_millis initial_backoff,
- double multiplier, double jitter,
- grpc_millis min_connect_timeout,
- grpc_millis max_backoff) {
- backoff->initial_backoff = initial_backoff;
- backoff->multiplier = multiplier;
- backoff->jitter = jitter;
- backoff->min_connect_timeout = min_connect_timeout;
- backoff->max_backoff = max_backoff;
- backoff->rng_state = (uint32_t)gpr_now(GPR_CLOCK_REALTIME).tv_nsec;
-}
+namespace grpc_core {
-grpc_backoff_result grpc_backoff_begin(grpc_backoff* backoff) {
- backoff->current_backoff = backoff->initial_backoff;
- const grpc_millis initial_timeout =
- GPR_MAX(backoff->initial_backoff, backoff->min_connect_timeout);
- const grpc_millis now = grpc_core::ExecCtx::Get()->Now();
- const grpc_backoff_result result = {now + initial_timeout,
- now + backoff->current_backoff};
- return result;
-}
+namespace {
-/* Generate a random number between 0 and 1. */
-static double generate_uniform_random_number(uint32_t* rng_state) {
- *rng_state = (1103515245 * *rng_state + 12345) % ((uint32_t)1 << 31);
- return *rng_state / (double)((uint32_t)1 << 31);
+/* Generate a random number between 0 and 1. We roll our own RNG because seeding
+ * rand() modifies a global variable we have no control over. */
+double generate_uniform_random_number(uint32_t* rng_state) {
+ constexpr uint32_t two_raise_31 = uint32_t(1) << 31;
+ *rng_state = (1103515245 * *rng_state + 12345) % two_raise_31;
+ return *rng_state / static_cast<double>(two_raise_31);
}
-static double generate_uniform_random_number_between(uint32_t* rng_state,
- double a, double b) {
+double generate_uniform_random_number_between(uint32_t* rng_state, double a,
+ double b) {
if (a == b) return a;
if (a > b) GPR_SWAP(double, a, b); // make sure a < b
const double range = b - a;
return a + generate_uniform_random_number(rng_state) * range;
}
+} // namespace
-grpc_backoff_result grpc_backoff_step(grpc_backoff* backoff) {
- backoff->current_backoff = (grpc_millis)(GPR_MIN(
- backoff->current_backoff * backoff->multiplier, backoff->max_backoff));
- const double jitter = generate_uniform_random_number_between(
- &backoff->rng_state, -backoff->jitter * backoff->current_backoff,
- backoff->jitter * backoff->current_backoff);
- const grpc_millis current_timeout =
- GPR_MAX((grpc_millis)(backoff->current_backoff + jitter),
- backoff->min_connect_timeout);
- const grpc_millis next_timeout = GPR_MIN(
- (grpc_millis)(backoff->current_backoff + jitter), backoff->max_backoff);
- const grpc_millis now = grpc_core::ExecCtx::Get()->Now();
- const grpc_backoff_result result = {now + current_timeout,
- now + next_timeout};
- return result;
+BackOff::BackOff(const Options& options) : options_(options) {
+ rng_state_ = static_cast<uint32_t>(gpr_now(GPR_CLOCK_REALTIME).tv_nsec);
+}
+
+grpc_millis BackOff::Begin() {
+ current_backoff_ = options_.initial_backoff();
+ return current_backoff_ + grpc_core::ExecCtx::Get()->Now();
}
-void grpc_backoff_reset(grpc_backoff* backoff) {
- backoff->current_backoff = backoff->initial_backoff;
+grpc_millis BackOff::Step() {
+ current_backoff_ =
+ (grpc_millis)(std::min(current_backoff_ * options_.multiplier(),
+ (double)options_.max_backoff()));
+ const double jitter = generate_uniform_random_number_between(
+ &rng_state_, -options_.jitter() * current_backoff_,
+ options_.jitter() * current_backoff_);
+ const grpc_millis next_timeout = (grpc_millis)(current_backoff_ + jitter);
+ return next_timeout + grpc_core::ExecCtx::Get()->Now();
}
+
+void BackOff::Reset() { current_backoff_ = options_.initial_backoff(); }
+
+void BackOff::SetRandomSeed(uint32_t seed) { rng_state_ = seed; }
+
+} // namespace grpc_core
diff --git a/src/core/lib/backoff/backoff.h b/src/core/lib/backoff/backoff.h
index f61d14ec95..84ef9b82e4 100644
--- a/src/core/lib/backoff/backoff.h
+++ b/src/core/lib/backoff/backoff.h
@@ -21,53 +21,69 @@
#include "src/core/lib/iomgr/exec_ctx.h"
-typedef struct {
- /// const: how long to wait after the first failure before retrying
- grpc_millis initial_backoff;
-
- /// const: factor with which to multiply backoff after a failed retry
- double multiplier;
-
- /// const: amount to randomize backoffs
- double jitter;
-
- /// const: minimum time between retries
- grpc_millis min_connect_timeout;
-
- /// const: maximum time between retries
- grpc_millis max_backoff;
-
+namespace grpc_core {
+
+/// Implementation of the backoff mechanism described in
+/// doc/connection-backoff.md
+class BackOff {
+ public:
+ class Options;
+
+ /// Initialize backoff machinery - does not need to be destroyed
+ explicit BackOff(const Options& options);
+
+ /// Begin retry loop: returns the deadline to be used for the next attempt,
+ /// following the backoff strategy.
+ grpc_millis Begin();
+ /// Step a retry loop: returns the deadline to be used for the next attempt,
+ /// following the backoff strategy.
+ grpc_millis Step();
+ /// Reset the backoff, so the next grpc_backoff_step will be a
+ /// grpc_backoff_begin.
+ void Reset();
+
+ void SetRandomSeed(unsigned int seed);
+
+ class Options {
+ public:
+ Options& set_initial_backoff(grpc_millis initial_backoff) {
+ initial_backoff_ = initial_backoff;
+ return *this;
+ }
+ Options& set_multiplier(double multiplier) {
+ multiplier_ = multiplier;
+ return *this;
+ }
+ Options& set_jitter(double jitter) {
+ jitter_ = jitter;
+ return *this;
+ }
+ Options& set_max_backoff(grpc_millis max_backoff) {
+ max_backoff_ = max_backoff;
+ return *this;
+ }
+ /// how long to wait after the first failure before retrying
+ grpc_millis initial_backoff() const { return initial_backoff_; }
+ /// factor with which to multiply backoff after a failed retry
+ double multiplier() const { return multiplier_; }
+ /// amount to randomize backoffs
+ double jitter() const { return jitter_; }
+ /// maximum time between retries
+ grpc_millis max_backoff() const { return max_backoff_; }
+
+ private:
+ grpc_millis initial_backoff_;
+ double multiplier_;
+ double jitter_;
+ grpc_millis max_backoff_;
+ }; // class Options
+
+ private:
+ const Options options_;
/// current delay before retries
- grpc_millis current_backoff;
-
- /// random number generator
- uint32_t rng_state;
-} grpc_backoff;
-
-typedef struct {
- /// Deadline to be used for the current attempt.
- grpc_millis current_deadline;
-
- /// Deadline to be used for the next attempt, following the backoff strategy.
- grpc_millis next_attempt_start_time;
-} grpc_backoff_result;
-
-/// Initialize backoff machinery - does not need to be destroyed
-void grpc_backoff_init(grpc_backoff* backoff, grpc_millis initial_backoff,
- double multiplier, double jitter,
- grpc_millis min_connect_timeout,
- grpc_millis max_backoff);
-
-/// Begin retry loop: returns the deadlines to be used for the current attempt
-/// and the subsequent retry, if any.
-grpc_backoff_result grpc_backoff_begin(grpc_backoff* backoff);
-
-/// Step a retry loop: returns the deadlines to be used for the current attempt
-/// and the subsequent retry, if any.
-grpc_backoff_result grpc_backoff_step(grpc_backoff* backoff);
-
-/// Reset the backoff, so the next grpc_backoff_step will be a
-/// grpc_backoff_begin.
-void grpc_backoff_reset(grpc_backoff* backoff);
+ grpc_millis current_backoff_;
+ uint32_t rng_state_;
+};
+} // namespace grpc_core
#endif /* GRPC_CORE_LIB_BACKOFF_BACKOFF_H */
diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc
index 42cd7c455d..67c3caf5ee 100644
--- a/src/core/lib/iomgr/error.cc
+++ b/src/core/lib/iomgr/error.cc
@@ -749,7 +749,7 @@ const char* grpc_error_string(grpc_error* err) {
if (!gpr_atm_rel_cas(&err->atomics.error_string, 0, (gpr_atm)out)) {
gpr_free(out);
- out = (char*)gpr_atm_no_barrier_load(&err->atomics.error_string);
+ out = (char*)gpr_atm_acq_load(&err->atomics.error_string);
}
GPR_TIMER_END("grpc_error_string", 0);
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index 4759ee0791..8c72a439f6 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -165,6 +165,8 @@ void grpc_error_unref(grpc_error* err);
grpc_error* grpc_error_set_int(grpc_error* src, grpc_error_ints which,
intptr_t value) GRPC_MUST_USE_RESULT;
bool grpc_error_get_int(grpc_error* error, grpc_error_ints which, intptr_t* p);
+/// This call takes ownership of the slice; the error is responsible for
+/// eventually unref-ing it.
grpc_error* grpc_error_set_str(grpc_error* src, grpc_error_strs which,
grpc_slice str) GRPC_MUST_USE_RESULT;
/// Returns false if the specified string is not set.
@@ -174,7 +176,8 @@ bool grpc_error_get_str(grpc_error* error, grpc_error_strs which,
/// Add a child error: an error that is believed to have contributed to this
/// error occurring. Allows root causing high level errors from lower level
-/// errors that contributed to them.
+/// errors that contributed to them. The src error takes ownership of the
+/// child error.
grpc_error* grpc_error_add_child(grpc_error* src,
grpc_error* child) GRPC_MUST_USE_RESULT;
grpc_error* grpc_os_error(const char* file, int line, int err,
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index ae9d47ece5..1ab7e516de 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -1232,8 +1232,6 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
* NULL */
const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
- gpr_log(GPR_ERROR,
- "Skipping epoll1 becuase GRPC_LINUX_EPOLL is not defined.");
return nullptr;
}
#endif /* defined(GRPC_POSIX_SOCKET) */
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index b2817156a8..5f5f45a7a5 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -1449,8 +1449,6 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux(
* NULL */
const grpc_event_engine_vtable* grpc_init_epollex_linux(
bool explicitly_requested) {
- gpr_log(GPR_ERROR,
- "Skipping epollex becuase GRPC_LINUX_EPOLL is not defined.");
return nullptr;
}
#endif /* defined(GRPC_POSIX_SOCKET) */
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index 7a8962f4a8..8072a6cbed 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -1732,8 +1732,6 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux(
* NULL */
const grpc_event_engine_vtable* grpc_init_epollsig_linux(
bool explicit_request) {
- gpr_log(GPR_ERROR,
- "Skipping epollsig becuase GRPC_LINUX_EPOLL is not defined.");
return nullptr;
}
#endif /* defined(GRPC_POSIX_SOCKET) */
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index 53de94fb6e..7ea1dfaa80 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -71,6 +71,7 @@ struct grpc_fd {
int shutdown;
int closed;
int released;
+ gpr_atm pollhup;
grpc_error* shutdown_error;
/* The watcher list.
@@ -242,7 +243,7 @@ struct grpc_pollset_set {
typedef struct poll_result {
gpr_refcount refcount;
- cv_node* watchers;
+ grpc_cv_node* watchers;
int watchcount;
struct pollfd* fds;
nfds_t nfds;
@@ -273,7 +274,7 @@ typedef struct poll_hash_table {
} poll_hash_table;
poll_hash_table poll_cache;
-cv_fd_table g_cvfds;
+grpc_cv_fd_table g_cvfds;
/*******************************************************************************
* fd_posix.c
@@ -335,6 +336,7 @@ static grpc_fd* fd_create(int fd, const char* name) {
r->on_done_closure = nullptr;
r->closed = 0;
r->released = 0;
+ gpr_atm_no_barrier_store(&r->pollhup, 0);
r->read_notifier_pollset = nullptr;
char* name2;
@@ -950,7 +952,8 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
pfds[0].events = POLLIN;
pfds[0].revents = 0;
for (i = 0; i < pollset->fd_count; i++) {
- if (fd_is_orphaned(pollset->fds[i])) {
+ if (fd_is_orphaned(pollset->fds[i]) ||
+ gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) {
GRPC_FD_UNREF(pollset->fds[i], "multipoller");
} else {
pollset->fds[fd_count++] = pollset->fds[i];
@@ -1017,6 +1020,12 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0,
(pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents);
}
+ /* This is a mitigation to prevent poll() from spinning on a
+ ** POLLHUP https://github.com/grpc/grpc/pull/13665
+ */
+ if (pfds[i].revents & POLLHUP) {
+ gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1);
+ }
fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK,
pfds[i].revents & POLLOUT_CHECK, pollset);
}
@@ -1435,7 +1444,7 @@ static void decref_poll_result(poll_result* res) {
}
}
-void remove_cvn(cv_node** head, cv_node* target) {
+void remove_cvn(grpc_cv_node** head, grpc_cv_node* target) {
if (target->next) {
target->next->prev = target->prev;
}
@@ -1460,7 +1469,7 @@ static void run_poll(void* args) {
result->completed = 1;
result->retval = retval;
result->err = errno;
- cv_node* watcher = result->watchers;
+ grpc_cv_node* watcher = result->watchers;
while (watcher) {
gpr_cv_signal(watcher->cv);
watcher = watcher->next;
@@ -1494,17 +1503,17 @@ static void run_poll(void* args) {
static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) {
unsigned int i;
int res, idx;
- cv_node* pollcv;
+ grpc_cv_node* pollcv;
int skip_poll = 0;
nfds_t nsockfds = 0;
poll_result* result = nullptr;
gpr_mu_lock(&g_cvfds.mu);
- pollcv = (cv_node*)gpr_malloc(sizeof(cv_node));
+ pollcv = (grpc_cv_node*)gpr_malloc(sizeof(grpc_cv_node));
pollcv->next = nullptr;
gpr_cv pollcv_cv;
gpr_cv_init(&pollcv_cv);
pollcv->cv = &pollcv_cv;
- cv_node* fd_cvs = (cv_node*)gpr_malloc(nfds * sizeof(cv_node));
+ grpc_cv_node* fd_cvs = (grpc_cv_node*)gpr_malloc(nfds * sizeof(grpc_cv_node));
for (i = 0; i < nfds; i++) {
fds[i].revents = 0;
@@ -1600,7 +1609,8 @@ static void global_cv_fd_table_init() {
gpr_cv_init(&g_cvfds.shutdown_cv);
gpr_ref_init(&g_cvfds.pollcount, 1);
g_cvfds.size = CV_DEFAULT_TABLE_SIZE;
- g_cvfds.cvfds = (fd_node*)gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE);
+ g_cvfds.cvfds =
+ (grpc_fd_node*)gpr_malloc(sizeof(grpc_fd_node) * CV_DEFAULT_TABLE_SIZE);
g_cvfds.free_fds = nullptr;
thread_grace = gpr_time_from_millis(POLLCV_THREAD_GRACE_MS, GPR_TIMESPAN);
for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) {
diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc
index 24ccab14b2..8cd5f8d618 100644
--- a/src/core/lib/iomgr/tcp_client_posix.cc
+++ b/src/core/lib/iomgr/tcp_client_posix.cc
@@ -212,6 +212,9 @@ finish:
fd = nullptr;
}
done = (--ac->refs == 0);
+ // Create a copy of the data from "ac" to be accessed after the unlock, as
+ // "ac" and its contents may be deallocated by the time they are read.
+ const grpc_slice addr_str_slice = grpc_slice_from_copied_string(ac->addr_str);
gpr_mu_unlock(&ac->mu);
if (error != GRPC_ERROR_NONE) {
char* error_descr;
@@ -225,9 +228,13 @@ finish:
gpr_free(error_descr);
gpr_free(desc);
error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS,
- grpc_slice_from_copied_string(ac->addr_str));
+ addr_str_slice /* takes ownership */);
+ } else {
+ grpc_slice_unref(addr_str_slice);
}
if (done) {
+ // This is safe even outside the lock, because "done", the sentinel, is
+ // populated *inside* the lock.
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_str);
grpc_channel_args_destroy(ac->channel_args);
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index 55e0b165ec..4a97f3353d 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -72,6 +72,7 @@ struct grpc_udp_listener {
grpc_udp_server_read_cb read_cb;
grpc_udp_server_write_cb write_cb;
grpc_udp_server_orphan_cb orphan_cb;
+ grpc_udp_server_start_cb start_cb;
// To be scheduled on another thread to actually read/write.
grpc_closure do_read_closure;
grpc_closure do_write_closure;
@@ -353,7 +354,7 @@ static void do_read(void* arg, grpc_error* error) {
* read lock if available. */
gpr_mu_lock(&sp->server->mu);
/* Tell the registered callback that data is available to read. */
- if (!sp->already_shutdown && sp->read_cb(sp->emfd, sp->server->user_data)) {
+ if (!sp->already_shutdown && sp->read_cb(sp->emfd)) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */
GRPC_CLOSURE_SCHED(&sp->do_read_closure, GRPC_ERROR_NONE);
@@ -383,7 +384,7 @@ static void on_read(void* arg, grpc_error* error) {
/* Read once. If there is more data to read, off load the work to another
* thread to finish. */
GPR_ASSERT(sp->read_cb);
- if (sp->read_cb(sp->emfd, sp->server->user_data)) {
+ if (sp->read_cb(sp->emfd)) {
/* There maybe more packets to read. Schedule read_more_cb_ closure to run
* after finishing this event loop. */
GRPC_CLOSURE_INIT(&sp->do_read_closure, do_read, arg,
@@ -411,7 +412,7 @@ void fd_notify_on_write_wrapper(void* arg, grpc_error* error) {
static void do_write(void* arg, grpc_error* error) {
grpc_udp_listener* sp = reinterpret_cast<grpc_udp_listener*>(arg);
- gpr_mu_lock(&(sp->server->mu));
+ gpr_mu_lock(&sp->server->mu);
if (sp->already_shutdown) {
// If fd has been shutdown, don't write any more and re-arm notification.
grpc_fd_notify_on_write(sp->emfd, &sp->write_closure);
@@ -429,7 +430,7 @@ static void do_write(void* arg, grpc_error* error) {
static void on_write(void* arg, grpc_error* error) {
grpc_udp_listener* sp = (grpc_udp_listener*)arg;
- gpr_mu_lock(&(sp->server->mu));
+ gpr_mu_lock(&sp->server->mu);
if (error != GRPC_ERROR_NONE) {
if (0 == --sp->server->active_ports && sp->server->shutdown) {
gpr_mu_unlock(&sp->server->mu);
@@ -450,6 +451,7 @@ static void on_write(void* arg, grpc_error* error) {
static int add_socket_to_server(grpc_udp_server* s, int fd,
const grpc_resolved_address* addr,
+ grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb) {
@@ -480,6 +482,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd,
sp->read_cb = read_cb;
sp->write_cb = write_cb;
sp->orphan_cb = orphan_cb;
+ sp->start_cb = start_cb;
sp->orphan_notified = false;
sp->already_shutdown = false;
GPR_ASSERT(sp->emfd);
@@ -492,6 +495,7 @@ static int add_socket_to_server(grpc_udp_server* s, int fd,
int grpc_udp_server_add_port(grpc_udp_server* s,
const grpc_resolved_address* addr,
+ grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb) {
@@ -541,8 +545,8 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
// TODO(rjshade): Test and propagate the returned grpc_error*:
GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory(
s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd));
- allocated_port1 =
- add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb);
+ allocated_port1 = add_socket_to_server(s, fd, addr, start_cb, read_cb,
+ write_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
@@ -565,7 +569,7 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
addr = &addr4_copy;
}
allocated_port2 =
- add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb);
+ add_socket_to_server(s, fd, addr, start_cb, read_cb, write_cb, orphan_cb);
done:
gpr_free(allocated_addr);
@@ -587,6 +591,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) {
void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets,
size_t pollset_count, void* user_data) {
+ gpr_log(GPR_DEBUG, "grpc_udp_server_start");
size_t i;
gpr_mu_lock(&s->mu);
grpc_udp_listener* sp;
@@ -596,6 +601,7 @@ void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets,
sp = s->head;
while (sp != nullptr) {
+ sp->start_cb(sp->emfd, sp->server->user_data);
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(pollsets[i], sp->emfd);
}
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index 02e3acb7f5..a469ab9be5 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -30,9 +30,12 @@ struct grpc_server;
/* Forward decl of grpc_udp_server */
typedef struct grpc_udp_server grpc_udp_server;
+/* Called when grpc server starts to listening on the grpc_fd. */
+typedef void (*grpc_udp_server_start_cb)(grpc_fd* emfd, void* user_data);
+
/* Called when data is available to read from the socket.
* Return true if there is more data to read from fd. */
-typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd, void* user_data);
+typedef bool (*grpc_udp_server_read_cb)(grpc_fd* emfd);
/* Called when the socket is writeable. The given closure should be scheduled
* when the socket becomes blocked next time. */
@@ -65,6 +68,7 @@ int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index);
all of the multiple socket port matching logic in one place */
int grpc_udp_server_add_port(grpc_udp_server* s,
const grpc_resolved_address* addr,
+ grpc_udp_server_start_cb start_cb,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb);
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.cc b/src/core/lib/iomgr/wakeup_fd_cv.cc
index 5c1f16d3fc..c785114212 100644
--- a/src/core/lib/iomgr/wakeup_fd_cv.cc
+++ b/src/core/lib/iomgr/wakeup_fd_cv.cc
@@ -34,7 +34,7 @@
#define MAX_TABLE_RESIZE 256
-extern cv_fd_table g_cvfds;
+extern grpc_cv_fd_table g_cvfds;
static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) {
unsigned int i, newsize;
@@ -42,8 +42,8 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) {
gpr_mu_lock(&g_cvfds.mu);
if (!g_cvfds.free_fds) {
newsize = GPR_MIN(g_cvfds.size * 2, g_cvfds.size + MAX_TABLE_RESIZE);
- g_cvfds.cvfds =
- (fd_node*)gpr_realloc(g_cvfds.cvfds, sizeof(fd_node) * newsize);
+ g_cvfds.cvfds = (grpc_fd_node*)gpr_realloc(g_cvfds.cvfds,
+ sizeof(grpc_fd_node) * newsize);
for (i = g_cvfds.size; i < newsize; i++) {
g_cvfds.cvfds[i].is_set = 0;
g_cvfds.cvfds[i].cvs = nullptr;
@@ -64,7 +64,7 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) {
}
static grpc_error* cv_fd_wakeup(grpc_wakeup_fd* fd_info) {
- cv_node* cvn;
+ grpc_cv_node* cvn;
gpr_mu_lock(&g_cvfds.mu);
g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].is_set = 1;
cvn = g_cvfds.cvfds[GRPC_FD_TO_IDX(fd_info->read_fd)].cvs;
diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h
index 017e41bfa8..399620af76 100644
--- a/src/core/lib/iomgr/wakeup_fd_cv.h
+++ b/src/core/lib/iomgr/wakeup_fd_cv.h
@@ -40,27 +40,27 @@
#define GRPC_FD_TO_IDX(fd) (-(fd)-1)
#define GRPC_IDX_TO_FD(idx) (-(idx)-1)
-typedef struct cv_node {
+typedef struct grpc_cv_node {
gpr_cv* cv;
- struct cv_node* next;
- struct cv_node* prev;
-} cv_node;
+ struct grpc_cv_node* next;
+ struct grpc_cv_node* prev;
+} grpc_cv_node;
-typedef struct fd_node {
+typedef struct grpc_fd_node {
int is_set;
- cv_node* cvs;
- struct fd_node* next_free;
-} fd_node;
+ grpc_cv_node* cvs;
+ struct grpc_fd_node* next_free;
+} grpc_fd_node;
-typedef struct cv_fd_table {
+typedef struct grpc_cv_fd_table {
gpr_mu mu;
gpr_refcount pollcount;
gpr_cv shutdown_cv;
- fd_node* cvfds;
- fd_node* free_fds;
+ grpc_fd_node* cvfds;
+ grpc_fd_node* free_fds;
unsigned int size;
grpc_poll_function_type poll;
-} cv_fd_table;
+} grpc_cv_fd_table;
extern const grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable;
diff --git a/src/core/lib/support/debug_location.h b/src/core/lib/support/debug_location.h
index 0939da595d..9b3f9220fc 100644
--- a/src/core/lib/support/debug_location.h
+++ b/src/core/lib/support/debug_location.h
@@ -36,7 +36,7 @@ class DebugLocation {
const char* file_;
const int line_;
};
-#define DEBUG_LOCATION DebugLocation(__FILE__, __LINE__)
+#define DEBUG_LOCATION ::grpc_core::DebugLocation(__FILE__, __LINE__)
#else
class DebugLocation {
public:
@@ -44,7 +44,7 @@ class DebugLocation {
const char* file() const { return nullptr; }
int line() const { return -1; }
};
-#define DEBUG_LOCATION DebugLocation()
+#define DEBUG_LOCATION ::grpc_core::DebugLocation()
#endif
} // namespace grpc_core
diff --git a/src/node/health_check/package.json b/src/node/health_check/package.json
deleted file mode 100644
index fca3a2a7a6..0000000000
--- a/src/node/health_check/package.json
+++ /dev/null
@@ -1,29 +0,0 @@
-{
- "name": "grpc-health-check",
- "version": "1.7.2",
- "author": "Google Inc.",
- "description": "Health check service for use with gRPC",
- "repository": {
- "type": "git",
- "url": "https://github.com/grpc/grpc.git"
- },
- "bugs": "https://github.com/grpc/grpc/issues",
- "contributors": [
- {
- "name": "Michael Lumish",
- "email": "mlumish@google.com"
- }
- ],
- "dependencies": {
- "grpc": "^1.7.2",
- "lodash": "^3.9.3",
- "google-protobuf": "^3.0.0"
- },
- "files": [
- "LICENSE",
- "health.js",
- "v1"
- ],
- "main": "src/node/index.js",
- "license": "Apache-2.0"
-}
diff --git a/src/node/tools/package.json b/src/node/tools/package.json
deleted file mode 100644
index 99fd854067..0000000000
--- a/src/node/tools/package.json
+++ /dev/null
@@ -1,41 +0,0 @@
-{
- "name": "grpc-tools",
- "version": "1.7.2",
- "author": "Google Inc.",
- "description": "Tools for developing with gRPC on Node.js",
- "homepage": "https://grpc.io/",
- "repository": {
- "type": "git",
- "url": "https://github.com/grpc/grpc.git"
- },
- "bugs": "https://github.com/grpc/grpc/issues",
- "contributors": [
- {
- "name": "Michael Lumish",
- "email": "mlumish@google.com"
- }
- ],
- "bin": {
- "grpc_tools_node_protoc": "./bin/protoc.js",
- "grpc_tools_node_protoc_plugin": "./bin/protoc_plugin.js"
- },
- "scripts": {
- "install": "./node_modules/.bin/node-pre-gyp install"
- },
- "bundledDependencies": ["node-pre-gyp"],
- "binary": {
- "module_name": "grpc_tools",
- "host": "https://storage.googleapis.com/",
- "remote_path": "grpc-precompiled-binaries/node/{name}/v{version}",
- "package_name": "{platform}-{arch}.tar.gz",
- "module_path": "bin"
- },
- "files": [
- "index.js",
- "bin/protoc.js",
- "bin/protoc_plugin.js",
- "bin/google/protobuf",
- "LICENSE"
- ],
- "main": "index.js"
-}
diff --git a/src/objective-c/tests/run_tests.sh b/src/objective-c/tests/run_tests.sh
index cf0b07e8c0..cec34787cf 100755
--- a/src/objective-c/tests/run_tests.sh
+++ b/src/objective-c/tests/run_tests.sh
@@ -34,36 +34,50 @@ $BINDIR/interop_server --port=5051 --max_send_message_size=8388608 --use_tls &
# Kill them when this script exits.
trap 'kill -9 `jobs -p` ; echo "EXIT TIME: $(date)"' EXIT
-# Boot Xcode first with several retries since Xcode might fail due to a bug:
-# http://www.openradar.me/29785686
-xcrun simctl list | egrep 'iPhone 6 \('
-udid=`xcrun simctl list | egrep 'iPhone 6 \(.*\) \(.*\)' | sed -E 's/ *iPhone 6 \(([^\)]*)\).*/\1/g' | head -n 1`
-retries=0
-while [ $retries -lt 3 ] && ! open -a Simulator --args -CurrentDeviceUDID $udid ; do
-retries=$(($retries+1))
-done
-if [ $retries == 3 ]; then
- echo "Xcode simulator failed to start after 3 retries."
- exit 1
-fi
+set -o pipefail
# xcodebuild is very verbose. We filter its output and tell Bash to fail if any
# element of the pipe fails.
# TODO(jcanizales): Use xctool instead? Issue #2540.
-set -o pipefail
XCODEBUILD_FILTER='(^CompileC |^Ld |^ *[^ ]*clang |^ *cd |^ *export |^Libtool |^ *[^ ]*libtool |^CpHeader |^ *builtin-copy )'
+
echo "TIME: $(date)"
-xcodebuild \
- -workspace Tests.xcworkspace \
- -scheme AllTests \
- -destination name="iPhone 6" \
- HOST_PORT_LOCALSSL=localhost:5051 \
- HOST_PORT_LOCAL=localhost:5050 \
- HOST_PORT_REMOTE=grpc-test.sandbox.googleapis.com \
- test \
- | egrep -v "$XCODEBUILD_FILTER" \
- | egrep -v '^$' \
- | egrep -v "(GPBDictionary|GPBArray)" -
+
+# Retry the test for up to 3 times when return code is 65, due to Xcode issue:
+# http://www.openradar.me/29785686
+# The issue seems to be a connectivity issue to Xcode simulator so only retry
+# the first xcodebuild command
+retries=0
+while [ $retries -lt 3 ]; do
+ return_code=0
+ out=$(xcodebuild \
+ -workspace Tests.xcworkspace \
+ -scheme AllTests \
+ -destination name="iPhone 6" \
+ HOST_PORT_LOCALSSL=localhost:5051 \
+ HOST_PORT_LOCAL=localhost:5050 \
+ HOST_PORT_REMOTE=grpc-test.sandbox.googleapis.com \
+ test 2>&1 \
+ | egrep -v "$XCODEBUILD_FILTER" \
+ | egrep -v '^$' \
+ | egrep -v "(GPBDictionary|GPBArray)" - ) || return_code=$?
+ if [ $return_code == 65 ] && [[ $out == *"DTXProxyChannel error 1"* ]]; then
+ echo "$out"
+ echo "Failed with code 65 (DTXProxyChannel error 1); retry."
+ retries=$(($retries+1))
+ elif [ $return_code == 0 ]; then
+ echo "$out"
+ break
+ else
+ echo "$out"
+ echo "Failed with code $return_code."
+ exit 1
+ fi
+done
+if [ $retries == 3 ]; then
+ echo "Failed with code 65 for 3 times; abort."
+ exit 1
+fi
echo "TIME: $(date)"
xcodebuild \
@@ -95,3 +109,5 @@ xcodebuild \
| egrep -v "$XCODEBUILD_FILTER" \
| egrep -v '^$' \
| egrep -v "(GPBDictionary|GPBArray)" -
+
+exit 0
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index d7456a3dd1..3572737c87 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -129,12 +129,12 @@ def _abort(state, code, details):
def _handle_event(event, state, response_deserializer):
callbacks = []
for batch_operation in event.batch_operations:
- operation_type = batch_operation.type
+ operation_type = batch_operation.type()
state.due.remove(operation_type)
if operation_type == cygrpc.OperationType.receive_initial_metadata:
- state.initial_metadata = batch_operation.received_metadata
+ state.initial_metadata = batch_operation.initial_metadata()
elif operation_type == cygrpc.OperationType.receive_message:
- serialized_response = batch_operation.received_message.bytes()
+ serialized_response = batch_operation.message()
if serialized_response is not None:
response = _common.deserialize(serialized_response,
response_deserializer)
@@ -144,18 +144,17 @@ def _handle_event(event, state, response_deserializer):
else:
state.response = response
elif operation_type == cygrpc.OperationType.receive_status_on_client:
- state.trailing_metadata = batch_operation.received_metadata
+ state.trailing_metadata = batch_operation.trailing_metadata()
if state.code is None:
code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
- batch_operation.received_status_code)
+ batch_operation.code())
if code is None:
state.code = grpc.StatusCode.UNKNOWN
state.details = _unknown_code_details(
- batch_operation.received_status_code,
- batch_operation.received_status_details)
+ code, batch_operation.details())
else:
state.code = code
- state.details = batch_operation.received_status_details
+ state.details = batch_operation.details()
callbacks.extend(state.callbacks)
state.callbacks = None
return callbacks
@@ -200,7 +199,7 @@ def _consume_request_iterator(request_iterator, state, call,
_abort(state, grpc.StatusCode.INTERNAL, details)
return
else:
- operations = (cygrpc.operation_send_message(
+ operations = (cygrpc.SendMessageOperation(
serialized_request, _EMPTY_FLAGS),)
call.start_client_batch(operations, event_handler)
state.due.add(cygrpc.OperationType.send_message)
@@ -216,7 +215,7 @@ def _consume_request_iterator(request_iterator, state, call,
with state.condition:
if state.code is None:
operations = (
- cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),)
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
call.start_client_batch(operations, event_handler)
state.due.add(cygrpc.OperationType.send_close_from_client)
@@ -319,7 +318,7 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
event_handler = _event_handler(self._state, self._call,
self._response_deserializer)
self._call.start_client_batch(
- (cygrpc.operation_receive_message(_EMPTY_FLAGS),),
+ (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
event_handler)
self._state.due.add(cygrpc.OperationType.receive_message)
elif self._state.code is grpc.StatusCode.OK:
@@ -453,12 +452,12 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
else:
state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
operations = (
- cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
- cygrpc.operation_send_message(serialized_request, _EMPTY_FLAGS),
- cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
- cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
- cygrpc.operation_receive_message(_EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
+ cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
return state, operations, deadline, deadline_timespec, None
def _blocking(self, request, timeout, metadata, credentials):
@@ -536,14 +535,14 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
self._response_deserializer)
with state.condition:
call.start_client_batch(
- (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),),
+ (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
event_handler)
operations = (
- cygrpc.operation_send_initial_metadata(
- metadata, _EMPTY_FLAGS), cygrpc.operation_send_message(
+ cygrpc.SendInitialMetadataOperation(
+ metadata, _EMPTY_FLAGS), cygrpc.SendMessageOperation(
serialized_request, _EMPTY_FLAGS),
- cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
call_error = call.start_client_batch(operations, event_handler)
if call_error != cygrpc.CallError.ok:
_call_error_set_RPCstate(state, call_error, metadata)
@@ -573,12 +572,11 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
call.set_credentials(credentials._credentials)
with state.condition:
call.start_client_batch(
- (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),),
- None)
+ (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), None)
operations = (
- cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
- cygrpc.operation_receive_message(_EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
+ cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
call_error = call.start_client_batch(operations, None)
_check_call_error(call_error, metadata)
_consume_request_iterator(request_iterator, state, call,
@@ -624,12 +622,12 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
event_handler = _event_handler(state, call, self._response_deserializer)
with state.condition:
call.start_client_batch(
- (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),),
+ (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
event_handler)
operations = (
- cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
- cygrpc.operation_receive_message(_EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
+ cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
call_error = call.start_client_batch(operations, event_handler)
if call_error != cygrpc.CallError.ok:
_call_error_set_RPCstate(state, call_error, metadata)
@@ -664,11 +662,11 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
event_handler = _event_handler(state, call, self._response_deserializer)
with state.condition:
call.start_client_batch(
- (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),),
+ (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
event_handler)
operations = (
- cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
+ cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
call_error = call.start_client_batch(operations, event_handler)
if call_error != cygrpc.CallError.ok:
_call_error_set_RPCstate(state, call_error, metadata)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
index 6361669757..0892215b6d 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
@@ -26,16 +26,13 @@ cdef class Call:
def _start_batch(self, operations, tag, retain_self):
if not self.is_valid:
raise ValueError("invalid call object cannot be used from Python")
- cdef OperationTag operation_tag = OperationTag(tag, operations)
- if retain_self:
- operation_tag.operation_call = self
- else:
- operation_tag.operation_call = None
- operation_tag.store_ops()
- cpython.Py_INCREF(operation_tag)
+ cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag(
+ tag, operations, self if retain_self else None)
+ batch_operation_tag.prepare()
+ cpython.Py_INCREF(batch_operation_tag)
return grpc_call_start_batch(
- self.c_call, operation_tag.c_ops, operation_tag.c_nops,
- <cpython.PyObject *>operation_tag, NULL)
+ self.c_call, batch_operation_tag.c_ops, batch_operation_tag.c_nops,
+ <cpython.PyObject *>batch_operation_tag, NULL)
def start_client_batch(self, operations, tag):
# We don't reference this call in the operations tag because
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index 644df674cc..443d534d7e 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -76,12 +76,12 @@ cdef class Channel:
def watch_connectivity_state(
self, grpc_connectivity_state last_observed_state,
Timespec deadline not None, CompletionQueue queue not None, tag):
- cdef OperationTag operation_tag = OperationTag(tag, None)
- cpython.Py_INCREF(operation_tag)
+ cdef _ConnectivityTag connectivity_tag = _ConnectivityTag(tag)
+ cpython.Py_INCREF(connectivity_tag)
with nogil:
grpc_channel_watch_connectivity_state(
self.c_channel, last_observed_state, deadline.c_time,
- queue.c_completion_queue, <cpython.PyObject *>operation_tag)
+ queue.c_completion_queue, <cpython.PyObject *>connectivity_tag)
def target(self):
cdef char *target = NULL
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index 140fc357b9..e259789b35 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -37,42 +37,20 @@ cdef class CompletionQueue:
self.is_shutdown = False
cdef _interpret_event(self, grpc_event event):
- cdef OperationTag tag = None
- cdef object user_tag = None
- cdef Call operation_call = None
- cdef CallDetails request_call_details = None
- cdef object request_metadata = None
- cdef object batch_operations = None
+ cdef _Tag tag = None
if event.type == GRPC_QUEUE_TIMEOUT:
- return Event(
- event.type, False, None, None, None, None, False, None)
+ # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
+ return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None)
elif event.type == GRPC_QUEUE_SHUTDOWN:
self.is_shutdown = True
- return Event(
- event.type, True, None, None, None, None, False, None)
+ # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
+ return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, True, None)
else:
- if event.tag != NULL:
- tag = <OperationTag>event.tag
- # We receive event tags only after they've been inc-ref'd elsewhere in
- # the code.
- cpython.Py_DECREF(tag)
- if tag.shutting_down_server is not None:
- tag.shutting_down_server.notify_shutdown_complete()
- user_tag = tag.user_tag
- operation_call = tag.operation_call
- request_call_details = tag.request_call_details
- if tag.is_new_request:
- request_metadata = _metadata(&tag._c_request_metadata)
- grpc_metadata_array_destroy(&tag._c_request_metadata)
- batch_operations = tag.release_ops()
- if tag.is_new_request:
- # Stuff in the tag not explicitly handled by us needs to live through
- # the life of the call
- operation_call.references.extend(tag.references)
- return Event(
- event.type, event.success, user_tag, operation_call,
- request_call_details, request_metadata, tag.is_new_request,
- batch_operations)
+ tag = <_Tag>event.tag
+ # We receive event tags only after they've been inc-ref'd elsewhere in
+ # the code.
+ cpython.Py_DECREF(tag)
+ return tag.event(event)
def poll(self, Timespec deadline=None):
# We name this 'poll' to avoid problems with CPython's expectations for
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi
new file mode 100644
index 0000000000..686199ecf4
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi
@@ -0,0 +1,45 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class ConnectivityEvent:
+
+ cdef readonly grpc_completion_type completion_type
+ cdef readonly bint success
+ cdef readonly object tag
+
+
+cdef class RequestCallEvent:
+
+ cdef readonly grpc_completion_type completion_type
+ cdef readonly bint success
+ cdef readonly object tag
+ cdef readonly Call call
+ cdef readonly CallDetails call_details
+ cdef readonly tuple invocation_metadata
+
+
+cdef class BatchOperationEvent:
+
+ cdef readonly grpc_completion_type completion_type
+ cdef readonly bint success
+ cdef readonly object tag
+ cdef readonly object batch_operations
+
+
+cdef class ServerShutdownEvent:
+
+ cdef readonly grpc_completion_type completion_type
+ cdef readonly bint success
+ cdef readonly object tag
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi
new file mode 100644
index 0000000000..af26d27318
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi
@@ -0,0 +1,55 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class ConnectivityEvent:
+
+ def __cinit__(
+ self, grpc_completion_type completion_type, bint success, object tag):
+ self.completion_type = completion_type
+ self.success = success
+ self.tag = tag
+
+
+cdef class RequestCallEvent:
+
+ def __cinit__(
+ self, grpc_completion_type completion_type, bint success, object tag,
+ Call call, CallDetails call_details, tuple invocation_metadata):
+ self.completion_type = completion_type
+ self.success = success
+ self.tag = tag
+ self.call = call
+ self.call_details = call_details
+ self.invocation_metadata = invocation_metadata
+
+
+cdef class BatchOperationEvent:
+
+ def __cinit__(
+ self, grpc_completion_type completion_type, bint success, object tag,
+ object batch_operations):
+ self.completion_type = completion_type
+ self.success = success
+ self.tag = tag
+ self.batch_operations = batch_operations
+
+
+cdef class ServerShutdownEvent:
+
+ def __cinit__(
+ self, grpc_completion_type completion_type, bint success, object tag):
+ self.completion_type = completion_type
+ self.success = success
+ self.tag = tag
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index 6a72bbf693..6ee833697d 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -17,6 +17,7 @@ cimport libc.time
# Typedef types with approximately the same semantics to provide their names to
# Cython
+ctypedef unsigned char uint8_t
ctypedef int int32_t
ctypedef unsigned uint32_t
ctypedef long int64_t
@@ -25,6 +26,7 @@ ctypedef long int64_t
cdef extern from "grpc/support/alloc.h":
void *gpr_malloc(size_t size) nogil
+ void *gpr_zalloc(size_t size) nogil
void gpr_free(void *ptr) nogil
void *gpr_realloc(void *p, size_t size) nogil
@@ -183,6 +185,18 @@ cdef extern from "grpc/grpc.h":
size_t arguments_length "num_args"
grpc_arg *arguments "args"
+ ctypedef enum grpc_compression_level:
+ GRPC_COMPRESS_LEVEL_NONE
+ GRPC_COMPRESS_LEVEL_LOW
+ GRPC_COMPRESS_LEVEL_MED
+ GRPC_COMPRESS_LEVEL_HIGH
+
+ ctypedef enum grpc_stream_compression_level:
+ GRPC_STREAM_COMPRESS_LEVEL_NONE
+ GRPC_STREAM_COMPRESS_LEVEL_LOW
+ GRPC_STREAM_COMPRESS_LEVEL_MED
+ GRPC_STREAM_COMPRESS_LEVEL_HIGH
+
ctypedef enum grpc_call_error:
GRPC_CALL_OK
GRPC_CALL_ERROR
@@ -258,9 +272,19 @@ cdef extern from "grpc/grpc.h":
GRPC_OP_RECV_STATUS_ON_CLIENT
GRPC_OP_RECV_CLOSE_ON_SERVER
+ ctypedef struct grpc_op_send_initial_metadata_maybe_compression_level:
+ uint8_t is_set
+ grpc_compression_level level
+
+ ctypedef struct grpc_op_send_initial_metadata_maybe_stream_compression_level:
+ uint8_t is_set
+ grpc_stream_compression_level level
+
ctypedef struct grpc_op_data_send_initial_metadata:
size_t count
grpc_metadata *metadata
+ grpc_op_send_initial_metadata_maybe_compression_level maybe_compression_level
+ grpc_op_send_initial_metadata_maybe_stream_compression_level maybe_stream_compression_level
ctypedef struct grpc_op_data_send_status_from_server:
size_t trailing_metadata_count
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi
new file mode 100644
index 0000000000..bfbe27785b
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi
@@ -0,0 +1,109 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class Operation:
+
+ cdef void c(self)
+ cdef void un_c(self)
+
+ # TODO(https://github.com/grpc/grpc/issues/7950): Eliminate this!
+ cdef grpc_op c_op
+
+
+cdef class SendInitialMetadataOperation(Operation):
+
+ cdef readonly object _initial_metadata;
+ cdef readonly int _flags
+ cdef grpc_metadata *_c_initial_metadata
+ cdef size_t _c_initial_metadata_count
+
+ cdef void c(self)
+ cdef void un_c(self)
+
+
+cdef class SendMessageOperation(Operation):
+
+ cdef readonly bytes _message
+ cdef readonly int _flags
+ cdef grpc_byte_buffer *_c_message_byte_buffer
+
+ cdef void c(self)
+ cdef void un_c(self)
+
+
+cdef class SendCloseFromClientOperation(Operation):
+
+ cdef readonly int _flags
+
+ cdef void c(self)
+ cdef void un_c(self)
+
+
+cdef class SendStatusFromServerOperation(Operation):
+
+ cdef readonly object _trailing_metadata
+ cdef readonly object _code
+ cdef readonly object _details
+ cdef readonly int _flags
+ cdef grpc_metadata *_c_trailing_metadata
+ cdef size_t _c_trailing_metadata_count
+ cdef grpc_slice _c_details
+
+ cdef void c(self)
+ cdef void un_c(self)
+
+
+cdef class ReceiveInitialMetadataOperation(Operation):
+
+ cdef readonly int _flags
+ cdef tuple _initial_metadata
+ cdef grpc_metadata_array _c_initial_metadata
+
+ cdef void c(self)
+ cdef void un_c(self)
+
+
+cdef class ReceiveMessageOperation(Operation):
+
+ cdef readonly int _flags
+ cdef grpc_byte_buffer *_c_message_byte_buffer
+ cdef bytes _message
+
+ cdef void c(self)
+ cdef void un_c(self)
+
+
+cdef class ReceiveStatusOnClientOperation(Operation):
+
+ cdef readonly int _flags
+ cdef grpc_metadata_array _c_trailing_metadata
+ cdef grpc_status_code _c_code
+ cdef grpc_slice _c_details
+ cdef tuple _trailing_metadata
+ cdef object _code
+ cdef str _details
+
+ cdef void c(self)
+ cdef void un_c(self)
+
+
+cdef class ReceiveCloseOnServerOperation(Operation):
+
+ cdef readonly int _flags
+ cdef object _cancelled
+ cdef int _c_cancelled
+
+ cdef void c(self)
+ cdef void un_c(self)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi
new file mode 100644
index 0000000000..3c91abf722
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi
@@ -0,0 +1,238 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class Operation:
+
+ cdef void c(self):
+ raise NotImplementedError()
+
+ cdef void un_c(self):
+ raise NotImplementedError()
+
+
+cdef class SendInitialMetadataOperation(Operation):
+
+ def __cinit__(self, initial_metadata, flags):
+ self._initial_metadata = initial_metadata
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_SEND_INITIAL_METADATA
+
+ cdef void c(self):
+ self.c_op.type = GRPC_OP_SEND_INITIAL_METADATA
+ self.c_op.flags = self._flags
+ _store_c_metadata(
+ self._initial_metadata, &self._c_initial_metadata,
+ &self._c_initial_metadata_count)
+ self.c_op.data.send_initial_metadata.metadata = self._c_initial_metadata
+ self.c_op.data.send_initial_metadata.count = self._c_initial_metadata_count
+ self.c_op.data.send_initial_metadata.maybe_compression_level.is_set = 0
+ self.c_op.data.send_initial_metadata.maybe_stream_compression_level.is_set = 0
+
+ cdef void un_c(self):
+ _release_c_metadata(
+ self._c_initial_metadata, self._c_initial_metadata_count)
+
+
+cdef class SendMessageOperation(Operation):
+
+ def __cinit__(self, bytes message, int flags):
+ self._message = message
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_SEND_MESSAGE
+
+ cdef void c(self):
+ self.c_op.type = GRPC_OP_SEND_MESSAGE
+ self.c_op.flags = self._flags
+ cdef grpc_slice message_slice = grpc_slice_from_copied_buffer(
+ self._message, len(self._message))
+ self._c_message_byte_buffer = grpc_raw_byte_buffer_create(
+ &message_slice, 1)
+ grpc_slice_unref(message_slice)
+ self.c_op.data.send_message.send_message = self._c_message_byte_buffer
+
+ cdef void un_c(self):
+ grpc_byte_buffer_destroy(self._c_message_byte_buffer)
+
+
+cdef class SendCloseFromClientOperation(Operation):
+
+ def __cinit__(self, int flags):
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_SEND_CLOSE_FROM_CLIENT
+
+ cdef void c(self):
+ self.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT
+ self.c_op.flags = self._flags
+
+ cdef void un_c(self):
+ pass
+
+
+cdef class SendStatusFromServerOperation(Operation):
+
+ def __cinit__(self, trailing_metadata, code, object details, int flags):
+ self._trailing_metadata = trailing_metadata
+ self._code = code
+ self._details = details
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_SEND_STATUS_FROM_SERVER
+
+ cdef void c(self):
+ self.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER
+ self.c_op.flags = self._flags
+ _store_c_metadata(
+ self._trailing_metadata, &self._c_trailing_metadata,
+ &self._c_trailing_metadata_count)
+ self.c_op.data.send_status_from_server.trailing_metadata = (
+ self._c_trailing_metadata)
+ self.c_op.data.send_status_from_server.trailing_metadata_count = (
+ self._c_trailing_metadata_count)
+ self.c_op.data.send_status_from_server.status = self._code
+ self._c_details = _slice_from_bytes(_encode(self._details))
+ self.c_op.data.send_status_from_server.status_details = &self._c_details
+
+ cdef void un_c(self):
+ grpc_slice_unref(self._c_details)
+ _release_c_metadata(
+ self._c_trailing_metadata, self._c_trailing_metadata_count)
+
+
+cdef class ReceiveInitialMetadataOperation(Operation):
+
+ def __cinit__(self, flags):
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_RECV_INITIAL_METADATA
+
+ cdef void c(self):
+ self.c_op.type = GRPC_OP_RECV_INITIAL_METADATA
+ self.c_op.flags = self._flags
+ grpc_metadata_array_init(&self._c_initial_metadata)
+ self.c_op.data.receive_initial_metadata.receive_initial_metadata = (
+ &self._c_initial_metadata)
+
+ cdef void un_c(self):
+ self._initial_metadata = _metadata(&self._c_initial_metadata)
+ grpc_metadata_array_destroy(&self._c_initial_metadata)
+
+ def initial_metadata(self):
+ return self._initial_metadata
+
+
+cdef class ReceiveMessageOperation(Operation):
+
+ def __cinit__(self, flags):
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_RECV_MESSAGE
+
+ cdef void c(self):
+ self.c_op.type = GRPC_OP_RECV_MESSAGE
+ self.c_op.flags = self._flags
+ self.c_op.data.receive_message.receive_message = (
+ &self._c_message_byte_buffer)
+
+ cdef void un_c(self):
+ cdef grpc_byte_buffer_reader message_reader
+ cdef bint message_reader_status
+ cdef grpc_slice message_slice
+ cdef size_t message_slice_length
+ cdef void *message_slice_pointer
+ if self._c_message_byte_buffer != NULL:
+ message_reader_status = grpc_byte_buffer_reader_init(
+ &message_reader, self._c_message_byte_buffer)
+ if message_reader_status:
+ message = bytearray()
+ while grpc_byte_buffer_reader_next(&message_reader, &message_slice):
+ message_slice_pointer = grpc_slice_start_ptr(message_slice)
+ message_slice_length = grpc_slice_length(message_slice)
+ message += (<char *>message_slice_pointer)[:message_slice_length]
+ grpc_slice_unref(message_slice)
+ grpc_byte_buffer_reader_destroy(&message_reader)
+ self._message = bytes(message)
+ else:
+ self._message = None
+ grpc_byte_buffer_destroy(self._c_message_byte_buffer)
+ else:
+ self._message = None
+
+ def message(self):
+ return self._message
+
+
+cdef class ReceiveStatusOnClientOperation(Operation):
+
+ def __cinit__(self, flags):
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_RECV_STATUS_ON_CLIENT
+
+ cdef void c(self):
+ self.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT
+ self.c_op.flags = self._flags
+ grpc_metadata_array_init(&self._c_trailing_metadata)
+ self.c_op.data.receive_status_on_client.trailing_metadata = (
+ &self._c_trailing_metadata)
+ self.c_op.data.receive_status_on_client.status = (
+ &self._c_code)
+ self.c_op.data.receive_status_on_client.status_details = (
+ &self._c_details)
+
+ cdef void un_c(self):
+ self._trailing_metadata = _metadata(&self._c_trailing_metadata)
+ grpc_metadata_array_destroy(&self._c_trailing_metadata)
+ self._code = self._c_code
+ self._details = _decode(_slice_bytes(self._c_details))
+ grpc_slice_unref(self._c_details)
+
+ def trailing_metadata(self):
+ return self._trailing_metadata
+
+ def code(self):
+ return self._code
+
+ def details(self):
+ return self._details
+
+
+cdef class ReceiveCloseOnServerOperation(Operation):
+
+ def __cinit__(self, flags):
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_RECV_CLOSE_ON_SERVER
+
+ cdef void c(self):
+ self.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER
+ self.c_op.flags = self._flags
+ self.c_op.data.receive_close_on_server.cancelled = &self._c_cancelled
+
+ cdef void un_c(self):
+ self._cancelled = bool(self._c_cancelled)
+
+ def cancelled(self):
+ return self._cancelled
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
index 594fdb1a8b..7b2482d947 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
@@ -28,48 +28,6 @@ cdef class CallDetails:
cdef grpc_call_details c_details
-cdef class OperationTag:
-
- cdef object user_tag
- cdef list references
- # This allows CompletionQueue to notify the Python Server object that the
- # underlying GRPC core server has shutdown
- cdef Server shutting_down_server
- cdef Call operation_call
- cdef CallDetails request_call_details
- cdef grpc_metadata_array _c_request_metadata
- cdef grpc_op *c_ops
- cdef size_t c_nops
- cdef readonly object _operations
- cdef bint is_new_request
-
- cdef void store_ops(self)
- cdef object release_ops(self)
-
-
-cdef class Event:
-
- cdef readonly grpc_completion_type type
- cdef readonly bint success
- cdef readonly object tag
-
- # For Server.request_call
- cdef readonly bint is_new_request
- cdef readonly CallDetails request_call_details
- cdef readonly object request_metadata
-
- # For server calls
- cdef readonly Call operation_call
-
- # For Call.start_batch
- cdef readonly object batch_operations
-
-
-cdef class ByteBuffer:
-
- cdef grpc_byte_buffer *c_byte_buffer
-
-
cdef class SslPemKeyCertPair:
cdef grpc_ssl_pem_key_cert_pair c_pair
@@ -89,22 +47,6 @@ cdef class ChannelArgs:
cdef list args
-cdef class Operation:
-
- cdef grpc_op c_op
- cdef bint _c_metadata_needs_release
- cdef size_t _c_metadata_count
- cdef grpc_metadata *_c_metadata
- cdef ByteBuffer _received_message
- cdef bint _c_metadata_array_needs_destruction
- cdef grpc_metadata_array _c_metadata_array
- cdef grpc_status_code _received_status_code
- cdef grpc_slice _status_details
- cdef int _received_cancelled
- cdef readonly bint is_valid
- cdef object references
-
-
cdef class CompressionOptions:
cdef grpc_compression_options c_options
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index 26eaf50eb4..bc2cd0338e 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -218,111 +218,6 @@ cdef class CallDetails:
return timespec
-cdef class OperationTag:
-
- def __cinit__(self, user_tag, operations):
- self.user_tag = user_tag
- self.references = []
- self._operations = operations
-
- cdef void store_ops(self):
- self.c_nops = 0 if self._operations is None else len(self._operations)
- if 0 < self.c_nops:
- self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops)
- for index in range(self.c_nops):
- self.c_ops[index] = (<Operation>(self._operations[index])).c_op
-
- cdef object release_ops(self):
- if 0 < self.c_nops:
- for index, operation in enumerate(self._operations):
- (<Operation>operation).c_op = self.c_ops[index]
- gpr_free(self.c_ops)
- return self._operations
- else:
- return ()
-
-
-cdef class Event:
-
- def __cinit__(self, grpc_completion_type type, bint success,
- object tag, Call operation_call,
- CallDetails request_call_details,
- object request_metadata,
- bint is_new_request,
- object batch_operations):
- self.type = type
- self.success = success
- self.tag = tag
- self.operation_call = operation_call
- self.request_call_details = request_call_details
- self.request_metadata = request_metadata
- self.batch_operations = batch_operations
- self.is_new_request = is_new_request
-
-
-cdef class ByteBuffer:
-
- def __cinit__(self, bytes data):
- grpc_init()
- if data is None:
- self.c_byte_buffer = NULL
- return
-
- cdef char *c_data = data
- cdef grpc_slice data_slice
- cdef size_t data_length = len(data)
- with nogil:
- data_slice = grpc_slice_from_copied_buffer(c_data, data_length)
- with nogil:
- self.c_byte_buffer = grpc_raw_byte_buffer_create(
- &data_slice, 1)
- with nogil:
- grpc_slice_unref(data_slice)
-
- def bytes(self):
- cdef grpc_byte_buffer_reader reader
- cdef grpc_slice data_slice
- cdef size_t data_slice_length
- cdef void *data_slice_pointer
- cdef bint reader_status
- if self.c_byte_buffer != NULL:
- with nogil:
- reader_status = grpc_byte_buffer_reader_init(
- &reader, self.c_byte_buffer)
- if not reader_status:
- return None
- result = bytearray()
- with nogil:
- while grpc_byte_buffer_reader_next(&reader, &data_slice):
- data_slice_pointer = grpc_slice_start_ptr(data_slice)
- data_slice_length = grpc_slice_length(data_slice)
- with gil:
- result += (<char *>data_slice_pointer)[:data_slice_length]
- grpc_slice_unref(data_slice)
- with nogil:
- grpc_byte_buffer_reader_destroy(&reader)
- return bytes(result)
- else:
- return None
-
- def __len__(self):
- cdef size_t result
- if self.c_byte_buffer != NULL:
- with nogil:
- result = grpc_byte_buffer_length(self.c_byte_buffer)
- return result
- else:
- return 0
-
- def __str__(self):
- return self.bytes()
-
- def __dealloc__(self):
- if self.c_byte_buffer != NULL:
- grpc_byte_buffer_destroy(self.c_byte_buffer)
- grpc_shutdown()
-
-
cdef class SslPemKeyCertPair:
def __cinit__(self, bytes private_key, bytes certificate_chain):
@@ -407,185 +302,6 @@ cdef class ChannelArgs:
return self.args[i]
-cdef class Operation:
-
- def __cinit__(self):
- grpc_init()
- self.references = []
- self._c_metadata_needs_release = False
- self._c_metadata_array_needs_destruction = False
- self._status_details = grpc_empty_slice()
- self.is_valid = False
-
- @property
- def type(self):
- return self.c_op.type
-
- @property
- def flags(self):
- return self.c_op.flags
-
- @property
- def has_status(self):
- return self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT
-
- @property
- def received_message(self):
- if self.c_op.type != GRPC_OP_RECV_MESSAGE:
- raise TypeError("self must be an operation receiving a message")
- return self._received_message
-
- @property
- def received_message_or_none(self):
- if self.c_op.type != GRPC_OP_RECV_MESSAGE:
- return None
- return self._received_message
-
- @property
- def received_metadata(self):
- if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and
- self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT):
- raise TypeError("self must be an operation receiving metadata")
- return _metadata(&self._c_metadata_array)
-
- @property
- def received_status_code(self):
- if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
- raise TypeError("self must be an operation receiving a status code")
- return self._received_status_code
-
- @property
- def received_status_code_or_none(self):
- if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
- return None
- return self._received_status_code
-
- @property
- def received_status_details(self):
- if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
- raise TypeError("self must be an operation receiving status details")
- return _slice_bytes(self._status_details)
-
- @property
- def received_status_details_or_none(self):
- if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
- return None
- return _slice_bytes(self._status_details)
-
- @property
- def received_cancelled(self):
- if self.c_op.type != GRPC_OP_RECV_CLOSE_ON_SERVER:
- raise TypeError("self must be an operation receiving cancellation "
- "information")
- return False if self._received_cancelled == 0 else True
-
- @property
- def received_cancelled_or_none(self):
- if self.c_op.type != GRPC_OP_RECV_CLOSE_ON_SERVER:
- return None
- return False if self._received_cancelled == 0 else True
-
- def __dealloc__(self):
- if self._c_metadata_needs_release:
- _release_c_metadata(self._c_metadata, self._c_metadata_count)
- if self._c_metadata_array_needs_destruction:
- grpc_metadata_array_destroy(&self._c_metadata_array)
- grpc_slice_unref(self._status_details)
- grpc_shutdown()
-
-def operation_send_initial_metadata(metadata, int flags):
- cdef Operation op = Operation()
- op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA
- op.c_op.flags = flags
- _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count)
- op._c_metadata_needs_release = True
- op.c_op.data.send_initial_metadata.count = op._c_metadata_count
- op.c_op.data.send_initial_metadata.metadata = op._c_metadata
- op.is_valid = True
- return op
-
-def operation_send_message(data, int flags):
- cdef Operation op = Operation()
- op.c_op.type = GRPC_OP_SEND_MESSAGE
- op.c_op.flags = flags
- byte_buffer = ByteBuffer(data)
- op.c_op.data.send_message.send_message = byte_buffer.c_byte_buffer
- op.references.append(byte_buffer)
- op.is_valid = True
- return op
-
-def operation_send_close_from_client(int flags):
- cdef Operation op = Operation()
- op.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT
- op.c_op.flags = flags
- op.is_valid = True
- return op
-
-def operation_send_status_from_server(
- metadata, grpc_status_code code, bytes details, int flags):
- cdef Operation op = Operation()
- op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER
- op.c_op.flags = flags
- _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count)
- op._c_metadata_needs_release = True
- op.c_op.data.send_status_from_server.trailing_metadata_count = (
- op._c_metadata_count)
- op.c_op.data.send_status_from_server.trailing_metadata = op._c_metadata
- op.c_op.data.send_status_from_server.status = code
- grpc_slice_unref(op._status_details)
- op._status_details = _slice_from_bytes(details)
- op.c_op.data.send_status_from_server.status_details = &op._status_details
- op.is_valid = True
- return op
-
-def operation_receive_initial_metadata(int flags):
- cdef Operation op = Operation()
- op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA
- op.c_op.flags = flags
- grpc_metadata_array_init(&op._c_metadata_array)
- op.c_op.data.receive_initial_metadata.receive_initial_metadata = (
- &op._c_metadata_array)
- op._c_metadata_array_needs_destruction = True
- op.is_valid = True
- return op
-
-def operation_receive_message(int flags):
- cdef Operation op = Operation()
- op.c_op.type = GRPC_OP_RECV_MESSAGE
- op.c_op.flags = flags
- op._received_message = ByteBuffer(None)
- # n.b. the c_op.data.receive_message field needs to be deleted by us,
- # anyway, so we just let that be handled by the ByteBuffer() we allocated
- # the line before.
- op.c_op.data.receive_message.receive_message = (
- &op._received_message.c_byte_buffer)
- op.is_valid = True
- return op
-
-def operation_receive_status_on_client(int flags):
- cdef Operation op = Operation()
- op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT
- op.c_op.flags = flags
- grpc_metadata_array_init(&op._c_metadata_array)
- op.c_op.data.receive_status_on_client.trailing_metadata = (
- &op._c_metadata_array)
- op._c_metadata_array_needs_destruction = True
- op.c_op.data.receive_status_on_client.status = (
- &op._received_status_code)
- op.c_op.data.receive_status_on_client.status_details = (
- &op._status_details)
- op.is_valid = True
- return op
-
-def operation_receive_close_on_server(int flags):
- cdef Operation op = Operation()
- op.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER
- op.c_op.flags = flags
- op.c_op.data.receive_close_on_server.cancelled = &op._received_cancelled
- op.is_valid = True
- return op
-
-
cdef class CompressionOptions:
def __cinit__(self):
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index f8d7892858..c19beccde6 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -78,19 +78,15 @@ cdef class Server:
raise ValueError("server must be started and not shutting down")
if server_queue not in self.registered_completion_queues:
raise ValueError("server_queue must be a registered completion queue")
- cdef OperationTag operation_tag = OperationTag(tag, None)
- operation_tag.operation_call = Call()
- operation_tag.request_call_details = CallDetails()
- grpc_metadata_array_init(&operation_tag._c_request_metadata)
- operation_tag.references.extend([self, call_queue, server_queue])
- operation_tag.is_new_request = True
- cpython.Py_INCREF(operation_tag)
+ cdef _RequestCallTag request_call_tag = _RequestCallTag(tag)
+ request_call_tag.prepare()
+ cpython.Py_INCREF(request_call_tag)
return grpc_server_request_call(
- self.c_server, &operation_tag.operation_call.c_call,
- &operation_tag.request_call_details.c_details,
- &operation_tag._c_request_metadata,
+ self.c_server, &request_call_tag.call.c_call,
+ &request_call_tag.call_details.c_details,
+ &request_call_tag.c_invocation_metadata,
call_queue.c_completion_queue, server_queue.c_completion_queue,
- <cpython.PyObject *>operation_tag)
+ <cpython.PyObject *>request_call_tag)
def register_completion_queue(
self, CompletionQueue queue not None):
@@ -131,16 +127,14 @@ cdef class Server:
cdef _c_shutdown(self, CompletionQueue queue, tag):
self.is_shutting_down = True
- operation_tag = OperationTag(tag, None)
- operation_tag.shutting_down_server = self
- cpython.Py_INCREF(operation_tag)
+ cdef _ServerShutdownTag server_shutdown_tag = _ServerShutdownTag(tag, self)
+ cpython.Py_INCREF(server_shutdown_tag)
with nogil:
grpc_server_shutdown_and_notify(
self.c_server, queue.c_completion_queue,
- <cpython.PyObject *>operation_tag)
+ <cpython.PyObject *>server_shutdown_tag)
def shutdown(self, CompletionQueue queue not None, tag):
- cdef OperationTag operation_tag
if queue.is_shutting_down:
raise ValueError("queue must be live")
elif not self.is_started:
@@ -153,7 +147,8 @@ cdef class Server:
self._c_shutdown(queue, tag)
cdef notify_shutdown_complete(self):
- # called only by a completion queue on receiving our shutdown operation tag
+ # called only after our server shutdown tag has emerged from a completion
+ # queue.
self.is_shutdown = True
def cancel_all_calls(self):
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi
new file mode 100644
index 0000000000..f9a3b5e8f4
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi
@@ -0,0 +1,58 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class _Tag:
+
+ cdef object event(self, grpc_event c_event)
+
+
+cdef class _ConnectivityTag(_Tag):
+
+ cdef readonly object _user_tag
+
+ cdef ConnectivityEvent event(self, grpc_event c_event)
+
+
+cdef class _RequestCallTag(_Tag):
+
+ cdef readonly object _user_tag
+ cdef Call call
+ cdef CallDetails call_details
+ cdef grpc_metadata_array c_invocation_metadata
+
+ cdef void prepare(self)
+ cdef RequestCallEvent event(self, grpc_event c_event)
+
+
+cdef class _BatchOperationTag(_Tag):
+
+ cdef object _user_tag
+ cdef readonly object _operations
+ cdef readonly object _retained_call
+ cdef grpc_op *c_ops
+ cdef size_t c_nops
+
+ cdef void prepare(self)
+ cdef BatchOperationEvent event(self, grpc_event c_event)
+
+
+cdef class _ServerShutdownTag(_Tag):
+
+ cdef readonly object _user_tag
+ # This allows CompletionQueue to notify the Python Server object that the
+ # underlying GRPC core server has shutdown
+ cdef readonly Server _shutting_down_server
+
+ cdef ServerShutdownEvent event(self, grpc_event c_event)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi
new file mode 100644
index 0000000000..aaca458442
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi
@@ -0,0 +1,87 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class _Tag:
+
+ cdef object event(self, grpc_event c_event):
+ raise NotImplementedError()
+
+
+cdef class _ConnectivityTag(_Tag):
+
+ def __cinit__(self, user_tag):
+ self._user_tag = user_tag
+
+ cdef ConnectivityEvent event(self, grpc_event c_event):
+ return ConnectivityEvent(c_event.type, c_event.success, self._user_tag)
+
+
+cdef class _RequestCallTag(_Tag):
+
+ def __cinit__(self, user_tag):
+ self._user_tag = user_tag
+ self.call = None
+ self.call_details = None
+
+ cdef void prepare(self):
+ self.call = Call()
+ self.call_details = CallDetails()
+ grpc_metadata_array_init(&self.c_invocation_metadata)
+
+ cdef RequestCallEvent event(self, grpc_event c_event):
+ cdef tuple invocation_metadata = _metadata(&self.c_invocation_metadata)
+ grpc_metadata_array_destroy(&self.c_invocation_metadata)
+ return RequestCallEvent(
+ c_event.type, c_event.success, self._user_tag, self.call,
+ self.call_details, invocation_metadata)
+
+
+cdef class _BatchOperationTag:
+
+ def __cinit__(self, user_tag, operations, call):
+ self._user_tag = user_tag
+ self._operations = operations
+ self._retained_call = call
+
+ cdef void prepare(self):
+ self.c_nops = 0 if self._operations is None else len(self._operations)
+ if 0 < self.c_nops:
+ self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops)
+ for index, operation in enumerate(self._operations):
+ (<Operation>operation).c()
+ self.c_ops[index] = (<Operation>operation).c_op
+
+ cdef BatchOperationEvent event(self, grpc_event c_event):
+ if 0 < self.c_nops:
+ for index, operation in enumerate(self._operations):
+ (<Operation>operation).c_op = self.c_ops[index]
+ (<Operation>operation).un_c()
+ gpr_free(self.c_ops)
+ return BatchOperationEvent(
+ c_event.type, c_event.success, self._user_tag, self._operations)
+ else:
+ return BatchOperationEvent(
+ c_event.type, c_event.success, self._user_tag, ())
+
+
+cdef class _ServerShutdownTag(_Tag):
+
+ def __cinit__(self, user_tag, shutting_down_server):
+ self._user_tag = user_tag
+ self._shutting_down_server = shutting_down_server
+
+ cdef ServerShutdownEvent event(self, grpc_event c_event):
+ self._shutting_down_server.notify_shutdown_complete()
+ return ServerShutdownEvent(c_event.type, c_event.success, self._user_tag) \ No newline at end of file
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd
index 6fc5638d5d..b32fa518fc 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pxd
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd
@@ -18,7 +18,10 @@ include "_cygrpc/call.pxd.pxi"
include "_cygrpc/channel.pxd.pxi"
include "_cygrpc/credentials.pxd.pxi"
include "_cygrpc/completion_queue.pxd.pxi"
+include "_cygrpc/event.pxd.pxi"
include "_cygrpc/metadata.pxd.pxi"
+include "_cygrpc/operation.pxd.pxi"
include "_cygrpc/records.pxd.pxi"
include "_cygrpc/security.pxd.pxi"
include "_cygrpc/server.pxd.pxi"
+include "_cygrpc/tag.pxd.pxi"
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx
index d605229822..5106394708 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pyx
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx
@@ -25,10 +25,13 @@ include "_cygrpc/call.pyx.pxi"
include "_cygrpc/channel.pyx.pxi"
include "_cygrpc/credentials.pyx.pxi"
include "_cygrpc/completion_queue.pyx.pxi"
+include "_cygrpc/event.pyx.pxi"
include "_cygrpc/metadata.pyx.pxi"
+include "_cygrpc/operation.pyx.pxi"
include "_cygrpc/records.pyx.pxi"
include "_cygrpc/security.pyx.pxi"
include "_cygrpc/server.pyx.pxi"
+include "_cygrpc/tag.pyx.pxi"
#
# initialize gRPC
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index 02d3af8706..22244b9cec 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -50,7 +50,7 @@ _UNEXPECTED_EXIT_SERVER_GRACE = 1.0
def _serialized_request(request_event):
- return request_event.batch_operations[0].received_message.bytes()
+ return request_event.batch_operations[0].message()
def _application_code(code):
@@ -130,13 +130,13 @@ def _abort(state, call, code, details):
effective_code = _abortion_code(state, code)
effective_details = details if state.details is None else state.details
if state.initial_metadata_allowed:
- operations = (cygrpc.operation_send_initial_metadata(
- (), _EMPTY_FLAGS), cygrpc.operation_send_status_from_server(
+ operations = (cygrpc.SendInitialMetadataOperation(
+ None, _EMPTY_FLAGS), cygrpc.SendStatusFromServerOperation(
state.trailing_metadata, effective_code, effective_details,
_EMPTY_FLAGS),)
token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN
else:
- operations = (cygrpc.operation_send_status_from_server(
+ operations = (cygrpc.SendStatusFromServerOperation(
state.trailing_metadata, effective_code, effective_details,
_EMPTY_FLAGS),)
token = _SEND_STATUS_FROM_SERVER_TOKEN
@@ -150,8 +150,7 @@ def _receive_close_on_server(state):
def receive_close_on_server(receive_close_on_server_event):
with state.condition:
- if receive_close_on_server_event.batch_operations[
- 0].received_cancelled:
+ if receive_close_on_server_event.batch_operations[0].cancelled():
state.client = _CANCELLED
elif state.client is _OPEN:
state.client = _CLOSED
@@ -218,11 +217,10 @@ class _Context(grpc.ServicerContext):
def time_remaining(self):
return max(
- float(self._rpc_event.request_call_details.deadline) - time.time(),
- 0)
+ float(self._rpc_event.call_details.deadline) - time.time(), 0)
def cancel(self):
- self._rpc_event.operation_call.cancel()
+ self._rpc_event.call.cancel()
def add_callback(self, callback):
with self._state.condition:
@@ -237,23 +235,23 @@ class _Context(grpc.ServicerContext):
self._state.disable_next_compression = True
def invocation_metadata(self):
- return self._rpc_event.request_metadata
+ return self._rpc_event.invocation_metadata
def peer(self):
- return _common.decode(self._rpc_event.operation_call.peer())
+ return _common.decode(self._rpc_event.call.peer())
def peer_identities(self):
- return cygrpc.peer_identities(self._rpc_event.operation_call)
+ return cygrpc.peer_identities(self._rpc_event.call)
def peer_identity_key(self):
- id_key = cygrpc.peer_identity_key(self._rpc_event.operation_call)
+ id_key = cygrpc.peer_identity_key(self._rpc_event.call)
return id_key if id_key is None else _common.decode(id_key)
def auth_context(self):
return {
_common.decode(key): value
for key, value in six.iteritems(
- cygrpc.auth_context(self._rpc_event.operation_call))
+ cygrpc.auth_context(self._rpc_event.call))
}
def send_initial_metadata(self, initial_metadata):
@@ -262,9 +260,9 @@ class _Context(grpc.ServicerContext):
_raise_rpc_error(self._state)
else:
if self._state.initial_metadata_allowed:
- operation = cygrpc.operation_send_initial_metadata(
+ operation = cygrpc.SendInitialMetadataOperation(
initial_metadata, _EMPTY_FLAGS)
- self._rpc_event.operation_call.start_server_batch(
+ self._rpc_event.call.start_server_batch(
(operation,), _send_initial_metadata(self._state))
self._state.initial_metadata_allowed = False
self._state.due.add(_SEND_INITIAL_METADATA_TOKEN)
@@ -305,7 +303,7 @@ class _RequestIterator(object):
raise StopIteration()
else:
self._call.start_server_batch(
- (cygrpc.operation_receive_message(_EMPTY_FLAGS),),
+ (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
_receive_message(self._state, self._call,
self._request_deserializer))
self._state.due.add(_RECEIVE_MESSAGE_TOKEN)
@@ -347,9 +345,9 @@ def _unary_request(rpc_event, state, request_deserializer):
if state.client is _CANCELLED or state.statused:
return None
else:
- rpc_event.operation_call.start_server_batch(
- (cygrpc.operation_receive_message(_EMPTY_FLAGS),),
- _receive_message(state, rpc_event.operation_call,
+ rpc_event.call.start_server_batch(
+ (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
+ _receive_message(state, rpc_event.call,
request_deserializer))
state.due.add(_RECEIVE_MESSAGE_TOKEN)
while True:
@@ -357,8 +355,8 @@ def _unary_request(rpc_event, state, request_deserializer):
if state.request is None:
if state.client is _CLOSED:
details = '"{}" requires exactly one request message.'.format(
- rpc_event.request_call_details.method)
- _abort(state, rpc_event.operation_call,
+ rpc_event.call_details.method)
+ _abort(state, rpc_event.call,
cygrpc.StatusCode.unimplemented,
_common.encode(details))
return None
@@ -379,13 +377,13 @@ def _call_behavior(rpc_event, state, behavior, argument, request_deserializer):
except Exception as exception: # pylint: disable=broad-except
with state.condition:
if exception is state.abortion:
- _abort(state, rpc_event.operation_call,
- cygrpc.StatusCode.unknown, b'RPC Aborted')
+ _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
+ b'RPC Aborted')
elif exception not in state.rpc_errors:
details = 'Exception calling application: {}'.format(exception)
logging.exception(details)
- _abort(state, rpc_event.operation_call,
- cygrpc.StatusCode.unknown, _common.encode(details))
+ _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
+ _common.encode(details))
return None, False
@@ -397,13 +395,13 @@ def _take_response_from_response_iterator(rpc_event, state, response_iterator):
except Exception as exception: # pylint: disable=broad-except
with state.condition:
if exception is state.abortion:
- _abort(state, rpc_event.operation_call,
- cygrpc.StatusCode.unknown, b'RPC Aborted')
+ _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
+ b'RPC Aborted')
elif exception not in state.rpc_errors:
details = 'Exception iterating responses: {}'.format(exception)
logging.exception(details)
- _abort(state, rpc_event.operation_call,
- cygrpc.StatusCode.unknown, _common.encode(details))
+ _abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
+ _common.encode(details))
return None, False
@@ -411,7 +409,7 @@ def _serialize_response(rpc_event, state, response, response_serializer):
serialized_response = _common.serialize(response, response_serializer)
if serialized_response is None:
with state.condition:
- _abort(state, rpc_event.operation_call, cygrpc.StatusCode.internal,
+ _abort(state, rpc_event.call, cygrpc.StatusCode.internal,
b'Failed to serialize response!')
return None
else:
@@ -424,17 +422,18 @@ def _send_response(rpc_event, state, serialized_response):
return False
else:
if state.initial_metadata_allowed:
- operations = (cygrpc.operation_send_initial_metadata(
- (), _EMPTY_FLAGS), cygrpc.operation_send_message(
- serialized_response, _EMPTY_FLAGS),)
+ operations = (cygrpc.SendInitialMetadataOperation(None,
+ _EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(serialized_response,
+ _EMPTY_FLAGS),)
state.initial_metadata_allowed = False
token = _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN
else:
- operations = (cygrpc.operation_send_message(serialized_response,
- _EMPTY_FLAGS),)
+ operations = (cygrpc.SendMessageOperation(serialized_response,
+ _EMPTY_FLAGS),)
token = _SEND_MESSAGE_TOKEN
- rpc_event.operation_call.start_server_batch(
- operations, _send_message(state, token))
+ rpc_event.call.start_server_batch(operations,
+ _send_message(state, token))
state.due.add(token)
while True:
state.condition.wait()
@@ -448,17 +447,17 @@ def _status(rpc_event, state, serialized_response):
code = _completion_code(state)
details = _details(state)
operations = [
- cygrpc.operation_send_status_from_server(
+ cygrpc.SendStatusFromServerOperation(
state.trailing_metadata, code, details, _EMPTY_FLAGS),
]
if state.initial_metadata_allowed:
operations.append(
- cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS))
+ cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS))
if serialized_response is not None:
operations.append(
- cygrpc.operation_send_message(serialized_response,
- _EMPTY_FLAGS))
- rpc_event.operation_call.start_server_batch(
+ cygrpc.SendMessageOperation(serialized_response,
+ _EMPTY_FLAGS))
+ rpc_event.call.start_server_batch(
operations,
_send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN))
state.statused = True
@@ -525,7 +524,7 @@ def _handle_unary_stream(rpc_event, state, method_handler, thread_pool):
def _handle_stream_unary(rpc_event, state, method_handler, thread_pool):
- request_iterator = _RequestIterator(state, rpc_event.operation_call,
+ request_iterator = _RequestIterator(state, rpc_event.call,
method_handler.request_deserializer)
return thread_pool.submit(
_unary_response_in_pool, rpc_event, state, method_handler.stream_unary,
@@ -534,7 +533,7 @@ def _handle_stream_unary(rpc_event, state, method_handler, thread_pool):
def _handle_stream_stream(rpc_event, state, method_handler, thread_pool):
- request_iterator = _RequestIterator(state, rpc_event.operation_call,
+ request_iterator = _RequestIterator(state, rpc_event.call,
method_handler.request_deserializer)
return thread_pool.submit(
_stream_response_in_pool, rpc_event, state,
@@ -552,8 +551,8 @@ def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline):
return None
handler_call_details = _HandlerCallDetails(
- _common.decode(rpc_event.request_call_details.method),
- rpc_event.request_metadata)
+ _common.decode(rpc_event.call_details.method),
+ rpc_event.invocation_metadata)
if interceptor_pipeline is not None:
return interceptor_pipeline.execute(query_handlers,
@@ -563,21 +562,21 @@ def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline):
def _reject_rpc(rpc_event, status, details):
- operations = (cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS),
- cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
- cygrpc.operation_send_status_from_server((), status, details,
- _EMPTY_FLAGS),)
+ operations = (cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS),
+ cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
+ cygrpc.SendStatusFromServerOperation(None, status, details,
+ _EMPTY_FLAGS),)
rpc_state = _RPCState()
- rpc_event.operation_call.start_server_batch(
- operations, lambda ignored_event: (rpc_state, (),))
+ rpc_event.call.start_server_batch(operations,
+ lambda ignored_event: (rpc_state, (),))
return rpc_state
def _handle_with_method_handler(rpc_event, method_handler, thread_pool):
state = _RPCState()
with state.condition:
- rpc_event.operation_call.start_server_batch(
- (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),),
+ rpc_event.call.start_server_batch(
+ (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),
_receive_close_on_server(state))
state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN)
if method_handler.request_streaming:
@@ -600,7 +599,7 @@ def _handle_call(rpc_event, generic_handlers, interceptor_pipeline, thread_pool,
concurrency_exceeded):
if not rpc_event.success:
return None, None
- if rpc_event.request_call_details.method is not None:
+ if rpc_event.call_details.method is not None:
try:
method_handler = _find_method_handler(rpc_event, generic_handlers,
interceptor_pipeline)
diff --git a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py
index ac31e72409..3cbbb8de33 100644
--- a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py
+++ b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py
@@ -16,12 +16,11 @@
import unittest
import grpc
-from grpc.framework.foundation import logging_pool
from grpc_health.v1 import health
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc
-from tests.unit.framework.common import test_constants
+from tests.unit import test_common
class HealthServicerTest(unittest.TestCase):
@@ -35,8 +34,7 @@ class HealthServicerTest(unittest.TestCase):
health_pb2.HealthCheckResponse.UNKNOWN)
servicer.set('grpc.test.TestServiceNotServing',
health_pb2.HealthCheckResponse.NOT_SERVING)
- server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
- self._server = grpc.server(server_pool)
+ self._server = test_common.test_server()
port = self._server.add_insecure_port('[::]:0')
health_pb2_grpc.add_HealthServicer_to_server(servicer, self._server)
self._server.start()
diff --git a/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py b/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py
index 4136739f05..8d464b2d4b 100644
--- a/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py
+++ b/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py
@@ -13,7 +13,6 @@
# limitations under the License.
"""Insecure client-server interoperability as a unit test."""
-from concurrent import futures
import unittest
import grpc
@@ -22,13 +21,14 @@ from src.proto.grpc.testing import test_pb2_grpc
from tests.interop import _intraop_test_case
from tests.interop import methods
from tests.interop import server
+from tests.unit import test_common
class InsecureIntraopTest(_intraop_test_case.IntraopTestCase,
unittest.TestCase):
def setUp(self):
- self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ self.server = test_common.test_server()
test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(),
self.server)
port = self.server.add_insecure_port('[::]:0')
diff --git a/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py
index 6514d77371..6ce8b3715b 100644
--- a/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py
+++ b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py
@@ -13,7 +13,6 @@
# limitations under the License.
"""Secure client-server interoperability as a unit test."""
-from concurrent import futures
import unittest
import grpc
@@ -22,6 +21,7 @@ from src.proto.grpc.testing import test_pb2_grpc
from tests.interop import _intraop_test_case
from tests.interop import methods
from tests.interop import resources
+from tests.unit import test_common
_SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
@@ -29,7 +29,7 @@ _SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
class SecureIntraopTest(_intraop_test_case.IntraopTestCase, unittest.TestCase):
def setUp(self):
- self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ self.server = test_common.test_server()
test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(),
self.server)
port = self.server.add_secure_port(
diff --git a/src/python/grpcio_tests/tests/interop/server.py b/src/python/grpcio_tests/tests/interop/server.py
index eeb41a21d2..dd4f5146e9 100644
--- a/src/python/grpcio_tests/tests/interop/server.py
+++ b/src/python/grpcio_tests/tests/interop/server.py
@@ -23,6 +23,7 @@ from src.proto.grpc.testing import test_pb2_grpc
from tests.interop import methods
from tests.interop import resources
+from tests.unit import test_common
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
@@ -38,7 +39,7 @@ def serve():
help='require a secure connection')
args = parser.parse_args()
- server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ server = test_common.test_server()
test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(),
server)
if args.use_tls:
diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py
index 5b84001aab..8fc539e641 100644
--- a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py
+++ b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py
@@ -13,7 +13,6 @@
# limitations under the License.
import collections
-from concurrent import futures
import contextlib
import distutils.spawn
import errno
@@ -28,6 +27,7 @@ import unittest
from six import moves
import grpc
+from tests.unit import test_common
from tests.unit.framework.common import test_constants
import tests.protoc_plugin.protos.payload.test_payload_pb2 as payload_pb2
@@ -155,8 +155,7 @@ def _CreateService():
def HalfDuplexCall(self, request_iter, context):
return servicer_methods.HalfDuplexCall(request_iter, context)
- server = grpc.server(
- futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE))
+ server = test_common.test_server()
getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(),
server)
port = server.add_insecure_port('[::]:0')
@@ -177,8 +176,7 @@ def _CreateIncompleteService():
class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)):
pass
- server = grpc.server(
- futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE))
+ server = test_common.test_server()
getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(),
server)
port = server.add_insecure_port('[::]:0')
diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py
index 7868cdbfb3..c732e55108 100644
--- a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py
+++ b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py
@@ -13,7 +13,6 @@
# limitations under the License.
import abc
-from concurrent import futures
import contextlib
import importlib
import os
@@ -29,7 +28,7 @@ import six
import grpc
from grpc_tools import protoc
-from tests.unit.framework.common import test_constants
+from tests.unit import test_common
_MESSAGES_IMPORT = b'import "messages.proto";'
_SPLIT_NAMESPACE = b'package grpc_protoc_plugin.invocation_testing.split;'
@@ -256,9 +255,7 @@ class _Test(six.with_metaclass(abc.ABCMeta, unittest.TestCase)):
self._protoc()
for services_module in self._services_modules():
- server = grpc.server(
- futures.ThreadPoolExecutor(
- max_workers=test_constants.POOL_SIZE))
+ server = test_common.test_server()
services_module.add_TestServiceServicer_to_server(
_Servicer(self._messages_pb2.Response), server)
port = server.add_insecure_port('[::]:0')
diff --git a/src/python/grpcio_tests/tests/qps/qps_worker.py b/src/python/grpcio_tests/tests/qps/qps_worker.py
index 3e46c0b8c2..54f69db109 100644
--- a/src/python/grpcio_tests/tests/qps/qps_worker.py
+++ b/src/python/grpcio_tests/tests/qps/qps_worker.py
@@ -16,15 +16,15 @@
import argparse
import time
-from concurrent import futures
import grpc
from src.proto.grpc.testing import services_pb2_grpc
from tests.qps import worker_server
+from tests.unit import test_common
def run_worker_server(port):
- server = grpc.server(futures.ThreadPoolExecutor(max_workers=5))
+ server = test_common.test_server()
servicer = worker_server.WorkerServer()
services_pb2_grpc.add_WorkerServiceServicer_to_server(servicer, server)
server.add_insecure_port('[::]:{}'.format(port))
diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py
index adb10cbcec..fef4fb0459 100644
--- a/src/python/grpcio_tests/tests/qps/worker_server.py
+++ b/src/python/grpcio_tests/tests/qps/worker_server.py
@@ -28,6 +28,7 @@ from tests.qps import benchmark_server
from tests.qps import client_runner
from tests.qps import histogram
from tests.unit import resources
+from tests.unit import test_common
class WorkerServer(services_pb2_grpc.WorkerServiceServicer):
@@ -68,8 +69,7 @@ class WorkerServer(services_pb2_grpc.WorkerServiceServicer):
server_threads = multiprocessing.cpu_count() * 5
else:
server_threads = config.async_server_threads
- server = grpc.server(
- futures.ThreadPoolExecutor(max_workers=server_threads))
+ server = test_common.test_server(max_workers=server_threads)
if config.server_type == control_pb2.ASYNC_SERVER:
servicer = benchmark_server.BenchmarkServer()
services_pb2_grpc.add_BenchmarkServiceServicer_to_server(servicer,
diff --git a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py
index a86743fa5a..86037e258a 100644
--- a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py
+++ b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py
@@ -16,7 +16,6 @@
import unittest
import grpc
-from grpc.framework.foundation import logging_pool
from grpc_reflection.v1alpha import reflection
from grpc_reflection.v1alpha import reflection_pb2
from grpc_reflection.v1alpha import reflection_pb2_grpc
@@ -27,7 +26,7 @@ from google.protobuf import descriptor_pb2
from src.proto.grpc.testing import empty_pb2
from src.proto.grpc.testing.proto2 import empty2_extensions_pb2
-from tests.unit.framework.common import test_constants
+from tests.unit import test_common
_EMPTY_PROTO_FILE_NAME = 'src/proto/grpc/testing/empty.proto'
_EMPTY_PROTO_SYMBOL_NAME = 'grpc.testing.Empty'
@@ -46,8 +45,7 @@ def _file_descriptor_to_proto(descriptor):
class ReflectionServicerTest(unittest.TestCase):
def setUp(self):
- server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
- self._server = grpc.server(server_pool)
+ self._server = test_common.test_server()
reflection.enable_server_reflection(_SERVICE_NAMES, self._server)
port = self._server.add_insecure_port('[::]:0')
self._server.start()
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index 3bf5308749..e033c1063f 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -34,6 +34,7 @@
"unit._cython._no_messages_server_completion_queue_per_call_test.Test",
"unit._cython._no_messages_single_server_completion_queue_test.Test",
"unit._cython._read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest",
+ "unit._cython._server_test.Test",
"unit._cython.cygrpc_test.InsecureServerInsecureClient",
"unit._cython.cygrpc_test.SecureServerSecureClient",
"unit._cython.cygrpc_test.TypeSmokeTest",
diff --git a/src/python/grpcio_tests/tests/unit/_auth_context_test.py b/src/python/grpcio_tests/tests/unit/_auth_context_test.py
index c6a0a23549..ebc04a71e0 100644
--- a/src/python/grpcio_tests/tests/unit/_auth_context_test.py
+++ b/src/python/grpcio_tests/tests/unit/_auth_context_test.py
@@ -18,11 +18,9 @@ import unittest
import grpc
from grpc import _channel
-from grpc.framework.foundation import logging_pool
import six
from tests.unit import test_common
-from tests.unit.framework.common import test_constants
from tests.unit import resources
_REQUEST = b'\x00\x00\x00'
@@ -55,12 +53,12 @@ def handle_unary_unary(request, servicer_context):
class AuthContextTest(unittest.TestCase):
def testInsecure(self):
- server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
handler = grpc.method_handlers_generic_handler('test', {
'UnaryUnary':
grpc.unary_unary_rpc_method_handler(handle_unary_unary)
})
- server = grpc.server(server_pool, (handler,))
+ server = test_common.test_server()
+ server.add_generic_rpc_handlers((handler,))
port = server.add_insecure_port('[::]:0')
server.start()
@@ -74,12 +72,12 @@ class AuthContextTest(unittest.TestCase):
self.assertDictEqual({}, auth_data[_AUTH_CTX])
def testSecureNoCert(self):
- server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
handler = grpc.method_handlers_generic_handler('test', {
'UnaryUnary':
grpc.unary_unary_rpc_method_handler(handle_unary_unary)
})
- server = grpc.server(server_pool, (handler,))
+ server = test_common.test_server()
+ server.add_generic_rpc_handlers((handler,))
server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
port = server.add_secure_port('[::]:0', server_cred)
server.start()
@@ -101,12 +99,12 @@ class AuthContextTest(unittest.TestCase):
}, auth_data[_AUTH_CTX])
def testSecureClientCert(self):
- server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
handler = grpc.method_handlers_generic_handler('test', {
'UnaryUnary':
grpc.unary_unary_rpc_method_handler(handle_unary_unary)
})
- server = grpc.server(server_pool, (handler,))
+ server = test_common.test_server()
+ server.add_generic_rpc_handlers((handler,))
server_cred = grpc.ssl_server_credentials(
_SERVER_CERTS,
root_certificates=_TEST_ROOT_CERTIFICATES,
diff --git a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py
index f8c61270ca..f9eb0011dc 100644
--- a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py
+++ b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py
@@ -83,7 +83,7 @@ class ChannelConnectivityTest(unittest.TestCase):
def test_immediately_connectable_channel_connectivity(self):
thread_pool = _thread_pool.RecordingThreadPool(max_workers=None)
- server = grpc.server(thread_pool)
+ server = grpc.server(thread_pool, options=(('grpc.so_reuseport', 0),))
port = server.add_insecure_port('[::]:0')
server.start()
first_callback = _Callback()
@@ -125,7 +125,7 @@ class ChannelConnectivityTest(unittest.TestCase):
def test_reachable_then_unreachable_channel_connectivity(self):
thread_pool = _thread_pool.RecordingThreadPool(max_workers=None)
- server = grpc.server(thread_pool)
+ server = grpc.server(thread_pool, options=(('grpc.so_reuseport', 0),))
port = server.add_insecure_port('[::]:0')
server.start()
callback = _Callback()
diff --git a/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py b/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py
index bdd2d86169..30b486079c 100644
--- a/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py
+++ b/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py
@@ -61,7 +61,7 @@ class ChannelReadyFutureTest(unittest.TestCase):
def test_immediately_connectable_channel_connectivity(self):
thread_pool = _thread_pool.RecordingThreadPool(max_workers=None)
- server = grpc.server(thread_pool)
+ server = grpc.server(thread_pool, options=(('grpc.so_reuseport', 0),))
port = server.add_insecure_port('[::]:0')
server.start()
channel = grpc.insecure_channel('localhost:{}'.format(port))
diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py
index e576a5aca9..93e599d8f8 100644
--- a/src/python/grpcio_tests/tests/unit/_compression_test.py
+++ b/src/python/grpcio_tests/tests/unit/_compression_test.py
@@ -17,7 +17,6 @@ import unittest
import grpc
from grpc import _grpcio_metadata
-from grpc.framework.foundation import logging_pool
from tests.unit import test_common
from tests.unit.framework.common import test_constants
@@ -72,9 +71,8 @@ class _GenericHandler(grpc.GenericRpcHandler):
class CompressionTest(unittest.TestCase):
def setUp(self):
- self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
- self._server = grpc.server(
- self._server_pool, handlers=(_GenericHandler(),))
+ self._server = test_common.test_server()
+ self._server.add_generic_rpc_handlers((_GenericHandler(),))
self._port = self._server.add_insecure_port('[::]:0')
self._server.start()
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
index a8a7175cc7..cdb3572453 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
@@ -53,7 +53,7 @@ class _Handler(object):
self._state = state
self._lock = threading.Lock()
self._completion_queue = completion_queue
- self._call = rpc_event.operation_call
+ self._call = rpc_event.call
def __call__(self):
with self._state.condition:
@@ -65,10 +65,10 @@ class _Handler(object):
with self._lock:
self._call.start_server_batch(
- (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),),
+ (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),
_RECEIVE_CLOSE_ON_SERVER_TAG)
self._call.start_server_batch(
- (cygrpc.operation_receive_message(_EMPTY_FLAGS),),
+ (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
_RECEIVE_MESSAGE_TAG)
first_event = self._completion_queue.poll()
if _is_cancellation_event(first_event):
@@ -76,10 +76,10 @@ class _Handler(object):
else:
with self._lock:
operations = (
- cygrpc.operation_send_initial_metadata(_EMPTY_METADATA,
- _EMPTY_FLAGS),
- cygrpc.operation_send_message(b'\x79\x57', _EMPTY_FLAGS),
- cygrpc.operation_send_status_from_server(
+ cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
+ _EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(b'\x79\x57', _EMPTY_FLAGS),
+ cygrpc.SendStatusFromServerOperation(
_EMPTY_METADATA, cygrpc.StatusCode.ok, b'test details!',
_EMPTY_FLAGS),)
self._call.start_server_batch(operations,
@@ -141,7 +141,8 @@ class CancelManyCallsTest(unittest.TestCase):
test_constants.THREAD_CONCURRENCY)
server_completion_queue = cygrpc.CompletionQueue()
- server = cygrpc.Server(cygrpc.ChannelArgs([]))
+ server = cygrpc.Server(
+ cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)]))
server.register_completion_queue(server_completion_queue)
port = server.add_http2_port(b'[::]:0')
server.start()
@@ -169,13 +170,13 @@ class CancelManyCallsTest(unittest.TestCase):
None, _EMPTY_FLAGS, client_completion_queue, b'/twinkies',
None, _INFINITE_FUTURE)
operations = (
- cygrpc.operation_send_initial_metadata(_EMPTY_METADATA,
- _EMPTY_FLAGS),
- cygrpc.operation_send_message(b'\x45\x56', _EMPTY_FLAGS),
- cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
- cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
- cygrpc.operation_receive_message(_EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
+ cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
+ _EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(b'\x45\x56', _EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
tag = 'client_complete_call_{0:04d}_tag'.format(index)
client_call.start_client_batch(operations, tag)
client_due.add(tag)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_common.py b/src/python/grpcio_tests/tests/unit/_cython/_common.py
index 96f0f1589b..c5acd36bf2 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_common.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_common.py
@@ -88,7 +88,8 @@ class RpcTest(object):
def setUp(self):
self.server_completion_queue = cygrpc.CompletionQueue()
- self.server = cygrpc.Server(cygrpc.ChannelArgs([]))
+ self.server = cygrpc.Server(
+ cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)]))
self.server.register_completion_queue(self.server_completion_queue)
port = self.server.add_http2_port(b'[::]:0')
self.server.start()
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
index d08003af44..583136cf23 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
@@ -49,18 +49,19 @@ class Test(_common.RpcTest, unittest.TestCase):
with self.client_condition:
client_receive_initial_metadata_start_batch_result = (
client_call.start_client_batch([
- cygrpc.operation_receive_initial_metadata(
- _common.EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
], client_receive_initial_metadata_tag))
+ self.assertEqual(cygrpc.CallError.ok,
+ client_receive_initial_metadata_start_batch_result)
client_complete_rpc_start_batch_result = client_call.start_client_batch(
[
- cygrpc.operation_send_initial_metadata(
+ cygrpc.SendInitialMetadataOperation(
_common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
- cygrpc.operation_send_close_from_client(
- _common.EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(
- _common.EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS),
], client_complete_rpc_tag)
+ self.assertEqual(cygrpc.CallError.ok,
+ client_complete_rpc_start_batch_result)
self.client_driver.add_due({
client_receive_initial_metadata_tag,
client_complete_rpc_tag,
@@ -71,8 +72,8 @@ class Test(_common.RpcTest, unittest.TestCase):
with server_call_condition:
server_send_initial_metadata_start_batch_result = (
- server_request_call_event.operation_call.start_server_batch([
- cygrpc.operation_send_initial_metadata(
+ server_request_call_event.call.start_server_batch([
+ cygrpc.SendInitialMetadataOperation(
_common.INITIAL_METADATA, _common.EMPTY_FLAGS),
], server_send_initial_metadata_tag))
server_call_driver.add_due({
@@ -83,10 +84,9 @@ class Test(_common.RpcTest, unittest.TestCase):
with server_call_condition:
server_complete_rpc_start_batch_result = (
- server_request_call_event.operation_call.start_server_batch([
- cygrpc.operation_receive_close_on_server(
- _common.EMPTY_FLAGS),
- cygrpc.operation_send_status_from_server(
+ server_request_call_event.call.start_server_batch([
+ cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS),
+ cygrpc.SendStatusFromServerOperation(
_common.TRAILING_METADATA, cygrpc.StatusCode.ok,
b'test details', _common.EMPTY_FLAGS),
], server_complete_rpc_tag))
@@ -101,23 +101,24 @@ class Test(_common.RpcTest, unittest.TestCase):
client_complete_rpc_event = self.client_driver.event_with_tag(
client_complete_rpc_tag)
- return (_common.OperationResult(server_request_call_start_batch_result,
- server_request_call_event.type,
- server_request_call_event.success),
+ return (_common.OperationResult(
+ server_request_call_start_batch_result,
+ server_request_call_event.completion_type,
+ server_request_call_event.success), _common.OperationResult(
+ client_receive_initial_metadata_start_batch_result,
+ client_receive_initial_metadata_event.completion_type,
+ client_receive_initial_metadata_event.success),
_common.OperationResult(
- client_receive_initial_metadata_start_batch_result,
- client_receive_initial_metadata_event.type,
- client_receive_initial_metadata_event.success),
- _common.OperationResult(client_complete_rpc_start_batch_result,
- client_complete_rpc_event.type,
- client_complete_rpc_event.success),
+ client_complete_rpc_start_batch_result,
+ client_complete_rpc_event.completion_type,
+ client_complete_rpc_event.success), _common.OperationResult(
+ server_send_initial_metadata_start_batch_result,
+ server_send_initial_metadata_event.completion_type,
+ server_send_initial_metadata_event.success),
_common.OperationResult(
- server_send_initial_metadata_start_batch_result,
- server_send_initial_metadata_event.type,
- server_send_initial_metadata_event.success),
- _common.OperationResult(server_complete_rpc_start_batch_result,
- server_complete_rpc_event.type,
- server_complete_rpc_event.success),)
+ server_complete_rpc_start_batch_result,
+ server_complete_rpc_event.completion_type,
+ server_complete_rpc_event.success),)
def test_rpcs(self):
expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) *
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
index d0166a2b29..c5cf606c90 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
@@ -44,17 +44,14 @@ class Test(_common.RpcTest, unittest.TestCase):
with self.client_condition:
client_receive_initial_metadata_start_batch_result = (
client_call.start_client_batch([
- cygrpc.operation_receive_initial_metadata(
- _common.EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
], client_receive_initial_metadata_tag))
client_complete_rpc_start_batch_result = client_call.start_client_batch(
[
- cygrpc.operation_send_initial_metadata(
+ cygrpc.SendInitialMetadataOperation(
_common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
- cygrpc.operation_send_close_from_client(
- _common.EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(
- _common.EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS),
], client_complete_rpc_tag)
self.client_driver.add_due({
client_receive_initial_metadata_tag,
@@ -66,8 +63,8 @@ class Test(_common.RpcTest, unittest.TestCase):
with self.server_condition:
server_send_initial_metadata_start_batch_result = (
- server_request_call_event.operation_call.start_server_batch([
- cygrpc.operation_send_initial_metadata(
+ server_request_call_event.call.start_server_batch([
+ cygrpc.SendInitialMetadataOperation(
_common.INITIAL_METADATA, _common.EMPTY_FLAGS),
], server_send_initial_metadata_tag))
self.server_driver.add_due({
@@ -78,12 +75,11 @@ class Test(_common.RpcTest, unittest.TestCase):
with self.server_condition:
server_complete_rpc_start_batch_result = (
- server_request_call_event.operation_call.start_server_batch([
- cygrpc.operation_receive_close_on_server(
- _common.EMPTY_FLAGS),
- cygrpc.operation_send_status_from_server(
+ server_request_call_event.call.start_server_batch([
+ cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS),
+ cygrpc.SendStatusFromServerOperation(
_common.TRAILING_METADATA, cygrpc.StatusCode.ok,
- b'test details', _common.EMPTY_FLAGS),
+ 'test details', _common.EMPTY_FLAGS),
], server_complete_rpc_tag))
self.server_driver.add_due({
server_complete_rpc_tag,
@@ -96,23 +92,24 @@ class Test(_common.RpcTest, unittest.TestCase):
client_complete_rpc_event = self.client_driver.event_with_tag(
client_complete_rpc_tag)
- return (_common.OperationResult(server_request_call_start_batch_result,
- server_request_call_event.type,
- server_request_call_event.success),
+ return (_common.OperationResult(
+ server_request_call_start_batch_result,
+ server_request_call_event.completion_type,
+ server_request_call_event.success), _common.OperationResult(
+ client_receive_initial_metadata_start_batch_result,
+ client_receive_initial_metadata_event.completion_type,
+ client_receive_initial_metadata_event.success),
_common.OperationResult(
- client_receive_initial_metadata_start_batch_result,
- client_receive_initial_metadata_event.type,
- client_receive_initial_metadata_event.success),
- _common.OperationResult(client_complete_rpc_start_batch_result,
- client_complete_rpc_event.type,
- client_complete_rpc_event.success),
+ client_complete_rpc_start_batch_result,
+ client_complete_rpc_event.completion_type,
+ client_complete_rpc_event.success), _common.OperationResult(
+ server_send_initial_metadata_start_batch_result,
+ server_send_initial_metadata_event.completion_type,
+ server_send_initial_metadata_event.success),
_common.OperationResult(
- server_send_initial_metadata_start_batch_result,
- server_send_initial_metadata_event.type,
- server_send_initial_metadata_event.success),
- _common.OperationResult(server_complete_rpc_start_batch_result,
- server_complete_rpc_event.type,
- server_complete_rpc_event.success),)
+ server_complete_rpc_start_batch_result,
+ server_complete_rpc_event.completion_type,
+ server_complete_rpc_event.success),)
def test_rpcs(self):
expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) *
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
index 1deb15ba03..a5ec54ee59 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
@@ -112,7 +112,8 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
def testReadSomeButNotAllResponses(self):
server_completion_queue = cygrpc.CompletionQueue()
- server = cygrpc.Server(cygrpc.ChannelArgs([]))
+ server = cygrpc.Server(
+ cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)]))
server.register_completion_queue(server_completion_queue)
port = server.add_http2_port(b'[::]:0')
server.start()
@@ -158,15 +159,15 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
with client_condition:
client_receive_initial_metadata_start_batch_result = (
client_call.start_client_batch([
- cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
], client_receive_initial_metadata_tag))
client_due.add(client_receive_initial_metadata_tag)
client_complete_rpc_start_batch_result = (
client_call.start_client_batch([
- cygrpc.operation_send_initial_metadata(_EMPTY_METADATA,
- _EMPTY_FLAGS),
- cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),
+ cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
+ _EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
], client_complete_rpc_tag))
client_due.add(client_complete_rpc_tag)
@@ -174,13 +175,13 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
with server_call_condition:
server_send_initial_metadata_start_batch_result = (
- server_rpc_event.operation_call.start_server_batch([
- cygrpc.operation_send_initial_metadata(_EMPTY_METADATA,
- _EMPTY_FLAGS),
+ server_rpc_event.call.start_server_batch([
+ cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
+ _EMPTY_FLAGS),
], server_send_initial_metadata_tag))
server_send_first_message_start_batch_result = (
- server_rpc_event.operation_call.start_server_batch([
- cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS),
+ server_rpc_event.call.start_server_batch([
+ cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS),
], server_send_first_message_tag))
server_send_initial_metadata_event = server_call_driver.event_with_tag(
server_send_initial_metadata_tag)
@@ -188,13 +189,13 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
server_send_first_message_tag)
with server_call_condition:
server_send_second_message_start_batch_result = (
- server_rpc_event.operation_call.start_server_batch([
- cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS),
+ server_rpc_event.call.start_server_batch([
+ cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS),
], server_send_second_message_tag))
server_complete_rpc_start_batch_result = (
- server_rpc_event.operation_call.start_server_batch([
- cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
- cygrpc.operation_send_status_from_server(
+ server_rpc_event.call.start_server_batch([
+ cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
+ cygrpc.SendStatusFromServerOperation(
(), cygrpc.StatusCode.ok, b'test details',
_EMPTY_FLAGS),
], server_complete_rpc_tag))
@@ -208,7 +209,7 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
client_receive_first_message_tag = 'client_receive_first_message_tag'
client_receive_first_message_start_batch_result = (
client_call.start_client_batch([
- cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
], client_receive_first_message_tag))
client_due.add(client_receive_first_message_tag)
client_receive_first_message_event = client_driver.event_with_tag(
@@ -231,9 +232,8 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
self.assertEqual(cygrpc.CallError.ok, client_call_cancel_result)
self.assertIs(server_rpc_tag, server_rpc_event.tag)
self.assertEqual(cygrpc.CompletionType.operation_complete,
- server_rpc_event.type)
- self.assertIsInstance(server_rpc_event.operation_call, cygrpc.Call)
- self.assertEqual(0, len(server_rpc_event.batch_operations))
+ server_rpc_event.completion_type)
+ self.assertIsInstance(server_rpc_event.call, cygrpc.Call)
if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_server_test.py b/src/python/grpcio_tests/tests/unit/_cython/_server_test.py
new file mode 100644
index 0000000000..12bf40be6b
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_cython/_server_test.py
@@ -0,0 +1,49 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Test servers at the level of the Cython API."""
+
+import threading
+import time
+import unittest
+
+from grpc._cython import cygrpc
+
+
+class Test(unittest.TestCase):
+
+ def test_lonely_server(self):
+ server_call_completion_queue = cygrpc.CompletionQueue()
+ server_shutdown_completion_queue = cygrpc.CompletionQueue()
+ server = cygrpc.Server(cygrpc.ChannelArgs([]))
+ server.register_completion_queue(server_call_completion_queue)
+ server.register_completion_queue(server_shutdown_completion_queue)
+ port = server.add_http2_port(b'[::]:0')
+ server.start()
+
+ server_request_call_tag = 'server_request_call_tag'
+ server_request_call_start_batch_result = server.request_call(
+ server_call_completion_queue, server_call_completion_queue,
+ server_request_call_tag)
+
+ time.sleep(4)
+
+ server_shutdown_tag = 'server_shutdown_tag'
+ server_shutdown_result = server.shutdown(
+ server_shutdown_completion_queue, server_shutdown_tag)
+ server_request_call_event = server_call_completion_queue.poll()
+ server_shutdown_event = server_shutdown_completion_queue.poll()
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
index 4eda685486..5453735f11 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
@@ -35,11 +35,6 @@ def _metadata_plugin(context, callback):
class TypeSmokeTest(unittest.TestCase):
- def testOperationFlags(self):
- operation = cygrpc.operation_send_message(b'asdf',
- cygrpc.WriteFlag.no_compress)
- self.assertEqual(cygrpc.WriteFlag.no_compress, operation.flags)
-
def testTimespec(self):
now = time.time()
now_timespec_a = cygrpc.Timespec(now)
@@ -60,7 +55,8 @@ class TypeSmokeTest(unittest.TestCase):
del completion_queue
def testServerUpDown(self):
- server = cygrpc.Server(cygrpc.ChannelArgs([]))
+ server = cygrpc.Server(
+ cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)]))
del server
def testChannelUpDown(self):
@@ -72,7 +68,8 @@ class TypeSmokeTest(unittest.TestCase):
b'test plugin name!')
def testServerStartNoExplicitShutdown(self):
- server = cygrpc.Server(cygrpc.ChannelArgs([]))
+ server = cygrpc.Server(
+ cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)]))
completion_queue = cygrpc.CompletionQueue()
server.register_completion_queue(completion_queue)
port = server.add_http2_port(b'[::]:0')
@@ -82,14 +79,16 @@ class TypeSmokeTest(unittest.TestCase):
def testServerStartShutdown(self):
completion_queue = cygrpc.CompletionQueue()
- server = cygrpc.Server(cygrpc.ChannelArgs([]))
+ server = cygrpc.Server(
+ cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)]))
server.add_http2_port(b'[::]:0')
server.register_completion_queue(completion_queue)
server.start()
shutdown_tag = object()
server.shutdown(completion_queue, shutdown_tag)
event = completion_queue.poll()
- self.assertEqual(cygrpc.CompletionType.operation_complete, event.type)
+ self.assertEqual(cygrpc.CompletionType.operation_complete,
+ event.completion_type)
self.assertIs(shutdown_tag, event.tag)
del server
del completion_queue
@@ -99,7 +98,8 @@ class ServerClientMixin(object):
def setUpMixin(self, server_credentials, client_credentials, host_override):
self.server_completion_queue = cygrpc.CompletionQueue()
- self.server = cygrpc.Server(cygrpc.ChannelArgs([]))
+ self.server = cygrpc.Server(
+ cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)]))
self.server.register_completion_queue(self.server_completion_queue)
if server_credentials:
self.port = self.server.add_http2_port(b'[::]:0',
@@ -148,7 +148,7 @@ class ServerClientMixin(object):
self.assertEqual(cygrpc.CallError.ok, call_result)
event = queue.poll(deadline)
self.assertEqual(cygrpc.CompletionType.operation_complete,
- event.type)
+ event.completion_type)
self.assertTrue(event.success)
self.assertIs(tag, event.tag)
except Exception as error:
@@ -170,7 +170,7 @@ class ServerClientMixin(object):
SERVER_TRAILING_METADATA_KEY = 'california_is_in_a_drought'
SERVER_TRAILING_METADATA_VALUE = 'zomg it is'
SERVER_STATUS_CODE = cygrpc.StatusCode.ok
- SERVER_STATUS_DETAILS = b'our work is never over'
+ SERVER_STATUS_DETAILS = 'our work is never over'
REQUEST = b'in death a member of project mayhem has a name'
RESPONSE = b'his name is robert paulson'
METHOD = b'twinkies'
@@ -192,13 +192,13 @@ class ServerClientMixin(object):
(CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE,),
(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE,),)
client_start_batch_result = client_call.start_client_batch([
- cygrpc.operation_send_initial_metadata(client_initial_metadata,
- _EMPTY_FLAGS),
- cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS),
- cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
- cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
- cygrpc.operation_receive_message(_EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS)
+ cygrpc.SendInitialMetadataOperation(client_initial_metadata,
+ _EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
], client_call_tag)
self.assertEqual(cygrpc.CallError.ok, client_start_batch_result)
client_event_future = test_utilities.CompletionQueuePollFuture(
@@ -206,33 +206,31 @@ class ServerClientMixin(object):
request_event = self.server_completion_queue.poll(cygrpc_deadline)
self.assertEqual(cygrpc.CompletionType.operation_complete,
- request_event.type)
- self.assertIsInstance(request_event.operation_call, cygrpc.Call)
+ request_event.completion_type)
+ self.assertIsInstance(request_event.call, cygrpc.Call)
self.assertIs(server_request_tag, request_event.tag)
- self.assertEqual(0, len(request_event.batch_operations))
self.assertTrue(
test_common.metadata_transmitted(client_initial_metadata,
- request_event.request_metadata))
- self.assertEqual(METHOD, request_event.request_call_details.method)
- self.assertEqual(self.expected_host,
- request_event.request_call_details.host)
+ request_event.invocation_metadata))
+ self.assertEqual(METHOD, request_event.call_details.method)
+ self.assertEqual(self.expected_host, request_event.call_details.host)
self.assertLess(
- abs(DEADLINE - float(request_event.request_call_details.deadline)),
+ abs(DEADLINE - float(request_event.call_details.deadline)),
DEADLINE_TOLERANCE)
server_call_tag = object()
- server_call = request_event.operation_call
+ server_call = request_event.call
server_initial_metadata = (
(SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE,),)
server_trailing_metadata = (
(SERVER_TRAILING_METADATA_KEY, SERVER_TRAILING_METADATA_VALUE,),)
server_start_batch_result = server_call.start_server_batch([
- cygrpc.operation_send_initial_metadata(
+ cygrpc.SendInitialMetadataOperation(
server_initial_metadata,
- _EMPTY_FLAGS), cygrpc.operation_receive_message(_EMPTY_FLAGS),
- cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS),
- cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
- cygrpc.operation_send_status_from_server(
+ _EMPTY_FLAGS), cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(RESPONSE, _EMPTY_FLAGS),
+ cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
+ cygrpc.SendStatusFromServerOperation(
server_trailing_metadata, SERVER_STATUS_CODE,
SERVER_STATUS_DETAILS, _EMPTY_FLAGS)
], server_call_tag)
@@ -245,25 +243,24 @@ class ServerClientMixin(object):
found_client_op_types = set()
for client_result in client_event.batch_operations:
# we expect each op type to be unique
- self.assertNotIn(client_result.type, found_client_op_types)
- found_client_op_types.add(client_result.type)
- if client_result.type == cygrpc.OperationType.receive_initial_metadata:
+ self.assertNotIn(client_result.type(), found_client_op_types)
+ found_client_op_types.add(client_result.type())
+ if client_result.type(
+ ) == cygrpc.OperationType.receive_initial_metadata:
self.assertTrue(
test_common.metadata_transmitted(
server_initial_metadata,
- client_result.received_metadata))
- elif client_result.type == cygrpc.OperationType.receive_message:
- self.assertEqual(RESPONSE,
- client_result.received_message.bytes())
- elif client_result.type == cygrpc.OperationType.receive_status_on_client:
+ client_result.initial_metadata()))
+ elif client_result.type() == cygrpc.OperationType.receive_message:
+ self.assertEqual(RESPONSE, client_result.message())
+ elif client_result.type(
+ ) == cygrpc.OperationType.receive_status_on_client:
self.assertTrue(
test_common.metadata_transmitted(
server_trailing_metadata,
- client_result.received_metadata))
- self.assertEqual(SERVER_STATUS_DETAILS,
- client_result.received_status_details)
- self.assertEqual(SERVER_STATUS_CODE,
- client_result.received_status_code)
+ client_result.trailing_metadata()))
+ self.assertEqual(SERVER_STATUS_DETAILS, client_result.details())
+ self.assertEqual(SERVER_STATUS_CODE, client_result.code())
self.assertEqual(
set([
cygrpc.OperationType.send_initial_metadata,
@@ -277,13 +274,13 @@ class ServerClientMixin(object):
self.assertEqual(5, len(server_event.batch_operations))
found_server_op_types = set()
for server_result in server_event.batch_operations:
- self.assertNotIn(client_result.type, found_server_op_types)
- found_server_op_types.add(server_result.type)
- if server_result.type == cygrpc.OperationType.receive_message:
- self.assertEqual(REQUEST,
- server_result.received_message.bytes())
- elif server_result.type == cygrpc.OperationType.receive_close_on_server:
- self.assertFalse(server_result.received_cancelled)
+ self.assertNotIn(client_result.type(), found_server_op_types)
+ found_server_op_types.add(server_result.type())
+ if server_result.type() == cygrpc.OperationType.receive_message:
+ self.assertEqual(REQUEST, server_result.message())
+ elif server_result.type(
+ ) == cygrpc.OperationType.receive_close_on_server:
+ self.assertFalse(server_result.cancelled())
self.assertEqual(
set([
cygrpc.OperationType.send_initial_metadata,
@@ -319,13 +316,12 @@ class ServerClientMixin(object):
cygrpc_deadline, description)
client_event_future = perform_client_operations([
- cygrpc.operation_send_initial_metadata(empty_metadata,
- _EMPTY_FLAGS),
- cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
+ cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
], "Client prologue")
request_event = self.server_completion_queue.poll(cygrpc_deadline)
- server_call = request_event.operation_call
+ server_call = request_event.call
def perform_server_operations(operations, description):
return self._perform_operations(operations, server_call,
@@ -333,8 +329,7 @@ class ServerClientMixin(object):
cygrpc_deadline, description)
server_event_future = perform_server_operations([
- cygrpc.operation_send_initial_metadata(empty_metadata,
- _EMPTY_FLAGS),
+ cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS),
], "Server prologue")
client_event_future.result() # force completion
@@ -343,12 +338,12 @@ class ServerClientMixin(object):
# Messaging
for _ in range(10):
client_event_future = perform_client_operations([
- cygrpc.operation_send_message(b'', _EMPTY_FLAGS),
- cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
], "Client message")
server_event_future = perform_server_operations([
- cygrpc.operation_send_message(b'', _EMPTY_FLAGS),
- cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
], "Server receive")
client_event_future.result() # force completion
@@ -356,13 +351,13 @@ class ServerClientMixin(object):
# Epilogue
client_event_future = perform_client_operations([
- cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS)
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
], "Client epilogue")
server_event_future = perform_server_operations([
- cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
- cygrpc.operation_send_status_from_server(
+ cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
+ cygrpc.SendStatusFromServerOperation(
empty_metadata, cygrpc.StatusCode.ok, b'', _EMPTY_FLAGS)
], "Server epilogue")
diff --git a/src/python/grpcio_tests/tests/unit/_empty_message_test.py b/src/python/grpcio_tests/tests/unit/_empty_message_test.py
index 62077e7677..14695bc13f 100644
--- a/src/python/grpcio_tests/tests/unit/_empty_message_test.py
+++ b/src/python/grpcio_tests/tests/unit/_empty_message_test.py
@@ -15,8 +15,8 @@
import unittest
import grpc
-from grpc.framework.foundation import logging_pool
+from tests.unit import test_common
from tests.unit.framework.common import test_constants
_REQUEST = b''
@@ -87,9 +87,8 @@ class _GenericHandler(grpc.GenericRpcHandler):
class EmptyMessageTest(unittest.TestCase):
def setUp(self):
- self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
- self._server = grpc.server(
- self._server_pool, handlers=(_GenericHandler(),))
+ self._server = test_common.test_server()
+ self._server.add_generic_rpc_handlers((_GenericHandler(),))
port = self._server.add_insecure_port('[::]:0')
self._server.start()
self._channel = grpc.insecure_channel('localhost:%d' % port)
diff --git a/src/python/grpcio_tests/tests/unit/_exit_scenarios.py b/src/python/grpcio_tests/tests/unit/_exit_scenarios.py
index 7c13dab756..0a0239a63d 100644
--- a/src/python/grpcio_tests/tests/unit/_exit_scenarios.py
+++ b/src/python/grpcio_tests/tests/unit/_exit_scenarios.py
@@ -168,11 +168,11 @@ if __name__ == '__main__':
args = parser.parse_args()
if args.scenario == UNSTARTED_SERVER:
- server = grpc.server(DaemonPool())
+ server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
if args.wait_for_interrupt:
time.sleep(WAIT_TIME)
elif args.scenario == RUNNING_SERVER:
- server = grpc.server(DaemonPool())
+ server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
port = server.add_insecure_port('[::]:0')
server.start()
if args.wait_for_interrupt:
@@ -187,7 +187,7 @@ if __name__ == '__main__':
if args.wait_for_interrupt:
time.sleep(WAIT_TIME)
elif args.scenario == POLL_CONNECTIVITY:
- server = grpc.server(DaemonPool())
+ server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
port = server.add_insecure_port('[::]:0')
server.start()
channel = grpc.insecure_channel('localhost:%d' % port)
@@ -201,7 +201,7 @@ if __name__ == '__main__':
else:
handler = GenericHandler()
- server = grpc.server(DaemonPool())
+ server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
port = server.add_insecure_port('[::]:0')
server.add_generic_rpc_handlers((handler,))
server.start()
diff --git a/src/python/grpcio_tests/tests/unit/_interceptor_test.py b/src/python/grpcio_tests/tests/unit/_interceptor_test.py
index cf875ed7da..2aee298df2 100644
--- a/src/python/grpcio_tests/tests/unit/_interceptor_test.py
+++ b/src/python/grpcio_tests/tests/unit/_interceptor_test.py
@@ -22,6 +22,7 @@ from concurrent import futures
import grpc
from grpc.framework.foundation import logging_pool
+from tests.unit import test_common
from tests.unit.framework.common import test_constants
from tests.unit.framework.common import test_control
@@ -304,6 +305,7 @@ class InterceptorTest(unittest.TestCase):
self._server = grpc.server(
self._server_pool,
+ options=(('grpc.so_reuseport', 0),),
interceptors=(_LoggingInterceptor('s1', self._record),
conditional_interceptor,
_LoggingInterceptor('s2', self._record),))
diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
index 2a1a49ce74..b46d176d04 100644
--- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
+++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
@@ -15,11 +15,10 @@
import itertools
import threading
import unittest
-from concurrent import futures
import grpc
-from grpc.framework.foundation import logging_pool
+from tests.unit import test_common
from tests.unit.framework.common import test_constants
from tests.unit.framework.common import test_control
@@ -191,9 +190,8 @@ class InvocationDefectsTest(unittest.TestCase):
def setUp(self):
self._control = test_control.PauseFailControl()
self._handler = _Handler(self._control)
- self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
- self._server = grpc.server(self._server_pool)
+ self._server = test_common.test_server()
port = self._server.add_insecure_port('[::]:0')
self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),))
self._server.start()
diff --git a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py
index cb59cd3769..ec67f99fbc 100644
--- a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py
+++ b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py
@@ -17,7 +17,6 @@ import threading
import unittest
import grpc
-from grpc.framework.foundation import logging_pool
from tests.unit import test_common
from tests.unit.framework.common import test_constants
@@ -186,9 +185,9 @@ class MetadataCodeDetailsTest(unittest.TestCase):
def setUp(self):
self._servicer = _Servicer()
- self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
- self._server = grpc.server(
- self._server_pool, handlers=(_generic_handler(self._servicer),))
+ self._server = test_common.test_server()
+ self._server.add_generic_rpc_handlers(
+ (_generic_handler(self._servicer),))
port = self._server.add_insecure_port('[::]:0')
self._server.start()
diff --git a/src/python/grpcio_tests/tests/unit/_metadata_test.py b/src/python/grpcio_tests/tests/unit/_metadata_test.py
index 0669486892..f2dac7bdc5 100644
--- a/src/python/grpcio_tests/tests/unit/_metadata_test.py
+++ b/src/python/grpcio_tests/tests/unit/_metadata_test.py
@@ -18,7 +18,6 @@ import weakref
import grpc
from grpc import _channel
-from grpc.framework.foundation import logging_pool
from tests.unit import test_common
from tests.unit.framework.common import test_constants
@@ -146,9 +145,9 @@ class _GenericHandler(grpc.GenericRpcHandler):
class MetadataTest(unittest.TestCase):
def setUp(self):
- self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
- self._server = grpc.server(
- self._server_pool, handlers=(_GenericHandler(weakref.proxy(self)),))
+ self._server = test_common.test_server()
+ self._server.add_generic_rpc_handlers(
+ (_GenericHandler(weakref.proxy(self)),))
port = self._server.add_insecure_port('[::]:0')
self._server.start()
self._channel = grpc.insecure_channel(
diff --git a/src/python/grpcio_tests/tests/unit/_reconnect_test.py b/src/python/grpcio_tests/tests/unit/_reconnect_test.py
index 53fd1c2ca4..10aee9fb4f 100644
--- a/src/python/grpcio_tests/tests/unit/_reconnect_test.py
+++ b/src/python/grpcio_tests/tests/unit/_reconnect_test.py
@@ -13,6 +13,7 @@
# limitations under the License.
"""Tests that a channel will reconnect if a connection is dropped"""
+import socket
import unittest
import grpc
@@ -30,6 +31,44 @@ def _handle_unary_unary(unused_request, unused_servicer_context):
return _RESPONSE
+def _get_reuse_socket_option():
+ try:
+ return socket.SO_REUSEPORT
+ except AttributeError:
+ # SO_REUSEPORT is unavailable on Windows, but SO_REUSEADDR
+ # allows forcibly re-binding to a port
+ return socket.SO_REUSEADDR
+
+
+def _pick_and_bind_port(sock_opt):
+ # Reserve a port, when we restart the server we want
+ # to hold onto the port
+ port = 0
+ for address_family in (socket.AF_INET6, socket.AF_INET):
+ try:
+ s = socket.socket(address_family, socket.SOCK_STREAM)
+ except socket.error:
+ continue # this address family is unavailable
+ s.setsockopt(socket.SOL_SOCKET, sock_opt, 1)
+ try:
+ s.bind(('localhost', port))
+ # for socket.SOCK_STREAM sockets, it is necessary to call
+ # listen to get the desired behavior.
+ s.listen(1)
+ port = s.getsockname()[1]
+ except socket.error:
+ # port was not available on the current address family
+ # try again
+ port = 0
+ break
+ finally:
+ s.close()
+ if s:
+ return port if port != 0 else _pick_and_bind_port(sock_opt)
+ else:
+ return None # no address family was available
+
+
class ReconnectTest(unittest.TestCase):
def test_reconnect(self):
@@ -38,8 +77,12 @@ class ReconnectTest(unittest.TestCase):
'UnaryUnary':
grpc.unary_unary_rpc_method_handler(_handle_unary_unary)
})
+ sock_opt = _get_reuse_socket_option()
+ port = _pick_and_bind_port(sock_opt)
+ self.assertIsNotNone(port)
+
server = grpc.server(server_pool, (handler,))
- port = server.add_insecure_port('[::]:0')
+ server.add_insecure_port('[::]:{}'.format(port))
server.start()
channel = grpc.insecure_channel('localhost:%d' % port)
multi_callable = channel.unary_unary(_UNARY_UNARY)
diff --git a/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py b/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py
index e425a0adfe..df4b129018 100644
--- a/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py
+++ b/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py
@@ -139,6 +139,7 @@ class ResourceExhaustedTest(unittest.TestCase):
self._server = grpc.server(
self._server_pool,
handlers=(_GenericHandler(self._trigger),),
+ options=(('grpc.so_reuseport', 0),),
maximum_concurrent_rpcs=test_constants.THREAD_CONCURRENCY)
port = self._server.add_insecure_port('[::]:0')
self._server.start()
diff --git a/src/python/grpcio_tests/tests/unit/_rpc_test.py b/src/python/grpcio_tests/tests/unit/_rpc_test.py
index 74d8541808..1515a87d93 100644
--- a/src/python/grpcio_tests/tests/unit/_rpc_test.py
+++ b/src/python/grpcio_tests/tests/unit/_rpc_test.py
@@ -21,6 +21,7 @@ from concurrent import futures
import grpc
from grpc.framework.foundation import logging_pool
+from tests.unit import test_common
from tests.unit.framework.common import test_constants
from tests.unit.framework.common import test_control
@@ -169,9 +170,8 @@ class RPCTest(unittest.TestCase):
def setUp(self):
self._control = test_control.PauseFailControl()
self._handler = _Handler(self._control)
- self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
- self._server = grpc.server(self._server_pool)
+ self._server = test_common.test_server()
port = self._server.add_insecure_port('[::]:0')
self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),))
self._server.start()
@@ -180,7 +180,6 @@ class RPCTest(unittest.TestCase):
def tearDown(self):
self._server.stop(None)
- self._server_pool.shutdown(wait=True)
def testUnrecognizedMethod(self):
request = b'abc'
diff --git a/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py b/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py
index 005d16ea75..2c513da5d0 100644
--- a/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py
+++ b/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py
@@ -40,6 +40,7 @@ from concurrent import futures
import grpc
from tests.unit import resources
+from tests.unit import test_common
from tests.testing import _application_common
from tests.testing import _server_application
from tests.testing.proto import services_pb2_grpc
@@ -135,7 +136,7 @@ class _ServerSSLCertReloadTest(
raise NotImplementedError()
def setUp(self):
- self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ self.server = test_common.test_server()
services_pb2_grpc.add_FirstServiceServicer_to_server(
_server_application.FirstServiceServicer(), self.server)
switch_cert_on_client_num = 10
@@ -407,7 +408,7 @@ class ServerSSLCertReloadTestCertConfigReuse(_ServerSSLCertReloadTest):
return True
def setUp(self):
- self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+ self.server = test_common.test_server()
services_pb2_grpc.add_FirstServiceServicer_to_server(
_server_application.FirstServiceServicer(), self.server)
self.cert_config_A = grpc.ssl_server_certificate_configuration(
diff --git a/src/python/grpcio_tests/tests/unit/test_common.py b/src/python/grpcio_tests/tests/unit/test_common.py
index ed71cc996b..6334a32b0e 100644
--- a/src/python/grpcio_tests/tests/unit/test_common.py
+++ b/src/python/grpcio_tests/tests/unit/test_common.py
@@ -15,6 +15,7 @@
import collections
+from concurrent import futures
import grpc
import six
@@ -82,3 +83,13 @@ def test_secure_channel(target, channel_credentials, server_host_override):
channel = grpc.secure_channel(target, channel_credentials, (
('grpc.ssl_target_name_override', server_host_override,),))
return channel
+
+
+def test_server(max_workers=10):
+ """Creates an insecure grpc server.
+
+ These servers have SO_REUSEPORT disabled to prevent cross-talk.
+ """
+ return grpc.server(
+ futures.ThreadPoolExecutor(max_workers=max_workers),
+ options=(('grpc.so_reuseport', 0),))