diff options
Diffstat (limited to 'src/python/grpcio')
17 files changed, 394 insertions, 652 deletions
diff --git a/src/python/grpcio/README.rst b/src/python/grpcio/README.rst index f3e962c197..33a462b66f 100644 --- a/src/python/grpcio/README.rst +++ b/src/python/grpcio/README.rst @@ -6,7 +6,7 @@ Package for gRPC Python. Installation ------------ -gRPC Python is available for Linux and Mac OS X running Python 2.7. +gRPC Python is available for Linux, Mac OS X, and Windows running Python 2.7. From PyPI ~~~~~~~~~ @@ -23,21 +23,26 @@ Else system wide (on Ubuntu)... $ sudo pip install grpcio +n.b. On Windows and on Mac OS X one *must* have a recent release of :code:`pip` +to retrieve the proper wheel from PyPI. Be sure to upgrade to the latest +version! + From Source ~~~~~~~~~~~ Building from source requires that you have the Python headers (usually a -package named `python-dev`). +package named :code:`python-dev`). :: - $ export REPO_ROOT=grpc + $ export REPO_ROOT=grpc # REPO_ROOT can be any directory of your choice $ git clone https://github.com/grpc/grpc.git $REPO_ROOT $ cd $REPO_ROOT - $ pip install . -Note that `$REPO_ROOT` can be assigned to whatever directory name floats your -fancy. + # For the next two commands do `sudo pip install` if you get permission-denied errors + $ pip install -rrequirements.txt + $ GRPC_PYTHON_BUILD_WITH_CYTHON=1 pip install . + Troubleshooting ~~~~~~~~~~~~~~~ @@ -45,10 +50,43 @@ Troubleshooting Help, I ... * **... see a** :code:`pkg_resources.VersionConflict` **when I try to install - grpc!** + grpc** This is likely because :code:`pip` doesn't own the offending dependency, which in turn is likely because your operating system's package manager owns it. You'll need to force the installation of the dependency: :code:`pip install --ignore-installed $OFFENDING_DEPENDENCY` + + For example, if you get an error like the following: + + :: + + Traceback (most recent call last): + File "<string>", line 17, in <module> + ... + File "/usr/lib/python2.7/dist-packages/pkg_resources.py", line 509, in find + raise VersionConflict(dist, req) + pkg_resources.VersionConflict: (six 1.8.0 (/usr/lib/python2.7/dist-packages), Requirement.parse('six>=1.10')) + + You can fix it by doing: + + :: + + sudo pip install --ignore-installed six + +* **... see the following error on some platforms** + + :: + + /tmp/pip-build-U8pSsr/cython/Cython/Plex/Scanners.c:4:20: fatal error: Python.h: No such file or directory + #include "Python.h" + ^ + compilation terminated. + + You can fix it by installing `python-dev` package. i.e + + :: + + sudo apt-get install python-dev + diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py index 0e7f02a271..1d43547419 100644 --- a/src/python/grpcio/commands.py +++ b/src/python/grpcio/commands.py @@ -119,8 +119,7 @@ class SphinxDocumentation(setuptools.Command): import sphinx import sphinx.apidoc metadata = self.distribution.metadata - src_dir = os.path.join( - PYTHON_STEM, self.distribution.package_dir[''], 'grpc') + src_dir = os.path.join(PYTHON_STEM, 'grpc') sys.path.append(src_dir) sphinx.apidoc.main([ '', '--force', '--full', '-H', metadata.name, '-A', metadata.author, diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi index 80f4da51e8..d1b9c98ffc 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -40,14 +40,17 @@ cdef class Call: def start_batch(self, operations, tag): if not self.is_valid: raise ValueError("invalid call object cannot be used from Python") + cdef grpc_call_error result cdef Operations cy_operations = Operations(operations) cdef OperationTag operation_tag = OperationTag(tag) operation_tag.operation_call = self operation_tag.batch_operations = cy_operations cpython.Py_INCREF(operation_tag) - return grpc_call_start_batch( - self.c_call, cy_operations.c_ops, cy_operations.c_nops, - <cpython.PyObject *>operation_tag, NULL) + with nogil: + result = grpc_call_start_batch( + self.c_call, cy_operations.c_ops, cy_operations.c_nops, + <cpython.PyObject *>operation_tag, NULL) + return result def cancel( self, grpc_status_code error_code=GRPC_STATUS__DO_NOT_USE, @@ -57,6 +60,8 @@ cdef class Call: if (details is None) != (error_code == GRPC_STATUS__DO_NOT_USE): raise ValueError("if error_code is specified, so must details " "(and vice-versa)") + cdef grpc_call_error result + cdef char *c_details = NULL if error_code != GRPC_STATUS__DO_NOT_USE: if isinstance(details, bytes): pass @@ -65,25 +70,37 @@ cdef class Call: else: raise TypeError("expected details to be str or bytes") self.references.append(details) - return grpc_call_cancel_with_status( - self.c_call, error_code, details, NULL) + c_details = details + with nogil: + result = grpc_call_cancel_with_status( + self.c_call, error_code, c_details, NULL) + return result else: - return grpc_call_cancel(self.c_call, NULL) + with nogil: + result = grpc_call_cancel(self.c_call, NULL) + return result def set_credentials( self, CallCredentials call_credentials not None): - return grpc_call_set_credentials( - self.c_call, call_credentials.c_credentials) + cdef grpc_call_error result + with nogil: + result = grpc_call_set_credentials( + self.c_call, call_credentials.c_credentials) + return result def peer(self): - cdef char *peer = grpc_call_get_peer(self.c_call) + cdef char *peer = NULL + with nogil: + peer = grpc_call_get_peer(self.c_call) result = <bytes>peer - gpr_free(peer) + with nogil: + gpr_free(peer) return result def __dealloc__(self): if self.c_call != NULL: - grpc_call_destroy(self.c_call) + with nogil: + grpc_call_destroy(self.c_call) # The object *should* always be valid from Python. Used for debugging. @property diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index 1f1833d5ec..d612c90791 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -35,6 +35,7 @@ cdef class Channel: def __cinit__(self, target, ChannelArgs arguments=None, ChannelCredentials channel_credentials=None): cdef grpc_channel_args *c_arguments = NULL + cdef char *c_target = NULL self.c_channel = NULL self.references = [] if arguments is not None: @@ -45,12 +46,15 @@ cdef class Channel: target = target.encode() else: raise TypeError("expected target to be str or bytes") + c_target = target if channel_credentials is None: - self.c_channel = grpc_insecure_channel_create(target, c_arguments, - NULL) + with nogil: + self.c_channel = grpc_insecure_channel_create(c_target, c_arguments, + NULL) else: - self.c_channel = grpc_secure_channel_create( - channel_credentials.c_credentials, target, c_arguments, NULL) + with nogil: + self.c_channel = grpc_secure_channel_create( + channel_credentials.c_credentials, c_target, c_arguments, NULL) self.references.append(channel_credentials) self.references.append(target) self.references.append(arguments) @@ -66,6 +70,7 @@ cdef class Channel: method = method.encode() else: raise TypeError("expected method to be str or bytes") + cdef char *method_c_string = method cdef char *host_c_string = NULL if host is None: pass @@ -81,32 +86,40 @@ cdef class Channel: cdef grpc_call *parent_call = NULL if parent is not None: parent_call = parent.c_call - operation_call.c_call = grpc_channel_create_call( - self.c_channel, parent_call, flags, - queue.c_completion_queue, method, host_c_string, deadline.c_time, - NULL) + with nogil: + operation_call.c_call = grpc_channel_create_call( + self.c_channel, parent_call, flags, + queue.c_completion_queue, method_c_string, host_c_string, + deadline.c_time, NULL) return operation_call def check_connectivity_state(self, bint try_to_connect): - return grpc_channel_check_connectivity_state(self.c_channel, - try_to_connect) + cdef grpc_connectivity_state result + with nogil: + result = grpc_channel_check_connectivity_state(self.c_channel, + try_to_connect) + return result def watch_connectivity_state( self, grpc_connectivity_state last_observed_state, Timespec deadline not None, CompletionQueue queue not None, tag): cdef OperationTag operation_tag = OperationTag(tag) - operation_tag.references = [self, queue] cpython.Py_INCREF(operation_tag) - grpc_channel_watch_connectivity_state( - self.c_channel, last_observed_state, deadline.c_time, - queue.c_completion_queue, <cpython.PyObject *>operation_tag) + with nogil: + grpc_channel_watch_connectivity_state( + self.c_channel, last_observed_state, deadline.c_time, + queue.c_completion_queue, <cpython.PyObject *>operation_tag) def target(self): - cdef char * target = grpc_channel_get_target(self.c_channel) + cdef char *target = NULL + with nogil: + target = grpc_channel_get_target(self.c_channel) result = <bytes>target - gpr_free(target) + with nogil: + gpr_free(target) return result def __dealloc__(self): if self.c_channel != NULL: - grpc_channel_destroy(self.c_channel) + with nogil: + grpc_channel_destroy(self.c_channel) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index c139147114..09e47d4222 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -36,7 +36,8 @@ import time cdef class CompletionQueue: def __cinit__(self): - self.c_completion_queue = grpc_completion_queue_create(NULL) + with nogil: + self.c_completion_queue = grpc_completion_queue_create(NULL) self.is_shutting_down = False self.is_shutdown = False self.pluck_condition = threading.Condition() @@ -82,8 +83,9 @@ cdef class CompletionQueue: def poll(self, Timespec deadline=None): # We name this 'poll' to avoid problems with CPython's expectations for # 'special' methods (like next and __next__). - cdef gpr_timespec c_deadline = gpr_inf_future( - GPR_CLOCK_REALTIME) + cdef gpr_timespec c_deadline + with nogil: + c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) if deadline is not None: c_deadline = deadline.c_time cdef grpc_event event @@ -123,7 +125,8 @@ cdef class CompletionQueue: return self._interpret_event(event) def shutdown(self): - grpc_completion_queue_shutdown(self.c_completion_queue) + with nogil: + grpc_completion_queue_shutdown(self.c_completion_queue) self.is_shutting_down = True def clear(self): @@ -133,14 +136,19 @@ cdef class CompletionQueue: pass def __dealloc__(self): - cdef gpr_timespec c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) + cdef gpr_timespec c_deadline + with nogil: + c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) if self.c_completion_queue != NULL: # Ensure shutdown if not self.is_shutting_down: - grpc_completion_queue_shutdown(self.c_completion_queue) + with nogil: + grpc_completion_queue_shutdown(self.c_completion_queue) # Pump the queue while not self.is_shutdown: - event = grpc_completion_queue_next( - self.c_completion_queue, c_deadline, NULL) + with nogil: + event = grpc_completion_queue_next( + self.c_completion_queue, c_deadline, NULL) self._interpret_event(event) - grpc_completion_queue_destroy(self.c_completion_queue) + with nogil: + grpc_completion_queue_destroy(self.c_completion_queue) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi index 3f439c8900..1d7adca23e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -46,7 +46,8 @@ cdef class ChannelCredentials: def __dealloc__(self): if self.c_credentials != NULL: - grpc_channel_credentials_release(self.c_credentials) + with nogil: + grpc_channel_credentials_release(self.c_credentials) cdef class CallCredentials: @@ -63,7 +64,8 @@ cdef class CallCredentials: def __dealloc__(self): if self.c_credentials != NULL: - grpc_call_credentials_release(self.c_credentials) + with nogil: + grpc_call_credentials_release(self.c_credentials) cdef class ServerCredentials: @@ -74,7 +76,8 @@ cdef class ServerCredentials: def __dealloc__(self): if self.c_credentials != NULL: - grpc_server_credentials_release(self.c_credentials) + with nogil: + grpc_server_credentials_release(self.c_credentials) cdef class CredentialsMetadataPlugin: @@ -139,7 +142,8 @@ cdef void plugin_destroy_c_plugin_state(void *state): def channel_credentials_google_default(): cdef ChannelCredentials credentials = ChannelCredentials(); - credentials.c_credentials = grpc_google_default_credentials_create() + with nogil: + credentials.c_credentials = grpc_google_default_credentials_create() return credentials def channel_credentials_ssl(pem_root_certificates, @@ -158,12 +162,14 @@ def channel_credentials_ssl(pem_root_certificates, c_pem_root_certificates = pem_root_certificates credentials.references.append(pem_root_certificates) if ssl_pem_key_cert_pair is not None: - credentials.c_credentials = grpc_ssl_credentials_create( - c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL) + with nogil: + credentials.c_credentials = grpc_ssl_credentials_create( + c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL) credentials.references.append(ssl_pem_key_cert_pair) else: - credentials.c_credentials = grpc_ssl_credentials_create( - c_pem_root_certificates, NULL, NULL) + with nogil: + credentials.c_credentials = grpc_ssl_credentials_create( + c_pem_root_certificates, NULL, NULL) return credentials def channel_credentials_composite( @@ -172,8 +178,9 @@ def channel_credentials_composite( if not credentials_1.is_valid or not credentials_2.is_valid: raise ValueError("passed credentials must both be valid") cdef ChannelCredentials credentials = ChannelCredentials() - credentials.c_credentials = grpc_composite_channel_credentials_create( - credentials_1.c_credentials, credentials_2.c_credentials, NULL) + with nogil: + credentials.c_credentials = grpc_composite_channel_credentials_create( + credentials_1.c_credentials, credentials_2.c_credentials, NULL) credentials.references.append(credentials_1) credentials.references.append(credentials_2) return credentials @@ -184,16 +191,18 @@ def call_credentials_composite( if not credentials_1.is_valid or not credentials_2.is_valid: raise ValueError("passed credentials must both be valid") cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = grpc_composite_call_credentials_create( - credentials_1.c_credentials, credentials_2.c_credentials, NULL) + with nogil: + credentials.c_credentials = grpc_composite_call_credentials_create( + credentials_1.c_credentials, credentials_2.c_credentials, NULL) credentials.references.append(credentials_1) credentials.references.append(credentials_2) return credentials def call_credentials_google_compute_engine(): cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = ( - grpc_google_compute_engine_credentials_create(NULL)) + with nogil: + credentials.c_credentials = ( + grpc_google_compute_engine_credentials_create(NULL)) return credentials def call_credentials_service_account_jwt_access( @@ -205,9 +214,11 @@ def call_credentials_service_account_jwt_access( else: raise TypeError("expected json_key to be str or bytes") cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = ( - grpc_service_account_jwt_access_credentials_create( - json_key, token_lifetime.c_time, NULL)) + cdef char *json_key_c_string = json_key + with nogil: + credentials.c_credentials = ( + grpc_service_account_jwt_access_credentials_create( + json_key_c_string, token_lifetime.c_time, NULL)) credentials.references.append(json_key) return credentials @@ -219,8 +230,10 @@ def call_credentials_google_refresh_token(json_refresh_token): else: raise TypeError("expected json_refresh_token to be str or bytes") cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = grpc_google_refresh_token_credentials_create( - json_refresh_token, NULL) + cdef char *json_refresh_token_c_string = json_refresh_token + with nogil: + credentials.c_credentials = grpc_google_refresh_token_credentials_create( + json_refresh_token_c_string, NULL) credentials.references.append(json_refresh_token) return credentials @@ -238,17 +251,21 @@ def call_credentials_google_iam(authorization_token, authority_selector): else: raise TypeError("expected authority_selector to be str or bytes") cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = grpc_google_iam_credentials_create( - authorization_token, authority_selector, NULL) + cdef char *authorization_token_c_string = authorization_token + cdef char *authority_selector_c_string = authority_selector + with nogil: + credentials.c_credentials = grpc_google_iam_credentials_create( + authorization_token_c_string, authority_selector_c_string, NULL) credentials.references.append(authorization_token) credentials.references.append(authority_selector) return credentials def call_credentials_metadata_plugin(CredentialsMetadataPlugin plugin): cdef CallCredentials credentials = CallCredentials() - credentials.c_credentials = ( - grpc_metadata_credentials_create_from_plugin(plugin.make_c_plugin(), - NULL)) + cdef grpc_metadata_credentials_plugin c_plugin = plugin.make_c_plugin() + with nogil: + credentials.c_credentials = ( + grpc_metadata_credentials_create_from_plugin(c_plugin, NULL)) # TODO(atash): the following held reference is *probably* never necessary credentials.references.append(plugin) return credentials @@ -274,11 +291,12 @@ def server_credentials_ssl(pem_root_certs, pem_key_cert_pairs, credentials.references.append(pem_key_cert_pairs) credentials.references.append(pem_root_certs) credentials.c_ssl_pem_key_cert_pairs_count = len(pem_key_cert_pairs) - credentials.c_ssl_pem_key_cert_pairs = ( - <grpc_ssl_pem_key_cert_pair *>gpr_malloc( - sizeof(grpc_ssl_pem_key_cert_pair) * - credentials.c_ssl_pem_key_cert_pairs_count - )) + with nogil: + credentials.c_ssl_pem_key_cert_pairs = ( + <grpc_ssl_pem_key_cert_pair *>gpr_malloc( + sizeof(grpc_ssl_pem_key_cert_pair) * + credentials.c_ssl_pem_key_cert_pairs_count + )) for i in range(credentials.c_ssl_pem_key_cert_pairs_count): credentials.c_ssl_pem_key_cert_pairs[i] = ( (<SslPemKeyCertPair>pem_key_cert_pairs[i]).c_pair) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index dbf0045710..61165cb021 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -38,27 +38,27 @@ cdef extern from "grpc/_cython/loader.h": int pygrpc_load_core(char*) - void *gpr_malloc(size_t size) - void gpr_free(void *ptr) - void *gpr_realloc(void *p, size_t size) + void *gpr_malloc(size_t size) nogil + void gpr_free(void *ptr) nogil + void *gpr_realloc(void *p, size_t size) nogil ctypedef struct gpr_slice: # don't worry about writing out the members of gpr_slice; we never access # them directly. pass - gpr_slice gpr_slice_ref(gpr_slice s) - void gpr_slice_unref(gpr_slice s) - gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) + gpr_slice gpr_slice_ref(gpr_slice s) nogil + void gpr_slice_unref(gpr_slice s) nogil + gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil gpr_slice gpr_slice_new_with_len( - void *p, size_t len, void (*destroy)(void *, size_t)) - gpr_slice gpr_slice_malloc(size_t length) - gpr_slice gpr_slice_from_copied_string(const char *source) - gpr_slice gpr_slice_from_copied_buffer(const char *source, size_t len) + void *p, size_t len, void (*destroy)(void *, size_t)) nogil + gpr_slice gpr_slice_malloc(size_t length) nogil + gpr_slice gpr_slice_from_copied_string(const char *source) nogil + gpr_slice gpr_slice_from_copied_buffer(const char *source, size_t len) nogil # Declare functions for function-like macros (because Cython)... - void *gpr_slice_start_ptr "GPR_SLICE_START_PTR" (gpr_slice s) - size_t gpr_slice_length "GPR_SLICE_LENGTH" (gpr_slice s) + void *gpr_slice_start_ptr "GPR_SLICE_START_PTR" (gpr_slice s) nogil + size_t gpr_slice_length "GPR_SLICE_LENGTH" (gpr_slice s) nogil ctypedef enum gpr_clock_type: GPR_CLOCK_MONOTONIC @@ -71,14 +71,14 @@ cdef extern from "grpc/_cython/loader.h": int32_t nanoseconds "tv_nsec" gpr_clock_type clock_type - gpr_timespec gpr_time_0(gpr_clock_type type) - gpr_timespec gpr_inf_future(gpr_clock_type type) - gpr_timespec gpr_inf_past(gpr_clock_type type) + gpr_timespec gpr_time_0(gpr_clock_type type) nogil + gpr_timespec gpr_inf_future(gpr_clock_type type) nogil + gpr_timespec gpr_inf_past(gpr_clock_type type) nogil - gpr_timespec gpr_now(gpr_clock_type clock) + gpr_timespec gpr_now(gpr_clock_type clock) nogil gpr_timespec gpr_convert_clock_type(gpr_timespec t, - gpr_clock_type target_clock) + gpr_clock_type target_clock) nogil ctypedef enum grpc_status_code: GRPC_STATUS_OK @@ -114,15 +114,15 @@ cdef extern from "grpc/_cython/loader.h": pass grpc_byte_buffer *grpc_raw_byte_buffer_create(gpr_slice *slices, - size_t nslices) - size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) - void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer) + size_t nslices) nogil + size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) nogil + void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer) nogil void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader, - grpc_byte_buffer *buffer) + grpc_byte_buffer *buffer) nogil int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader, - gpr_slice *slice) - void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) + gpr_slice *slice) nogil + void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) nogil const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING const char *GRPC_ARG_ENABLE_CENSUS @@ -221,8 +221,8 @@ cdef extern from "grpc/_cython/loader.h": size_t capacity grpc_metadata *metadata - void grpc_metadata_array_init(grpc_metadata_array *array) - void grpc_metadata_array_destroy(grpc_metadata_array *array) + void grpc_metadata_array_init(grpc_metadata_array *array) nogil + void grpc_metadata_array_destroy(grpc_metadata_array *array) nogil ctypedef struct grpc_call_details: char *method @@ -231,8 +231,8 @@ cdef extern from "grpc/_cython/loader.h": size_t host_capacity gpr_timespec deadline - void grpc_call_details_init(grpc_call_details *details) - void grpc_call_details_destroy(grpc_call_details *details) + void grpc_call_details_init(grpc_call_details *details) nogil + void grpc_call_details_destroy(grpc_call_details *details) nogil ctypedef enum grpc_op_type: GRPC_OP_SEND_INITIAL_METADATA @@ -277,61 +277,62 @@ cdef extern from "grpc/_cython/loader.h": uint32_t flags grpc_op_data data - void grpc_init() - void grpc_shutdown() + void grpc_init() nogil + void grpc_shutdown() nogil - grpc_completion_queue *grpc_completion_queue_create(void *reserved) + grpc_completion_queue *grpc_completion_queue_create(void *reserved) nogil grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved) nogil grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved) nogil - void grpc_completion_queue_shutdown(grpc_completion_queue *cq) - void grpc_completion_queue_destroy(grpc_completion_queue *cq) + void grpc_completion_queue_shutdown(grpc_completion_queue *cq) nogil + void grpc_completion_queue_destroy(grpc_completion_queue *cq) nogil - grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, - size_t nops, void *tag, void *reserved) - grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) + grpc_call_error grpc_call_start_batch( + grpc_call *call, const grpc_op *ops, size_t nops, void *tag, + void *reserved) nogil + grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) nogil grpc_call_error grpc_call_cancel_with_status(grpc_call *call, grpc_status_code status, const char *description, - void *reserved) - char *grpc_call_get_peer(grpc_call *call) - void grpc_call_destroy(grpc_call *call) + void *reserved) nogil + char *grpc_call_get_peer(grpc_call *call) nogil + void grpc_call_destroy(grpc_call *call) nogil grpc_channel *grpc_insecure_channel_create(const char *target, const grpc_channel_args *args, - void *reserved) - grpc_call *grpc_channel_create_call(grpc_channel *channel, - grpc_call *parent_call, - uint32_t propagation_mask, - grpc_completion_queue *completion_queue, - const char *method, const char *host, - gpr_timespec deadline, void *reserved) + void *reserved) nogil + grpc_call *grpc_channel_create_call( + grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, + grpc_completion_queue *completion_queue, const char *method, + const char *host, gpr_timespec deadline, void *reserved) nogil grpc_connectivity_state grpc_channel_check_connectivity_state( - grpc_channel *channel, int try_to_connect) + grpc_channel *channel, int try_to_connect) nogil void grpc_channel_watch_connectivity_state( grpc_channel *channel, grpc_connectivity_state last_observed_state, - gpr_timespec deadline, grpc_completion_queue *cq, void *tag) - char *grpc_channel_get_target(grpc_channel *channel) - void grpc_channel_destroy(grpc_channel *channel) + gpr_timespec deadline, grpc_completion_queue *cq, void *tag) nogil + char *grpc_channel_get_target(grpc_channel *channel) nogil + void grpc_channel_destroy(grpc_channel *channel) nogil - grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) + grpc_server *grpc_server_create( + const grpc_channel_args *args, void *reserved) nogil grpc_call_error grpc_server_request_call( grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void - *tag_new) + *tag_new) nogil void grpc_server_register_completion_queue(grpc_server *server, grpc_completion_queue *cq, - void *reserved) - int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) - void grpc_server_start(grpc_server *server) + void *reserved) nogil + int grpc_server_add_insecure_http2_port( + grpc_server *server, const char *addr) nogil + void grpc_server_start(grpc_server *server) nogil void grpc_server_shutdown_and_notify( - grpc_server *server, grpc_completion_queue *cq, void *tag) - void grpc_server_cancel_all_calls(grpc_server *server) - void grpc_server_destroy(grpc_server *server) + grpc_server *server, grpc_completion_queue *cq, void *tag) nogil + void grpc_server_cancel_all_calls(grpc_server *server) nogil + void grpc_server_destroy(grpc_server *server) nogil ctypedef struct grpc_ssl_pem_key_cert_pair: const char *private_key @@ -347,35 +348,36 @@ cdef extern from "grpc/_cython/loader.h": ctypedef void (*grpc_ssl_roots_override_callback)(char **pem_root_certs) - void grpc_set_ssl_roots_override_callback(grpc_ssl_roots_override_callback cb) + void grpc_set_ssl_roots_override_callback( + grpc_ssl_roots_override_callback cb) nogil - grpc_channel_credentials *grpc_google_default_credentials_create() + grpc_channel_credentials *grpc_google_default_credentials_create() nogil grpc_channel_credentials *grpc_ssl_credentials_create( const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pair, - void *reserved) + void *reserved) nogil grpc_channel_credentials *grpc_composite_channel_credentials_create( grpc_channel_credentials *creds1, grpc_call_credentials *creds2, - void *reserved) - void grpc_channel_credentials_release(grpc_channel_credentials *creds) + void *reserved) nogil + void grpc_channel_credentials_release(grpc_channel_credentials *creds) nogil grpc_call_credentials *grpc_composite_call_credentials_create( grpc_call_credentials *creds1, grpc_call_credentials *creds2, - void *reserved) + void *reserved) nogil grpc_call_credentials *grpc_google_compute_engine_credentials_create( - void *reserved) + void *reserved) nogil grpc_call_credentials *grpc_service_account_jwt_access_credentials_create( const char *json_key, - gpr_timespec token_lifetime, void *reserved) + gpr_timespec token_lifetime, void *reserved) nogil grpc_call_credentials *grpc_google_refresh_token_credentials_create( - const char *json_refresh_token, void *reserved) + const char *json_refresh_token, void *reserved) nogil grpc_call_credentials *grpc_google_iam_credentials_create( const char *authorization_token, const char *authority_selector, - void *reserved) - void grpc_call_credentials_release(grpc_call_credentials *creds) + void *reserved) nogil + void grpc_call_credentials_release(grpc_call_credentials *creds) nogil grpc_channel *grpc_secure_channel_create( grpc_channel_credentials *creds, const char *target, - const grpc_channel_args *args, void *reserved) + const grpc_channel_args *args, void *reserved) nogil ctypedef struct grpc_server_credentials: # We don't care about the internals (and in fact don't know them) @@ -385,13 +387,13 @@ cdef extern from "grpc/_cython/loader.h": const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, size_t num_key_cert_pairs, int force_client_auth, void *reserved) - void grpc_server_credentials_release(grpc_server_credentials *creds) + void grpc_server_credentials_release(grpc_server_credentials *creds) nogil int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, - grpc_server_credentials *creds) + grpc_server_credentials *creds) nogil grpc_call_error grpc_call_set_credentials(grpc_call *call, - grpc_call_credentials *creds) + grpc_call_credentials *creds) nogil ctypedef struct grpc_auth_context: # We don't care about the internals (and in fact don't know them) @@ -415,4 +417,4 @@ cdef extern from "grpc/_cython/loader.h": const char *type grpc_call_credentials *grpc_metadata_credentials_create_from_plugin( - grpc_metadata_credentials_plugin plugin, void *reserved) + grpc_metadata_credentials_plugin plugin, void *reserved) nogil diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index fa4ea99ea9..851389a261 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -107,15 +107,18 @@ cdef class Timespec: def __cinit__(self, time): if time is None: - self.c_time = gpr_now(GPR_CLOCK_REALTIME) + with nogil: + self.c_time = gpr_now(GPR_CLOCK_REALTIME) return if isinstance(time, int): time = float(time) if isinstance(time, float): if time == float("+inf"): - self.c_time = gpr_inf_future(GPR_CLOCK_REALTIME) + with nogil: + self.c_time = gpr_inf_future(GPR_CLOCK_REALTIME) elif time == float("-inf"): - self.c_time = gpr_inf_past(GPR_CLOCK_REALTIME) + with nogil: + self.c_time = gpr_inf_past(GPR_CLOCK_REALTIME) else: self.c_time.seconds = time self.c_time.nanoseconds = (time - float(self.c_time.seconds)) * 1e9 @@ -131,8 +134,10 @@ cdef class Timespec: # TODO(atash) ensure that everywhere a Timespec is created that it's # converted to GPR_CLOCK_REALTIME then and not every time someone wants to # read values off in Python. - cdef gpr_timespec real_time = ( - gpr_convert_clock_type(self.c_time, GPR_CLOCK_REALTIME)) + cdef gpr_timespec real_time + with nogil: + real_time = ( + gpr_convert_clock_type(self.c_time, GPR_CLOCK_REALTIME)) return real_time.seconds @property @@ -158,10 +163,12 @@ cdef class Timespec: cdef class CallDetails: def __cinit__(self): - grpc_call_details_init(&self.c_details) + with nogil: + grpc_call_details_init(&self.c_details) def __dealloc__(self): - grpc_call_details_destroy(&self.c_details) + with nogil: + grpc_call_details_destroy(&self.c_details) @property def method(self): @@ -229,10 +236,15 @@ cdef class ByteBuffer: "ByteBuffer, not {}".format(type(data))) cdef char *c_data = data - data_slice = gpr_slice_from_copied_buffer(c_data, len(data)) - self.c_byte_buffer = grpc_raw_byte_buffer_create( - &data_slice, 1) - gpr_slice_unref(data_slice) + cdef gpr_slice data_slice + cdef size_t data_length = len(data) + with nogil: + data_slice = gpr_slice_from_copied_buffer(c_data, data_length) + with nogil: + self.c_byte_buffer = grpc_raw_byte_buffer_create( + &data_slice, 1) + with nogil: + gpr_slice_unref(data_slice) def bytes(self): cdef grpc_byte_buffer_reader reader @@ -240,20 +252,27 @@ cdef class ByteBuffer: cdef size_t data_slice_length cdef void *data_slice_pointer if self.c_byte_buffer != NULL: - grpc_byte_buffer_reader_init(&reader, self.c_byte_buffer) + with nogil: + grpc_byte_buffer_reader_init(&reader, self.c_byte_buffer) result = b"" - while grpc_byte_buffer_reader_next(&reader, &data_slice): - data_slice_pointer = gpr_slice_start_ptr(data_slice) - data_slice_length = gpr_slice_length(data_slice) - result += (<char *>data_slice_pointer)[:data_slice_length] - grpc_byte_buffer_reader_destroy(&reader) + with nogil: + while grpc_byte_buffer_reader_next(&reader, &data_slice): + data_slice_pointer = gpr_slice_start_ptr(data_slice) + data_slice_length = gpr_slice_length(data_slice) + with gil: + result += (<char *>data_slice_pointer)[:data_slice_length] + with nogil: + grpc_byte_buffer_reader_destroy(&reader) return result else: return None def __len__(self): + cdef size_t result if self.c_byte_buffer != NULL: - return grpc_byte_buffer_length(self.c_byte_buffer) + with nogil: + result = grpc_byte_buffer_length(self.c_byte_buffer) + return result else: return 0 @@ -262,7 +281,8 @@ cdef class ByteBuffer: def __dealloc__(self): if self.c_byte_buffer != NULL: - grpc_byte_buffer_destroy(self.c_byte_buffer) + with nogil: + grpc_byte_buffer_destroy(self.c_byte_buffer) cdef class SslPemKeyCertPair: @@ -319,14 +339,15 @@ cdef class ChannelArgs: if not isinstance(arg, ChannelArg): raise TypeError("expected list of ChannelArg") self.c_args.arguments_length = len(self.args) - self.c_args.arguments = <grpc_arg *>gpr_malloc( - self.c_args.arguments_length*sizeof(grpc_arg) - ) + with nogil: + self.c_args.arguments = <grpc_arg *>gpr_malloc( + self.c_args.arguments_length*sizeof(grpc_arg)) for i in range(self.c_args.arguments_length): self.c_args.arguments[i] = (<ChannelArg>self.args[i]).c_arg def __dealloc__(self): - gpr_free(self.c_args.arguments) + with nogil: + gpr_free(self.c_args.arguments) def __len__(self): # self.args is never stale; it's only updated from this file @@ -407,21 +428,24 @@ cdef class Metadata: for metadatum in metadata: if not isinstance(metadatum, Metadatum): raise TypeError("expected list of Metadatum") - grpc_metadata_array_init(&self.c_metadata_array) + with nogil: + grpc_metadata_array_init(&self.c_metadata_array) self.c_metadata_array.count = len(self.metadata) self.c_metadata_array.capacity = len(self.metadata) - self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc( - self.c_metadata_array.count*sizeof(grpc_metadata) - ) + with nogil: + self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc( + self.c_metadata_array.count*sizeof(grpc_metadata) + ) for i in range(self.c_metadata_array.count): self.c_metadata_array.metadata[i] = ( (<Metadatum>self.metadata[i]).c_metadata) def __dealloc__(self): # this frees the allocated memory for the grpc_metadata_array (although - # it'd be nice if that were documented somewhere...) TODO(atash): document - # this in the C core - grpc_metadata_array_destroy(&self.c_metadata_array) + # it'd be nice if that were documented somewhere...) + # TODO(atash): document this in the C core + with nogil: + grpc_metadata_array_destroy(&self.c_metadata_array) def __len__(self): return self.c_metadata_array.count @@ -526,7 +550,8 @@ cdef class Operation: # Python. The remaining one(s) are primitive fields filled in by GRPC core. # This means that we need to clean up after receive_status_on_client. if self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT: - gpr_free(self._received_status_details) + with nogil: + gpr_free(self._received_status_details) def operation_send_initial_metadata(Metadata metadata): cdef Operation op = Operation() @@ -648,8 +673,8 @@ cdef class Operations: if not isinstance(operation, Operation): raise TypeError("expected operations to be iterable of Operation") self.c_nops = len(self.operations) - self.c_ops = <grpc_op *>gpr_malloc( - sizeof(grpc_op)*self.c_nops) + with nogil: + self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op)*self.c_nops) for i in range(self.c_nops): self.c_ops[i] = (<Operation>(self.operations[i])).c_op @@ -661,7 +686,8 @@ cdef class Operations: return self.operations[i] def __dealloc__(self): - gpr_free(self.c_ops) + with nogil: + gpr_free(self.c_ops) def __iter__(self): return _OperationsIterator(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index fe93da6c12..a098f11da2 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -41,7 +41,8 @@ cdef class Server: if arguments is not None: c_arguments = &arguments.c_args self.references.append(arguments) - self.c_server = grpc_server_create(c_arguments, NULL) + with nogil: + self.c_server = grpc_server_create(c_arguments, NULL) self.is_started = False self.is_shutting_down = False self.is_shutdown = False @@ -53,6 +54,7 @@ cdef class Server: raise ValueError("server must be started and not shutting down") if server_queue not in self.registered_completion_queues: raise ValueError("server_queue must be a registered completion queue") + cdef grpc_call_error result cdef OperationTag operation_tag = OperationTag(tag) operation_tag.operation_call = Call() operation_tag.request_call_details = CallDetails() @@ -61,19 +63,22 @@ cdef class Server: operation_tag.is_new_request = True operation_tag.batch_operations = Operations([]) cpython.Py_INCREF(operation_tag) - return grpc_server_request_call( - self.c_server, &operation_tag.operation_call.c_call, - &operation_tag.request_call_details.c_details, - &operation_tag.request_metadata.c_metadata_array, - call_queue.c_completion_queue, server_queue.c_completion_queue, - <cpython.PyObject *>operation_tag) + with nogil: + result = grpc_server_request_call( + self.c_server, &operation_tag.operation_call.c_call, + &operation_tag.request_call_details.c_details, + &operation_tag.request_metadata.c_metadata_array, + call_queue.c_completion_queue, server_queue.c_completion_queue, + <cpython.PyObject *>operation_tag) + return result def register_completion_queue( self, CompletionQueue queue not None): if self.is_started: raise ValueError("cannot register completion queues after start") - grpc_server_register_completion_queue( - self.c_server, queue.c_completion_queue, NULL) + with nogil: + grpc_server_register_completion_queue( + self.c_server, queue.c_completion_queue, NULL) self.registered_completion_queues.append(queue) def start(self): @@ -82,7 +87,8 @@ cdef class Server: self.backup_shutdown_queue = CompletionQueue() self.register_completion_queue(self.backup_shutdown_queue) self.is_started = True - grpc_server_start(self.c_server) + with nogil: + grpc_server_start(self.c_server) # Ensure the core has gotten a chance to do the start-up work self.backup_shutdown_queue.pluck(None, Timespec(None)) @@ -95,22 +101,28 @@ cdef class Server: else: raise TypeError("expected address to be a str or bytes") self.references.append(address) + cdef int result + cdef char *address_c_string = address if server_credentials is not None: self.references.append(server_credentials) - return grpc_server_add_secure_http2_port( - self.c_server, address, server_credentials.c_credentials) + with nogil: + result = grpc_server_add_secure_http2_port( + self.c_server, address_c_string, server_credentials.c_credentials) else: - return grpc_server_add_insecure_http2_port(self.c_server, address) + with nogil: + result = grpc_server_add_insecure_http2_port(self.c_server, + address_c_string) + return result cdef _c_shutdown(self, CompletionQueue queue, tag): self.is_shutting_down = True operation_tag = OperationTag(tag) operation_tag.shutting_down_server = self - operation_tag.references.extend([self, queue]) cpython.Py_INCREF(operation_tag) - grpc_server_shutdown_and_notify( - self.c_server, queue.c_completion_queue, - <cpython.PyObject *>operation_tag) + with nogil: + grpc_server_shutdown_and_notify( + self.c_server, queue.c_completion_queue, + <cpython.PyObject *>operation_tag) def shutdown(self, CompletionQueue queue not None, tag): cdef OperationTag operation_tag @@ -135,7 +147,8 @@ cdef class Server: elif self.is_shutdown: return else: - grpc_server_cancel_all_calls(self.c_server) + with nogil: + grpc_server_cancel_all_calls(self.c_server) def __dealloc__(self): if self.c_server != NULL: @@ -154,5 +167,6 @@ cdef class Server: # much but repeatedly release the GIL and wait while not self.is_shutdown: time.sleep(0) - grpc_server_destroy(self.c_server) + with nogil: + grpc_server_destroy(self.c_server) diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index 30cc7a132b..8a0f171ee7 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -57,14 +57,17 @@ cdef class _ModuleState: 'grpc._cython', '_windows/grpc_c.64.python') if not pygrpc_load_core(filename): raise ImportError('failed to load core gRPC library') - grpc_init() + with nogil: + grpc_init() self.is_loaded = True - grpc_set_ssl_roots_override_callback( - <grpc_ssl_roots_override_callback>ssl_roots_override_callback) + with nogil: + grpc_set_ssl_roots_override_callback( + <grpc_ssl_roots_override_callback>ssl_roots_override_callback) def __dealloc__(self): if self.is_loaded: - grpc_shutdown() + with nogil: + grpc_shutdown() _module_state = _ModuleState() diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h index b70dcccd17..4d18369e1f 100644 --- a/src/python/grpcio/grpc/_cython/imports.generated.h +++ b/src/python/grpcio/grpc/_cython/imports.generated.h @@ -166,7 +166,7 @@ extern grpc_compression_algorithm_parse_type grpc_compression_algorithm_parse_im typedef int(*grpc_compression_algorithm_name_type)(grpc_compression_algorithm algorithm, char **name); extern grpc_compression_algorithm_name_type grpc_compression_algorithm_name_import; #define grpc_compression_algorithm_name grpc_compression_algorithm_name_import -typedef grpc_compression_algorithm(*grpc_compression_algorithm_for_level_type)(grpc_compression_level level); +typedef grpc_compression_algorithm(*grpc_compression_algorithm_for_level_type)(grpc_compression_level level, uint32_t accepted_encodings); extern grpc_compression_algorithm_for_level_type grpc_compression_algorithm_for_level_import; #define grpc_compression_algorithm_for_level grpc_compression_algorithm_for_level_import typedef void(*grpc_compression_options_init_type)(grpc_compression_options *opts); diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index a543791f5c..31e16e0491 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -34,6 +34,7 @@ CORE_SOURCE_FILES = [ 'src/core/profiling/stap_timers.c', 'src/core/support/alloc.c', 'src/core/support/avl.c', + 'src/core/support/backoff.c', 'src/core/support/cmdline.c', 'src/core/support/cpu_iphone.c', 'src/core/support/cpu_linux.c', diff --git a/src/python/grpcio/precompiled.py b/src/python/grpcio/precompiled.py index ae2a0c835a..d34250b02c 100644 --- a/src/python/grpcio/precompiled.py +++ b/src/python/grpcio/precompiled.py @@ -31,6 +31,7 @@ import os import platform import shutil import sys +import sysconfig import setuptools @@ -51,9 +52,15 @@ USE_PRECOMPILED_BINARIES = bool(int(os.environ.get( def _tagged_ext_name(base): uname = platform.uname() - tags = '-'.join((grpc_version.VERSION, uname[0], uname[4])) - flavor = 'ucs2' if sys.maxunicode == 65535 else 'ucs4' - return '{base}-{tags}-{flavor}'.format(base=base, tags=tags, flavor=flavor) + tags = ( + grpc_version.VERSION, + 'py{}'.format(sysconfig.get_python_version()), + uname[0], + uname[4], + ) + ucs = 'ucs{}'.format(sysconfig.get_config_var('Py_UNICODE_SIZE')) + return '{base}-{tags}-{ucs}'.format( + base=base, tags='-'.join(tags), ucs=ucs) class BuildTaggedExt(setuptools.Command): diff --git a/src/python/grpcio/tests/_runner.py b/src/python/grpcio/tests/_runner.py index 38a5432e79..3b5ca03dd9 100644 --- a/src/python/grpcio/tests/_runner.py +++ b/src/python/grpcio/tests/_runner.py @@ -35,6 +35,7 @@ import os import select import signal import sys +import tempfile import threading import time import unittest @@ -44,60 +45,46 @@ from tests import _loader from tests import _result -class CapturePipe(object): - """A context-manager pipe to redirect output to a byte array. +class CaptureFile(object): + """A context-managed file to redirect output to a byte array. + + Use by invoking `start` (`__enter__`) and at some point invoking `stop` + (`__exit__`). At any point after the initial call to `start` call `output` to + get the current redirected output. Note that we don't currently use file + locking, so calling `output` between calls to `start` and `stop` may muddle + the result (you should only be doing this during a Python-handled interrupt as + a last ditch effort to provide output to the user). Attributes: - _redirect_fd (int): File descriptor of file to redirect writes from. + _redirected_fd (int): File descriptor of file to redirect writes from. _saved_fd (int): A copy of the original value of the redirected file descriptor. - _read_thread (threading.Thread or None): Thread upon which reads through the - pipe are performed. Only non-None when self is started. - _read_fd (int or None): File descriptor of the read end of the redirect - pipe. Only non-None when self is started. - _write_fd (int or None): File descriptor of the write end of the redirect - pipe. Only non-None when self is started. - output (bytearray or None): Redirected output from writes to the redirected - file descriptor. Only valid during and after self has started. + _into_file (TemporaryFile or None): File to which writes are redirected. + Only non-None when self is started. """ def __init__(self, fd): - self._redirect_fd = fd - self._saved_fd = os.dup(self._redirect_fd) - self._read_thread = None - self._read_fd = None - self._write_fd = None - self.output = None + self._redirected_fd = fd + self._saved_fd = os.dup(self._redirected_fd) + self._into_file = None + + def output(self): + """Get all output from the redirected-to file if it exists.""" + if self._into_file: + self._into_file.seek(0) + return bytes(self._into_file.read()) + else: + return bytes() def start(self): """Start redirection of writes to the file descriptor.""" - self._read_fd, self._write_fd = os.pipe() - os.dup2(self._write_fd, self._redirect_fd) - flags = fcntl.fcntl(self._read_fd, fcntl.F_GETFL) - fcntl.fcntl(self._read_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) - self._read_thread = threading.Thread(target=self._read) - self._read_thread.start() + self._into_file = tempfile.TemporaryFile() + os.dup2(self._into_file.fileno(), self._redirected_fd) def stop(self): """Stop redirection of writes to the file descriptor.""" - os.close(self._write_fd) - os.dup2(self._saved_fd, self._redirect_fd) # auto-close self._redirect_fd - self._read_thread.join() - self._read_thread = None - # we waited for the read thread to finish, so _read_fd has been read and we - # can close it. - os.close(self._read_fd) - - def _read(self): - """Read-thread target for self.""" - self.output = bytearray() - while True: - select.select([self._read_fd], [], []) - read_bytes = os.read(self._read_fd, 1024) - if read_bytes: - self.output.extend(read_bytes) - else: - break + # n.b. this dup2 call auto-closes self._redirected_fd + os.dup2(self._saved_fd, self._redirected_fd) def write_bypass(self, value): """Bypass the redirection and write directly to the original file. @@ -144,7 +131,7 @@ class Runner(object): def run(self, suite): """See setuptools' test_runner setup argument for information.""" # only run test cases with id starting with given prefix - testcase_filter = os.getenv('GPRC_PYTHON_TESTRUNNER_FILTER') + testcase_filter = os.getenv('GRPC_PYTHON_TESTRUNNER_FILTER') filtered_cases = [] for case in _loader.iterate_suite_cases(suite): if not testcase_filter or case.id().startswith(testcase_filter): @@ -159,8 +146,8 @@ class Runner(object): result_out = StringIO.StringIO() result = _result.TerminalResult( result_out, id_map=lambda case: case_id_by_case[case]) - stdout_pipe = CapturePipe(sys.stdout.fileno()) - stderr_pipe = CapturePipe(sys.stderr.fileno()) + stdout_pipe = CaptureFile(sys.stdout.fileno()) + stderr_pipe = CaptureFile(sys.stderr.fileno()) kill_flag = [False] def sigint_handler(signal_number, frame): @@ -171,7 +158,8 @@ class Runner(object): def fault_handler(signal_number, frame): stdout_pipe.write_bypass( 'Received fault signal {}\nstdout:\n{}\n\nstderr:{}\n' - .format(signal_number, stdout_pipe.output, stderr_pipe.output)) + .format(signal_number, stdout_pipe.output(), + stderr_pipe.output())) os._exit(1) def check_kill_self(): @@ -180,9 +168,9 @@ class Runner(object): result.stopTestRun() stdout_pipe.write_bypass(result_out.getvalue()) stdout_pipe.write_bypass( - '\ninterrupted stdout:\n{}\n'.format(stdout_pipe.output)) + '\ninterrupted stdout:\n{}\n'.format(stdout_pipe.output())) stderr_pipe.write_bypass( - '\ninterrupted stderr:\n{}\n'.format(stderr_pipe.output)) + '\ninterrupted stderr:\n{}\n'.format(stderr_pipe.output())) os._exit(1) signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGSEGV, fault_handler) @@ -212,7 +200,7 @@ class Runner(object): # re-raise the exception after forcing the with-block to end raise result.set_output( - augmented_case.case, stdout_pipe.output, stderr_pipe.output) + augmented_case.case, stdout_pipe.output(), stderr_pipe.output()) sys.stdout.write(result_out.getvalue()) sys.stdout.flush() result_out.truncate(0) diff --git a/src/python/grpcio/tests/tests.json b/src/python/grpcio/tests/tests.json index 388d040d5c..84870aaa5c 100644 --- a/src/python/grpcio/tests/tests.json +++ b/src/python/grpcio/tests/tests.json @@ -12,31 +12,22 @@ "_core_over_links_base_interface_test.SyncEasyTest", "_core_over_links_base_interface_test.SyncPeasyTest", "_crust_over_core_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest", - "_crust_over_core_face_interface_test.DynamicInvokerEventInvocationSynchronousEventServiceTest", "_crust_over_core_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest", "_crust_over_core_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest", - "_crust_over_core_face_interface_test.GenericInvokerEventInvocationSynchronousEventServiceTest", "_crust_over_core_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest", "_crust_over_core_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest", - "_crust_over_core_face_interface_test.MultiCallableInvokerEventInvocationSynchronousEventServiceTest", "_crust_over_core_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest", "_crust_over_core_over_links_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest", - "_crust_over_core_over_links_face_interface_test.DynamicInvokerEventInvocationSynchronousEventServiceTest", "_crust_over_core_over_links_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest", "_crust_over_core_over_links_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest", - "_crust_over_core_over_links_face_interface_test.GenericInvokerEventInvocationSynchronousEventServiceTest", "_crust_over_core_over_links_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest", "_crust_over_core_over_links_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest", - "_crust_over_core_over_links_face_interface_test.MultiCallableInvokerEventInvocationSynchronousEventServiceTest", "_crust_over_core_over_links_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest", "_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest", - "_face_interface_test.DynamicInvokerEventInvocationSynchronousEventServiceTest", "_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest", "_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest", - "_face_interface_test.GenericInvokerEventInvocationSynchronousEventServiceTest", "_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest", "_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest", - "_face_interface_test.MultiCallableInvokerEventInvocationSynchronousEventServiceTest", "_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest", "_implementations_test.ChannelCredentialsTest", "_insecure_interop_test.InsecureInteropTest", diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_event_invocation_synchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_event_invocation_synchronous_event_service.py deleted file mode 100644 index 34db6c3e55..0000000000 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/_event_invocation_synchronous_event_service.py +++ /dev/null @@ -1,381 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Test code for the Face layer of RPC Framework.""" - -import abc -import unittest - -# test_interfaces is referenced from specification in this module. -from grpc.framework.interfaces.face import face -from tests.unit.framework.common import test_constants -from tests.unit.framework.common import test_control -from tests.unit.framework.common import test_coverage -from tests.unit.framework.interfaces.face import _3069_test_constant -from tests.unit.framework.interfaces.face import _digest -from tests.unit.framework.interfaces.face import _receiver -from tests.unit.framework.interfaces.face import _stock_service -from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import - - -class TestCase(test_coverage.Coverage, unittest.TestCase): - """A test of the Face layer of RPC Framework. - - Concrete subclasses must have an "implementation" attribute of type - test_interfaces.Implementation and an "invoker_constructor" attribute of type - _invocation.InvokerConstructor. - """ - __metaclass__ = abc.ABCMeta - - NAME = 'EventInvocationSynchronousEventServiceTest' - - def setUp(self): - """See unittest.TestCase.setUp for full specification. - - Overriding implementations must call this implementation. - """ - self._control = test_control.PauseFailControl() - self._digest = _digest.digest( - _stock_service.STOCK_TEST_SERVICE, self._control, None) - - generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate( - self._digest.methods, self._digest.event_method_implementations, None) - self._invoker = self.invoker_constructor.construct_invoker( - generic_stub, dynamic_stubs, self._digest.methods) - - def tearDown(self): - """See unittest.TestCase.tearDown for full specification. - - Overriding implementations must call this implementation. - """ - self._invoker = None - self.implementation.destantiate(self._memo) - - def testSuccessfulUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - request = test_messages.request() - receiver = _receiver.Receiver() - - self._invoker.event(group, method)( - request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) - receiver.block_until_terminated() - response = receiver.unary_response() - - test_messages.verify(request, response, self) - - def testSuccessfulUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.unary_stream_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - request = test_messages.request() - receiver = _receiver.Receiver() - - self._invoker.event(group, method)( - request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) - receiver.block_until_terminated() - responses = receiver.stream_responses() - - test_messages.verify(request, responses, self) - - def testSuccessfulStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.stream_unary_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - receiver = _receiver.Receiver() - - call_consumer = self._invoker.event(group, method)( - receiver, receiver.abort, test_constants.LONG_TIMEOUT) - for request in requests: - call_consumer.consume(request) - call_consumer.terminate() - receiver.block_until_terminated() - response = receiver.unary_response() - - test_messages.verify(requests, response, self) - - def testSuccessfulStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.stream_stream_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - receiver = _receiver.Receiver() - - call_consumer = self._invoker.event(group, method)( - receiver, receiver.abort, test_constants.LONG_TIMEOUT) - for request in requests: - call_consumer.consume(request) - call_consumer.terminate() - receiver.block_until_terminated() - responses = receiver.stream_responses() - - test_messages.verify(requests, responses, self) - - def testSequentialInvocations(self): - # pylint: disable=cell-var-from-loop - for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - first_request = test_messages.request() - second_request = test_messages.request() - second_receiver = _receiver.Receiver() - - def make_second_invocation(): - self._invoker.event(group, method)( - second_request, second_receiver, second_receiver.abort, - test_constants.LONG_TIMEOUT) - - class FirstReceiver(_receiver.Receiver): - - def complete(self, terminal_metadata, code, details): - super(FirstReceiver, self).complete( - terminal_metadata, code, details) - make_second_invocation() - - first_receiver = FirstReceiver() - - self._invoker.event(group, method)( - first_request, first_receiver, first_receiver.abort, - test_constants.LONG_TIMEOUT) - second_receiver.block_until_terminated() - - first_response = first_receiver.unary_response() - second_response = second_receiver.unary_response() - test_messages.verify(first_request, first_response, self) - test_messages.verify(second_request, second_response, self) - - def testParallelInvocations(self): - for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - first_request = test_messages.request() - first_receiver = _receiver.Receiver() - second_request = test_messages.request() - second_receiver = _receiver.Receiver() - - self._invoker.event(group, method)( - first_request, first_receiver, first_receiver.abort, - test_constants.LONG_TIMEOUT) - self._invoker.event(group, method)( - second_request, second_receiver, second_receiver.abort, - test_constants.LONG_TIMEOUT) - first_receiver.block_until_terminated() - second_receiver.block_until_terminated() - - first_response = first_receiver.unary_response() - second_response = second_receiver.unary_response() - test_messages.verify(first_request, first_response, self) - test_messages.verify(second_request, second_response, self) - - @unittest.skip('TODO(nathaniel): implement.') - def testWaitingForSomeButNotAllParallelInvocations(self): - raise NotImplementedError() - - def testCancelledUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - request = test_messages.request() - receiver = _receiver.Receiver() - - with self._control.pause(): - call = self._invoker.event(group, method)( - request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) - call.cancel() - receiver.block_until_terminated() - - self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind) - - def testCancelledUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.unary_stream_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - request = test_messages.request() - receiver = _receiver.Receiver() - - call = self._invoker.event(group, method)( - request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) - call.cancel() - receiver.block_until_terminated() - - self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind) - - def testCancelledStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.stream_unary_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - receiver = _receiver.Receiver() - - call_consumer = self._invoker.event(group, method)( - receiver, receiver.abort, test_constants.LONG_TIMEOUT) - for request in requests: - call_consumer.consume(request) - call_consumer.cancel() - receiver.block_until_terminated() - - self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind) - - def testCancelledStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.stream_stream_messages_sequences.iteritems()): - for unused_test_messages in test_messages_sequence: - receiver = _receiver.Receiver() - - call_consumer = self._invoker.event(group, method)( - receiver, receiver.abort, test_constants.LONG_TIMEOUT) - call_consumer.cancel() - receiver.block_until_terminated() - - self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind) - - def testExpiredUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - request = test_messages.request() - receiver = _receiver.Receiver() - - with self._control.pause(): - self._invoker.event(group, method)( - request, receiver, receiver.abort, - _3069_test_constant.REALLY_SHORT_TIMEOUT) - receiver.block_until_terminated() - - self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind) - - def testExpiredUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.unary_stream_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - request = test_messages.request() - receiver = _receiver.Receiver() - - with self._control.pause(): - self._invoker.event(group, method)( - request, receiver, receiver.abort, - _3069_test_constant.REALLY_SHORT_TIMEOUT) - receiver.block_until_terminated() - - self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind) - - def testExpiredStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.stream_unary_messages_sequences.iteritems()): - for unused_test_messages in test_messages_sequence: - receiver = _receiver.Receiver() - - self._invoker.event(group, method)( - receiver, receiver.abort, _3069_test_constant.REALLY_SHORT_TIMEOUT) - receiver.block_until_terminated() - - self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind) - - def testExpiredStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.stream_stream_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - receiver = _receiver.Receiver() - - call_consumer = self._invoker.event(group, method)( - receiver, receiver.abort, _3069_test_constant.REALLY_SHORT_TIMEOUT) - for request in requests: - call_consumer.consume(request) - receiver.block_until_terminated() - - self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind) - - def testFailedUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.unary_unary_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - request = test_messages.request() - receiver = _receiver.Receiver() - - with self._control.fail(): - self._invoker.event(group, method)( - request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) - receiver.block_until_terminated() - - self.assertIs( - face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind) - - def testFailedUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.unary_stream_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - request = test_messages.request() - receiver = _receiver.Receiver() - - with self._control.fail(): - self._invoker.event(group, method)( - request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) - receiver.block_until_terminated() - - self.assertIs( - face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind) - - def testFailedStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.stream_unary_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - receiver = _receiver.Receiver() - - with self._control.fail(): - call_consumer = self._invoker.event(group, method)( - receiver, receiver.abort, test_constants.LONG_TIMEOUT) - for request in requests: - call_consumer.consume(request) - call_consumer.terminate() - receiver.block_until_terminated() - - self.assertIs( - face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind) - - def testFailedStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - self._digest.stream_stream_messages_sequences.iteritems()): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - receiver = _receiver.Receiver() - - with self._control.fail(): - call_consumer = self._invoker.event(group, method)( - receiver, receiver.abort, test_constants.LONG_TIMEOUT) - for request in requests: - call_consumer.consume(request) - call_consumer.terminate() - receiver.block_until_terminated() - - self.assertIs( - face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind) diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py b/src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py index 462829b660..06b9d77e52 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -34,14 +34,12 @@ import unittest # pylint: disable=unused-import # test_interfaces is referenced from specification in this module. from tests.unit.framework.interfaces.face import _blocking_invocation_inline_service -from tests.unit.framework.interfaces.face import _event_invocation_synchronous_event_service from tests.unit.framework.interfaces.face import _future_invocation_asynchronous_event_service from tests.unit.framework.interfaces.face import _invocation from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import _TEST_CASE_SUPERCLASSES = ( _blocking_invocation_inline_service.TestCase, - _event_invocation_synchronous_event_service.TestCase, _future_invocation_asynchronous_event_service.TestCase, ) |