From 1efb6017ec1edb26706895dfe71f4d72ea387cf4 Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Wed, 8 Jun 2016 13:06:44 -0700 Subject: Initial Python3 support Notable Changes: -Convert all str types to byte types at cython layer (ascii encoding) -Use six for packages that have different names in Python2/Python3 -By default, unit tests are compiled/run in Python2.7 and Python3.4 -Ensure MACOSX_BUILD_TARGET is at least 10.7 --- src/python/grpcio/commands.py | 2 +- src/python/grpcio/grpc/_channel.py | 10 ++-- src/python/grpcio/grpc/_common.py | 13 +++++ .../grpcio/grpc/_cython/_cygrpc/call.pyx.pxi | 7 +-- .../grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi | 24 ++------ .../grpc/_cython/_cygrpc/credentials.pxd.pxi | 2 +- .../grpc/_cython/_cygrpc/credentials.pyx.pxi | 53 ++++------------- .../grpc/_cython/_cygrpc/grpc_string.pyx.pxi | 39 +++++++++++++ .../grpcio/grpc/_cython/_cygrpc/records.pyx.pxi | 68 +++++----------------- .../grpcio/grpc/_cython/_cygrpc/server.pyx.pxi | 7 +-- src/python/grpcio/grpc/_cython/cygrpc.pyx | 1 + src/python/grpcio/grpc/_links/service.py | 6 +- src/python/grpcio/grpc/_server.py | 14 ++--- src/python/grpcio/grpc/beta/_client_adaptations.py | 17 +++--- src/python/grpcio/grpc/beta/_server_adaptations.py | 46 +++++++++------ src/python/grpcio/tests/qps/benchmark_client.py | 5 +- src/python/grpcio/tests/qps/client_runner.py | 9 ++- src/python/grpcio/tests/stress/client.py | 6 +- .../tests/unit/_adapter/_intermediary_low_test.py | 43 +++++++------- src/python/grpcio/tests/unit/_adapter/_low_test.py | 16 ++--- .../grpcio/tests/unit/_cython/cygrpc_test.py | 2 +- .../grpcio/tests/unit/_links/_transmission_test.py | 2 +- src/python/grpcio/tests/unit/_rpc_test.py | 8 ++- .../grpcio/tests/unit/beta/_beta_features_test.py | 4 +- .../grpcio/tests/unit/beta/_not_found_test.py | 2 +- .../unit/framework/interfaces/base/test_cases.py | 6 +- src/python/grpcio/tests/unit/test_common.py | 4 ++ 27 files changed, 194 insertions(+), 222 deletions(-) create mode 100644 src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi (limited to 'src') diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py index 295dab2d27..3e974eba0a 100644 --- a/src/python/grpcio/commands.py +++ b/src/python/grpcio/commands.py @@ -191,7 +191,7 @@ class BuildProtoModules(setuptools.Command): except subprocess.CalledProcessError as e: sys.stderr.write( 'warning: Command:\n{}\nMessage:\n{}\nOutput:\n{}'.format( - command, e.message, e.output)) + command, str(e), e.output)) # Generated proto directories dont include __init__.py, but # these are needed for python package resolution diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index d9eb5a4b77..555a92a363 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -85,7 +85,7 @@ def _deadline(timeout): def _unknown_code_details(unknown_cygrpc_code, details): - return b'Server sent unknown code {} and details "{}"'.format( + return 'Server sent unknown code {} and details "{}"'.format( unknown_cygrpc_code, details) @@ -142,7 +142,7 @@ def _handle_event(event, state, response_deserializer): response = _common.deserialize( serialized_response, response_deserializer) if response is None: - details = b'Exception deserializing response!' + details = 'Exception deserializing response!' _abort(state, grpc.StatusCode.INTERNAL, details) else: state.response = response @@ -186,7 +186,7 @@ def _consume_request_iterator( if state.code is None and not state.cancelled: if serialized_request is None: call.cancel() - details = b'Exception serializing request!' + details = 'Exception serializing request!' _abort(state, grpc.StatusCode.INTERNAL, details) return else: @@ -230,7 +230,7 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): if self._state.code is None: self._call.cancel() self._state.cancelled = True - _abort(self._state, grpc.StatusCode.CANCELLED, b'Cancelled!') + _abort(self._state, grpc.StatusCode.CANCELLED, 'Cancelled!') self._state.condition.notify_all() return False @@ -402,7 +402,7 @@ def _start_unary_request(request, timeout, request_serializer): if serialized_request is None: state = _RPCState( (), _EMPTY_METADATA, _EMPTY_METADATA, grpc.StatusCode.INTERNAL, - b'Exception serializing request!') + 'Exception serializing request!') rendezvous = _Rendezvous(state, None, None, deadline) return deadline, deadline_timespec, None, rendezvous else: diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py index a3fb66cd07..b8688a0149 100644 --- a/src/python/grpcio/grpc/_common.py +++ b/src/python/grpcio/grpc/_common.py @@ -97,3 +97,16 @@ def serialize(message, serializer): def deserialize(serialized_message, deserializer): return _transform(serialized_message, deserializer, 'Exception deserializing message!') + + +def _encode(s): + if isinstance(s, bytes): + return s + else: + return s.encode('ascii') + + +def fully_qualified_method(group, method): + group = _encode(group) + method = _encode(method) + return b'/' + group + b'/' + method diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi index 1bfe6344e0..a09bbc75fe 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi @@ -55,6 +55,7 @@ cdef class Call: def cancel( self, grpc_status_code error_code=GRPC_STATUS__DO_NOT_USE, details=None): + details = str_to_bytes(details) if not self.is_valid: raise ValueError("invalid call object cannot be used from Python") if (details is None) != (error_code == GRPC_STATUS__DO_NOT_USE): @@ -63,12 +64,6 @@ cdef class Call: cdef grpc_call_error result cdef char *c_details = NULL if error_code != GRPC_STATUS__DO_NOT_USE: - if isinstance(details, bytes): - pass - elif isinstance(details, basestring): - details = details.encode() - else: - raise TypeError("expected details to be str or bytes") self.references.append(details) c_details = details with nogil: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index c26bc083cf..866cff0d01 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -34,18 +34,13 @@ cdef class Channel: def __cinit__(self, target, ChannelArgs arguments=None, ChannelCredentials channel_credentials=None): + target = str_to_bytes(target) cdef grpc_channel_args *c_arguments = NULL cdef char *c_target = NULL self.c_channel = NULL self.references = [] if arguments is not None: c_arguments = &arguments.c_args - if isinstance(target, bytes): - pass - elif isinstance(target, basestring): - target = target.encode() - else: - raise TypeError("expected target to be str or bytes") c_target = target if channel_credentials is None: with nogil: @@ -62,25 +57,14 @@ cdef class Channel: def create_call(self, Call parent, int flags, CompletionQueue queue not None, method, host, Timespec deadline not None): + method = str_to_bytes(method) + host = str_to_bytes(host) if queue.is_shutting_down: raise ValueError("queue must not be shutting down or shutdown") - if isinstance(method, bytes): - pass - elif isinstance(method, basestring): - method = method.encode() - else: - raise TypeError("expected method to be str or bytes") cdef char *method_c_string = method cdef char *host_c_string = NULL - if host is None: - pass - elif isinstance(host, bytes): + if host is not None: host_c_string = host - elif isinstance(host, basestring): - host = host.encode() - host_c_string = host - else: - raise TypeError("expected host to be str, bytes, or None") cdef Call operation_call = Call() operation_call.references = [self, method, host, queue] cdef grpc_call *parent_call = NULL diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi index 19a59e08f3..d377a67520 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi @@ -54,7 +54,7 @@ cdef class ServerCredentials: cdef class CredentialsMetadataPlugin: cdef object plugin_callback - cdef str plugin_name + cdef bytes plugin_name cdef grpc_metadata_credentials_plugin make_c_plugin(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi index 1ba86457af..470382d609 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi @@ -82,7 +82,7 @@ cdef class ServerCredentials: cdef class CredentialsMetadataPlugin: - def __cinit__(self, object plugin_callback, str name): + def __cinit__(self, object plugin_callback, name): """ Args: plugin_callback (callable): Callback accepting a service URL (str/bytes) @@ -93,6 +93,7 @@ cdef class CredentialsMetadataPlugin: successful). name (str): Plugin name. """ + name = str_to_bytes(name) if not callable(plugin_callback): raise ValueError('expected callable plugin_callback') self.plugin_callback = plugin_callback @@ -129,7 +130,8 @@ cdef void plugin_get_metadata( grpc_credentials_plugin_metadata_cb cb, void *user_data) with gil: def python_callback( Metadata metadata, grpc_status_code status, - const char *error_details): + error_details): + error_details = str_to_bytes(error_details) cb(user_data, metadata.c_metadata_array.metadata, metadata.c_metadata_array.count, status, error_details) cdef CredentialsMetadataPlugin self = state @@ -148,14 +150,7 @@ def channel_credentials_google_default(): def channel_credentials_ssl(pem_root_certificates, SslPemKeyCertPair ssl_pem_key_cert_pair): - if pem_root_certificates is None: - pass - elif isinstance(pem_root_certificates, bytes): - pass - elif isinstance(pem_root_certificates, basestring): - pem_root_certificates = pem_root_certificates.encode() - else: - raise TypeError("expected str or bytes for pem_root_certificates") + pem_root_certificates = str_to_bytes(pem_root_certificates) cdef ChannelCredentials credentials = ChannelCredentials() cdef const char *c_pem_root_certificates = NULL if pem_root_certificates is not None: @@ -207,12 +202,7 @@ def call_credentials_google_compute_engine(): def call_credentials_service_account_jwt_access( json_key, Timespec token_lifetime not None): - if isinstance(json_key, bytes): - pass - elif isinstance(json_key, basestring): - json_key = json_key.encode() - else: - raise TypeError("expected json_key to be str or bytes") + json_key = str_to_bytes(json_key) cdef CallCredentials credentials = CallCredentials() cdef char *json_key_c_string = json_key with nogil: @@ -223,12 +213,7 @@ def call_credentials_service_account_jwt_access( return credentials def call_credentials_google_refresh_token(json_refresh_token): - if isinstance(json_refresh_token, bytes): - pass - elif isinstance(json_refresh_token, basestring): - json_refresh_token = json_refresh_token.encode() - else: - raise TypeError("expected json_refresh_token to be str or bytes") + json_refresh_token = str_to_bytes(json_refresh_token) cdef CallCredentials credentials = CallCredentials() cdef char *json_refresh_token_c_string = json_refresh_token with nogil: @@ -238,18 +223,8 @@ def call_credentials_google_refresh_token(json_refresh_token): return credentials def call_credentials_google_iam(authorization_token, authority_selector): - if isinstance(authorization_token, bytes): - pass - elif isinstance(authorization_token, basestring): - authorization_token = authorization_token.encode() - else: - raise TypeError("expected authorization_token to be str or bytes") - if isinstance(authority_selector, bytes): - pass - elif isinstance(authority_selector, basestring): - authority_selector = authority_selector.encode() - else: - raise TypeError("expected authority_selector to be str or bytes") + authorization_token = str_to_bytes(authorization_token) + authority_selector = str_to_bytes(authority_selector) cdef CallCredentials credentials = CallCredentials() cdef char *authorization_token_c_string = authorization_token cdef char *authority_selector_c_string = authority_selector @@ -272,16 +247,10 @@ def call_credentials_metadata_plugin(CredentialsMetadataPlugin plugin): def server_credentials_ssl(pem_root_certs, pem_key_cert_pairs, bint force_client_auth): + pem_root_certs = str_to_bytes(pem_root_certs) cdef char *c_pem_root_certs = NULL - if pem_root_certs is None: - pass - elif isinstance(pem_root_certs, bytes): - c_pem_root_certs = pem_root_certs - elif isinstance(pem_root_certs, basestring): - pem_root_certs = pem_root_certs.encode() + if pem_root_certs is not None: c_pem_root_certs = pem_root_certs - else: - raise TypeError("expected pem_root_certs to be str or bytes") pem_key_cert_pairs = list(pem_key_cert_pairs) for pair in pem_key_cert_pairs: if not isinstance(pair, SslPemKeyCertPair): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi new file mode 100644 index 0000000000..274f038e0a --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_string.pyx.pxi @@ -0,0 +1,39 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +# This function will ascii encode unicode string inputs if neccesary. +# In Python3, unicode strings are the default str type. +cdef bytes str_to_bytes(object s): + if s is None or isinstance(s, bytes): + return s + elif isinstance(s, unicode): + return s.encode('ascii') + else: + raise TypeError('Expected bytes, str, or unicode, not {}'.format(type(s))) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index e0219b0086..2e52953c0a 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -235,18 +235,13 @@ cdef class ByteBuffer: if data is None: self.c_byte_buffer = NULL return - if isinstance(data, bytes): - pass - elif isinstance(data, basestring): - data = data.encode() - elif isinstance(data, ByteBuffer): + if isinstance(data, ByteBuffer): data = (data).bytes() if data is None: self.c_byte_buffer = NULL return else: - raise TypeError("expected value to be of type str, bytes, or " - "ByteBuffer, not {}".format(type(data))) + data = str_to_bytes(data) cdef char *c_data = data cdef gpr_slice data_slice @@ -302,19 +297,8 @@ cdef class ByteBuffer: cdef class SslPemKeyCertPair: def __cinit__(self, private_key, certificate_chain): - if isinstance(private_key, bytes): - self.private_key = private_key - elif isinstance(private_key, basestring): - self.private_key = private_key.encode() - else: - raise TypeError("expected private_key to be of type str or bytes") - if isinstance(certificate_chain, bytes): - self.certificate_chain = certificate_chain - elif isinstance(certificate_chain, basestring): - self.certificate_chain = certificate_chain.encode() - else: - raise TypeError("expected certificate_chain to be of type str or bytes " - "or int") + self.private_key = str_to_bytes(private_key) + self.certificate_chain = str_to_bytes(certificate_chain) self.c_pair.private_key = self.private_key self.c_pair.certificate_chain = self.certificate_chain @@ -322,27 +306,16 @@ cdef class SslPemKeyCertPair: cdef class ChannelArg: def __cinit__(self, key, value): - if isinstance(key, bytes): - self.key = key - elif isinstance(key, basestring): - self.key = key.encode() - else: - raise TypeError("expected key to be of type str or bytes") - if isinstance(value, bytes): - self.value = value - self.c_arg.type = GRPC_ARG_STRING - self.c_arg.value.string = self.value - elif isinstance(value, basestring): - self.value = value.encode() - self.c_arg.type = GRPC_ARG_STRING - self.c_arg.value.string = self.value - elif isinstance(value, int): + self.key = str_to_bytes(key) + self.c_arg.key = self.key + if isinstance(value, int): self.value = int(value) self.c_arg.type = GRPC_ARG_INTEGER self.c_arg.value.integer = self.value else: - raise TypeError("expected value to be of type str or bytes or int") - self.c_arg.key = self.key + self.value = str_to_bytes(value) + self.c_arg.type = GRPC_ARG_STRING + self.c_arg.value.string = self.value cdef class ChannelArgs: @@ -375,18 +348,8 @@ cdef class ChannelArgs: cdef class Metadatum: def __cinit__(self, key, value): - if isinstance(key, bytes): - self._key = key - elif isinstance(key, basestring): - self._key = key.encode() - else: - raise TypeError("expected key to be of type str or bytes") - if isinstance(value, bytes): - self._value = value - elif isinstance(value, basestring): - self._value = value.encode() - else: - raise TypeError("expected value to be of type str or bytes") + self._key = str_to_bytes(key) + self._value = str_to_bytes(value) self.c_metadata.key = self._key self.c_metadata.value = self._value self.c_metadata.value_length = len(self._value) @@ -601,12 +564,7 @@ def operation_send_close_from_client(int flags): def operation_send_status_from_server( Metadata metadata, grpc_status_code code, details, int flags): - if isinstance(details, bytes): - pass - elif isinstance(details, basestring): - details = details.encode() - else: - raise TypeError("expected a str or bytes object for details") + details = str_to_bytes(details) cdef Operation op = Operation() op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER op.c_op.flags = flags diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 55948755b2..c8a73e65d6 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -103,12 +103,7 @@ cdef class Server: def add_http2_port(self, address, ServerCredentials server_credentials=None): - if isinstance(address, bytes): - pass - elif isinstance(address, basestring): - address = address.encode() - else: - raise TypeError("expected address to be a str or bytes") + address = str_to_bytes(address) self.references.append(address) cdef int result cdef char *address_c_string = address diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index 8823ea5ef5..cf146f5a04 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -35,6 +35,7 @@ import sys # TODO(atash): figure out why the coverage tool gets confused about the Cython # coverage plugin when the following files don't have a '.pxi' suffix. +include "grpc/_cython/_cygrpc/grpc_string.pyx.pxi" include "grpc/_cython/_cygrpc/call.pyx.pxi" include "grpc/_cython/_cygrpc/channel.pyx.pxi" include "grpc/_cython/_cygrpc/credentials.pyx.pxi" diff --git a/src/python/grpcio/grpc/_links/service.py b/src/python/grpcio/grpc/_links/service.py index 11310e2240..5fc4994ca0 100644 --- a/src/python/grpcio/grpc/_links/service.py +++ b/src/python/grpcio/grpc/_links/service.py @@ -33,6 +33,7 @@ import abc import enum import logging import threading +import six import time from grpc._adapter import _intermediary_low @@ -177,7 +178,10 @@ class _Kernel(object): call = service_acceptance.call call.accept(self._completion_queue, call) try: - group, method = service_acceptance.method.split(b'/')[1:3] + service_method = service_acceptance.method + if six.PY3: + service_method = service_method.decode('latin1') + group, method = service_method.split('/')[1:3] except ValueError: logging.info('Illegal path "%s"!', service_acceptance.method) return diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index aae9f48ae6..f4f6720497 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -85,7 +85,7 @@ def _abortion_code(state, code): def _details(state): - return b'' if state.details is None else state.details + return '' if state.details is None else state.details class _HandlerCallDetails( @@ -189,7 +189,7 @@ def _receive_message(state, call, request_deserializer): if request is None: _abort( state, call, cygrpc.StatusCode.internal, - b'Exception deserializing request!') + 'Exception deserializing request!') else: state.request = request state.condition.notify_all() @@ -340,7 +340,7 @@ def _unary_request(rpc_event, state, request_deserializer): state.condition.wait() if state.request is None: if state.client is _CLOSED: - details = b'"{}" requires exactly one request message.'.format( + details = '"{}" requires exactly one request message.'.format( rpc_event.request_call_details.method) # TODO(5992#issuecomment-220761992): really, what status code? _abort( @@ -363,7 +363,7 @@ def _call_behavior(rpc_event, state, behavior, argument, request_deserializer): except Exception as e: # pylint: disable=broad-except with state.condition: if e not in state.rpc_errors: - details = b'Exception calling application: {}'.format(e) + details = 'Exception calling application: {}'.format(e) logging.exception(details) _abort( state, rpc_event.operation_call, cygrpc.StatusCode.unknown, details) @@ -378,7 +378,7 @@ def _take_response_from_response_iterator(rpc_event, state, response_iterator): except Exception as e: # pylint: disable=broad-except with state.condition: if e not in state.rpc_errors: - details = b'Exception iterating responses: {}'.format(e) + details = 'Exception iterating responses: {}'.format(e) logging.exception(details) _abort( state, rpc_event.operation_call, cygrpc.StatusCode.unknown, details) @@ -391,7 +391,7 @@ def _serialize_response(rpc_event, state, response, response_serializer): with state.condition: _abort( state, rpc_event.operation_call, cygrpc.StatusCode.internal, - b'Failed to serialize response!') + 'Failed to serialize response!') return None else: return serialized_response @@ -544,7 +544,7 @@ def _handle_unrecognized_method(rpc_event): cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), cygrpc.operation_send_status_from_server( _EMPTY_METADATA, cygrpc.StatusCode.unimplemented, - b'Method not found!', _EMPTY_FLAGS), + 'Method not found!', _EMPTY_FLAGS), ) rpc_state = _RPCState() rpc_event.operation_call.start_batch( diff --git a/src/python/grpcio/grpc/beta/_client_adaptations.py b/src/python/grpcio/grpc/beta/_client_adaptations.py index 621fcf2174..024808c540 100644 --- a/src/python/grpcio/grpc/beta/_client_adaptations.py +++ b/src/python/grpcio/grpc/beta/_client_adaptations.py @@ -30,6 +30,7 @@ """Translates gRPC's client-side API into gRPC's client-side Beta API.""" import grpc +from grpc import _common from grpc._cython import cygrpc from grpc.beta import interfaces from grpc.framework.common import cardinality @@ -48,10 +49,6 @@ _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS = { } -def _fully_qualified_method(group, method): - return b'/{}/{}'.format(group, method) - - def _effective_metadata(metadata, metadata_transformer): non_none_metadata = () if metadata is None else metadata if metadata_transformer is None: @@ -184,7 +181,7 @@ def _blocking_unary_unary( metadata_transformer, request, request_serializer, response_deserializer): try: multi_callable = channel.unary_unary( - _fully_qualified_method(group, method), + _common.fully_qualified_method(group, method), request_serializer=request_serializer, response_deserializer=response_deserializer) effective_metadata = _effective_metadata(metadata, metadata_transformer) @@ -205,7 +202,7 @@ def _future_unary_unary( channel, group, method, timeout, protocol_options, metadata, metadata_transformer, request, request_serializer, response_deserializer): multi_callable = channel.unary_unary( - _fully_qualified_method(group, method), + _common.fully_qualified_method(group, method), request_serializer=request_serializer, response_deserializer=response_deserializer) effective_metadata = _effective_metadata(metadata, metadata_transformer) @@ -219,7 +216,7 @@ def _unary_stream( channel, group, method, timeout, protocol_options, metadata, metadata_transformer, request, request_serializer, response_deserializer): multi_callable = channel.unary_stream( - _fully_qualified_method(group, method), + _common.fully_qualified_method(group, method), request_serializer=request_serializer, response_deserializer=response_deserializer) effective_metadata = _effective_metadata(metadata, metadata_transformer) @@ -235,7 +232,7 @@ def _blocking_stream_unary( response_deserializer): try: multi_callable = channel.stream_unary( - _fully_qualified_method(group, method), + _common.fully_qualified_method(group, method), request_serializer=request_serializer, response_deserializer=response_deserializer) effective_metadata = _effective_metadata(metadata, metadata_transformer) @@ -257,7 +254,7 @@ def _future_stream_unary( metadata_transformer, request_iterator, request_serializer, response_deserializer): multi_callable = channel.stream_unary( - _fully_qualified_method(group, method), + _common.fully_qualified_method(group, method), request_serializer=request_serializer, response_deserializer=response_deserializer) effective_metadata = _effective_metadata(metadata, metadata_transformer) @@ -272,7 +269,7 @@ def _stream_stream( metadata_transformer, request_iterator, request_serializer, response_deserializer): multi_callable = channel.stream_stream( - _fully_qualified_method(group, method), + _common.fully_qualified_method(group, method), request_serializer=request_serializer, response_deserializer=response_deserializer) effective_metadata = _effective_metadata(metadata, metadata_transformer) diff --git a/src/python/grpcio/grpc/beta/_server_adaptations.py b/src/python/grpcio/grpc/beta/_server_adaptations.py index 52eadf2315..79e6ca87eb 100644 --- a/src/python/grpcio/grpc/beta/_server_adaptations.py +++ b/src/python/grpcio/grpc/beta/_server_adaptations.py @@ -33,6 +33,7 @@ import collections import threading import grpc +from grpc import _common from grpc.beta import interfaces from grpc.framework.common import cardinality from grpc.framework.common import style @@ -287,36 +288,43 @@ def _simple_method_handler( None, _adapt_stream_stream_event(implementation.stream_stream_event)) +def _flatten_method_pair_map(method_pair_map): + method_pair_map = method_pair_map or {} + flat_map = {} + for method_pair in method_pair_map: + method = _common.fully_qualified_method(method_pair[0], method_pair[1]) + flat_map[method] = method_pair_map[method_pair] + return flat_map + + class _GenericRpcHandler(grpc.GenericRpcHandler): def __init__( self, method_implementations, multi_method_implementation, request_deserializers, response_serializers): - self._method_implementations = method_implementations + self._method_implementations = _flatten_method_pair_map( + method_implementations) + self._request_deserializers = _flatten_method_pair_map( + request_deserializers) + self._response_serializers = _flatten_method_pair_map( + response_serializers) self._multi_method_implementation = multi_method_implementation - self._request_deserializers = request_deserializers or {} - self._response_serializers = response_serializers or {} def service(self, handler_call_details): - try: - group_name, method_name = handler_call_details.method.split(b'/')[1:3] - except ValueError: + method_implementation = self._method_implementations.get( + handler_call_details.method) + if method_implementation is not None: + return _simple_method_handler( + method_implementation, + self._request_deserializers.get(handler_call_details.method), + self._response_serializers.get(handler_call_details.method)) + elif self._multi_method_implementation is None: return None else: - method_implementation = self._method_implementations.get( - (group_name, method_name,)) - if method_implementation is not None: - return _simple_method_handler( - method_implementation, - self._request_deserializers.get((group_name, method_name,)), - self._response_serializers.get((group_name, method_name,))) - elif self._multi_method_implementation is None: + try: + return None #TODO(nathaniel): call the multimethod. + except face.NoSuchMethodError: return None - else: - try: - return None #TODO(nathaniel): call the multimethod. - except face.NoSuchMethodError: - return None class _Server(interfaces.Server): diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py index e2922347f9..1b100bb168 100644 --- a/src/python/grpcio/tests/qps/benchmark_client.py +++ b/src/python/grpcio/tests/qps/benchmark_client.py @@ -31,12 +31,9 @@ import abc import time -try: - import Queue as queue # Python 2.x -except ImportError: - import queue # Python 3 from concurrent import futures +from six.moves import queue from grpc.beta import implementations from grpc.framework.interfaces.face import face diff --git a/src/python/grpcio/tests/qps/client_runner.py b/src/python/grpcio/tests/qps/client_runner.py index 2d1d981733..1fd58687ad 100644 --- a/src/python/grpcio/tests/qps/client_runner.py +++ b/src/python/grpcio/tests/qps/client_runner.py @@ -34,7 +34,7 @@ ClientRunner invokes either periodically or in response to some event. """ import abc -import thread +import threading import time @@ -61,15 +61,18 @@ class OpenLoopClientRunner(ClientRunner): super(OpenLoopClientRunner, self).__init__(client) self._is_running = False self._interval_generator = interval_generator + self._dispatch_thread = threading.Thread( + target=self._dispatch_requests, args=()) def start(self): self._is_running = True self._client.start() - thread.start_new_thread(self._dispatch_requests, ()) - + self._dispatch_thread.start() + def stop(self): self._is_running = False self._client.stop() + self._dispatch_thread.join() self._client = None def _dispatch_requests(self): diff --git a/src/python/grpcio/tests/stress/client.py b/src/python/grpcio/tests/stress/client.py index e2e016760c..0de2532cd8 100644 --- a/src/python/grpcio/tests/stress/client.py +++ b/src/python/grpcio/tests/stress/client.py @@ -30,10 +30,10 @@ """Entry point for running stress tests.""" import argparse -import Queue import threading from grpc.beta import implementations +from six.moves import queue from src.proto.grpc.testing import metrics_pb2 from src.proto.grpc.testing import test_pb2 @@ -94,7 +94,7 @@ def run_test(args): test_cases = _parse_weighted_test_cases(args.test_cases) test_servers = args.server_addresses.split(',') # Propagate any client exceptions with a queue - exception_queue = Queue.Queue() + exception_queue = queue.Queue() stop_event = threading.Event() hist = histogram.Histogram(1, 1) runners = [] @@ -121,7 +121,7 @@ def run_test(args): if timeout_secs < 0: timeout_secs = None raise exception_queue.get(block=True, timeout=timeout_secs) - except Queue.Empty: + except queue.Empty: # No exceptions thrown, success pass finally: diff --git a/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py b/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py index 22d4b019c7..09ebdeff33 100644 --- a/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py +++ b/src/python/grpcio/tests/unit/_adapter/_intermediary_low_test.py @@ -164,15 +164,15 @@ class EchoTest(unittest.TestCase): self.assertIsNotNone(service_accepted) self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED) self.assertIs(service_accepted.tag, service_tag) - self.assertEqual(method, service_accepted.service_acceptance.method) - self.assertEqual(self.host, service_accepted.service_acceptance.host) + self.assertEqual(method.encode(), service_accepted.service_acceptance.method) + self.assertEqual(self.host.encode(), service_accepted.service_acceptance.host) self.assertIsNotNone(service_accepted.service_acceptance.call) metadata = dict(service_accepted.metadata) - self.assertIn(client_metadata_key, metadata) - self.assertEqual(client_metadata_value, metadata[client_metadata_key]) - self.assertIn(client_binary_metadata_key, metadata) + self.assertIn(client_metadata_key.encode(), metadata) + self.assertEqual(client_metadata_value.encode(), metadata[client_metadata_key.encode()]) + self.assertIn(client_binary_metadata_key.encode(), metadata) self.assertEqual(client_binary_metadata_value, - metadata[client_binary_metadata_key]) + metadata[client_binary_metadata_key.encode()]) server_call = service_accepted.service_acceptance.call server_call.accept(self.server_completion_queue, finish_tag) server_call.add_metadata(server_leading_metadata_key, @@ -186,12 +186,12 @@ class EchoTest(unittest.TestCase): self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind) self.assertEqual(metadata_tag, metadata_accepted.tag) metadata = dict(metadata_accepted.metadata) - self.assertIn(server_leading_metadata_key, metadata) - self.assertEqual(server_leading_metadata_value, - metadata[server_leading_metadata_key]) - self.assertIn(server_leading_binary_metadata_key, metadata) + self.assertIn(server_leading_metadata_key.encode(), metadata) + self.assertEqual(server_leading_metadata_value.encode(), + metadata[server_leading_metadata_key.encode()]) + self.assertIn(server_leading_binary_metadata_key.encode(), metadata) self.assertEqual(server_leading_binary_metadata_value, - metadata[server_leading_binary_metadata_key]) + metadata[server_leading_binary_metadata_key.encode()]) for datum in test_data: client_call.write(datum, write_tag, _low.WriteFlags.WRITE_NO_COMPRESS) @@ -277,17 +277,17 @@ class EchoTest(unittest.TestCase): self.assertIsNone(read_accepted.bytes) self.assertEqual(_low.Event.Kind.FINISH, finish_accepted.kind) self.assertEqual(finish_tag, finish_accepted.tag) - self.assertEqual(_low.Status(_low.Code.OK, details), finish_accepted.status) + self.assertEqual(_low.Status(_low.Code.OK, details.encode()), finish_accepted.status) metadata = dict(finish_accepted.metadata) - self.assertIn(server_trailing_metadata_key, metadata) - self.assertEqual(server_trailing_metadata_value, - metadata[server_trailing_metadata_key]) - self.assertIn(server_trailing_binary_metadata_key, metadata) + self.assertIn(server_trailing_metadata_key.encode(), metadata) + self.assertEqual(server_trailing_metadata_value.encode(), + metadata[server_trailing_metadata_key.encode()]) + self.assertIn(server_trailing_binary_metadata_key.encode(), metadata) self.assertEqual(server_trailing_binary_metadata_value, - metadata[server_trailing_binary_metadata_key]) + metadata[server_trailing_binary_metadata_key.encode()]) self.assertSetEqual(set(key for key, _ in finish_accepted.metadata), - set((server_trailing_metadata_key, - server_trailing_binary_metadata_key,))) + set((server_trailing_metadata_key.encode(), + server_trailing_binary_metadata_key.encode(),))) self.assertSequenceEqual(test_data, server_data) self.assertSequenceEqual(test_data, client_data) @@ -302,7 +302,8 @@ class EchoTest(unittest.TestCase): self._perform_echo_test([_BYTE_SEQUENCE]) def testManyOneByteEchoes(self): - self._perform_echo_test(_BYTE_SEQUENCE) + self._perform_echo_test( + [_BYTE_SEQUENCE[i:i+1] for i in range(len(_BYTE_SEQUENCE))]) def testManyManyByteEchoes(self): self._perform_echo_test(_BYTE_SEQUENCE_SEQUENCE) @@ -409,7 +410,7 @@ class CancellationTest(unittest.TestCase): finish_event = self.client_events.get() self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind) - self.assertEqual(_low.Status(_low.Code.CANCELLED, 'Cancelled'), + self.assertEqual(_low.Status(_low.Code.CANCELLED, b'Cancelled'), finish_event.status) self.assertSequenceEqual(test_data, server_data) diff --git a/src/python/grpcio/tests/unit/_adapter/_low_test.py b/src/python/grpcio/tests/unit/_adapter/_low_test.py index ec46617996..e09a1f2564 100644 --- a/src/python/grpcio/tests/unit/_adapter/_low_test.py +++ b/src/python/grpcio/tests/unit/_adapter/_low_test.py @@ -148,11 +148,11 @@ class InsecureServerInsecureClient(unittest.TestCase): # Check that Python's user agent string is a part of the full user agent # string received_initial_metadata_dict = dict(received_initial_metadata) - self.assertIn('user-agent', received_initial_metadata_dict) - self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__), - received_initial_metadata_dict['user-agent']) - self.assertEqual(method, request_event.call_details.method) - self.assertEqual(host, request_event.call_details.host) + self.assertIn(b'user-agent', received_initial_metadata_dict) + self.assertIn('Python-gRPC-{}'.format(_grpcio_metadata.__version__).encode(), + received_initial_metadata_dict[b'user-agent']) + self.assertEqual(method.encode(), request_event.call_details.method) + self.assertEqual(host.encode(), request_event.call_details.host) self.assertLess(abs(deadline - request_event.call_details.deadline), deadline_tolerance) @@ -198,12 +198,12 @@ class InsecureServerInsecureClient(unittest.TestCase): test_common.metadata_transmitted(server_initial_metadata, client_result.initial_metadata)) elif client_result.type == _types.OpType.RECV_MESSAGE: - self.assertEqual(response, client_result.message) + self.assertEqual(response.encode(), client_result.message) elif client_result.type == _types.OpType.RECV_STATUS_ON_CLIENT: self.assertTrue( test_common.metadata_transmitted(server_trailing_metadata, client_result.trailing_metadata)) - self.assertEqual(server_status_details, client_result.status.details) + self.assertEqual(server_status_details.encode(), client_result.status.details) self.assertEqual(server_status_code, client_result.status.code) self.assertEqual(set([ _types.OpType.SEND_INITIAL_METADATA, @@ -220,7 +220,7 @@ class InsecureServerInsecureClient(unittest.TestCase): self.assertNotIn(client_result.type, found_server_op_types) found_server_op_types.add(server_result.type) if server_result.type == _types.OpType.RECV_MESSAGE: - self.assertEqual(request, server_result.message) + self.assertEqual(request.encode(), server_result.message) elif server_result.type == _types.OpType.RECV_CLOSE_ON_SERVER: self.assertFalse(server_result.cancelled) self.assertEqual(set([ diff --git a/src/python/grpcio/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio/tests/unit/_cython/cygrpc_test.py index 4039c1b99b..a006a20ce3 100644 --- a/src/python/grpcio/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio/tests/unit/_cython/cygrpc_test.py @@ -37,7 +37,7 @@ from tests.unit import test_common from tests.unit import resources -_SSL_HOST_OVERRIDE = 'foo.test.google.fr' +_SSL_HOST_OVERRIDE = b'foo.test.google.fr' _CALL_CREDENTIALS_METADATA_KEY = 'call-creds-key' _CALL_CREDENTIALS_METADATA_VALUE = 'call-creds-value' _EMPTY_FLAGS = 0 diff --git a/src/python/grpcio/tests/unit/_links/_transmission_test.py b/src/python/grpcio/tests/unit/_links/_transmission_test.py index 888684d197..1f6edd18ca 100644 --- a/src/python/grpcio/tests/unit/_links/_transmission_test.py +++ b/src/python/grpcio/tests/unit/_links/_transmission_test.py @@ -153,7 +153,7 @@ class RoundTripTest(unittest.TestCase): invocation_mate.tickets()[-1].termination, links.Ticket.Termination.COMPLETION) self.assertIs(invocation_mate.tickets()[-1].code, test_code) - self.assertEqual(invocation_mate.tickets()[-1].message, test_message) + self.assertEqual(invocation_mate.tickets()[-1].message, test_message.encode()) def _perform_scenario_test(self, scenario): test_operation_id = object() diff --git a/src/python/grpcio/tests/unit/_rpc_test.py b/src/python/grpcio/tests/unit/_rpc_test.py index 1c7a14c5d0..b33bff490c 100644 --- a/src/python/grpcio/tests/unit/_rpc_test.py +++ b/src/python/grpcio/tests/unit/_rpc_test.py @@ -29,6 +29,8 @@ """Test of gRPC Python's application-layer API.""" +from __future__ import division + import itertools import threading import unittest @@ -41,9 +43,9 @@ from tests.unit.framework.common import test_constants from tests.unit.framework.common import test_control _SERIALIZE_REQUEST = lambda bytestring: bytestring * 2 -_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) / 2:] +_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2:] _SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3 -_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) / 3] +_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) // 3] _UNARY_UNARY = b'/test/UnaryUnary' _UNARY_STREAM = b'/test/UnaryStream' @@ -189,7 +191,7 @@ class RPCTest(unittest.TestCase): self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),)) self._server.start() - self._channel = grpc.insecure_channel(b'localhost:%d' % port) + self._channel = grpc.insecure_channel('localhost:%d' % port) # TODO(nathaniel): Why is this necessary, and only in some development # environments? diff --git a/src/python/grpcio/tests/unit/beta/_beta_features_test.py b/src/python/grpcio/tests/unit/beta/_beta_features_test.py index bb2893a21b..3a9701b8eb 100644 --- a/src/python/grpcio/tests/unit/beta/_beta_features_test.py +++ b/src/python/grpcio/tests/unit/beta/_beta_features_test.py @@ -42,8 +42,8 @@ from tests.unit.framework.common import test_constants _SERVER_HOST_OVERRIDE = 'foo.test.google.fr' -_PER_RPC_CREDENTIALS_METADATA_KEY = 'my-call-credentials-metadata-key' -_PER_RPC_CREDENTIALS_METADATA_VALUE = 'my-call-credentials-metadata-value' +_PER_RPC_CREDENTIALS_METADATA_KEY = b'my-call-credentials-metadata-key' +_PER_RPC_CREDENTIALS_METADATA_VALUE = b'my-call-credentials-metadata-value' _GROUP = 'group' _UNARY_UNARY = 'unary-unary' diff --git a/src/python/grpcio/tests/unit/beta/_not_found_test.py b/src/python/grpcio/tests/unit/beta/_not_found_test.py index 44fcd1e13c..37b8c49120 100644 --- a/src/python/grpcio/tests/unit/beta/_not_found_test.py +++ b/src/python/grpcio/tests/unit/beta/_not_found_test.py @@ -61,7 +61,7 @@ class NotFoundTest(unittest.TestCase): def test_future_stream_unary_not_found(self): rpc_future = self._generic_stub.future_stream_unary( - 'grupe', 'mevvod', b'def', test_constants.LONG_TIMEOUT) + 'grupe', 'mevvod', [b'def'], test_constants.LONG_TIMEOUT) with self.assertRaises(face.LocalError) as exception_assertion_context: rpc_future.result() self.assertIs( diff --git a/src/python/grpcio/tests/unit/framework/interfaces/base/test_cases.py b/src/python/grpcio/tests/unit/framework/interfaces/base/test_cases.py index 4f8e26c9a2..5d16bf98be 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/base/test_cases.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/base/test_cases.py @@ -29,6 +29,8 @@ """Tests of the base interface of RPC Framework.""" +from __future__ import division + import logging import random import threading @@ -54,13 +56,13 @@ class _Serialization(test_interfaces.Serialization): return request + request def deserialize_request(self, serialized_request): - return serialized_request[:len(serialized_request) / 2] + return serialized_request[:len(serialized_request) // 2] def serialize_response(self, response): return response * 3 def deserialize_response(self, serialized_response): - return serialized_response[2 * len(serialized_response) / 3:] + return serialized_response[2 * len(serialized_response) // 3:] def _advance(quadruples, operator, controller): diff --git a/src/python/grpcio/tests/unit/test_common.py b/src/python/grpcio/tests/unit/test_common.py index 7b4d20dccb..b779f65e7e 100644 --- a/src/python/grpcio/tests/unit/test_common.py +++ b/src/python/grpcio/tests/unit/test_common.py @@ -61,6 +61,10 @@ def metadata_transmitted(original_metadata, transmitted_metadata): original = collections.defaultdict(list) for key_value_pair in original_metadata: key, value = tuple(key_value_pair) + if not isinstance(key, bytes): + key = key.encode() + if not isinstance(value, bytes): + value = value.encode() original[key].append(value) transmitted = collections.defaultdict(list) for key_value_pair in transmitted_metadata: -- cgit v1.2.3