aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio')
-rw-r--r--src/python/grpcio/README.rst52
-rw-r--r--src/python/grpcio/commands.py3
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi41
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi47
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi26
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi78
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi148
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi94
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi52
-rw-r--r--src/python/grpcio/grpc/_cython/cygrpc.pyx11
-rw-r--r--src/python/grpcio/grpc/_cython/imports.generated.h2
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py1
-rw-r--r--src/python/grpcio/precompiled.py13
-rw-r--r--src/python/grpcio/tests/_runner.py84
-rw-r--r--src/python/grpcio/tests/tests.json9
-rw-r--r--src/python/grpcio/tests/unit/framework/interfaces/face/_event_invocation_synchronous_event_service.py381
-rw-r--r--src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py4
17 files changed, 394 insertions, 652 deletions
diff --git a/src/python/grpcio/README.rst b/src/python/grpcio/README.rst
index f3e962c197..33a462b66f 100644
--- a/src/python/grpcio/README.rst
+++ b/src/python/grpcio/README.rst
@@ -6,7 +6,7 @@ Package for gRPC Python.
Installation
------------
-gRPC Python is available for Linux and Mac OS X running Python 2.7.
+gRPC Python is available for Linux, Mac OS X, and Windows running Python 2.7.
From PyPI
~~~~~~~~~
@@ -23,21 +23,26 @@ Else system wide (on Ubuntu)...
$ sudo pip install grpcio
+n.b. On Windows and on Mac OS X one *must* have a recent release of :code:`pip`
+to retrieve the proper wheel from PyPI. Be sure to upgrade to the latest
+version!
+
From Source
~~~~~~~~~~~
Building from source requires that you have the Python headers (usually a
-package named `python-dev`).
+package named :code:`python-dev`).
::
- $ export REPO_ROOT=grpc
+ $ export REPO_ROOT=grpc # REPO_ROOT can be any directory of your choice
$ git clone https://github.com/grpc/grpc.git $REPO_ROOT
$ cd $REPO_ROOT
- $ pip install .
-Note that `$REPO_ROOT` can be assigned to whatever directory name floats your
-fancy.
+ # For the next two commands do `sudo pip install` if you get permission-denied errors
+ $ pip install -rrequirements.txt
+ $ GRPC_PYTHON_BUILD_WITH_CYTHON=1 pip install .
+
Troubleshooting
~~~~~~~~~~~~~~~
@@ -45,10 +50,43 @@ Troubleshooting
Help, I ...
* **... see a** :code:`pkg_resources.VersionConflict` **when I try to install
- grpc!**
+ grpc**
This is likely because :code:`pip` doesn't own the offending dependency,
which in turn is likely because your operating system's package manager owns
it. You'll need to force the installation of the dependency:
:code:`pip install --ignore-installed $OFFENDING_DEPENDENCY`
+
+ For example, if you get an error like the following:
+
+ ::
+
+ Traceback (most recent call last):
+ File "<string>", line 17, in <module>
+ ...
+ File "/usr/lib/python2.7/dist-packages/pkg_resources.py", line 509, in find
+ raise VersionConflict(dist, req)
+ pkg_resources.VersionConflict: (six 1.8.0 (/usr/lib/python2.7/dist-packages), Requirement.parse('six>=1.10'))
+
+ You can fix it by doing:
+
+ ::
+
+ sudo pip install --ignore-installed six
+
+* **... see the following error on some platforms**
+
+ ::
+
+ /tmp/pip-build-U8pSsr/cython/Cython/Plex/Scanners.c:4:20: fatal error: Python.h: No such file or directory
+ #include "Python.h"
+ ^
+ compilation terminated.
+
+ You can fix it by installing `python-dev` package. i.e
+
+ ::
+
+ sudo apt-get install python-dev
+
diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py
index 0e7f02a271..1d43547419 100644
--- a/src/python/grpcio/commands.py
+++ b/src/python/grpcio/commands.py
@@ -119,8 +119,7 @@ class SphinxDocumentation(setuptools.Command):
import sphinx
import sphinx.apidoc
metadata = self.distribution.metadata
- src_dir = os.path.join(
- PYTHON_STEM, self.distribution.package_dir[''], 'grpc')
+ src_dir = os.path.join(PYTHON_STEM, 'grpc')
sys.path.append(src_dir)
sphinx.apidoc.main([
'', '--force', '--full', '-H', metadata.name, '-A', metadata.author,
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
index 80f4da51e8..d1b9c98ffc 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -40,14 +40,17 @@ cdef class Call:
def start_batch(self, operations, tag):
if not self.is_valid:
raise ValueError("invalid call object cannot be used from Python")
+ cdef grpc_call_error result
cdef Operations cy_operations = Operations(operations)
cdef OperationTag operation_tag = OperationTag(tag)
operation_tag.operation_call = self
operation_tag.batch_operations = cy_operations
cpython.Py_INCREF(operation_tag)
- return grpc_call_start_batch(
- self.c_call, cy_operations.c_ops, cy_operations.c_nops,
- <cpython.PyObject *>operation_tag, NULL)
+ with nogil:
+ result = grpc_call_start_batch(
+ self.c_call, cy_operations.c_ops, cy_operations.c_nops,
+ <cpython.PyObject *>operation_tag, NULL)
+ return result
def cancel(
self, grpc_status_code error_code=GRPC_STATUS__DO_NOT_USE,
@@ -57,6 +60,8 @@ cdef class Call:
if (details is None) != (error_code == GRPC_STATUS__DO_NOT_USE):
raise ValueError("if error_code is specified, so must details "
"(and vice-versa)")
+ cdef grpc_call_error result
+ cdef char *c_details = NULL
if error_code != GRPC_STATUS__DO_NOT_USE:
if isinstance(details, bytes):
pass
@@ -65,25 +70,37 @@ cdef class Call:
else:
raise TypeError("expected details to be str or bytes")
self.references.append(details)
- return grpc_call_cancel_with_status(
- self.c_call, error_code, details, NULL)
+ c_details = details
+ with nogil:
+ result = grpc_call_cancel_with_status(
+ self.c_call, error_code, c_details, NULL)
+ return result
else:
- return grpc_call_cancel(self.c_call, NULL)
+ with nogil:
+ result = grpc_call_cancel(self.c_call, NULL)
+ return result
def set_credentials(
self, CallCredentials call_credentials not None):
- return grpc_call_set_credentials(
- self.c_call, call_credentials.c_credentials)
+ cdef grpc_call_error result
+ with nogil:
+ result = grpc_call_set_credentials(
+ self.c_call, call_credentials.c_credentials)
+ return result
def peer(self):
- cdef char *peer = grpc_call_get_peer(self.c_call)
+ cdef char *peer = NULL
+ with nogil:
+ peer = grpc_call_get_peer(self.c_call)
result = <bytes>peer
- gpr_free(peer)
+ with nogil:
+ gpr_free(peer)
return result
def __dealloc__(self):
if self.c_call != NULL:
- grpc_call_destroy(self.c_call)
+ with nogil:
+ grpc_call_destroy(self.c_call)
# The object *should* always be valid from Python. Used for debugging.
@property
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index 1f1833d5ec..d612c90791 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -35,6 +35,7 @@ cdef class Channel:
def __cinit__(self, target, ChannelArgs arguments=None,
ChannelCredentials channel_credentials=None):
cdef grpc_channel_args *c_arguments = NULL
+ cdef char *c_target = NULL
self.c_channel = NULL
self.references = []
if arguments is not None:
@@ -45,12 +46,15 @@ cdef class Channel:
target = target.encode()
else:
raise TypeError("expected target to be str or bytes")
+ c_target = target
if channel_credentials is None:
- self.c_channel = grpc_insecure_channel_create(target, c_arguments,
- NULL)
+ with nogil:
+ self.c_channel = grpc_insecure_channel_create(c_target, c_arguments,
+ NULL)
else:
- self.c_channel = grpc_secure_channel_create(
- channel_credentials.c_credentials, target, c_arguments, NULL)
+ with nogil:
+ self.c_channel = grpc_secure_channel_create(
+ channel_credentials.c_credentials, c_target, c_arguments, NULL)
self.references.append(channel_credentials)
self.references.append(target)
self.references.append(arguments)
@@ -66,6 +70,7 @@ cdef class Channel:
method = method.encode()
else:
raise TypeError("expected method to be str or bytes")
+ cdef char *method_c_string = method
cdef char *host_c_string = NULL
if host is None:
pass
@@ -81,32 +86,40 @@ cdef class Channel:
cdef grpc_call *parent_call = NULL
if parent is not None:
parent_call = parent.c_call
- operation_call.c_call = grpc_channel_create_call(
- self.c_channel, parent_call, flags,
- queue.c_completion_queue, method, host_c_string, deadline.c_time,
- NULL)
+ with nogil:
+ operation_call.c_call = grpc_channel_create_call(
+ self.c_channel, parent_call, flags,
+ queue.c_completion_queue, method_c_string, host_c_string,
+ deadline.c_time, NULL)
return operation_call
def check_connectivity_state(self, bint try_to_connect):
- return grpc_channel_check_connectivity_state(self.c_channel,
- try_to_connect)
+ cdef grpc_connectivity_state result
+ with nogil:
+ result = grpc_channel_check_connectivity_state(self.c_channel,
+ try_to_connect)
+ return result
def watch_connectivity_state(
self, grpc_connectivity_state last_observed_state,
Timespec deadline not None, CompletionQueue queue not None, tag):
cdef OperationTag operation_tag = OperationTag(tag)
- operation_tag.references = [self, queue]
cpython.Py_INCREF(operation_tag)
- grpc_channel_watch_connectivity_state(
- self.c_channel, last_observed_state, deadline.c_time,
- queue.c_completion_queue, <cpython.PyObject *>operation_tag)
+ with nogil:
+ grpc_channel_watch_connectivity_state(
+ self.c_channel, last_observed_state, deadline.c_time,
+ queue.c_completion_queue, <cpython.PyObject *>operation_tag)
def target(self):
- cdef char * target = grpc_channel_get_target(self.c_channel)
+ cdef char *target = NULL
+ with nogil:
+ target = grpc_channel_get_target(self.c_channel)
result = <bytes>target
- gpr_free(target)
+ with nogil:
+ gpr_free(target)
return result
def __dealloc__(self):
if self.c_channel != NULL:
- grpc_channel_destroy(self.c_channel)
+ with nogil:
+ grpc_channel_destroy(self.c_channel)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index c139147114..09e47d4222 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -36,7 +36,8 @@ import time
cdef class CompletionQueue:
def __cinit__(self):
- self.c_completion_queue = grpc_completion_queue_create(NULL)
+ with nogil:
+ self.c_completion_queue = grpc_completion_queue_create(NULL)
self.is_shutting_down = False
self.is_shutdown = False
self.pluck_condition = threading.Condition()
@@ -82,8 +83,9 @@ cdef class CompletionQueue:
def poll(self, Timespec deadline=None):
# We name this 'poll' to avoid problems with CPython's expectations for
# 'special' methods (like next and __next__).
- cdef gpr_timespec c_deadline = gpr_inf_future(
- GPR_CLOCK_REALTIME)
+ cdef gpr_timespec c_deadline
+ with nogil:
+ c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
if deadline is not None:
c_deadline = deadline.c_time
cdef grpc_event event
@@ -123,7 +125,8 @@ cdef class CompletionQueue:
return self._interpret_event(event)
def shutdown(self):
- grpc_completion_queue_shutdown(self.c_completion_queue)
+ with nogil:
+ grpc_completion_queue_shutdown(self.c_completion_queue)
self.is_shutting_down = True
def clear(self):
@@ -133,14 +136,19 @@ cdef class CompletionQueue:
pass
def __dealloc__(self):
- cdef gpr_timespec c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
+ cdef gpr_timespec c_deadline
+ with nogil:
+ c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME)
if self.c_completion_queue != NULL:
# Ensure shutdown
if not self.is_shutting_down:
- grpc_completion_queue_shutdown(self.c_completion_queue)
+ with nogil:
+ grpc_completion_queue_shutdown(self.c_completion_queue)
# Pump the queue
while not self.is_shutdown:
- event = grpc_completion_queue_next(
- self.c_completion_queue, c_deadline, NULL)
+ with nogil:
+ event = grpc_completion_queue_next(
+ self.c_completion_queue, c_deadline, NULL)
self._interpret_event(event)
- grpc_completion_queue_destroy(self.c_completion_queue)
+ with nogil:
+ grpc_completion_queue_destroy(self.c_completion_queue)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
index 3f439c8900..1d7adca23e 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -46,7 +46,8 @@ cdef class ChannelCredentials:
def __dealloc__(self):
if self.c_credentials != NULL:
- grpc_channel_credentials_release(self.c_credentials)
+ with nogil:
+ grpc_channel_credentials_release(self.c_credentials)
cdef class CallCredentials:
@@ -63,7 +64,8 @@ cdef class CallCredentials:
def __dealloc__(self):
if self.c_credentials != NULL:
- grpc_call_credentials_release(self.c_credentials)
+ with nogil:
+ grpc_call_credentials_release(self.c_credentials)
cdef class ServerCredentials:
@@ -74,7 +76,8 @@ cdef class ServerCredentials:
def __dealloc__(self):
if self.c_credentials != NULL:
- grpc_server_credentials_release(self.c_credentials)
+ with nogil:
+ grpc_server_credentials_release(self.c_credentials)
cdef class CredentialsMetadataPlugin:
@@ -139,7 +142,8 @@ cdef void plugin_destroy_c_plugin_state(void *state):
def channel_credentials_google_default():
cdef ChannelCredentials credentials = ChannelCredentials();
- credentials.c_credentials = grpc_google_default_credentials_create()
+ with nogil:
+ credentials.c_credentials = grpc_google_default_credentials_create()
return credentials
def channel_credentials_ssl(pem_root_certificates,
@@ -158,12 +162,14 @@ def channel_credentials_ssl(pem_root_certificates,
c_pem_root_certificates = pem_root_certificates
credentials.references.append(pem_root_certificates)
if ssl_pem_key_cert_pair is not None:
- credentials.c_credentials = grpc_ssl_credentials_create(
- c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL)
+ with nogil:
+ credentials.c_credentials = grpc_ssl_credentials_create(
+ c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL)
credentials.references.append(ssl_pem_key_cert_pair)
else:
- credentials.c_credentials = grpc_ssl_credentials_create(
- c_pem_root_certificates, NULL, NULL)
+ with nogil:
+ credentials.c_credentials = grpc_ssl_credentials_create(
+ c_pem_root_certificates, NULL, NULL)
return credentials
def channel_credentials_composite(
@@ -172,8 +178,9 @@ def channel_credentials_composite(
if not credentials_1.is_valid or not credentials_2.is_valid:
raise ValueError("passed credentials must both be valid")
cdef ChannelCredentials credentials = ChannelCredentials()
- credentials.c_credentials = grpc_composite_channel_credentials_create(
- credentials_1.c_credentials, credentials_2.c_credentials, NULL)
+ with nogil:
+ credentials.c_credentials = grpc_composite_channel_credentials_create(
+ credentials_1.c_credentials, credentials_2.c_credentials, NULL)
credentials.references.append(credentials_1)
credentials.references.append(credentials_2)
return credentials
@@ -184,16 +191,18 @@ def call_credentials_composite(
if not credentials_1.is_valid or not credentials_2.is_valid:
raise ValueError("passed credentials must both be valid")
cdef CallCredentials credentials = CallCredentials()
- credentials.c_credentials = grpc_composite_call_credentials_create(
- credentials_1.c_credentials, credentials_2.c_credentials, NULL)
+ with nogil:
+ credentials.c_credentials = grpc_composite_call_credentials_create(
+ credentials_1.c_credentials, credentials_2.c_credentials, NULL)
credentials.references.append(credentials_1)
credentials.references.append(credentials_2)
return credentials
def call_credentials_google_compute_engine():
cdef CallCredentials credentials = CallCredentials()
- credentials.c_credentials = (
- grpc_google_compute_engine_credentials_create(NULL))
+ with nogil:
+ credentials.c_credentials = (
+ grpc_google_compute_engine_credentials_create(NULL))
return credentials
def call_credentials_service_account_jwt_access(
@@ -205,9 +214,11 @@ def call_credentials_service_account_jwt_access(
else:
raise TypeError("expected json_key to be str or bytes")
cdef CallCredentials credentials = CallCredentials()
- credentials.c_credentials = (
- grpc_service_account_jwt_access_credentials_create(
- json_key, token_lifetime.c_time, NULL))
+ cdef char *json_key_c_string = json_key
+ with nogil:
+ credentials.c_credentials = (
+ grpc_service_account_jwt_access_credentials_create(
+ json_key_c_string, token_lifetime.c_time, NULL))
credentials.references.append(json_key)
return credentials
@@ -219,8 +230,10 @@ def call_credentials_google_refresh_token(json_refresh_token):
else:
raise TypeError("expected json_refresh_token to be str or bytes")
cdef CallCredentials credentials = CallCredentials()
- credentials.c_credentials = grpc_google_refresh_token_credentials_create(
- json_refresh_token, NULL)
+ cdef char *json_refresh_token_c_string = json_refresh_token
+ with nogil:
+ credentials.c_credentials = grpc_google_refresh_token_credentials_create(
+ json_refresh_token_c_string, NULL)
credentials.references.append(json_refresh_token)
return credentials
@@ -238,17 +251,21 @@ def call_credentials_google_iam(authorization_token, authority_selector):
else:
raise TypeError("expected authority_selector to be str or bytes")
cdef CallCredentials credentials = CallCredentials()
- credentials.c_credentials = grpc_google_iam_credentials_create(
- authorization_token, authority_selector, NULL)
+ cdef char *authorization_token_c_string = authorization_token
+ cdef char *authority_selector_c_string = authority_selector
+ with nogil:
+ credentials.c_credentials = grpc_google_iam_credentials_create(
+ authorization_token_c_string, authority_selector_c_string, NULL)
credentials.references.append(authorization_token)
credentials.references.append(authority_selector)
return credentials
def call_credentials_metadata_plugin(CredentialsMetadataPlugin plugin):
cdef CallCredentials credentials = CallCredentials()
- credentials.c_credentials = (
- grpc_metadata_credentials_create_from_plugin(plugin.make_c_plugin(),
- NULL))
+ cdef grpc_metadata_credentials_plugin c_plugin = plugin.make_c_plugin()
+ with nogil:
+ credentials.c_credentials = (
+ grpc_metadata_credentials_create_from_plugin(c_plugin, NULL))
# TODO(atash): the following held reference is *probably* never necessary
credentials.references.append(plugin)
return credentials
@@ -274,11 +291,12 @@ def server_credentials_ssl(pem_root_certs, pem_key_cert_pairs,
credentials.references.append(pem_key_cert_pairs)
credentials.references.append(pem_root_certs)
credentials.c_ssl_pem_key_cert_pairs_count = len(pem_key_cert_pairs)
- credentials.c_ssl_pem_key_cert_pairs = (
- <grpc_ssl_pem_key_cert_pair *>gpr_malloc(
- sizeof(grpc_ssl_pem_key_cert_pair) *
- credentials.c_ssl_pem_key_cert_pairs_count
- ))
+ with nogil:
+ credentials.c_ssl_pem_key_cert_pairs = (
+ <grpc_ssl_pem_key_cert_pair *>gpr_malloc(
+ sizeof(grpc_ssl_pem_key_cert_pair) *
+ credentials.c_ssl_pem_key_cert_pairs_count
+ ))
for i in range(credentials.c_ssl_pem_key_cert_pairs_count):
credentials.c_ssl_pem_key_cert_pairs[i] = (
(<SslPemKeyCertPair>pem_key_cert_pairs[i]).c_pair)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index dbf0045710..61165cb021 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -38,27 +38,27 @@ cdef extern from "grpc/_cython/loader.h":
int pygrpc_load_core(char*)
- void *gpr_malloc(size_t size)
- void gpr_free(void *ptr)
- void *gpr_realloc(void *p, size_t size)
+ void *gpr_malloc(size_t size) nogil
+ void gpr_free(void *ptr) nogil
+ void *gpr_realloc(void *p, size_t size) nogil
ctypedef struct gpr_slice:
# don't worry about writing out the members of gpr_slice; we never access
# them directly.
pass
- gpr_slice gpr_slice_ref(gpr_slice s)
- void gpr_slice_unref(gpr_slice s)
- gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *))
+ gpr_slice gpr_slice_ref(gpr_slice s) nogil
+ void gpr_slice_unref(gpr_slice s) nogil
+ gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil
gpr_slice gpr_slice_new_with_len(
- void *p, size_t len, void (*destroy)(void *, size_t))
- gpr_slice gpr_slice_malloc(size_t length)
- gpr_slice gpr_slice_from_copied_string(const char *source)
- gpr_slice gpr_slice_from_copied_buffer(const char *source, size_t len)
+ void *p, size_t len, void (*destroy)(void *, size_t)) nogil
+ gpr_slice gpr_slice_malloc(size_t length) nogil
+ gpr_slice gpr_slice_from_copied_string(const char *source) nogil
+ gpr_slice gpr_slice_from_copied_buffer(const char *source, size_t len) nogil
# Declare functions for function-like macros (because Cython)...
- void *gpr_slice_start_ptr "GPR_SLICE_START_PTR" (gpr_slice s)
- size_t gpr_slice_length "GPR_SLICE_LENGTH" (gpr_slice s)
+ void *gpr_slice_start_ptr "GPR_SLICE_START_PTR" (gpr_slice s) nogil
+ size_t gpr_slice_length "GPR_SLICE_LENGTH" (gpr_slice s) nogil
ctypedef enum gpr_clock_type:
GPR_CLOCK_MONOTONIC
@@ -71,14 +71,14 @@ cdef extern from "grpc/_cython/loader.h":
int32_t nanoseconds "tv_nsec"
gpr_clock_type clock_type
- gpr_timespec gpr_time_0(gpr_clock_type type)
- gpr_timespec gpr_inf_future(gpr_clock_type type)
- gpr_timespec gpr_inf_past(gpr_clock_type type)
+ gpr_timespec gpr_time_0(gpr_clock_type type) nogil
+ gpr_timespec gpr_inf_future(gpr_clock_type type) nogil
+ gpr_timespec gpr_inf_past(gpr_clock_type type) nogil
- gpr_timespec gpr_now(gpr_clock_type clock)
+ gpr_timespec gpr_now(gpr_clock_type clock) nogil
gpr_timespec gpr_convert_clock_type(gpr_timespec t,
- gpr_clock_type target_clock)
+ gpr_clock_type target_clock) nogil
ctypedef enum grpc_status_code:
GRPC_STATUS_OK
@@ -114,15 +114,15 @@ cdef extern from "grpc/_cython/loader.h":
pass
grpc_byte_buffer *grpc_raw_byte_buffer_create(gpr_slice *slices,
- size_t nslices)
- size_t grpc_byte_buffer_length(grpc_byte_buffer *bb)
- void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer)
+ size_t nslices) nogil
+ size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) nogil
+ void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer) nogil
void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
- grpc_byte_buffer *buffer)
+ grpc_byte_buffer *buffer) nogil
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
- gpr_slice *slice)
- void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader)
+ gpr_slice *slice) nogil
+ void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) nogil
const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING
const char *GRPC_ARG_ENABLE_CENSUS
@@ -221,8 +221,8 @@ cdef extern from "grpc/_cython/loader.h":
size_t capacity
grpc_metadata *metadata
- void grpc_metadata_array_init(grpc_metadata_array *array)
- void grpc_metadata_array_destroy(grpc_metadata_array *array)
+ void grpc_metadata_array_init(grpc_metadata_array *array) nogil
+ void grpc_metadata_array_destroy(grpc_metadata_array *array) nogil
ctypedef struct grpc_call_details:
char *method
@@ -231,8 +231,8 @@ cdef extern from "grpc/_cython/loader.h":
size_t host_capacity
gpr_timespec deadline
- void grpc_call_details_init(grpc_call_details *details)
- void grpc_call_details_destroy(grpc_call_details *details)
+ void grpc_call_details_init(grpc_call_details *details) nogil
+ void grpc_call_details_destroy(grpc_call_details *details) nogil
ctypedef enum grpc_op_type:
GRPC_OP_SEND_INITIAL_METADATA
@@ -277,61 +277,62 @@ cdef extern from "grpc/_cython/loader.h":
uint32_t flags
grpc_op_data data
- void grpc_init()
- void grpc_shutdown()
+ void grpc_init() nogil
+ void grpc_shutdown() nogil
- grpc_completion_queue *grpc_completion_queue_create(void *reserved)
+ grpc_completion_queue *grpc_completion_queue_create(void *reserved) nogil
grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
gpr_timespec deadline,
void *reserved) nogil
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
gpr_timespec deadline,
void *reserved) nogil
- void grpc_completion_queue_shutdown(grpc_completion_queue *cq)
- void grpc_completion_queue_destroy(grpc_completion_queue *cq)
+ void grpc_completion_queue_shutdown(grpc_completion_queue *cq) nogil
+ void grpc_completion_queue_destroy(grpc_completion_queue *cq) nogil
- grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
- size_t nops, void *tag, void *reserved)
- grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved)
+ grpc_call_error grpc_call_start_batch(
+ grpc_call *call, const grpc_op *ops, size_t nops, void *tag,
+ void *reserved) nogil
+ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) nogil
grpc_call_error grpc_call_cancel_with_status(grpc_call *call,
grpc_status_code status,
const char *description,
- void *reserved)
- char *grpc_call_get_peer(grpc_call *call)
- void grpc_call_destroy(grpc_call *call)
+ void *reserved) nogil
+ char *grpc_call_get_peer(grpc_call *call) nogil
+ void grpc_call_destroy(grpc_call *call) nogil
grpc_channel *grpc_insecure_channel_create(const char *target,
const grpc_channel_args *args,
- void *reserved)
- grpc_call *grpc_channel_create_call(grpc_channel *channel,
- grpc_call *parent_call,
- uint32_t propagation_mask,
- grpc_completion_queue *completion_queue,
- const char *method, const char *host,
- gpr_timespec deadline, void *reserved)
+ void *reserved) nogil
+ grpc_call *grpc_channel_create_call(
+ grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
+ grpc_completion_queue *completion_queue, const char *method,
+ const char *host, gpr_timespec deadline, void *reserved) nogil
grpc_connectivity_state grpc_channel_check_connectivity_state(
- grpc_channel *channel, int try_to_connect)
+ grpc_channel *channel, int try_to_connect) nogil
void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state,
- gpr_timespec deadline, grpc_completion_queue *cq, void *tag)
- char *grpc_channel_get_target(grpc_channel *channel)
- void grpc_channel_destroy(grpc_channel *channel)
+ gpr_timespec deadline, grpc_completion_queue *cq, void *tag) nogil
+ char *grpc_channel_get_target(grpc_channel *channel) nogil
+ void grpc_channel_destroy(grpc_channel *channel) nogil
- grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved)
+ grpc_server *grpc_server_create(
+ const grpc_channel_args *args, void *reserved) nogil
grpc_call_error grpc_server_request_call(
grpc_server *server, grpc_call **call, grpc_call_details *details,
grpc_metadata_array *request_metadata, grpc_completion_queue
*cq_bound_to_call, grpc_completion_queue *cq_for_notification, void
- *tag_new)
+ *tag_new) nogil
void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq,
- void *reserved)
- int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr)
- void grpc_server_start(grpc_server *server)
+ void *reserved) nogil
+ int grpc_server_add_insecure_http2_port(
+ grpc_server *server, const char *addr) nogil
+ void grpc_server_start(grpc_server *server) nogil
void grpc_server_shutdown_and_notify(
- grpc_server *server, grpc_completion_queue *cq, void *tag)
- void grpc_server_cancel_all_calls(grpc_server *server)
- void grpc_server_destroy(grpc_server *server)
+ grpc_server *server, grpc_completion_queue *cq, void *tag) nogil
+ void grpc_server_cancel_all_calls(grpc_server *server) nogil
+ void grpc_server_destroy(grpc_server *server) nogil
ctypedef struct grpc_ssl_pem_key_cert_pair:
const char *private_key
@@ -347,35 +348,36 @@ cdef extern from "grpc/_cython/loader.h":
ctypedef void (*grpc_ssl_roots_override_callback)(char **pem_root_certs)
- void grpc_set_ssl_roots_override_callback(grpc_ssl_roots_override_callback cb)
+ void grpc_set_ssl_roots_override_callback(
+ grpc_ssl_roots_override_callback cb) nogil
- grpc_channel_credentials *grpc_google_default_credentials_create()
+ grpc_channel_credentials *grpc_google_default_credentials_create() nogil
grpc_channel_credentials *grpc_ssl_credentials_create(
const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pair,
- void *reserved)
+ void *reserved) nogil
grpc_channel_credentials *grpc_composite_channel_credentials_create(
grpc_channel_credentials *creds1, grpc_call_credentials *creds2,
- void *reserved)
- void grpc_channel_credentials_release(grpc_channel_credentials *creds)
+ void *reserved) nogil
+ void grpc_channel_credentials_release(grpc_channel_credentials *creds) nogil
grpc_call_credentials *grpc_composite_call_credentials_create(
grpc_call_credentials *creds1, grpc_call_credentials *creds2,
- void *reserved)
+ void *reserved) nogil
grpc_call_credentials *grpc_google_compute_engine_credentials_create(
- void *reserved)
+ void *reserved) nogil
grpc_call_credentials *grpc_service_account_jwt_access_credentials_create(
const char *json_key,
- gpr_timespec token_lifetime, void *reserved)
+ gpr_timespec token_lifetime, void *reserved) nogil
grpc_call_credentials *grpc_google_refresh_token_credentials_create(
- const char *json_refresh_token, void *reserved)
+ const char *json_refresh_token, void *reserved) nogil
grpc_call_credentials *grpc_google_iam_credentials_create(
const char *authorization_token, const char *authority_selector,
- void *reserved)
- void grpc_call_credentials_release(grpc_call_credentials *creds)
+ void *reserved) nogil
+ void grpc_call_credentials_release(grpc_call_credentials *creds) nogil
grpc_channel *grpc_secure_channel_create(
grpc_channel_credentials *creds, const char *target,
- const grpc_channel_args *args, void *reserved)
+ const grpc_channel_args *args, void *reserved) nogil
ctypedef struct grpc_server_credentials:
# We don't care about the internals (and in fact don't know them)
@@ -385,13 +387,13 @@ cdef extern from "grpc/_cython/loader.h":
const char *pem_root_certs,
grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs,
size_t num_key_cert_pairs, int force_client_auth, void *reserved)
- void grpc_server_credentials_release(grpc_server_credentials *creds)
+ void grpc_server_credentials_release(grpc_server_credentials *creds) nogil
int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
- grpc_server_credentials *creds)
+ grpc_server_credentials *creds) nogil
grpc_call_error grpc_call_set_credentials(grpc_call *call,
- grpc_call_credentials *creds)
+ grpc_call_credentials *creds) nogil
ctypedef struct grpc_auth_context:
# We don't care about the internals (and in fact don't know them)
@@ -415,4 +417,4 @@ cdef extern from "grpc/_cython/loader.h":
const char *type
grpc_call_credentials *grpc_metadata_credentials_create_from_plugin(
- grpc_metadata_credentials_plugin plugin, void *reserved)
+ grpc_metadata_credentials_plugin plugin, void *reserved) nogil
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index fa4ea99ea9..851389a261 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -107,15 +107,18 @@ cdef class Timespec:
def __cinit__(self, time):
if time is None:
- self.c_time = gpr_now(GPR_CLOCK_REALTIME)
+ with nogil:
+ self.c_time = gpr_now(GPR_CLOCK_REALTIME)
return
if isinstance(time, int):
time = float(time)
if isinstance(time, float):
if time == float("+inf"):
- self.c_time = gpr_inf_future(GPR_CLOCK_REALTIME)
+ with nogil:
+ self.c_time = gpr_inf_future(GPR_CLOCK_REALTIME)
elif time == float("-inf"):
- self.c_time = gpr_inf_past(GPR_CLOCK_REALTIME)
+ with nogil:
+ self.c_time = gpr_inf_past(GPR_CLOCK_REALTIME)
else:
self.c_time.seconds = time
self.c_time.nanoseconds = (time - float(self.c_time.seconds)) * 1e9
@@ -131,8 +134,10 @@ cdef class Timespec:
# TODO(atash) ensure that everywhere a Timespec is created that it's
# converted to GPR_CLOCK_REALTIME then and not every time someone wants to
# read values off in Python.
- cdef gpr_timespec real_time = (
- gpr_convert_clock_type(self.c_time, GPR_CLOCK_REALTIME))
+ cdef gpr_timespec real_time
+ with nogil:
+ real_time = (
+ gpr_convert_clock_type(self.c_time, GPR_CLOCK_REALTIME))
return real_time.seconds
@property
@@ -158,10 +163,12 @@ cdef class Timespec:
cdef class CallDetails:
def __cinit__(self):
- grpc_call_details_init(&self.c_details)
+ with nogil:
+ grpc_call_details_init(&self.c_details)
def __dealloc__(self):
- grpc_call_details_destroy(&self.c_details)
+ with nogil:
+ grpc_call_details_destroy(&self.c_details)
@property
def method(self):
@@ -229,10 +236,15 @@ cdef class ByteBuffer:
"ByteBuffer, not {}".format(type(data)))
cdef char *c_data = data
- data_slice = gpr_slice_from_copied_buffer(c_data, len(data))
- self.c_byte_buffer = grpc_raw_byte_buffer_create(
- &data_slice, 1)
- gpr_slice_unref(data_slice)
+ cdef gpr_slice data_slice
+ cdef size_t data_length = len(data)
+ with nogil:
+ data_slice = gpr_slice_from_copied_buffer(c_data, data_length)
+ with nogil:
+ self.c_byte_buffer = grpc_raw_byte_buffer_create(
+ &data_slice, 1)
+ with nogil:
+ gpr_slice_unref(data_slice)
def bytes(self):
cdef grpc_byte_buffer_reader reader
@@ -240,20 +252,27 @@ cdef class ByteBuffer:
cdef size_t data_slice_length
cdef void *data_slice_pointer
if self.c_byte_buffer != NULL:
- grpc_byte_buffer_reader_init(&reader, self.c_byte_buffer)
+ with nogil:
+ grpc_byte_buffer_reader_init(&reader, self.c_byte_buffer)
result = b""
- while grpc_byte_buffer_reader_next(&reader, &data_slice):
- data_slice_pointer = gpr_slice_start_ptr(data_slice)
- data_slice_length = gpr_slice_length(data_slice)
- result += (<char *>data_slice_pointer)[:data_slice_length]
- grpc_byte_buffer_reader_destroy(&reader)
+ with nogil:
+ while grpc_byte_buffer_reader_next(&reader, &data_slice):
+ data_slice_pointer = gpr_slice_start_ptr(data_slice)
+ data_slice_length = gpr_slice_length(data_slice)
+ with gil:
+ result += (<char *>data_slice_pointer)[:data_slice_length]
+ with nogil:
+ grpc_byte_buffer_reader_destroy(&reader)
return result
else:
return None
def __len__(self):
+ cdef size_t result
if self.c_byte_buffer != NULL:
- return grpc_byte_buffer_length(self.c_byte_buffer)
+ with nogil:
+ result = grpc_byte_buffer_length(self.c_byte_buffer)
+ return result
else:
return 0
@@ -262,7 +281,8 @@ cdef class ByteBuffer:
def __dealloc__(self):
if self.c_byte_buffer != NULL:
- grpc_byte_buffer_destroy(self.c_byte_buffer)
+ with nogil:
+ grpc_byte_buffer_destroy(self.c_byte_buffer)
cdef class SslPemKeyCertPair:
@@ -319,14 +339,15 @@ cdef class ChannelArgs:
if not isinstance(arg, ChannelArg):
raise TypeError("expected list of ChannelArg")
self.c_args.arguments_length = len(self.args)
- self.c_args.arguments = <grpc_arg *>gpr_malloc(
- self.c_args.arguments_length*sizeof(grpc_arg)
- )
+ with nogil:
+ self.c_args.arguments = <grpc_arg *>gpr_malloc(
+ self.c_args.arguments_length*sizeof(grpc_arg))
for i in range(self.c_args.arguments_length):
self.c_args.arguments[i] = (<ChannelArg>self.args[i]).c_arg
def __dealloc__(self):
- gpr_free(self.c_args.arguments)
+ with nogil:
+ gpr_free(self.c_args.arguments)
def __len__(self):
# self.args is never stale; it's only updated from this file
@@ -407,21 +428,24 @@ cdef class Metadata:
for metadatum in metadata:
if not isinstance(metadatum, Metadatum):
raise TypeError("expected list of Metadatum")
- grpc_metadata_array_init(&self.c_metadata_array)
+ with nogil:
+ grpc_metadata_array_init(&self.c_metadata_array)
self.c_metadata_array.count = len(self.metadata)
self.c_metadata_array.capacity = len(self.metadata)
- self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
- self.c_metadata_array.count*sizeof(grpc_metadata)
- )
+ with nogil:
+ self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
+ self.c_metadata_array.count*sizeof(grpc_metadata)
+ )
for i in range(self.c_metadata_array.count):
self.c_metadata_array.metadata[i] = (
(<Metadatum>self.metadata[i]).c_metadata)
def __dealloc__(self):
# this frees the allocated memory for the grpc_metadata_array (although
- # it'd be nice if that were documented somewhere...) TODO(atash): document
- # this in the C core
- grpc_metadata_array_destroy(&self.c_metadata_array)
+ # it'd be nice if that were documented somewhere...)
+ # TODO(atash): document this in the C core
+ with nogil:
+ grpc_metadata_array_destroy(&self.c_metadata_array)
def __len__(self):
return self.c_metadata_array.count
@@ -526,7 +550,8 @@ cdef class Operation:
# Python. The remaining one(s) are primitive fields filled in by GRPC core.
# This means that we need to clean up after receive_status_on_client.
if self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT:
- gpr_free(self._received_status_details)
+ with nogil:
+ gpr_free(self._received_status_details)
def operation_send_initial_metadata(Metadata metadata):
cdef Operation op = Operation()
@@ -648,8 +673,8 @@ cdef class Operations:
if not isinstance(operation, Operation):
raise TypeError("expected operations to be iterable of Operation")
self.c_nops = len(self.operations)
- self.c_ops = <grpc_op *>gpr_malloc(
- sizeof(grpc_op)*self.c_nops)
+ with nogil:
+ self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op)*self.c_nops)
for i in range(self.c_nops):
self.c_ops[i] = (<Operation>(self.operations[i])).c_op
@@ -661,7 +686,8 @@ cdef class Operations:
return self.operations[i]
def __dealloc__(self):
- gpr_free(self.c_ops)
+ with nogil:
+ gpr_free(self.c_ops)
def __iter__(self):
return _OperationsIterator(self)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index fe93da6c12..a098f11da2 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -41,7 +41,8 @@ cdef class Server:
if arguments is not None:
c_arguments = &arguments.c_args
self.references.append(arguments)
- self.c_server = grpc_server_create(c_arguments, NULL)
+ with nogil:
+ self.c_server = grpc_server_create(c_arguments, NULL)
self.is_started = False
self.is_shutting_down = False
self.is_shutdown = False
@@ -53,6 +54,7 @@ cdef class Server:
raise ValueError("server must be started and not shutting down")
if server_queue not in self.registered_completion_queues:
raise ValueError("server_queue must be a registered completion queue")
+ cdef grpc_call_error result
cdef OperationTag operation_tag = OperationTag(tag)
operation_tag.operation_call = Call()
operation_tag.request_call_details = CallDetails()
@@ -61,19 +63,22 @@ cdef class Server:
operation_tag.is_new_request = True
operation_tag.batch_operations = Operations([])
cpython.Py_INCREF(operation_tag)
- return grpc_server_request_call(
- self.c_server, &operation_tag.operation_call.c_call,
- &operation_tag.request_call_details.c_details,
- &operation_tag.request_metadata.c_metadata_array,
- call_queue.c_completion_queue, server_queue.c_completion_queue,
- <cpython.PyObject *>operation_tag)
+ with nogil:
+ result = grpc_server_request_call(
+ self.c_server, &operation_tag.operation_call.c_call,
+ &operation_tag.request_call_details.c_details,
+ &operation_tag.request_metadata.c_metadata_array,
+ call_queue.c_completion_queue, server_queue.c_completion_queue,
+ <cpython.PyObject *>operation_tag)
+ return result
def register_completion_queue(
self, CompletionQueue queue not None):
if self.is_started:
raise ValueError("cannot register completion queues after start")
- grpc_server_register_completion_queue(
- self.c_server, queue.c_completion_queue, NULL)
+ with nogil:
+ grpc_server_register_completion_queue(
+ self.c_server, queue.c_completion_queue, NULL)
self.registered_completion_queues.append(queue)
def start(self):
@@ -82,7 +87,8 @@ cdef class Server:
self.backup_shutdown_queue = CompletionQueue()
self.register_completion_queue(self.backup_shutdown_queue)
self.is_started = True
- grpc_server_start(self.c_server)
+ with nogil:
+ grpc_server_start(self.c_server)
# Ensure the core has gotten a chance to do the start-up work
self.backup_shutdown_queue.pluck(None, Timespec(None))
@@ -95,22 +101,28 @@ cdef class Server:
else:
raise TypeError("expected address to be a str or bytes")
self.references.append(address)
+ cdef int result
+ cdef char *address_c_string = address
if server_credentials is not None:
self.references.append(server_credentials)
- return grpc_server_add_secure_http2_port(
- self.c_server, address, server_credentials.c_credentials)
+ with nogil:
+ result = grpc_server_add_secure_http2_port(
+ self.c_server, address_c_string, server_credentials.c_credentials)
else:
- return grpc_server_add_insecure_http2_port(self.c_server, address)
+ with nogil:
+ result = grpc_server_add_insecure_http2_port(self.c_server,
+ address_c_string)
+ return result
cdef _c_shutdown(self, CompletionQueue queue, tag):
self.is_shutting_down = True
operation_tag = OperationTag(tag)
operation_tag.shutting_down_server = self
- operation_tag.references.extend([self, queue])
cpython.Py_INCREF(operation_tag)
- grpc_server_shutdown_and_notify(
- self.c_server, queue.c_completion_queue,
- <cpython.PyObject *>operation_tag)
+ with nogil:
+ grpc_server_shutdown_and_notify(
+ self.c_server, queue.c_completion_queue,
+ <cpython.PyObject *>operation_tag)
def shutdown(self, CompletionQueue queue not None, tag):
cdef OperationTag operation_tag
@@ -135,7 +147,8 @@ cdef class Server:
elif self.is_shutdown:
return
else:
- grpc_server_cancel_all_calls(self.c_server)
+ with nogil:
+ grpc_server_cancel_all_calls(self.c_server)
def __dealloc__(self):
if self.c_server != NULL:
@@ -154,5 +167,6 @@ cdef class Server:
# much but repeatedly release the GIL and wait
while not self.is_shutdown:
time.sleep(0)
- grpc_server_destroy(self.c_server)
+ with nogil:
+ grpc_server_destroy(self.c_server)
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx
index 30cc7a132b..8a0f171ee7 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pyx
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx
@@ -57,14 +57,17 @@ cdef class _ModuleState:
'grpc._cython', '_windows/grpc_c.64.python')
if not pygrpc_load_core(filename):
raise ImportError('failed to load core gRPC library')
- grpc_init()
+ with nogil:
+ grpc_init()
self.is_loaded = True
- grpc_set_ssl_roots_override_callback(
- <grpc_ssl_roots_override_callback>ssl_roots_override_callback)
+ with nogil:
+ grpc_set_ssl_roots_override_callback(
+ <grpc_ssl_roots_override_callback>ssl_roots_override_callback)
def __dealloc__(self):
if self.is_loaded:
- grpc_shutdown()
+ with nogil:
+ grpc_shutdown()
_module_state = _ModuleState()
diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h
index b70dcccd17..4d18369e1f 100644
--- a/src/python/grpcio/grpc/_cython/imports.generated.h
+++ b/src/python/grpcio/grpc/_cython/imports.generated.h
@@ -166,7 +166,7 @@ extern grpc_compression_algorithm_parse_type grpc_compression_algorithm_parse_im
typedef int(*grpc_compression_algorithm_name_type)(grpc_compression_algorithm algorithm, char **name);
extern grpc_compression_algorithm_name_type grpc_compression_algorithm_name_import;
#define grpc_compression_algorithm_name grpc_compression_algorithm_name_import
-typedef grpc_compression_algorithm(*grpc_compression_algorithm_for_level_type)(grpc_compression_level level);
+typedef grpc_compression_algorithm(*grpc_compression_algorithm_for_level_type)(grpc_compression_level level, uint32_t accepted_encodings);
extern grpc_compression_algorithm_for_level_type grpc_compression_algorithm_for_level_import;
#define grpc_compression_algorithm_for_level grpc_compression_algorithm_for_level_import
typedef void(*grpc_compression_options_init_type)(grpc_compression_options *opts);
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index a543791f5c..31e16e0491 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -34,6 +34,7 @@ CORE_SOURCE_FILES = [
'src/core/profiling/stap_timers.c',
'src/core/support/alloc.c',
'src/core/support/avl.c',
+ 'src/core/support/backoff.c',
'src/core/support/cmdline.c',
'src/core/support/cpu_iphone.c',
'src/core/support/cpu_linux.c',
diff --git a/src/python/grpcio/precompiled.py b/src/python/grpcio/precompiled.py
index ae2a0c835a..d34250b02c 100644
--- a/src/python/grpcio/precompiled.py
+++ b/src/python/grpcio/precompiled.py
@@ -31,6 +31,7 @@ import os
import platform
import shutil
import sys
+import sysconfig
import setuptools
@@ -51,9 +52,15 @@ USE_PRECOMPILED_BINARIES = bool(int(os.environ.get(
def _tagged_ext_name(base):
uname = platform.uname()
- tags = '-'.join((grpc_version.VERSION, uname[0], uname[4]))
- flavor = 'ucs2' if sys.maxunicode == 65535 else 'ucs4'
- return '{base}-{tags}-{flavor}'.format(base=base, tags=tags, flavor=flavor)
+ tags = (
+ grpc_version.VERSION,
+ 'py{}'.format(sysconfig.get_python_version()),
+ uname[0],
+ uname[4],
+ )
+ ucs = 'ucs{}'.format(sysconfig.get_config_var('Py_UNICODE_SIZE'))
+ return '{base}-{tags}-{ucs}'.format(
+ base=base, tags='-'.join(tags), ucs=ucs)
class BuildTaggedExt(setuptools.Command):
diff --git a/src/python/grpcio/tests/_runner.py b/src/python/grpcio/tests/_runner.py
index 38a5432e79..3b5ca03dd9 100644
--- a/src/python/grpcio/tests/_runner.py
+++ b/src/python/grpcio/tests/_runner.py
@@ -35,6 +35,7 @@ import os
import select
import signal
import sys
+import tempfile
import threading
import time
import unittest
@@ -44,60 +45,46 @@ from tests import _loader
from tests import _result
-class CapturePipe(object):
- """A context-manager pipe to redirect output to a byte array.
+class CaptureFile(object):
+ """A context-managed file to redirect output to a byte array.
+
+ Use by invoking `start` (`__enter__`) and at some point invoking `stop`
+ (`__exit__`). At any point after the initial call to `start` call `output` to
+ get the current redirected output. Note that we don't currently use file
+ locking, so calling `output` between calls to `start` and `stop` may muddle
+ the result (you should only be doing this during a Python-handled interrupt as
+ a last ditch effort to provide output to the user).
Attributes:
- _redirect_fd (int): File descriptor of file to redirect writes from.
+ _redirected_fd (int): File descriptor of file to redirect writes from.
_saved_fd (int): A copy of the original value of the redirected file
descriptor.
- _read_thread (threading.Thread or None): Thread upon which reads through the
- pipe are performed. Only non-None when self is started.
- _read_fd (int or None): File descriptor of the read end of the redirect
- pipe. Only non-None when self is started.
- _write_fd (int or None): File descriptor of the write end of the redirect
- pipe. Only non-None when self is started.
- output (bytearray or None): Redirected output from writes to the redirected
- file descriptor. Only valid during and after self has started.
+ _into_file (TemporaryFile or None): File to which writes are redirected.
+ Only non-None when self is started.
"""
def __init__(self, fd):
- self._redirect_fd = fd
- self._saved_fd = os.dup(self._redirect_fd)
- self._read_thread = None
- self._read_fd = None
- self._write_fd = None
- self.output = None
+ self._redirected_fd = fd
+ self._saved_fd = os.dup(self._redirected_fd)
+ self._into_file = None
+
+ def output(self):
+ """Get all output from the redirected-to file if it exists."""
+ if self._into_file:
+ self._into_file.seek(0)
+ return bytes(self._into_file.read())
+ else:
+ return bytes()
def start(self):
"""Start redirection of writes to the file descriptor."""
- self._read_fd, self._write_fd = os.pipe()
- os.dup2(self._write_fd, self._redirect_fd)
- flags = fcntl.fcntl(self._read_fd, fcntl.F_GETFL)
- fcntl.fcntl(self._read_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
- self._read_thread = threading.Thread(target=self._read)
- self._read_thread.start()
+ self._into_file = tempfile.TemporaryFile()
+ os.dup2(self._into_file.fileno(), self._redirected_fd)
def stop(self):
"""Stop redirection of writes to the file descriptor."""
- os.close(self._write_fd)
- os.dup2(self._saved_fd, self._redirect_fd) # auto-close self._redirect_fd
- self._read_thread.join()
- self._read_thread = None
- # we waited for the read thread to finish, so _read_fd has been read and we
- # can close it.
- os.close(self._read_fd)
-
- def _read(self):
- """Read-thread target for self."""
- self.output = bytearray()
- while True:
- select.select([self._read_fd], [], [])
- read_bytes = os.read(self._read_fd, 1024)
- if read_bytes:
- self.output.extend(read_bytes)
- else:
- break
+ # n.b. this dup2 call auto-closes self._redirected_fd
+ os.dup2(self._saved_fd, self._redirected_fd)
def write_bypass(self, value):
"""Bypass the redirection and write directly to the original file.
@@ -144,7 +131,7 @@ class Runner(object):
def run(self, suite):
"""See setuptools' test_runner setup argument for information."""
# only run test cases with id starting with given prefix
- testcase_filter = os.getenv('GPRC_PYTHON_TESTRUNNER_FILTER')
+ testcase_filter = os.getenv('GRPC_PYTHON_TESTRUNNER_FILTER')
filtered_cases = []
for case in _loader.iterate_suite_cases(suite):
if not testcase_filter or case.id().startswith(testcase_filter):
@@ -159,8 +146,8 @@ class Runner(object):
result_out = StringIO.StringIO()
result = _result.TerminalResult(
result_out, id_map=lambda case: case_id_by_case[case])
- stdout_pipe = CapturePipe(sys.stdout.fileno())
- stderr_pipe = CapturePipe(sys.stderr.fileno())
+ stdout_pipe = CaptureFile(sys.stdout.fileno())
+ stderr_pipe = CaptureFile(sys.stderr.fileno())
kill_flag = [False]
def sigint_handler(signal_number, frame):
@@ -171,7 +158,8 @@ class Runner(object):
def fault_handler(signal_number, frame):
stdout_pipe.write_bypass(
'Received fault signal {}\nstdout:\n{}\n\nstderr:{}\n'
- .format(signal_number, stdout_pipe.output, stderr_pipe.output))
+ .format(signal_number, stdout_pipe.output(),
+ stderr_pipe.output()))
os._exit(1)
def check_kill_self():
@@ -180,9 +168,9 @@ class Runner(object):
result.stopTestRun()
stdout_pipe.write_bypass(result_out.getvalue())
stdout_pipe.write_bypass(
- '\ninterrupted stdout:\n{}\n'.format(stdout_pipe.output))
+ '\ninterrupted stdout:\n{}\n'.format(stdout_pipe.output()))
stderr_pipe.write_bypass(
- '\ninterrupted stderr:\n{}\n'.format(stderr_pipe.output))
+ '\ninterrupted stderr:\n{}\n'.format(stderr_pipe.output()))
os._exit(1)
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGSEGV, fault_handler)
@@ -212,7 +200,7 @@ class Runner(object):
# re-raise the exception after forcing the with-block to end
raise
result.set_output(
- augmented_case.case, stdout_pipe.output, stderr_pipe.output)
+ augmented_case.case, stdout_pipe.output(), stderr_pipe.output())
sys.stdout.write(result_out.getvalue())
sys.stdout.flush()
result_out.truncate(0)
diff --git a/src/python/grpcio/tests/tests.json b/src/python/grpcio/tests/tests.json
index 388d040d5c..84870aaa5c 100644
--- a/src/python/grpcio/tests/tests.json
+++ b/src/python/grpcio/tests/tests.json
@@ -12,31 +12,22 @@
"_core_over_links_base_interface_test.SyncEasyTest",
"_core_over_links_base_interface_test.SyncPeasyTest",
"_crust_over_core_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest",
- "_crust_over_core_face_interface_test.DynamicInvokerEventInvocationSynchronousEventServiceTest",
"_crust_over_core_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest",
- "_crust_over_core_face_interface_test.GenericInvokerEventInvocationSynchronousEventServiceTest",
"_crust_over_core_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest",
- "_crust_over_core_face_interface_test.MultiCallableInvokerEventInvocationSynchronousEventServiceTest",
"_crust_over_core_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest",
- "_crust_over_core_over_links_face_interface_test.DynamicInvokerEventInvocationSynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest",
- "_crust_over_core_over_links_face_interface_test.GenericInvokerEventInvocationSynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest",
- "_crust_over_core_over_links_face_interface_test.MultiCallableInvokerEventInvocationSynchronousEventServiceTest",
"_crust_over_core_over_links_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
"_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest",
- "_face_interface_test.DynamicInvokerEventInvocationSynchronousEventServiceTest",
"_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest",
"_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest",
- "_face_interface_test.GenericInvokerEventInvocationSynchronousEventServiceTest",
"_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest",
"_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest",
- "_face_interface_test.MultiCallableInvokerEventInvocationSynchronousEventServiceTest",
"_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
"_implementations_test.ChannelCredentialsTest",
"_insecure_interop_test.InsecureInteropTest",
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_event_invocation_synchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_event_invocation_synchronous_event_service.py
deleted file mode 100644
index 34db6c3e55..0000000000
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_event_invocation_synchronous_event_service.py
+++ /dev/null
@@ -1,381 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Test code for the Face layer of RPC Framework."""
-
-import abc
-import unittest
-
-# test_interfaces is referenced from specification in this module.
-from grpc.framework.interfaces.face import face
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.common import test_control
-from tests.unit.framework.common import test_coverage
-from tests.unit.framework.interfaces.face import _3069_test_constant
-from tests.unit.framework.interfaces.face import _digest
-from tests.unit.framework.interfaces.face import _receiver
-from tests.unit.framework.interfaces.face import _stock_service
-from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
-
-
-class TestCase(test_coverage.Coverage, unittest.TestCase):
- """A test of the Face layer of RPC Framework.
-
- Concrete subclasses must have an "implementation" attribute of type
- test_interfaces.Implementation and an "invoker_constructor" attribute of type
- _invocation.InvokerConstructor.
- """
- __metaclass__ = abc.ABCMeta
-
- NAME = 'EventInvocationSynchronousEventServiceTest'
-
- def setUp(self):
- """See unittest.TestCase.setUp for full specification.
-
- Overriding implementations must call this implementation.
- """
- self._control = test_control.PauseFailControl()
- self._digest = _digest.digest(
- _stock_service.STOCK_TEST_SERVICE, self._control, None)
-
- generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate(
- self._digest.methods, self._digest.event_method_implementations, None)
- self._invoker = self.invoker_constructor.construct_invoker(
- generic_stub, dynamic_stubs, self._digest.methods)
-
- def tearDown(self):
- """See unittest.TestCase.tearDown for full specification.
-
- Overriding implementations must call this implementation.
- """
- self._invoker = None
- self.implementation.destantiate(self._memo)
-
- def testSuccessfulUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.unary_unary_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- receiver = _receiver.Receiver()
-
- self._invoker.event(group, method)(
- request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- receiver.block_until_terminated()
- response = receiver.unary_response()
-
- test_messages.verify(request, response, self)
-
- def testSuccessfulUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.unary_stream_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- receiver = _receiver.Receiver()
-
- self._invoker.event(group, method)(
- request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- receiver.block_until_terminated()
- responses = receiver.stream_responses()
-
- test_messages.verify(request, responses, self)
-
- def testSuccessfulStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.stream_unary_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- receiver = _receiver.Receiver()
-
- call_consumer = self._invoker.event(group, method)(
- receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- for request in requests:
- call_consumer.consume(request)
- call_consumer.terminate()
- receiver.block_until_terminated()
- response = receiver.unary_response()
-
- test_messages.verify(requests, response, self)
-
- def testSuccessfulStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.stream_stream_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- receiver = _receiver.Receiver()
-
- call_consumer = self._invoker.event(group, method)(
- receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- for request in requests:
- call_consumer.consume(request)
- call_consumer.terminate()
- receiver.block_until_terminated()
- responses = receiver.stream_responses()
-
- test_messages.verify(requests, responses, self)
-
- def testSequentialInvocations(self):
- # pylint: disable=cell-var-from-loop
- for (group, method), test_messages_sequence in (
- self._digest.unary_unary_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- second_request = test_messages.request()
- second_receiver = _receiver.Receiver()
-
- def make_second_invocation():
- self._invoker.event(group, method)(
- second_request, second_receiver, second_receiver.abort,
- test_constants.LONG_TIMEOUT)
-
- class FirstReceiver(_receiver.Receiver):
-
- def complete(self, terminal_metadata, code, details):
- super(FirstReceiver, self).complete(
- terminal_metadata, code, details)
- make_second_invocation()
-
- first_receiver = FirstReceiver()
-
- self._invoker.event(group, method)(
- first_request, first_receiver, first_receiver.abort,
- test_constants.LONG_TIMEOUT)
- second_receiver.block_until_terminated()
-
- first_response = first_receiver.unary_response()
- second_response = second_receiver.unary_response()
- test_messages.verify(first_request, first_response, self)
- test_messages.verify(second_request, second_response, self)
-
- def testParallelInvocations(self):
- for (group, method), test_messages_sequence in (
- self._digest.unary_unary_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- first_receiver = _receiver.Receiver()
- second_request = test_messages.request()
- second_receiver = _receiver.Receiver()
-
- self._invoker.event(group, method)(
- first_request, first_receiver, first_receiver.abort,
- test_constants.LONG_TIMEOUT)
- self._invoker.event(group, method)(
- second_request, second_receiver, second_receiver.abort,
- test_constants.LONG_TIMEOUT)
- first_receiver.block_until_terminated()
- second_receiver.block_until_terminated()
-
- first_response = first_receiver.unary_response()
- second_response = second_receiver.unary_response()
- test_messages.verify(first_request, first_response, self)
- test_messages.verify(second_request, second_response, self)
-
- @unittest.skip('TODO(nathaniel): implement.')
- def testWaitingForSomeButNotAllParallelInvocations(self):
- raise NotImplementedError()
-
- def testCancelledUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.unary_unary_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- receiver = _receiver.Receiver()
-
- with self._control.pause():
- call = self._invoker.event(group, method)(
- request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- call.cancel()
- receiver.block_until_terminated()
-
- self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind)
-
- def testCancelledUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.unary_stream_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- receiver = _receiver.Receiver()
-
- call = self._invoker.event(group, method)(
- request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- call.cancel()
- receiver.block_until_terminated()
-
- self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind)
-
- def testCancelledStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.stream_unary_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- receiver = _receiver.Receiver()
-
- call_consumer = self._invoker.event(group, method)(
- receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- for request in requests:
- call_consumer.consume(request)
- call_consumer.cancel()
- receiver.block_until_terminated()
-
- self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind)
-
- def testCancelledStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.stream_stream_messages_sequences.iteritems()):
- for unused_test_messages in test_messages_sequence:
- receiver = _receiver.Receiver()
-
- call_consumer = self._invoker.event(group, method)(
- receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- call_consumer.cancel()
- receiver.block_until_terminated()
-
- self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind)
-
- def testExpiredUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.unary_unary_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- receiver = _receiver.Receiver()
-
- with self._control.pause():
- self._invoker.event(group, method)(
- request, receiver, receiver.abort,
- _3069_test_constant.REALLY_SHORT_TIMEOUT)
- receiver.block_until_terminated()
-
- self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
-
- def testExpiredUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.unary_stream_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- receiver = _receiver.Receiver()
-
- with self._control.pause():
- self._invoker.event(group, method)(
- request, receiver, receiver.abort,
- _3069_test_constant.REALLY_SHORT_TIMEOUT)
- receiver.block_until_terminated()
-
- self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
-
- def testExpiredStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.stream_unary_messages_sequences.iteritems()):
- for unused_test_messages in test_messages_sequence:
- receiver = _receiver.Receiver()
-
- self._invoker.event(group, method)(
- receiver, receiver.abort, _3069_test_constant.REALLY_SHORT_TIMEOUT)
- receiver.block_until_terminated()
-
- self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
-
- def testExpiredStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.stream_stream_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- receiver = _receiver.Receiver()
-
- call_consumer = self._invoker.event(group, method)(
- receiver, receiver.abort, _3069_test_constant.REALLY_SHORT_TIMEOUT)
- for request in requests:
- call_consumer.consume(request)
- receiver.block_until_terminated()
-
- self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind)
-
- def testFailedUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.unary_unary_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- receiver = _receiver.Receiver()
-
- with self._control.fail():
- self._invoker.event(group, method)(
- request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- receiver.block_until_terminated()
-
- self.assertIs(
- face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind)
-
- def testFailedUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.unary_stream_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- receiver = _receiver.Receiver()
-
- with self._control.fail():
- self._invoker.event(group, method)(
- request, receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- receiver.block_until_terminated()
-
- self.assertIs(
- face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind)
-
- def testFailedStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.stream_unary_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- receiver = _receiver.Receiver()
-
- with self._control.fail():
- call_consumer = self._invoker.event(group, method)(
- receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- for request in requests:
- call_consumer.consume(request)
- call_consumer.terminate()
- receiver.block_until_terminated()
-
- self.assertIs(
- face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind)
-
- def testFailedStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- self._digest.stream_stream_messages_sequences.iteritems()):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- receiver = _receiver.Receiver()
-
- with self._control.fail():
- call_consumer = self._invoker.event(group, method)(
- receiver, receiver.abort, test_constants.LONG_TIMEOUT)
- for request in requests:
- call_consumer.consume(request)
- call_consumer.terminate()
- receiver.block_until_terminated()
-
- self.assertIs(
- face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind)
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py b/src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py
index 462829b660..06b9d77e52 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -34,14 +34,12 @@ import unittest # pylint: disable=unused-import
# test_interfaces is referenced from specification in this module.
from tests.unit.framework.interfaces.face import _blocking_invocation_inline_service
-from tests.unit.framework.interfaces.face import _event_invocation_synchronous_event_service
from tests.unit.framework.interfaces.face import _future_invocation_asynchronous_event_service
from tests.unit.framework.interfaces.face import _invocation
from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
_TEST_CASE_SUPERCLASSES = (
_blocking_invocation_inline_service.TestCase,
- _event_invocation_synchronous_event_service.TestCase,
_future_invocation_asynchronous_event_service.TestCase,
)