diff options
author | 2018-01-09 13:49:37 -0800 | |
---|---|---|
committer | 2018-01-09 13:49:37 -0800 | |
commit | b04efac6a5ac0842bc2b6c3e72b786b646644ac0 (patch) | |
tree | ca70be08a55ea3aeade97e226b232b1786c63656 /src/python | |
parent | b928fd496efbe2265169077bcfdf684f7ad6aea3 (diff) | |
parent | 0ea629c61ec70a35075e800bc3f85651f00e746f (diff) |
Merge branch 'master' of https://github.com/grpc/grpc into channel-tracing
Diffstat (limited to 'src/python')
98 files changed, 2161 insertions, 1382 deletions
diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py index 5f28e9101f..4c2ebaeaea 100644 --- a/src/python/grpcio/commands.py +++ b/src/python/grpcio/commands.py @@ -104,8 +104,8 @@ def _get_grpc_custom_bdist(decorated_basename, target_bdist_basename): with open(bdist_path, 'w') as bdist_file: bdist_file.write(bdist_data) except IOError as error: - raise CommandError('{}\n\nCould not write grpcio bdist: {}' - .format(traceback.format_exc(), error.message)) + raise CommandError('{}\n\nCould not write grpcio bdist: {}'.format( + traceback.format_exc(), error.message)) return bdist_path @@ -141,7 +141,8 @@ class SphinxDocumentation(setuptools.Command): with open(glossary_filepath, 'a') as glossary_filepath: glossary_filepath.write(API_GLOSSARY) sphinx.main( - ['', os.path.join('doc', 'src'), os.path.join('doc', 'build')]) + ['', os.path.join('doc', 'src'), + os.path.join('doc', 'build')]) class BuildProjectMetadata(setuptools.Command): @@ -189,10 +190,11 @@ def check_and_update_cythonization(extensions): for source in extension.sources: base, file_ext = os.path.splitext(source) if file_ext == '.pyx': - generated_pyx_source = next((base + gen_ext - for gen_ext in ('.c', '.cpp',) - if os.path.isfile(base + gen_ext)), - None) + generated_pyx_source = next( + (base + gen_ext for gen_ext in ( + '.c', + '.cpp', + ) if os.path.isfile(base + gen_ext)), None) if generated_pyx_source: generated_pyx_sources.append(generated_pyx_source) else: @@ -299,10 +301,10 @@ class Gather(setuptools.Command): """Command to gather project dependencies.""" description = 'gather dependencies for grpcio' - user_options = [ - ('test', 't', 'flag indicating to gather test dependencies'), - ('install', 'i', 'flag indicating to gather install dependencies') - ] + user_options = [('test', 't', + 'flag indicating to gather test dependencies'), + ('install', 'i', + 'flag indicating to gather install dependencies')] def initialize_options(self): self.test = False diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 8b913ac949..db410d307b 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -1376,8 +1376,8 @@ def metadata_call_credentials(metadata_plugin, name=None): A CallCredentials. """ from grpc import _plugin_wrapping # pylint: disable=cyclic-import - return _plugin_wrapping.metadata_plugin_call_credentials(metadata_plugin, - name) + return _plugin_wrapping.metadata_plugin_call_credentials( + metadata_plugin, name) def access_token_call_credentials(access_token): @@ -1631,25 +1631,57 @@ def server(thread_pool, ################################### __all__ ################################# __all__ = ( - 'FutureTimeoutError', 'FutureCancelledError', 'Future', - 'ChannelConnectivity', 'StatusCode', 'RpcError', 'RpcContext', 'Call', - 'ChannelCredentials', 'CallCredentials', 'AuthMetadataContext', - 'AuthMetadataPluginCallback', 'AuthMetadataPlugin', 'ClientCallDetails', - 'ServerCertificateConfiguration', 'ServerCredentials', - 'UnaryUnaryMultiCallable', 'UnaryStreamMultiCallable', - 'StreamUnaryMultiCallable', 'StreamStreamMultiCallable', - 'UnaryUnaryClientInterceptor', 'UnaryStreamClientInterceptor', - 'StreamUnaryClientInterceptor', 'StreamStreamClientInterceptor', 'Channel', - 'ServicerContext', 'RpcMethodHandler', 'HandlerCallDetails', - 'GenericRpcHandler', 'ServiceRpcHandler', 'Server', 'ServerInterceptor', - 'unary_unary_rpc_method_handler', 'unary_stream_rpc_method_handler', - 'stream_unary_rpc_method_handler', 'stream_stream_rpc_method_handler', - 'method_handlers_generic_handler', 'ssl_channel_credentials', - 'metadata_call_credentials', 'access_token_call_credentials', - 'composite_call_credentials', 'composite_channel_credentials', - 'ssl_server_credentials', 'ssl_server_certificate_configuration', - 'dynamic_ssl_server_credentials', 'channel_ready_future', - 'insecure_channel', 'secure_channel', 'intercept_channel', 'server',) + 'FutureTimeoutError', + 'FutureCancelledError', + 'Future', + 'ChannelConnectivity', + 'StatusCode', + 'RpcError', + 'RpcContext', + 'Call', + 'ChannelCredentials', + 'CallCredentials', + 'AuthMetadataContext', + 'AuthMetadataPluginCallback', + 'AuthMetadataPlugin', + 'ClientCallDetails', + 'ServerCertificateConfiguration', + 'ServerCredentials', + 'UnaryUnaryMultiCallable', + 'UnaryStreamMultiCallable', + 'StreamUnaryMultiCallable', + 'StreamStreamMultiCallable', + 'UnaryUnaryClientInterceptor', + 'UnaryStreamClientInterceptor', + 'StreamUnaryClientInterceptor', + 'StreamStreamClientInterceptor', + 'Channel', + 'ServicerContext', + 'RpcMethodHandler', + 'HandlerCallDetails', + 'GenericRpcHandler', + 'ServiceRpcHandler', + 'Server', + 'ServerInterceptor', + 'unary_unary_rpc_method_handler', + 'unary_stream_rpc_method_handler', + 'stream_unary_rpc_method_handler', + 'stream_stream_rpc_method_handler', + 'method_handlers_generic_handler', + 'ssl_channel_credentials', + 'metadata_call_credentials', + 'access_token_call_credentials', + 'composite_call_credentials', + 'composite_channel_credentials', + 'ssl_server_credentials', + 'ssl_server_certificate_configuration', + 'dynamic_ssl_server_credentials', + 'channel_ready_future', + 'insecure_channel', + 'secure_channel', + 'intercept_channel', + 'server', +) ############################### Extension Shims ################################ diff --git a/src/python/grpcio/grpc/_auth.py b/src/python/grpcio/grpc/_auth.py index 9a339b5900..c17824563d 100644 --- a/src/python/grpcio/grpc/_auth.py +++ b/src/python/grpcio/grpc/_auth.py @@ -54,7 +54,9 @@ class GoogleCallCredentials(grpc.AuthMetadataPlugin): if self._is_jwt: future = self._pool.submit( self._credentials.get_access_token, - additional_claims={'aud': context.service_url}) + additional_claims={ + 'aud': context.service_url + }) else: future = self._pool.submit(self._credentials.get_access_token) future.add_done_callback(_create_get_token_callback(callback)) diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index d7456a3dd1..24be042f61 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -29,24 +29,32 @@ _USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__) _EMPTY_FLAGS = 0 _INFINITE_FUTURE = cygrpc.Timespec(float('+inf')) -_UNARY_UNARY_INITIAL_DUE = (cygrpc.OperationType.send_initial_metadata, - cygrpc.OperationType.send_message, - cygrpc.OperationType.send_close_from_client, - cygrpc.OperationType.receive_initial_metadata, - cygrpc.OperationType.receive_message, - cygrpc.OperationType.receive_status_on_client,) -_UNARY_STREAM_INITIAL_DUE = (cygrpc.OperationType.send_initial_metadata, - cygrpc.OperationType.send_message, - cygrpc.OperationType.send_close_from_client, - cygrpc.OperationType.receive_initial_metadata, - cygrpc.OperationType.receive_status_on_client,) -_STREAM_UNARY_INITIAL_DUE = (cygrpc.OperationType.send_initial_metadata, - cygrpc.OperationType.receive_initial_metadata, - cygrpc.OperationType.receive_message, - cygrpc.OperationType.receive_status_on_client,) -_STREAM_STREAM_INITIAL_DUE = (cygrpc.OperationType.send_initial_metadata, - cygrpc.OperationType.receive_initial_metadata, - cygrpc.OperationType.receive_status_on_client,) +_UNARY_UNARY_INITIAL_DUE = ( + cygrpc.OperationType.send_initial_metadata, + cygrpc.OperationType.send_message, + cygrpc.OperationType.send_close_from_client, + cygrpc.OperationType.receive_initial_metadata, + cygrpc.OperationType.receive_message, + cygrpc.OperationType.receive_status_on_client, +) +_UNARY_STREAM_INITIAL_DUE = ( + cygrpc.OperationType.send_initial_metadata, + cygrpc.OperationType.send_message, + cygrpc.OperationType.send_close_from_client, + cygrpc.OperationType.receive_initial_metadata, + cygrpc.OperationType.receive_status_on_client, +) +_STREAM_UNARY_INITIAL_DUE = ( + cygrpc.OperationType.send_initial_metadata, + cygrpc.OperationType.receive_initial_metadata, + cygrpc.OperationType.receive_message, + cygrpc.OperationType.receive_status_on_client, +) +_STREAM_STREAM_INITIAL_DUE = ( + cygrpc.OperationType.send_initial_metadata, + cygrpc.OperationType.receive_initial_metadata, + cygrpc.OperationType.receive_status_on_client, +) _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( 'Exception calling channel subscription callback!') @@ -129,12 +137,12 @@ def _abort(state, code, details): def _handle_event(event, state, response_deserializer): callbacks = [] for batch_operation in event.batch_operations: - operation_type = batch_operation.type + operation_type = batch_operation.type() state.due.remove(operation_type) if operation_type == cygrpc.OperationType.receive_initial_metadata: - state.initial_metadata = batch_operation.received_metadata + state.initial_metadata = batch_operation.initial_metadata() elif operation_type == cygrpc.OperationType.receive_message: - serialized_response = batch_operation.received_message.bytes() + serialized_response = batch_operation.message() if serialized_response is not None: response = _common.deserialize(serialized_response, response_deserializer) @@ -144,18 +152,17 @@ def _handle_event(event, state, response_deserializer): else: state.response = response elif operation_type == cygrpc.OperationType.receive_status_on_client: - state.trailing_metadata = batch_operation.received_metadata + state.trailing_metadata = batch_operation.trailing_metadata() if state.code is None: code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get( - batch_operation.received_status_code) + batch_operation.code()) if code is None: state.code = grpc.StatusCode.UNKNOWN state.details = _unknown_code_details( - batch_operation.received_status_code, - batch_operation.received_status_details) + code, batch_operation.details()) else: state.code = code - state.details = batch_operation.received_status_details + state.details = batch_operation.details() callbacks.extend(state.callbacks) state.callbacks = None return callbacks @@ -200,7 +207,7 @@ def _consume_request_iterator(request_iterator, state, call, _abort(state, grpc.StatusCode.INTERNAL, details) return else: - operations = (cygrpc.operation_send_message( + operations = (cygrpc.SendMessageOperation( serialized_request, _EMPTY_FLAGS),) call.start_client_batch(operations, event_handler) state.due.add(cygrpc.OperationType.send_message) @@ -216,7 +223,7 @@ def _consume_request_iterator(request_iterator, state, call, with state.condition: if state.code is None: operations = ( - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),) + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),) call.start_client_batch(operations, event_handler) state.due.add(cygrpc.OperationType.send_close_from_client) @@ -319,7 +326,7 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): event_handler = _event_handler(self._state, self._call, self._response_deserializer) self._call.start_client_batch( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),), + (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), event_handler) self._state.due.add(cygrpc.OperationType.receive_message) elif self._state.code is grpc.StatusCode.OK: @@ -453,12 +460,13 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): else: state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) operations = ( - cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), - cygrpc.operation_send_message(serialized_request, _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ) return state, operations, deadline, deadline_timespec, None def _blocking(self, request, timeout, metadata, credentials): @@ -536,14 +544,15 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): self._response_deserializer) with state.condition: call.start_client_batch( - (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), event_handler) operations = ( - cygrpc.operation_send_initial_metadata( - metadata, _EMPTY_FLAGS), cygrpc.operation_send_message( - serialized_request, _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.SendMessageOperation(serialized_request, + _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ) call_error = call.start_client_batch(operations, event_handler) if call_error != cygrpc.CallError.ok: _call_error_set_RPCstate(state, call_error, metadata) @@ -573,12 +582,12 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): call.set_credentials(credentials._credentials) with state.condition: call.start_client_batch( - (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), - None) + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), None) operations = ( - cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ) call_error = call.start_client_batch(operations, None) _check_call_error(call_error, metadata) _consume_request_iterator(request_iterator, state, call, @@ -624,12 +633,13 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): event_handler = _event_handler(state, call, self._response_deserializer) with state.condition: call.start_client_batch( - (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), event_handler) operations = ( - cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ) call_error = call.start_client_batch(operations, event_handler) if call_error != cygrpc.CallError.ok: _call_error_set_RPCstate(state, call_error, metadata) @@ -664,11 +674,12 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): event_handler = _event_handler(state, call, self._response_deserializer) with state.condition: call.start_client_batch( - (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), event_handler) operations = ( - cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ) call_error = call.start_client_batch(operations, event_handler) if call_error != cygrpc.CallError.ok: _call_error_set_RPCstate(state, call_error, metadata) @@ -789,7 +800,11 @@ def _deliver(state, initial_connectivity, initial_callbacks): def _spawn_delivery(state, callbacks): delivering_thread = threading.Thread( - target=_deliver, args=(state, state.connectivity, callbacks,)) + target=_deliver, args=( + state, + state.connectivity, + callbacks, + )) delivering_thread.start() state.delivering = True @@ -864,17 +879,16 @@ def _subscribe(state, callback, try_to_connect): def _unsubscribe(state, callback): with state.lock: - for index, (subscribed_callback, unused_connectivity - ) in enumerate(state.callbacks_and_connectivities): + for index, (subscribed_callback, unused_connectivity) in enumerate( + state.callbacks_and_connectivities): if callback == subscribed_callback: state.callbacks_and_connectivities.pop(index) break def _options(options): - return list(options) + [ - (cygrpc.ChannelArgKey.primary_user_agent_string, _USER_AGENT) - ] + return list(options) + [(cygrpc.ChannelArgKey.primary_user_agent_string, + _USER_AGENT)] class Channel(grpc.Channel): @@ -889,8 +903,8 @@ class Channel(grpc.Channel): credentials: A cygrpc.ChannelCredentials or None. """ self._channel = cygrpc.Channel( - _common.encode(target), - _common.channel_args(_options(options)), credentials) + _common.encode(target), _common.channel_args(_options(options)), + credentials) self._call_state = _ChannelCallState(self._channel) self._connectivity_state = _ChannelConnectivityState(self._channel) @@ -910,8 +924,7 @@ class Channel(grpc.Channel): request_serializer=None, response_deserializer=None): return _UnaryUnaryMultiCallable( - self._channel, - _channel_managed_call_management(self._call_state), + self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), request_serializer, response_deserializer) def unary_stream(self, @@ -919,8 +932,7 @@ class Channel(grpc.Channel): request_serializer=None, response_deserializer=None): return _UnaryStreamMultiCallable( - self._channel, - _channel_managed_call_management(self._call_state), + self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), request_serializer, response_deserializer) def stream_unary(self, @@ -928,8 +940,7 @@ class Channel(grpc.Channel): request_serializer=None, response_deserializer=None): return _StreamUnaryMultiCallable( - self._channel, - _channel_managed_call_management(self._call_state), + self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), request_serializer, response_deserializer) def stream_stream(self, @@ -937,8 +948,7 @@ class Channel(grpc.Channel): request_serializer=None, response_deserializer=None): return _StreamStreamMultiCallable( - self._channel, - _channel_managed_call_management(self._call_state), + self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), request_serializer, response_deserializer) def __del__(self): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi index 6361669757..0892215b6d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi @@ -26,16 +26,13 @@ cdef class Call: def _start_batch(self, operations, tag, retain_self): if not self.is_valid: raise ValueError("invalid call object cannot be used from Python") - cdef OperationTag operation_tag = OperationTag(tag, operations) - if retain_self: - operation_tag.operation_call = self - else: - operation_tag.operation_call = None - operation_tag.store_ops() - cpython.Py_INCREF(operation_tag) + cdef _BatchOperationTag batch_operation_tag = _BatchOperationTag( + tag, operations, self if retain_self else None) + batch_operation_tag.prepare() + cpython.Py_INCREF(batch_operation_tag) return grpc_call_start_batch( - self.c_call, operation_tag.c_ops, operation_tag.c_nops, - <cpython.PyObject *>operation_tag, NULL) + self.c_call, batch_operation_tag.c_ops, batch_operation_tag.c_nops, + <cpython.PyObject *>batch_operation_tag, NULL) def start_client_batch(self, operations, tag): # We don't reference this call in the operations tag because diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index 644df674cc..443d534d7e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -76,12 +76,12 @@ cdef class Channel: def watch_connectivity_state( self, grpc_connectivity_state last_observed_state, Timespec deadline not None, CompletionQueue queue not None, tag): - cdef OperationTag operation_tag = OperationTag(tag, None) - cpython.Py_INCREF(operation_tag) + cdef _ConnectivityTag connectivity_tag = _ConnectivityTag(tag) + cpython.Py_INCREF(connectivity_tag) with nogil: grpc_channel_watch_connectivity_state( self.c_channel, last_observed_state, deadline.c_time, - queue.c_completion_queue, <cpython.PyObject *>operation_tag) + queue.c_completion_queue, <cpython.PyObject *>connectivity_tag) def target(self): cdef char *target = NULL diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 140fc357b9..e259789b35 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -37,42 +37,20 @@ cdef class CompletionQueue: self.is_shutdown = False cdef _interpret_event(self, grpc_event event): - cdef OperationTag tag = None - cdef object user_tag = None - cdef Call operation_call = None - cdef CallDetails request_call_details = None - cdef object request_metadata = None - cdef object batch_operations = None + cdef _Tag tag = None if event.type == GRPC_QUEUE_TIMEOUT: - return Event( - event.type, False, None, None, None, None, False, None) + # NOTE(nathaniel): For now we coopt ConnectivityEvent here. + return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None) elif event.type == GRPC_QUEUE_SHUTDOWN: self.is_shutdown = True - return Event( - event.type, True, None, None, None, None, False, None) + # NOTE(nathaniel): For now we coopt ConnectivityEvent here. + return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, True, None) else: - if event.tag != NULL: - tag = <OperationTag>event.tag - # We receive event tags only after they've been inc-ref'd elsewhere in - # the code. - cpython.Py_DECREF(tag) - if tag.shutting_down_server is not None: - tag.shutting_down_server.notify_shutdown_complete() - user_tag = tag.user_tag - operation_call = tag.operation_call - request_call_details = tag.request_call_details - if tag.is_new_request: - request_metadata = _metadata(&tag._c_request_metadata) - grpc_metadata_array_destroy(&tag._c_request_metadata) - batch_operations = tag.release_ops() - if tag.is_new_request: - # Stuff in the tag not explicitly handled by us needs to live through - # the life of the call - operation_call.references.extend(tag.references) - return Event( - event.type, event.success, user_tag, operation_call, - request_call_details, request_metadata, tag.is_new_request, - batch_operations) + tag = <_Tag>event.tag + # We receive event tags only after they've been inc-ref'd elsewhere in + # the code. + cpython.Py_DECREF(tag) + return tag.event(event) def poll(self, Timespec deadline=None): # We name this 'poll' to avoid problems with CPython's expectations for diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi new file mode 100644 index 0000000000..686199ecf4 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/event.pxd.pxi @@ -0,0 +1,45 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class ConnectivityEvent: + + cdef readonly grpc_completion_type completion_type + cdef readonly bint success + cdef readonly object tag + + +cdef class RequestCallEvent: + + cdef readonly grpc_completion_type completion_type + cdef readonly bint success + cdef readonly object tag + cdef readonly Call call + cdef readonly CallDetails call_details + cdef readonly tuple invocation_metadata + + +cdef class BatchOperationEvent: + + cdef readonly grpc_completion_type completion_type + cdef readonly bint success + cdef readonly object tag + cdef readonly object batch_operations + + +cdef class ServerShutdownEvent: + + cdef readonly grpc_completion_type completion_type + cdef readonly bint success + cdef readonly object tag diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi new file mode 100644 index 0000000000..af26d27318 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/event.pyx.pxi @@ -0,0 +1,55 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class ConnectivityEvent: + + def __cinit__( + self, grpc_completion_type completion_type, bint success, object tag): + self.completion_type = completion_type + self.success = success + self.tag = tag + + +cdef class RequestCallEvent: + + def __cinit__( + self, grpc_completion_type completion_type, bint success, object tag, + Call call, CallDetails call_details, tuple invocation_metadata): + self.completion_type = completion_type + self.success = success + self.tag = tag + self.call = call + self.call_details = call_details + self.invocation_metadata = invocation_metadata + + +cdef class BatchOperationEvent: + + def __cinit__( + self, grpc_completion_type completion_type, bint success, object tag, + object batch_operations): + self.completion_type = completion_type + self.success = success + self.tag = tag + self.batch_operations = batch_operations + + +cdef class ServerShutdownEvent: + + def __cinit__( + self, grpc_completion_type completion_type, bint success, object tag): + self.completion_type = completion_type + self.success = success + self.tag = tag diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 6a72bbf693..6ee833697d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -17,6 +17,7 @@ cimport libc.time # Typedef types with approximately the same semantics to provide their names to # Cython +ctypedef unsigned char uint8_t ctypedef int int32_t ctypedef unsigned uint32_t ctypedef long int64_t @@ -25,6 +26,7 @@ ctypedef long int64_t cdef extern from "grpc/support/alloc.h": void *gpr_malloc(size_t size) nogil + void *gpr_zalloc(size_t size) nogil void gpr_free(void *ptr) nogil void *gpr_realloc(void *p, size_t size) nogil @@ -183,6 +185,18 @@ cdef extern from "grpc/grpc.h": size_t arguments_length "num_args" grpc_arg *arguments "args" + ctypedef enum grpc_compression_level: + GRPC_COMPRESS_LEVEL_NONE + GRPC_COMPRESS_LEVEL_LOW + GRPC_COMPRESS_LEVEL_MED + GRPC_COMPRESS_LEVEL_HIGH + + ctypedef enum grpc_stream_compression_level: + GRPC_STREAM_COMPRESS_LEVEL_NONE + GRPC_STREAM_COMPRESS_LEVEL_LOW + GRPC_STREAM_COMPRESS_LEVEL_MED + GRPC_STREAM_COMPRESS_LEVEL_HIGH + ctypedef enum grpc_call_error: GRPC_CALL_OK GRPC_CALL_ERROR @@ -258,9 +272,19 @@ cdef extern from "grpc/grpc.h": GRPC_OP_RECV_STATUS_ON_CLIENT GRPC_OP_RECV_CLOSE_ON_SERVER + ctypedef struct grpc_op_send_initial_metadata_maybe_compression_level: + uint8_t is_set + grpc_compression_level level + + ctypedef struct grpc_op_send_initial_metadata_maybe_stream_compression_level: + uint8_t is_set + grpc_stream_compression_level level + ctypedef struct grpc_op_data_send_initial_metadata: size_t count grpc_metadata *metadata + grpc_op_send_initial_metadata_maybe_compression_level maybe_compression_level + grpc_op_send_initial_metadata_maybe_stream_compression_level maybe_stream_compression_level ctypedef struct grpc_op_data_send_status_from_server: size_t trailing_metadata_count diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi new file mode 100644 index 0000000000..bfbe27785b --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi @@ -0,0 +1,109 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class Operation: + + cdef void c(self) + cdef void un_c(self) + + # TODO(https://github.com/grpc/grpc/issues/7950): Eliminate this! + cdef grpc_op c_op + + +cdef class SendInitialMetadataOperation(Operation): + + cdef readonly object _initial_metadata; + cdef readonly int _flags + cdef grpc_metadata *_c_initial_metadata + cdef size_t _c_initial_metadata_count + + cdef void c(self) + cdef void un_c(self) + + +cdef class SendMessageOperation(Operation): + + cdef readonly bytes _message + cdef readonly int _flags + cdef grpc_byte_buffer *_c_message_byte_buffer + + cdef void c(self) + cdef void un_c(self) + + +cdef class SendCloseFromClientOperation(Operation): + + cdef readonly int _flags + + cdef void c(self) + cdef void un_c(self) + + +cdef class SendStatusFromServerOperation(Operation): + + cdef readonly object _trailing_metadata + cdef readonly object _code + cdef readonly object _details + cdef readonly int _flags + cdef grpc_metadata *_c_trailing_metadata + cdef size_t _c_trailing_metadata_count + cdef grpc_slice _c_details + + cdef void c(self) + cdef void un_c(self) + + +cdef class ReceiveInitialMetadataOperation(Operation): + + cdef readonly int _flags + cdef tuple _initial_metadata + cdef grpc_metadata_array _c_initial_metadata + + cdef void c(self) + cdef void un_c(self) + + +cdef class ReceiveMessageOperation(Operation): + + cdef readonly int _flags + cdef grpc_byte_buffer *_c_message_byte_buffer + cdef bytes _message + + cdef void c(self) + cdef void un_c(self) + + +cdef class ReceiveStatusOnClientOperation(Operation): + + cdef readonly int _flags + cdef grpc_metadata_array _c_trailing_metadata + cdef grpc_status_code _c_code + cdef grpc_slice _c_details + cdef tuple _trailing_metadata + cdef object _code + cdef str _details + + cdef void c(self) + cdef void un_c(self) + + +cdef class ReceiveCloseOnServerOperation(Operation): + + cdef readonly int _flags + cdef object _cancelled + cdef int _c_cancelled + + cdef void c(self) + cdef void un_c(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi new file mode 100644 index 0000000000..3c91abf722 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi @@ -0,0 +1,238 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class Operation: + + cdef void c(self): + raise NotImplementedError() + + cdef void un_c(self): + raise NotImplementedError() + + +cdef class SendInitialMetadataOperation(Operation): + + def __cinit__(self, initial_metadata, flags): + self._initial_metadata = initial_metadata + self._flags = flags + + def type(self): + return GRPC_OP_SEND_INITIAL_METADATA + + cdef void c(self): + self.c_op.type = GRPC_OP_SEND_INITIAL_METADATA + self.c_op.flags = self._flags + _store_c_metadata( + self._initial_metadata, &self._c_initial_metadata, + &self._c_initial_metadata_count) + self.c_op.data.send_initial_metadata.metadata = self._c_initial_metadata + self.c_op.data.send_initial_metadata.count = self._c_initial_metadata_count + self.c_op.data.send_initial_metadata.maybe_compression_level.is_set = 0 + self.c_op.data.send_initial_metadata.maybe_stream_compression_level.is_set = 0 + + cdef void un_c(self): + _release_c_metadata( + self._c_initial_metadata, self._c_initial_metadata_count) + + +cdef class SendMessageOperation(Operation): + + def __cinit__(self, bytes message, int flags): + self._message = message + self._flags = flags + + def type(self): + return GRPC_OP_SEND_MESSAGE + + cdef void c(self): + self.c_op.type = GRPC_OP_SEND_MESSAGE + self.c_op.flags = self._flags + cdef grpc_slice message_slice = grpc_slice_from_copied_buffer( + self._message, len(self._message)) + self._c_message_byte_buffer = grpc_raw_byte_buffer_create( + &message_slice, 1) + grpc_slice_unref(message_slice) + self.c_op.data.send_message.send_message = self._c_message_byte_buffer + + cdef void un_c(self): + grpc_byte_buffer_destroy(self._c_message_byte_buffer) + + +cdef class SendCloseFromClientOperation(Operation): + + def __cinit__(self, int flags): + self._flags = flags + + def type(self): + return GRPC_OP_SEND_CLOSE_FROM_CLIENT + + cdef void c(self): + self.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT + self.c_op.flags = self._flags + + cdef void un_c(self): + pass + + +cdef class SendStatusFromServerOperation(Operation): + + def __cinit__(self, trailing_metadata, code, object details, int flags): + self._trailing_metadata = trailing_metadata + self._code = code + self._details = details + self._flags = flags + + def type(self): + return GRPC_OP_SEND_STATUS_FROM_SERVER + + cdef void c(self): + self.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER + self.c_op.flags = self._flags + _store_c_metadata( + self._trailing_metadata, &self._c_trailing_metadata, + &self._c_trailing_metadata_count) + self.c_op.data.send_status_from_server.trailing_metadata = ( + self._c_trailing_metadata) + self.c_op.data.send_status_from_server.trailing_metadata_count = ( + self._c_trailing_metadata_count) + self.c_op.data.send_status_from_server.status = self._code + self._c_details = _slice_from_bytes(_encode(self._details)) + self.c_op.data.send_status_from_server.status_details = &self._c_details + + cdef void un_c(self): + grpc_slice_unref(self._c_details) + _release_c_metadata( + self._c_trailing_metadata, self._c_trailing_metadata_count) + + +cdef class ReceiveInitialMetadataOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_INITIAL_METADATA + + cdef void c(self): + self.c_op.type = GRPC_OP_RECV_INITIAL_METADATA + self.c_op.flags = self._flags + grpc_metadata_array_init(&self._c_initial_metadata) + self.c_op.data.receive_initial_metadata.receive_initial_metadata = ( + &self._c_initial_metadata) + + cdef void un_c(self): + self._initial_metadata = _metadata(&self._c_initial_metadata) + grpc_metadata_array_destroy(&self._c_initial_metadata) + + def initial_metadata(self): + return self._initial_metadata + + +cdef class ReceiveMessageOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_MESSAGE + + cdef void c(self): + self.c_op.type = GRPC_OP_RECV_MESSAGE + self.c_op.flags = self._flags + self.c_op.data.receive_message.receive_message = ( + &self._c_message_byte_buffer) + + cdef void un_c(self): + cdef grpc_byte_buffer_reader message_reader + cdef bint message_reader_status + cdef grpc_slice message_slice + cdef size_t message_slice_length + cdef void *message_slice_pointer + if self._c_message_byte_buffer != NULL: + message_reader_status = grpc_byte_buffer_reader_init( + &message_reader, self._c_message_byte_buffer) + if message_reader_status: + message = bytearray() + while grpc_byte_buffer_reader_next(&message_reader, &message_slice): + message_slice_pointer = grpc_slice_start_ptr(message_slice) + message_slice_length = grpc_slice_length(message_slice) + message += (<char *>message_slice_pointer)[:message_slice_length] + grpc_slice_unref(message_slice) + grpc_byte_buffer_reader_destroy(&message_reader) + self._message = bytes(message) + else: + self._message = None + grpc_byte_buffer_destroy(self._c_message_byte_buffer) + else: + self._message = None + + def message(self): + return self._message + + +cdef class ReceiveStatusOnClientOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_STATUS_ON_CLIENT + + cdef void c(self): + self.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT + self.c_op.flags = self._flags + grpc_metadata_array_init(&self._c_trailing_metadata) + self.c_op.data.receive_status_on_client.trailing_metadata = ( + &self._c_trailing_metadata) + self.c_op.data.receive_status_on_client.status = ( + &self._c_code) + self.c_op.data.receive_status_on_client.status_details = ( + &self._c_details) + + cdef void un_c(self): + self._trailing_metadata = _metadata(&self._c_trailing_metadata) + grpc_metadata_array_destroy(&self._c_trailing_metadata) + self._code = self._c_code + self._details = _decode(_slice_bytes(self._c_details)) + grpc_slice_unref(self._c_details) + + def trailing_metadata(self): + return self._trailing_metadata + + def code(self): + return self._code + + def details(self): + return self._details + + +cdef class ReceiveCloseOnServerOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_CLOSE_ON_SERVER + + cdef void c(self): + self.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER + self.c_op.flags = self._flags + self.c_op.data.receive_close_on_server.cancelled = &self._c_cancelled + + cdef void un_c(self): + self._cancelled = bool(self._c_cancelled) + + def cancelled(self): + return self._cancelled diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi index 594fdb1a8b..7b2482d947 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi @@ -28,48 +28,6 @@ cdef class CallDetails: cdef grpc_call_details c_details -cdef class OperationTag: - - cdef object user_tag - cdef list references - # This allows CompletionQueue to notify the Python Server object that the - # underlying GRPC core server has shutdown - cdef Server shutting_down_server - cdef Call operation_call - cdef CallDetails request_call_details - cdef grpc_metadata_array _c_request_metadata - cdef grpc_op *c_ops - cdef size_t c_nops - cdef readonly object _operations - cdef bint is_new_request - - cdef void store_ops(self) - cdef object release_ops(self) - - -cdef class Event: - - cdef readonly grpc_completion_type type - cdef readonly bint success - cdef readonly object tag - - # For Server.request_call - cdef readonly bint is_new_request - cdef readonly CallDetails request_call_details - cdef readonly object request_metadata - - # For server calls - cdef readonly Call operation_call - - # For Call.start_batch - cdef readonly object batch_operations - - -cdef class ByteBuffer: - - cdef grpc_byte_buffer *c_byte_buffer - - cdef class SslPemKeyCertPair: cdef grpc_ssl_pem_key_cert_pair c_pair @@ -89,22 +47,6 @@ cdef class ChannelArgs: cdef list args -cdef class Operation: - - cdef grpc_op c_op - cdef bint _c_metadata_needs_release - cdef size_t _c_metadata_count - cdef grpc_metadata *_c_metadata - cdef ByteBuffer _received_message - cdef bint _c_metadata_array_needs_destruction - cdef grpc_metadata_array _c_metadata_array - cdef grpc_status_code _received_status_code - cdef grpc_slice _status_details - cdef int _received_cancelled - cdef readonly bint is_valid - cdef object references - - cdef class CompressionOptions: cdef grpc_compression_options c_options diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index 26eaf50eb4..bc2cd0338e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -218,111 +218,6 @@ cdef class CallDetails: return timespec -cdef class OperationTag: - - def __cinit__(self, user_tag, operations): - self.user_tag = user_tag - self.references = [] - self._operations = operations - - cdef void store_ops(self): - self.c_nops = 0 if self._operations is None else len(self._operations) - if 0 < self.c_nops: - self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops) - for index in range(self.c_nops): - self.c_ops[index] = (<Operation>(self._operations[index])).c_op - - cdef object release_ops(self): - if 0 < self.c_nops: - for index, operation in enumerate(self._operations): - (<Operation>operation).c_op = self.c_ops[index] - gpr_free(self.c_ops) - return self._operations - else: - return () - - -cdef class Event: - - def __cinit__(self, grpc_completion_type type, bint success, - object tag, Call operation_call, - CallDetails request_call_details, - object request_metadata, - bint is_new_request, - object batch_operations): - self.type = type - self.success = success - self.tag = tag - self.operation_call = operation_call - self.request_call_details = request_call_details - self.request_metadata = request_metadata - self.batch_operations = batch_operations - self.is_new_request = is_new_request - - -cdef class ByteBuffer: - - def __cinit__(self, bytes data): - grpc_init() - if data is None: - self.c_byte_buffer = NULL - return - - cdef char *c_data = data - cdef grpc_slice data_slice - cdef size_t data_length = len(data) - with nogil: - data_slice = grpc_slice_from_copied_buffer(c_data, data_length) - with nogil: - self.c_byte_buffer = grpc_raw_byte_buffer_create( - &data_slice, 1) - with nogil: - grpc_slice_unref(data_slice) - - def bytes(self): - cdef grpc_byte_buffer_reader reader - cdef grpc_slice data_slice - cdef size_t data_slice_length - cdef void *data_slice_pointer - cdef bint reader_status - if self.c_byte_buffer != NULL: - with nogil: - reader_status = grpc_byte_buffer_reader_init( - &reader, self.c_byte_buffer) - if not reader_status: - return None - result = bytearray() - with nogil: - while grpc_byte_buffer_reader_next(&reader, &data_slice): - data_slice_pointer = grpc_slice_start_ptr(data_slice) - data_slice_length = grpc_slice_length(data_slice) - with gil: - result += (<char *>data_slice_pointer)[:data_slice_length] - grpc_slice_unref(data_slice) - with nogil: - grpc_byte_buffer_reader_destroy(&reader) - return bytes(result) - else: - return None - - def __len__(self): - cdef size_t result - if self.c_byte_buffer != NULL: - with nogil: - result = grpc_byte_buffer_length(self.c_byte_buffer) - return result - else: - return 0 - - def __str__(self): - return self.bytes() - - def __dealloc__(self): - if self.c_byte_buffer != NULL: - grpc_byte_buffer_destroy(self.c_byte_buffer) - grpc_shutdown() - - cdef class SslPemKeyCertPair: def __cinit__(self, bytes private_key, bytes certificate_chain): @@ -407,185 +302,6 @@ cdef class ChannelArgs: return self.args[i] -cdef class Operation: - - def __cinit__(self): - grpc_init() - self.references = [] - self._c_metadata_needs_release = False - self._c_metadata_array_needs_destruction = False - self._status_details = grpc_empty_slice() - self.is_valid = False - - @property - def type(self): - return self.c_op.type - - @property - def flags(self): - return self.c_op.flags - - @property - def has_status(self): - return self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT - - @property - def received_message(self): - if self.c_op.type != GRPC_OP_RECV_MESSAGE: - raise TypeError("self must be an operation receiving a message") - return self._received_message - - @property - def received_message_or_none(self): - if self.c_op.type != GRPC_OP_RECV_MESSAGE: - return None - return self._received_message - - @property - def received_metadata(self): - if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and - self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT): - raise TypeError("self must be an operation receiving metadata") - return _metadata(&self._c_metadata_array) - - @property - def received_status_code(self): - if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: - raise TypeError("self must be an operation receiving a status code") - return self._received_status_code - - @property - def received_status_code_or_none(self): - if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: - return None - return self._received_status_code - - @property - def received_status_details(self): - if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: - raise TypeError("self must be an operation receiving status details") - return _slice_bytes(self._status_details) - - @property - def received_status_details_or_none(self): - if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: - return None - return _slice_bytes(self._status_details) - - @property - def received_cancelled(self): - if self.c_op.type != GRPC_OP_RECV_CLOSE_ON_SERVER: - raise TypeError("self must be an operation receiving cancellation " - "information") - return False if self._received_cancelled == 0 else True - - @property - def received_cancelled_or_none(self): - if self.c_op.type != GRPC_OP_RECV_CLOSE_ON_SERVER: - return None - return False if self._received_cancelled == 0 else True - - def __dealloc__(self): - if self._c_metadata_needs_release: - _release_c_metadata(self._c_metadata, self._c_metadata_count) - if self._c_metadata_array_needs_destruction: - grpc_metadata_array_destroy(&self._c_metadata_array) - grpc_slice_unref(self._status_details) - grpc_shutdown() - -def operation_send_initial_metadata(metadata, int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA - op.c_op.flags = flags - _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count) - op._c_metadata_needs_release = True - op.c_op.data.send_initial_metadata.count = op._c_metadata_count - op.c_op.data.send_initial_metadata.metadata = op._c_metadata - op.is_valid = True - return op - -def operation_send_message(data, int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_SEND_MESSAGE - op.c_op.flags = flags - byte_buffer = ByteBuffer(data) - op.c_op.data.send_message.send_message = byte_buffer.c_byte_buffer - op.references.append(byte_buffer) - op.is_valid = True - return op - -def operation_send_close_from_client(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT - op.c_op.flags = flags - op.is_valid = True - return op - -def operation_send_status_from_server( - metadata, grpc_status_code code, bytes details, int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER - op.c_op.flags = flags - _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count) - op._c_metadata_needs_release = True - op.c_op.data.send_status_from_server.trailing_metadata_count = ( - op._c_metadata_count) - op.c_op.data.send_status_from_server.trailing_metadata = op._c_metadata - op.c_op.data.send_status_from_server.status = code - grpc_slice_unref(op._status_details) - op._status_details = _slice_from_bytes(details) - op.c_op.data.send_status_from_server.status_details = &op._status_details - op.is_valid = True - return op - -def operation_receive_initial_metadata(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA - op.c_op.flags = flags - grpc_metadata_array_init(&op._c_metadata_array) - op.c_op.data.receive_initial_metadata.receive_initial_metadata = ( - &op._c_metadata_array) - op._c_metadata_array_needs_destruction = True - op.is_valid = True - return op - -def operation_receive_message(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_RECV_MESSAGE - op.c_op.flags = flags - op._received_message = ByteBuffer(None) - # n.b. the c_op.data.receive_message field needs to be deleted by us, - # anyway, so we just let that be handled by the ByteBuffer() we allocated - # the line before. - op.c_op.data.receive_message.receive_message = ( - &op._received_message.c_byte_buffer) - op.is_valid = True - return op - -def operation_receive_status_on_client(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT - op.c_op.flags = flags - grpc_metadata_array_init(&op._c_metadata_array) - op.c_op.data.receive_status_on_client.trailing_metadata = ( - &op._c_metadata_array) - op._c_metadata_array_needs_destruction = True - op.c_op.data.receive_status_on_client.status = ( - &op._received_status_code) - op.c_op.data.receive_status_on_client.status_details = ( - &op._status_details) - op.is_valid = True - return op - -def operation_receive_close_on_server(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER - op.c_op.flags = flags - op.c_op.data.receive_close_on_server.cancelled = &op._received_cancelled - op.is_valid = True - return op - - cdef class CompressionOptions: def __cinit__(self): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index f8d7892858..c19beccde6 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -78,19 +78,15 @@ cdef class Server: raise ValueError("server must be started and not shutting down") if server_queue not in self.registered_completion_queues: raise ValueError("server_queue must be a registered completion queue") - cdef OperationTag operation_tag = OperationTag(tag, None) - operation_tag.operation_call = Call() - operation_tag.request_call_details = CallDetails() - grpc_metadata_array_init(&operation_tag._c_request_metadata) - operation_tag.references.extend([self, call_queue, server_queue]) - operation_tag.is_new_request = True - cpython.Py_INCREF(operation_tag) + cdef _RequestCallTag request_call_tag = _RequestCallTag(tag) + request_call_tag.prepare() + cpython.Py_INCREF(request_call_tag) return grpc_server_request_call( - self.c_server, &operation_tag.operation_call.c_call, - &operation_tag.request_call_details.c_details, - &operation_tag._c_request_metadata, + self.c_server, &request_call_tag.call.c_call, + &request_call_tag.call_details.c_details, + &request_call_tag.c_invocation_metadata, call_queue.c_completion_queue, server_queue.c_completion_queue, - <cpython.PyObject *>operation_tag) + <cpython.PyObject *>request_call_tag) def register_completion_queue( self, CompletionQueue queue not None): @@ -131,16 +127,14 @@ cdef class Server: cdef _c_shutdown(self, CompletionQueue queue, tag): self.is_shutting_down = True - operation_tag = OperationTag(tag, None) - operation_tag.shutting_down_server = self - cpython.Py_INCREF(operation_tag) + cdef _ServerShutdownTag server_shutdown_tag = _ServerShutdownTag(tag, self) + cpython.Py_INCREF(server_shutdown_tag) with nogil: grpc_server_shutdown_and_notify( self.c_server, queue.c_completion_queue, - <cpython.PyObject *>operation_tag) + <cpython.PyObject *>server_shutdown_tag) def shutdown(self, CompletionQueue queue not None, tag): - cdef OperationTag operation_tag if queue.is_shutting_down: raise ValueError("queue must be live") elif not self.is_started: @@ -153,7 +147,8 @@ cdef class Server: self._c_shutdown(queue, tag) cdef notify_shutdown_complete(self): - # called only by a completion queue on receiving our shutdown operation tag + # called only after our server shutdown tag has emerged from a completion + # queue. self.is_shutdown = True def cancel_all_calls(self): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi new file mode 100644 index 0000000000..f9a3b5e8f4 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pxd.pxi @@ -0,0 +1,58 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class _Tag: + + cdef object event(self, grpc_event c_event) + + +cdef class _ConnectivityTag(_Tag): + + cdef readonly object _user_tag + + cdef ConnectivityEvent event(self, grpc_event c_event) + + +cdef class _RequestCallTag(_Tag): + + cdef readonly object _user_tag + cdef Call call + cdef CallDetails call_details + cdef grpc_metadata_array c_invocation_metadata + + cdef void prepare(self) + cdef RequestCallEvent event(self, grpc_event c_event) + + +cdef class _BatchOperationTag(_Tag): + + cdef object _user_tag + cdef readonly object _operations + cdef readonly object _retained_call + cdef grpc_op *c_ops + cdef size_t c_nops + + cdef void prepare(self) + cdef BatchOperationEvent event(self, grpc_event c_event) + + +cdef class _ServerShutdownTag(_Tag): + + cdef readonly object _user_tag + # This allows CompletionQueue to notify the Python Server object that the + # underlying GRPC core server has shutdown + cdef readonly Server _shutting_down_server + + cdef ServerShutdownEvent event(self, grpc_event c_event) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi new file mode 100644 index 0000000000..aaca458442 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/tag.pyx.pxi @@ -0,0 +1,87 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class _Tag: + + cdef object event(self, grpc_event c_event): + raise NotImplementedError() + + +cdef class _ConnectivityTag(_Tag): + + def __cinit__(self, user_tag): + self._user_tag = user_tag + + cdef ConnectivityEvent event(self, grpc_event c_event): + return ConnectivityEvent(c_event.type, c_event.success, self._user_tag) + + +cdef class _RequestCallTag(_Tag): + + def __cinit__(self, user_tag): + self._user_tag = user_tag + self.call = None + self.call_details = None + + cdef void prepare(self): + self.call = Call() + self.call_details = CallDetails() + grpc_metadata_array_init(&self.c_invocation_metadata) + + cdef RequestCallEvent event(self, grpc_event c_event): + cdef tuple invocation_metadata = _metadata(&self.c_invocation_metadata) + grpc_metadata_array_destroy(&self.c_invocation_metadata) + return RequestCallEvent( + c_event.type, c_event.success, self._user_tag, self.call, + self.call_details, invocation_metadata) + + +cdef class _BatchOperationTag: + + def __cinit__(self, user_tag, operations, call): + self._user_tag = user_tag + self._operations = operations + self._retained_call = call + + cdef void prepare(self): + self.c_nops = 0 if self._operations is None else len(self._operations) + if 0 < self.c_nops: + self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops) + for index, operation in enumerate(self._operations): + (<Operation>operation).c() + self.c_ops[index] = (<Operation>operation).c_op + + cdef BatchOperationEvent event(self, grpc_event c_event): + if 0 < self.c_nops: + for index, operation in enumerate(self._operations): + (<Operation>operation).c_op = self.c_ops[index] + (<Operation>operation).un_c() + gpr_free(self.c_ops) + return BatchOperationEvent( + c_event.type, c_event.success, self._user_tag, self._operations) + else: + return BatchOperationEvent( + c_event.type, c_event.success, self._user_tag, ()) + + +cdef class _ServerShutdownTag(_Tag): + + def __cinit__(self, user_tag, shutting_down_server): + self._user_tag = user_tag + self._shutting_down_server = shutting_down_server + + cdef ServerShutdownEvent event(self, grpc_event c_event): + self._shutting_down_server.notify_shutdown_complete() + return ServerShutdownEvent(c_event.type, c_event.success, self._user_tag)
\ No newline at end of file diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd index 6fc5638d5d..b32fa518fc 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pxd +++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd @@ -18,7 +18,10 @@ include "_cygrpc/call.pxd.pxi" include "_cygrpc/channel.pxd.pxi" include "_cygrpc/credentials.pxd.pxi" include "_cygrpc/completion_queue.pxd.pxi" +include "_cygrpc/event.pxd.pxi" include "_cygrpc/metadata.pxd.pxi" +include "_cygrpc/operation.pxd.pxi" include "_cygrpc/records.pxd.pxi" include "_cygrpc/security.pxd.pxi" include "_cygrpc/server.pxd.pxi" +include "_cygrpc/tag.pxd.pxi" diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index d605229822..5106394708 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -25,10 +25,13 @@ include "_cygrpc/call.pyx.pxi" include "_cygrpc/channel.pyx.pxi" include "_cygrpc/credentials.pyx.pxi" include "_cygrpc/completion_queue.pyx.pxi" +include "_cygrpc/event.pyx.pxi" include "_cygrpc/metadata.pyx.pxi" +include "_cygrpc/operation.pyx.pxi" include "_cygrpc/records.pyx.pxi" include "_cygrpc/security.pyx.pxi" include "_cygrpc/server.pyx.pxi" +include "_cygrpc/tag.pyx.pxi" # # initialize gRPC diff --git a/src/python/grpcio/grpc/_interceptor.py b/src/python/grpcio/grpc/_interceptor.py index fffb269845..56a280624f 100644 --- a/src/python/grpcio/grpc/_interceptor.py +++ b/src/python/grpcio/grpc/_interceptor.py @@ -44,9 +44,10 @@ def service_pipeline(interceptors): class _ClientCallDetails( - collections.namedtuple('_ClientCallDetails', - ('method', 'timeout', 'metadata', - 'credentials')), grpc.ClientCallDetails): + collections.namedtuple( + '_ClientCallDetails', + ('method', 'timeout', 'metadata', 'credentials')), + grpc.ClientCallDetails): pass diff --git a/src/python/grpcio/grpc/_plugin_wrapping.py b/src/python/grpcio/grpc/_plugin_wrapping.py index f7287956dc..6785e5876a 100644 --- a/src/python/grpcio/grpc/_plugin_wrapping.py +++ b/src/python/grpcio/grpc/_plugin_wrapping.py @@ -23,7 +23,9 @@ from grpc._cython import cygrpc class _AuthMetadataContext( collections.namedtuple('AuthMetadataContext', ( - 'service_url', 'method_name',)), grpc.AuthMetadataContext): + 'service_url', + 'method_name', + )), grpc.AuthMetadataContext): pass @@ -70,8 +72,9 @@ class _Plugin(object): _common.decode(service_url), _common.decode(method_name)) callback_state = _CallbackState() try: - self._metadata_plugin( - context, _AuthMetadataPluginCallback(callback_state, callback)) + self._metadata_plugin(context, + _AuthMetadataPluginCallback( + callback_state, callback)) except Exception as exception: # pylint: disable=broad-except logging.exception( 'AuthMetadataPluginCallback "%s" raised exception!', diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 02d3af8706..1cdb2d45b6 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -50,7 +50,7 @@ _UNEXPECTED_EXIT_SERVER_GRACE = 1.0 def _serialized_request(request_event): - return request_event.batch_operations[0].received_message.bytes() + return request_event.batch_operations[0].message() def _application_code(code): @@ -78,7 +78,9 @@ def _details(state): class _HandlerCallDetails( collections.namedtuple('_HandlerCallDetails', ( - 'method', 'invocation_metadata',)), grpc.HandlerCallDetails): + 'method', + 'invocation_metadata', + )), grpc.HandlerCallDetails): pass @@ -130,13 +132,15 @@ def _abort(state, call, code, details): effective_code = _abortion_code(state, code) effective_details = details if state.details is None else state.details if state.initial_metadata_allowed: - operations = (cygrpc.operation_send_initial_metadata( - (), _EMPTY_FLAGS), cygrpc.operation_send_status_from_server( + operations = ( + cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( state.trailing_metadata, effective_code, effective_details, - _EMPTY_FLAGS),) + _EMPTY_FLAGS), + ) token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN else: - operations = (cygrpc.operation_send_status_from_server( + operations = (cygrpc.SendStatusFromServerOperation( state.trailing_metadata, effective_code, effective_details, _EMPTY_FLAGS),) token = _SEND_STATUS_FROM_SERVER_TOKEN @@ -150,8 +154,7 @@ def _receive_close_on_server(state): def receive_close_on_server(receive_close_on_server_event): with state.condition: - if receive_close_on_server_event.batch_operations[ - 0].received_cancelled: + if receive_close_on_server_event.batch_operations[0].cancelled(): state.client = _CANCELLED elif state.client is _OPEN: state.client = _CLOSED @@ -218,11 +221,10 @@ class _Context(grpc.ServicerContext): def time_remaining(self): return max( - float(self._rpc_event.request_call_details.deadline) - time.time(), - 0) + float(self._rpc_event.call_details.deadline) - time.time(), 0) def cancel(self): - self._rpc_event.operation_call.cancel() + self._rpc_event.call.cancel() def add_callback(self, callback): with self._state.condition: @@ -237,23 +239,23 @@ class _Context(grpc.ServicerContext): self._state.disable_next_compression = True def invocation_metadata(self): - return self._rpc_event.request_metadata + return self._rpc_event.invocation_metadata def peer(self): - return _common.decode(self._rpc_event.operation_call.peer()) + return _common.decode(self._rpc_event.call.peer()) def peer_identities(self): - return cygrpc.peer_identities(self._rpc_event.operation_call) + return cygrpc.peer_identities(self._rpc_event.call) def peer_identity_key(self): - id_key = cygrpc.peer_identity_key(self._rpc_event.operation_call) + id_key = cygrpc.peer_identity_key(self._rpc_event.call) return id_key if id_key is None else _common.decode(id_key) def auth_context(self): return { _common.decode(key): value for key, value in six.iteritems( - cygrpc.auth_context(self._rpc_event.operation_call)) + cygrpc.auth_context(self._rpc_event.call)) } def send_initial_metadata(self, initial_metadata): @@ -262,9 +264,9 @@ class _Context(grpc.ServicerContext): _raise_rpc_error(self._state) else: if self._state.initial_metadata_allowed: - operation = cygrpc.operation_send_initial_metadata( + operation = cygrpc.SendInitialMetadataOperation( initial_metadata, _EMPTY_FLAGS) - self._rpc_event.operation_call.start_server_batch( + self._rpc_event.call.start_server_batch( (operation,), _send_initial_metadata(self._state)) self._state.initial_metadata_allowed = False self._state.due.add(_SEND_INITIAL_METADATA_TOKEN) @@ -305,7 +307,7 @@ class _RequestIterator(object): raise StopIteration() else: self._call.start_server_batch( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),), + (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), _receive_message(self._state, self._call, self._request_deserializer)) self._state.due.add(_RECEIVE_MESSAGE_TOKEN) @@ -347,9 +349,9 @@ def _unary_request(rpc_event, state, request_deserializer): if state.client is _CANCELLED or state.statused: return None else: - rpc_event.operation_call.start_server_batch( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),), - _receive_message(state, rpc_event.operation_call, + rpc_event.call.start_server_batch( + (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), + _receive_message(state, rpc_event.call, request_deserializer)) state.due.add(_RECEIVE_MESSAGE_TOKEN) while True: @@ -357,8 +359,8 @@ def _unary_request(rpc_event, state, request_deserializer): if state.request is None: if state.client is _CLOSED: details = '"{}" requires exactly one request message.'.format( - rpc_event.request_call_details.method) - _abort(state, rpc_event.operation_call, + rpc_event.call_details.method) + _abort(state, rpc_event.call, cygrpc.StatusCode.unimplemented, _common.encode(details)) return None @@ -379,13 +381,13 @@ def _call_behavior(rpc_event, state, behavior, argument, request_deserializer): except Exception as exception: # pylint: disable=broad-except with state.condition: if exception is state.abortion: - _abort(state, rpc_event.operation_call, - cygrpc.StatusCode.unknown, b'RPC Aborted') + _abort(state, rpc_event.call, cygrpc.StatusCode.unknown, + b'RPC Aborted') elif exception not in state.rpc_errors: details = 'Exception calling application: {}'.format(exception) logging.exception(details) - _abort(state, rpc_event.operation_call, - cygrpc.StatusCode.unknown, _common.encode(details)) + _abort(state, rpc_event.call, cygrpc.StatusCode.unknown, + _common.encode(details)) return None, False @@ -397,13 +399,13 @@ def _take_response_from_response_iterator(rpc_event, state, response_iterator): except Exception as exception: # pylint: disable=broad-except with state.condition: if exception is state.abortion: - _abort(state, rpc_event.operation_call, - cygrpc.StatusCode.unknown, b'RPC Aborted') + _abort(state, rpc_event.call, cygrpc.StatusCode.unknown, + b'RPC Aborted') elif exception not in state.rpc_errors: details = 'Exception iterating responses: {}'.format(exception) logging.exception(details) - _abort(state, rpc_event.operation_call, - cygrpc.StatusCode.unknown, _common.encode(details)) + _abort(state, rpc_event.call, cygrpc.StatusCode.unknown, + _common.encode(details)) return None, False @@ -411,7 +413,7 @@ def _serialize_response(rpc_event, state, response, response_serializer): serialized_response = _common.serialize(response, response_serializer) if serialized_response is None: with state.condition: - _abort(state, rpc_event.operation_call, cygrpc.StatusCode.internal, + _abort(state, rpc_event.call, cygrpc.StatusCode.internal, b'Failed to serialize response!') return None else: @@ -424,17 +426,19 @@ def _send_response(rpc_event, state, serialized_response): return False else: if state.initial_metadata_allowed: - operations = (cygrpc.operation_send_initial_metadata( - (), _EMPTY_FLAGS), cygrpc.operation_send_message( - serialized_response, _EMPTY_FLAGS),) + operations = ( + cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS), + cygrpc.SendMessageOperation(serialized_response, + _EMPTY_FLAGS), + ) state.initial_metadata_allowed = False token = _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN else: - operations = (cygrpc.operation_send_message(serialized_response, - _EMPTY_FLAGS),) + operations = (cygrpc.SendMessageOperation( + serialized_response, _EMPTY_FLAGS),) token = _SEND_MESSAGE_TOKEN - rpc_event.operation_call.start_server_batch( - operations, _send_message(state, token)) + rpc_event.call.start_server_batch(operations, + _send_message(state, token)) state.due.add(token) while True: state.condition.wait() @@ -448,17 +452,17 @@ def _status(rpc_event, state, serialized_response): code = _completion_code(state) details = _details(state) operations = [ - cygrpc.operation_send_status_from_server( + cygrpc.SendStatusFromServerOperation( state.trailing_metadata, code, details, _EMPTY_FLAGS), ] if state.initial_metadata_allowed: operations.append( - cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS)) + cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS)) if serialized_response is not None: operations.append( - cygrpc.operation_send_message(serialized_response, - _EMPTY_FLAGS)) - rpc_event.operation_call.start_server_batch( + cygrpc.SendMessageOperation(serialized_response, + _EMPTY_FLAGS)) + rpc_event.call.start_server_batch( operations, _send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN)) state.statused = True @@ -525,7 +529,7 @@ def _handle_unary_stream(rpc_event, state, method_handler, thread_pool): def _handle_stream_unary(rpc_event, state, method_handler, thread_pool): - request_iterator = _RequestIterator(state, rpc_event.operation_call, + request_iterator = _RequestIterator(state, rpc_event.call, method_handler.request_deserializer) return thread_pool.submit( _unary_response_in_pool, rpc_event, state, method_handler.stream_unary, @@ -534,7 +538,7 @@ def _handle_stream_unary(rpc_event, state, method_handler, thread_pool): def _handle_stream_stream(rpc_event, state, method_handler, thread_pool): - request_iterator = _RequestIterator(state, rpc_event.operation_call, + request_iterator = _RequestIterator(state, rpc_event.call, method_handler.request_deserializer) return thread_pool.submit( _stream_response_in_pool, rpc_event, state, @@ -552,8 +556,8 @@ def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline): return None handler_call_details = _HandlerCallDetails( - _common.decode(rpc_event.request_call_details.method), - rpc_event.request_metadata) + _common.decode(rpc_event.call_details.method), + rpc_event.invocation_metadata) if interceptor_pipeline is not None: return interceptor_pipeline.execute(query_handlers, @@ -563,21 +567,23 @@ def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline): def _reject_rpc(rpc_event, status, details): - operations = (cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS), - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server((), status, details, - _EMPTY_FLAGS),) + operations = ( + cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS), + cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation(None, status, details, + _EMPTY_FLAGS), + ) rpc_state = _RPCState() - rpc_event.operation_call.start_server_batch( - operations, lambda ignored_event: (rpc_state, (),)) + rpc_event.call.start_server_batch(operations, + lambda ignored_event: (rpc_state, (),)) return rpc_state def _handle_with_method_handler(rpc_event, method_handler, thread_pool): state = _RPCState() with state.condition: - rpc_event.operation_call.start_server_batch( - (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),), + rpc_event.call.start_server_batch( + (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),), _receive_close_on_server(state)) state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN) if method_handler.request_streaming: @@ -600,7 +606,7 @@ def _handle_call(rpc_event, generic_handlers, interceptor_pipeline, thread_pool, concurrency_exceeded): if not rpc_event.success: return None, None - if rpc_event.request_call_details.method is not None: + if rpc_event.call_details.method is not None: try: method_handler = _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline) @@ -799,8 +805,8 @@ class Server(grpc.Server): return _add_insecure_port(self._state, _common.encode(address)) def add_secure_port(self, address, server_credentials): - return _add_secure_port(self._state, - _common.encode(address), server_credentials) + return _add_secure_port(self._state, _common.encode(address), + server_credentials) def start(self): _start(self._state) diff --git a/src/python/grpcio/grpc/_utilities.py b/src/python/grpcio/grpc/_utilities.py index 47cedcc867..25bd1ceae2 100644 --- a/src/python/grpcio/grpc/_utilities.py +++ b/src/python/grpcio/grpc/_utilities.py @@ -29,9 +29,15 @@ _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = ( class RpcMethodHandler( collections.namedtuple('_RpcMethodHandler', ( - 'request_streaming', 'response_streaming', 'request_deserializer', - 'response_serializer', 'unary_unary', 'unary_stream', - 'stream_unary', 'stream_stream',)), grpc.RpcMethodHandler): + 'request_streaming', + 'response_streaming', + 'request_deserializer', + 'response_serializer', + 'unary_unary', + 'unary_stream', + 'stream_unary', + 'stream_stream', + )), grpc.RpcMethodHandler): pass diff --git a/src/python/grpcio/grpc/beta/_client_adaptations.py b/src/python/grpcio/grpc/beta/_client_adaptations.py index dcaa0eeaa2..cf200a8c13 100644 --- a/src/python/grpcio/grpc/beta/_client_adaptations.py +++ b/src/python/grpcio/grpc/beta/_client_adaptations.py @@ -51,8 +51,7 @@ def _abortion(rpc_error_call): code = rpc_error_call.code() pair = _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS.get(code) error_kind = face.Abortion.Kind.LOCAL_FAILURE if pair is None else pair[0] - return face.Abortion(error_kind, - rpc_error_call.initial_metadata(), + return face.Abortion(error_kind, rpc_error_call.initial_metadata(), rpc_error_call.trailing_metadata(), code, rpc_error_call.details()) @@ -441,9 +440,14 @@ class _GenericStub(face.GenericStub): metadata=None, with_call=None, protocol_options=None): - request_serializer = self._request_serializers.get((group, method,)) - response_deserializer = self._response_deserializers.get((group, - method,)) + request_serializer = self._request_serializers.get(( + group, + method, + )) + response_deserializer = self._response_deserializers.get(( + group, + method, + )) return _blocking_unary_unary(self._channel, group, method, timeout, with_call, protocol_options, metadata, self._metadata_transformer, request, @@ -456,9 +460,14 @@ class _GenericStub(face.GenericStub): timeout, metadata=None, protocol_options=None): - request_serializer = self._request_serializers.get((group, method,)) - response_deserializer = self._response_deserializers.get((group, - method,)) + request_serializer = self._request_serializers.get(( + group, + method, + )) + response_deserializer = self._response_deserializers.get(( + group, + method, + )) return _future_unary_unary(self._channel, group, method, timeout, protocol_options, metadata, self._metadata_transformer, request, @@ -471,9 +480,14 @@ class _GenericStub(face.GenericStub): timeout, metadata=None, protocol_options=None): - request_serializer = self._request_serializers.get((group, method,)) - response_deserializer = self._response_deserializers.get((group, - method,)) + request_serializer = self._request_serializers.get(( + group, + method, + )) + response_deserializer = self._response_deserializers.get(( + group, + method, + )) return _unary_stream(self._channel, group, method, timeout, protocol_options, metadata, self._metadata_transformer, request, @@ -487,9 +501,14 @@ class _GenericStub(face.GenericStub): metadata=None, with_call=None, protocol_options=None): - request_serializer = self._request_serializers.get((group, method,)) - response_deserializer = self._response_deserializers.get((group, - method,)) + request_serializer = self._request_serializers.get(( + group, + method, + )) + response_deserializer = self._response_deserializers.get(( + group, + method, + )) return _blocking_stream_unary( self._channel, group, method, timeout, with_call, protocol_options, metadata, self._metadata_transformer, request_iterator, @@ -502,9 +521,14 @@ class _GenericStub(face.GenericStub): timeout, metadata=None, protocol_options=None): - request_serializer = self._request_serializers.get((group, method,)) - response_deserializer = self._response_deserializers.get((group, - method,)) + request_serializer = self._request_serializers.get(( + group, + method, + )) + response_deserializer = self._response_deserializers.get(( + group, + method, + )) return _future_stream_unary( self._channel, group, method, timeout, protocol_options, metadata, self._metadata_transformer, request_iterator, request_serializer, @@ -517,9 +541,14 @@ class _GenericStub(face.GenericStub): timeout, metadata=None, protocol_options=None): - request_serializer = self._request_serializers.get((group, method,)) - response_deserializer = self._response_deserializers.get((group, - method,)) + request_serializer = self._request_serializers.get(( + group, + method, + )) + response_deserializer = self._response_deserializers.get(( + group, + method, + )) return _stream_stream(self._channel, group, method, timeout, protocol_options, metadata, self._metadata_transformer, request_iterator, @@ -568,33 +597,53 @@ class _GenericStub(face.GenericStub): raise NotImplementedError() def unary_unary(self, group, method): - request_serializer = self._request_serializers.get((group, method,)) - response_deserializer = self._response_deserializers.get((group, - method,)) + request_serializer = self._request_serializers.get(( + group, + method, + )) + response_deserializer = self._response_deserializers.get(( + group, + method, + )) return _UnaryUnaryMultiCallable( self._channel, group, method, self._metadata_transformer, request_serializer, response_deserializer) def unary_stream(self, group, method): - request_serializer = self._request_serializers.get((group, method,)) - response_deserializer = self._response_deserializers.get((group, - method,)) + request_serializer = self._request_serializers.get(( + group, + method, + )) + response_deserializer = self._response_deserializers.get(( + group, + method, + )) return _UnaryStreamMultiCallable( self._channel, group, method, self._metadata_transformer, request_serializer, response_deserializer) def stream_unary(self, group, method): - request_serializer = self._request_serializers.get((group, method,)) - response_deserializer = self._response_deserializers.get((group, - method,)) + request_serializer = self._request_serializers.get(( + group, + method, + )) + response_deserializer = self._response_deserializers.get(( + group, + method, + )) return _StreamUnaryMultiCallable( self._channel, group, method, self._metadata_transformer, request_serializer, response_deserializer) def stream_stream(self, group, method): - request_serializer = self._request_serializers.get((group, method,)) - response_deserializer = self._response_deserializers.get((group, - method,)) + request_serializer = self._request_serializers.get(( + group, + method, + )) + response_deserializer = self._response_deserializers.get(( + group, + method, + )) return _StreamStreamMultiCallable( self._channel, group, method, self._metadata_transformer, request_serializer, response_deserializer) @@ -624,8 +673,8 @@ class _DynamicStub(face.DynamicStub): elif method_cardinality is cardinality.Cardinality.STREAM_STREAM: return self._generic_stub.stream_stream(self._group, attr) else: - raise AttributeError('_DynamicStub object has no attribute "%s"!' % - attr) + raise AttributeError( + '_DynamicStub object has no attribute "%s"!' % attr) def __enter__(self): return self diff --git a/src/python/grpcio/grpc/beta/_metadata.py b/src/python/grpcio/grpc/beta/_metadata.py index e135f4dff4..b7c8535285 100644 --- a/src/python/grpcio/grpc/beta/_metadata.py +++ b/src/python/grpcio/grpc/beta/_metadata.py @@ -15,7 +15,10 @@ import collections -_Metadatum = collections.namedtuple('_Metadatum', ('key', 'value',)) +_Metadatum = collections.namedtuple('_Metadatum', ( + 'key', + 'value', +)) def _beta_metadatum(key, value): diff --git a/src/python/grpcio/grpc/beta/_server_adaptations.py b/src/python/grpcio/grpc/beta/_server_adaptations.py index 1c22dbe3bb..3c04fd7639 100644 --- a/src/python/grpcio/grpc/beta/_server_adaptations.py +++ b/src/python/grpcio/grpc/beta/_server_adaptations.py @@ -245,9 +245,15 @@ def _adapt_stream_stream_event(stream_stream_event): class _SimpleMethodHandler( collections.namedtuple('_MethodHandler', ( - 'request_streaming', 'response_streaming', 'request_deserializer', - 'response_serializer', 'unary_unary', 'unary_stream', - 'stream_unary', 'stream_stream',)), grpc.RpcMethodHandler): + 'request_streaming', + 'response_streaming', + 'request_deserializer', + 'response_serializer', + 'unary_unary', + 'unary_stream', + 'stream_unary', + 'stream_stream', + )), grpc.RpcMethodHandler): pass @@ -255,15 +261,17 @@ def _simple_method_handler(implementation, request_deserializer, response_serializer): if implementation.style is style.Service.INLINE: if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY: - return _SimpleMethodHandler( - False, False, request_deserializer, response_serializer, - _adapt_unary_request_inline(implementation.unary_unary_inline), - None, None, None) + return _SimpleMethodHandler(False, False, request_deserializer, + response_serializer, + _adapt_unary_request_inline( + implementation.unary_unary_inline), + None, None, None) elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM: - return _SimpleMethodHandler( - False, True, request_deserializer, response_serializer, None, - _adapt_unary_request_inline(implementation.unary_stream_inline), - None, None) + return _SimpleMethodHandler(False, True, request_deserializer, + response_serializer, None, + _adapt_unary_request_inline( + implementation.unary_stream_inline), + None, None) elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY: return _SimpleMethodHandler(True, False, request_deserializer, response_serializer, None, None, @@ -278,26 +286,28 @@ def _simple_method_handler(implementation, request_deserializer, implementation.stream_stream_inline)) elif implementation.style is style.Service.EVENT: if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY: - return _SimpleMethodHandler( - False, False, request_deserializer, response_serializer, - _adapt_unary_unary_event(implementation.unary_unary_event), - None, None, None) + return _SimpleMethodHandler(False, False, request_deserializer, + response_serializer, + _adapt_unary_unary_event( + implementation.unary_unary_event), + None, None, None) elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM: - return _SimpleMethodHandler( - False, True, request_deserializer, response_serializer, None, - _adapt_unary_stream_event(implementation.unary_stream_event), - None, None) + return _SimpleMethodHandler(False, True, request_deserializer, + response_serializer, None, + _adapt_unary_stream_event( + implementation.unary_stream_event), + None, None) elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY: - return _SimpleMethodHandler( - True, False, request_deserializer, response_serializer, None, - None, - _adapt_stream_unary_event(implementation.stream_unary_event), - None) + return _SimpleMethodHandler(True, False, request_deserializer, + response_serializer, None, None, + _adapt_stream_unary_event( + implementation.stream_unary_event), + None) elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM: - return _SimpleMethodHandler( - True, True, request_deserializer, response_serializer, None, - None, None, - _adapt_stream_stream_event(implementation.stream_stream_event)) + return _SimpleMethodHandler(True, True, request_deserializer, + response_serializer, None, None, None, + _adapt_stream_stream_event( + implementation.stream_stream_event)) def _flatten_method_pair_map(method_pair_map): @@ -325,10 +335,11 @@ class _GenericRpcHandler(grpc.GenericRpcHandler): method_implementation = self._method_implementations.get( handler_call_details.method) if method_implementation is not None: - return _simple_method_handler( - method_implementation, - self._request_deserializers.get(handler_call_details.method), - self._response_serializers.get(handler_call_details.method)) + return _simple_method_handler(method_implementation, + self._request_deserializers.get( + handler_call_details.method), + self._response_serializers.get( + handler_call_details.method)) elif self._multi_method_implementation is None: return None else: diff --git a/src/python/grpcio/grpc/beta/implementations.py b/src/python/grpcio/grpc/beta/implementations.py index 312daf033e..44dbd61c55 100644 --- a/src/python/grpcio/grpc/beta/implementations.py +++ b/src/python/grpcio/grpc/beta/implementations.py @@ -110,8 +110,8 @@ def insecure_channel(host, port): Returns: A Channel to the remote host through which RPCs may be conducted. """ - channel = grpc.insecure_channel(host - if port is None else '%s:%d' % (host, port)) + channel = grpc.insecure_channel(host if port is None else '%s:%d' % (host, + port)) return Channel(channel) diff --git a/src/python/grpcio/grpc/framework/foundation/callable_util.py b/src/python/grpcio/grpc/framework/foundation/callable_util.py index 5bdfda5301..b9b9c49f17 100644 --- a/src/python/grpcio/grpc/framework/foundation/callable_util.py +++ b/src/python/grpcio/grpc/framework/foundation/callable_util.py @@ -50,8 +50,8 @@ class _EasyOutcome( def _call_logging_exceptions(behavior, message, *args, **kwargs): try: - return _EasyOutcome(Outcome.Kind.RETURNED, - behavior(*args, **kwargs), None) + return _EasyOutcome(Outcome.Kind.RETURNED, behavior(*args, **kwargs), + None) except Exception as e: # pylint: disable=broad-except logging.exception(message) return _EasyOutcome(Outcome.Kind.RAISED, None, e) diff --git a/src/python/grpcio/grpc/framework/interfaces/base/utilities.py b/src/python/grpcio/grpc/framework/interfaces/base/utilities.py index a9163d8588..281db62b5d 100644 --- a/src/python/grpcio/grpc/framework/interfaces/base/utilities.py +++ b/src/python/grpcio/grpc/framework/interfaces/base/utilities.py @@ -19,15 +19,22 @@ from grpc.framework.interfaces.base import base class _Completion(base.Completion, - collections.namedtuple('_Completion', ('terminal_metadata', - 'code', 'message',))): + collections.namedtuple('_Completion', ( + 'terminal_metadata', + 'code', + 'message', + ))): """A trivial implementation of base.Completion.""" class _Subscription(base.Subscription, collections.namedtuple('_Subscription', ( - 'kind', 'termination_callback', 'allowance', 'operator', - 'protocol_receiver',))): + 'kind', + 'termination_callback', + 'allowance', + 'operator', + 'protocol_receiver', + ))): """A trivial implementation of base.Subscription.""" diff --git a/src/python/grpcio/grpc/framework/interfaces/face/face.py b/src/python/grpcio/grpc/framework/interfaces/face/face.py index 0b93ea0f49..5b47f11d0d 100644 --- a/src/python/grpcio/grpc/framework/interfaces/face/face.py +++ b/src/python/grpcio/grpc/framework/interfaces/face/face.py @@ -50,13 +50,20 @@ class NoSuchMethodError(Exception): self.method = method def __repr__(self): - return 'face.NoSuchMethodError(%s, %s)' % (self.group, self.method,) + return 'face.NoSuchMethodError(%s, %s)' % ( + self.group, + self.method, + ) class Abortion( - collections.namedtuple('Abortion', - ('kind', 'initial_metadata', 'terminal_metadata', - 'code', 'details',))): + collections.namedtuple('Abortion', ( + 'kind', + 'initial_metadata', + 'terminal_metadata', + 'code', + 'details', + ))): """A value describing RPC abortion. Attributes: diff --git a/src/python/grpcio_health_checking/health_commands.py b/src/python/grpcio_health_checking/health_commands.py index 19547358a1..933f965aa2 100644 --- a/src/python/grpcio_health_checking/health_commands.py +++ b/src/python/grpcio_health_checking/health_commands.py @@ -36,9 +36,9 @@ class CopyProtoModules(setuptools.Command): def run(self): if os.path.isfile(HEALTH_PROTO): - shutil.copyfile( - HEALTH_PROTO, - os.path.join(ROOT_DIR, 'grpc_health/v1/health.proto')) + shutil.copyfile(HEALTH_PROTO, + os.path.join(ROOT_DIR, + 'grpc_health/v1/health.proto')) class BuildPackageProtos(setuptools.Command): diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py index c105f57509..60d309ec65 100644 --- a/src/python/grpcio_health_checking/setup.py +++ b/src/python/grpcio_health_checking/setup.py @@ -56,8 +56,10 @@ PACKAGE_DIRECTORIES = { '': '.', } -INSTALL_REQUIRES = ('protobuf>=3.5.0.post1', - 'grpcio>={version}'.format(version=grpc_version.VERSION),) +INSTALL_REQUIRES = ( + 'protobuf>=3.5.0.post1', + 'grpcio>={version}'.format(version=grpc_version.VERSION), +) try: import health_commands as _health_commands diff --git a/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py b/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py index 26a7ba8685..0c564f10e5 100644 --- a/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py +++ b/src/python/grpcio_reflection/grpc_reflection/v1alpha/reflection.py @@ -27,7 +27,8 @@ def _not_found_error(): return reflection_pb2.ServerReflectionResponse( error_response=reflection_pb2.ErrorResponse( error_code=grpc.StatusCode.NOT_FOUND.value[0], - error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),)) + error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(), + )) def _file_descriptor_response(descriptor): @@ -101,10 +102,11 @@ class ReflectionServicer(reflection_pb2_grpc.ServerReflectionServicer): def _list_services(self): return reflection_pb2.ServerReflectionResponse( - list_services_response=reflection_pb2.ListServiceResponse(service=[ - reflection_pb2.ServiceResponse(name=service_name) - for service_name in self._service_names - ])) + list_services_response=reflection_pb2.ListServiceResponse( + service=[ + reflection_pb2.ServiceResponse(name=service_name) + for service_name in self._service_names + ])) def ServerReflectionInfo(self, request_iterator, context): # pylint: disable=unused-argument @@ -128,7 +130,8 @@ class ReflectionServicer(reflection_pb2_grpc.ServerReflectionServicer): error_response=reflection_pb2.ErrorResponse( error_code=grpc.StatusCode.INVALID_ARGUMENT.value[0], error_message=grpc.StatusCode.INVALID_ARGUMENT.value[1] - .encode(),)) + .encode(), + )) def enable_server_reflection(service_names, server, pool=None): diff --git a/src/python/grpcio_reflection/setup.py b/src/python/grpcio_reflection/setup.py index 760b89373a..10c4c38f19 100644 --- a/src/python/grpcio_reflection/setup.py +++ b/src/python/grpcio_reflection/setup.py @@ -57,8 +57,10 @@ PACKAGE_DIRECTORIES = { '': '.', } -INSTALL_REQUIRES = ('protobuf>=3.5.0.post1', - 'grpcio>={version}'.format(version=grpc_version.VERSION),) +INSTALL_REQUIRES = ( + 'protobuf>=3.5.0.post1', + 'grpcio>={version}'.format(version=grpc_version.VERSION), +) try: import reflection_commands as _reflection_commands diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_multi_callable.py b/src/python/grpcio_testing/grpc_testing/_channel/_multi_callable.py index 32b2f361d3..2b2f5761f5 100644 --- a/src/python/grpcio_testing/grpc_testing/_channel/_multi_callable.py +++ b/src/python/grpcio_testing/grpc_testing/_channel/_multi_callable.py @@ -27,20 +27,20 @@ class UnaryUnary(grpc.UnaryUnaryMultiCallable): def __call__(self, request, timeout=None, metadata=None, credentials=None): rpc_handler = self._channel_handler.invoke_rpc( - self._method_full_rpc_name, - _common.fuss_with_metadata(metadata), [request], True, timeout) + self._method_full_rpc_name, _common.fuss_with_metadata(metadata), + [request], True, timeout) return _invocation.blocking_unary_response(rpc_handler) def with_call(self, request, timeout=None, metadata=None, credentials=None): rpc_handler = self._channel_handler.invoke_rpc( - self._method_full_rpc_name, - _common.fuss_with_metadata(metadata), [request], True, timeout) + self._method_full_rpc_name, _common.fuss_with_metadata(metadata), + [request], True, timeout) return _invocation.blocking_unary_response_with_call(rpc_handler) def future(self, request, timeout=None, metadata=None, credentials=None): rpc_handler = self._channel_handler.invoke_rpc( - self._method_full_rpc_name, - _common.fuss_with_metadata(metadata), [request], True, timeout) + self._method_full_rpc_name, _common.fuss_with_metadata(metadata), + [request], True, timeout) return _invocation.future_call(rpc_handler) @@ -52,8 +52,8 @@ class UnaryStream(grpc.StreamStreamMultiCallable): def __call__(self, request, timeout=None, metadata=None, credentials=None): rpc_handler = self._channel_handler.invoke_rpc( - self._method_full_rpc_name, - _common.fuss_with_metadata(metadata), [request], True, timeout) + self._method_full_rpc_name, _common.fuss_with_metadata(metadata), + [request], True, timeout) return _invocation.ResponseIteratorCall(rpc_handler) @@ -69,8 +69,8 @@ class StreamUnary(grpc.StreamUnaryMultiCallable): metadata=None, credentials=None): rpc_handler = self._channel_handler.invoke_rpc( - self._method_full_rpc_name, - _common.fuss_with_metadata(metadata), [], False, timeout) + self._method_full_rpc_name, _common.fuss_with_metadata(metadata), + [], False, timeout) _invocation.consume_requests(request_iterator, rpc_handler) return _invocation.blocking_unary_response(rpc_handler) @@ -80,8 +80,8 @@ class StreamUnary(grpc.StreamUnaryMultiCallable): metadata=None, credentials=None): rpc_handler = self._channel_handler.invoke_rpc( - self._method_full_rpc_name, - _common.fuss_with_metadata(metadata), [], False, timeout) + self._method_full_rpc_name, _common.fuss_with_metadata(metadata), + [], False, timeout) _invocation.consume_requests(request_iterator, rpc_handler) return _invocation.blocking_unary_response_with_call(rpc_handler) @@ -91,8 +91,8 @@ class StreamUnary(grpc.StreamUnaryMultiCallable): metadata=None, credentials=None): rpc_handler = self._channel_handler.invoke_rpc( - self._method_full_rpc_name, - _common.fuss_with_metadata(metadata), [], False, timeout) + self._method_full_rpc_name, _common.fuss_with_metadata(metadata), + [], False, timeout) _invocation.consume_requests(request_iterator, rpc_handler) return _invocation.future_call(rpc_handler) @@ -109,8 +109,8 @@ class StreamStream(grpc.StreamStreamMultiCallable): metadata=None, credentials=None): rpc_handler = self._channel_handler.invoke_rpc( - self._method_full_rpc_name, - _common.fuss_with_metadata(metadata), [], False, timeout) + self._method_full_rpc_name, _common.fuss_with_metadata(metadata), + [], False, timeout) _invocation.consume_requests(request_iterator, rpc_handler) return _invocation.ResponseIteratorCall(rpc_handler) diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_rpc_state.py b/src/python/grpcio_testing/grpc_testing/_channel/_rpc_state.py index ee0233002d..009f675e49 100644 --- a/src/python/grpcio_testing/grpc_testing/_channel/_rpc_state.py +++ b/src/python/grpcio_testing/grpc_testing/_channel/_rpc_state.py @@ -179,8 +179,8 @@ class State(_common.ChannelRpcHandler): elif self._code is None: self._condition.wait() else: - raise ValueError( - 'Status code unexpectedly {}!'.format(self._code)) + raise ValueError('Status code unexpectedly {}!'.format( + self._code)) def is_active(self): raise NotImplementedError() diff --git a/src/python/grpcio_testing/grpc_testing/_common.py b/src/python/grpcio_testing/grpc_testing/_common.py index 05327b0ac2..cebad31b5c 100644 --- a/src/python/grpcio_testing/grpc_testing/_common.py +++ b/src/python/grpcio_testing/grpc_testing/_common.py @@ -20,9 +20,10 @@ import six def _fuss(tuplified_metadata): - return tuplified_metadata + ( - ('grpc.metadata_added_by_runtime', - 'gRPC is allowed to add metadata in transmission and does so.',),) + return tuplified_metadata + (( + 'grpc.metadata_added_by_runtime', + 'gRPC is allowed to add metadata in transmission and does so.', + ),) FUSSED_EMPTY_METADATA = _fuss(()) @@ -46,9 +47,12 @@ def rpc_names(service_descriptors): class ChannelRpcRead( - collections.namedtuple( - 'ChannelRpcRead', - ('response', 'trailing_metadata', 'code', 'details',))): + collections.namedtuple('ChannelRpcRead', ( + 'response', + 'trailing_metadata', + 'code', + 'details', + ))): pass @@ -100,8 +104,11 @@ class ChannelHandler(six.with_metaclass(abc.ABCMeta)): class ServerRpcRead( - collections.namedtuple('ServerRpcRead', - ('request', 'requests_closed', 'terminated',))): + collections.namedtuple('ServerRpcRead', ( + 'request', + 'requests_closed', + 'terminated', + ))): pass diff --git a/src/python/grpcio_testing/grpc_testing/_server/_handler.py b/src/python/grpcio_testing/grpc_testing/_server/_handler.py index 5e4730e087..d4f50f6863 100644 --- a/src/python/grpcio_testing/grpc_testing/_server/_handler.py +++ b/src/python/grpcio_testing/grpc_testing/_server/_handler.py @@ -170,8 +170,12 @@ class _Handler(Handler): if self._unary_response is None: if self._responses: self._unary_response = self._responses.pop(0) - return (self._unary_response, self._trailing_metadata, - self._code, self._details,) + return ( + self._unary_response, + self._trailing_metadata, + self._code, + self._details, + ) def stream_response_termination(self): with self._condition: diff --git a/src/python/grpcio_testing/grpc_testing/_server/_server.py b/src/python/grpcio_testing/grpc_testing/_server/_server.py index 3e358e50a9..c7effb6b55 100644 --- a/src/python/grpcio_testing/grpc_testing/_server/_server.py +++ b/src/python/grpcio_testing/grpc_testing/_server/_server.py @@ -76,7 +76,11 @@ class _Serverish(_common.Serverish): rpc, self._time, deadline) service_thread = threading.Thread( target=service_behavior, - args=(implementation, rpc, servicer_context,)) + args=( + implementation, + rpc, + servicer_context, + )) service_thread.start() def invoke_unary_unary(self, method_descriptor, handler, diff --git a/src/python/grpcio_testing/grpc_testing/_time.py b/src/python/grpcio_testing/grpc_testing/_time.py index 3b1ab4bcd8..afbdad3524 100644 --- a/src/python/grpcio_testing/grpc_testing/_time.py +++ b/src/python/grpcio_testing/grpc_testing/_time.py @@ -46,9 +46,11 @@ class _State(object): class _Delta( - collections.namedtuple('_Delta', - ('mature_behaviors', 'earliest_mature_time', - 'earliest_immature_time',))): + collections.namedtuple('_Delta', ( + 'mature_behaviors', + 'earliest_mature_time', + 'earliest_immature_time', + ))): pass diff --git a/src/python/grpcio_testing/setup.py b/src/python/grpcio_testing/setup.py index fa40424f6a..5a9d593ec1 100644 --- a/src/python/grpcio_testing/setup.py +++ b/src/python/grpcio_testing/setup.py @@ -28,8 +28,10 @@ PACKAGE_DIRECTORIES = { '': '.', } -INSTALL_REQUIRES = ('protobuf>=3.5.0.post1', - 'grpcio>={version}'.format(version=grpc_version.VERSION),) +INSTALL_REQUIRES = ( + 'protobuf>=3.5.0.post1', + 'grpcio>={version}'.format(version=grpc_version.VERSION), +) setuptools.setup( name='grpcio-testing', diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py index aeb4ea9c53..250df65803 100644 --- a/src/python/grpcio_tests/setup.py +++ b/src/python/grpcio_tests/setup.py @@ -99,4 +99,5 @@ setuptools.setup( tests_require=TESTS_REQUIRE, test_suite=TEST_SUITE, test_loader=TEST_LOADER, - test_runner=TEST_RUNNER,) + test_runner=TEST_RUNNER, +) diff --git a/src/python/grpcio_tests/tests/_loader.py b/src/python/grpcio_tests/tests/_loader.py index 281a23c16e..31680916b4 100644 --- a/src/python/grpcio_tests/tests/_loader.py +++ b/src/python/grpcio_tests/tests/_loader.py @@ -101,5 +101,5 @@ def iterate_suite_cases(suite): elif isinstance(item, unittest.TestCase): yield item else: - raise ValueError( - 'unexpected suite item of type {}'.format(type(item))) + raise ValueError('unexpected suite item of type {}'.format( + type(item))) diff --git a/src/python/grpcio_tests/tests/_result.py b/src/python/grpcio_tests/tests/_result.py index f26fdefc97..9907c4e1f9 100644 --- a/src/python/grpcio_tests/tests/_result.py +++ b/src/python/grpcio_tests/tests/_result.py @@ -215,7 +215,8 @@ class AugmentedResult(unittest.TestResult): Args: filter (callable): A unary predicate to filter over CaseResult objects. """ - return (self.cases[case_id] for case_id in self.cases + return (self.cases[case_id] + for case_id in self.cases if filter(self.cases[case_id])) @@ -285,8 +286,8 @@ class TerminalResult(CoverageResult): def startTestRun(self): """See unittest.TestResult.startTestRun.""" super(TerminalResult, self).startTestRun() - self.out.write(_Colors.HEADER + 'Testing gRPC Python...\n' + - _Colors.END) + self.out.write( + _Colors.HEADER + 'Testing gRPC Python...\n' + _Colors.END) def stopTestRun(self): """See unittest.TestResult.stopTestRun.""" @@ -297,43 +298,43 @@ class TerminalResult(CoverageResult): def addError(self, test, error): """See unittest.TestResult.addError.""" super(TerminalResult, self).addError(test, error) - self.out.write(_Colors.FAIL + 'ERROR {}\n'.format(test.id()) + - _Colors.END) + self.out.write( + _Colors.FAIL + 'ERROR {}\n'.format(test.id()) + _Colors.END) self.out.flush() def addFailure(self, test, error): """See unittest.TestResult.addFailure.""" super(TerminalResult, self).addFailure(test, error) - self.out.write(_Colors.FAIL + 'FAILURE {}\n'.format(test.id()) + - _Colors.END) + self.out.write( + _Colors.FAIL + 'FAILURE {}\n'.format(test.id()) + _Colors.END) self.out.flush() def addSuccess(self, test): """See unittest.TestResult.addSuccess.""" super(TerminalResult, self).addSuccess(test) - self.out.write(_Colors.OK + 'SUCCESS {}\n'.format(test.id()) + - _Colors.END) + self.out.write( + _Colors.OK + 'SUCCESS {}\n'.format(test.id()) + _Colors.END) self.out.flush() def addSkip(self, test, reason): """See unittest.TestResult.addSkip.""" super(TerminalResult, self).addSkip(test, reason) - self.out.write(_Colors.INFO + 'SKIP {}\n'.format(test.id()) + - _Colors.END) + self.out.write( + _Colors.INFO + 'SKIP {}\n'.format(test.id()) + _Colors.END) self.out.flush() def addExpectedFailure(self, test, error): """See unittest.TestResult.addExpectedFailure.""" super(TerminalResult, self).addExpectedFailure(test, error) - self.out.write(_Colors.INFO + 'FAILURE_OK {}\n'.format(test.id()) + - _Colors.END) + self.out.write( + _Colors.INFO + 'FAILURE_OK {}\n'.format(test.id()) + _Colors.END) self.out.flush() def addUnexpectedSuccess(self, test): """See unittest.TestResult.addUnexpectedSuccess.""" super(TerminalResult, self).addUnexpectedSuccess(test) - self.out.write(_Colors.INFO + 'UNEXPECTED_OK {}\n'.format(test.id()) + - _Colors.END) + self.out.write( + _Colors.INFO + 'UNEXPECTED_OK {}\n'.format(test.id()) + _Colors.END) self.out.flush() diff --git a/src/python/grpcio_tests/tests/_runner.py b/src/python/grpcio_tests/tests/_runner.py index 8fb4a0e09b..8e27dc6c6d 100644 --- a/src/python/grpcio_tests/tests/_runner.py +++ b/src/python/grpcio_tests/tests/_runner.py @@ -181,8 +181,8 @@ class Runner(object): # Run the tests result.startTestRun() for augmented_case in augmented_cases: - sys.stdout.write( - 'Running {}\n'.format(augmented_case.case.id())) + sys.stdout.write('Running {}\n'.format( + augmented_case.case.id())) sys.stdout.flush() case_thread = threading.Thread( target=augmented_case.case.run, args=(result,)) @@ -196,8 +196,8 @@ class Runner(object): except: # re-raise the exception after forcing the with-block to end raise - result.set_output(augmented_case.case, - stdout_pipe.output(), stderr_pipe.output()) + result.set_output(augmented_case.case, stdout_pipe.output(), + stderr_pipe.output()) sys.stdout.write(result_out.getvalue()) sys.stdout.flush() result_out.truncate(0) diff --git a/src/python/grpcio_tests/tests/http2/negative_http2_client.py b/src/python/grpcio_tests/tests/http2/negative_http2_client.py index 8dab5b67f1..e4076827b6 100644 --- a/src/python/grpcio_tests/tests/http2/negative_http2_client.py +++ b/src/python/grpcio_tests/tests/http2/negative_http2_client.py @@ -32,14 +32,14 @@ def _validate_payload_type_and_length(response, expected_type, expected_length): def _expect_status_code(call, expected_code): if call.code() != expected_code: - raise ValueError('expected code %s, got %s' % - (expected_code, call.code())) + raise ValueError('expected code %s, got %s' % (expected_code, + call.code())) def _expect_status_details(call, expected_details): if call.details() != expected_details: - raise ValueError('expected message %s, got %s' % - (expected_details, call.details())) + raise ValueError('expected message %s, got %s' % (expected_details, + call.details())) def _validate_status_code_and_details(call, expected_code, expected_details): diff --git a/src/python/grpcio_tests/tests/interop/_intraop_test_case.py b/src/python/grpcio_tests/tests/interop/_intraop_test_case.py index ce456a679b..007db7ab41 100644 --- a/src/python/grpcio_tests/tests/interop/_intraop_test_case.py +++ b/src/python/grpcio_tests/tests/interop/_intraop_test_case.py @@ -39,8 +39,8 @@ class IntraopTestCase(object): methods.TestCase.PING_PONG.test_interoperability(self.stub, None) def testCancelAfterBegin(self): - methods.TestCase.CANCEL_AFTER_BEGIN.test_interoperability(self.stub, - None) + methods.TestCase.CANCEL_AFTER_BEGIN.test_interoperability( + self.stub, None) def testCancelAfterFirstResponse(self): methods.TestCase.CANCEL_AFTER_FIRST_RESPONSE.test_interoperability( diff --git a/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py index 6ce8b3715b..c89135998d 100644 --- a/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py +++ b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py @@ -34,15 +34,16 @@ class SecureIntraopTest(_intraop_test_case.IntraopTestCase, unittest.TestCase): self.server) port = self.server.add_secure_port( '[::]:0', - grpc.ssl_server_credentials( - [(resources.private_key(), resources.certificate_chain())])) + grpc.ssl_server_credentials([(resources.private_key(), + resources.certificate_chain())])) self.server.start() self.stub = test_pb2_grpc.TestServiceStub( grpc.secure_channel('localhost:{}'.format(port), grpc.ssl_channel_credentials( - resources.test_root_certificates()), ( - ('grpc.ssl_target_name_override', - _SERVER_HOST_OVERRIDE,),))) + resources.test_root_certificates()), (( + 'grpc.ssl_target_name_override', + _SERVER_HOST_OVERRIDE, + ),))) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/interop/client.py b/src/python/grpcio_tests/tests/interop/client.py index 383b5f033d..3780ed9020 100644 --- a/src/python/grpcio_tests/tests/interop/client.py +++ b/src/python/grpcio_tests/tests/interop/client.py @@ -104,8 +104,10 @@ def _stub(args): channel_credentials = grpc.composite_channel_credentials( channel_credentials, call_credentials) - channel = grpc.secure_channel(target, channel_credentials, ( - ('grpc.ssl_target_name_override', args.server_host_override,),)) + channel = grpc.secure_channel(target, channel_credentials, (( + 'grpc.ssl_target_name_override', + args.server_host_override, + ),)) else: channel = grpc.insecure_channel(target) if args.test_case == "unimplemented_service": diff --git a/src/python/grpcio_tests/tests/interop/methods.py b/src/python/grpcio_tests/tests/interop/methods.py index ae9a50dd9b..b728ffd704 100644 --- a/src/python/grpcio_tests/tests/interop/methods.py +++ b/src/python/grpcio_tests/tests/interop/methods.py @@ -62,9 +62,10 @@ class TestService(test_pb2_grpc.TestServiceServicer): def UnaryCall(self, request, context): _maybe_echo_metadata(context) _maybe_echo_status_and_message(request, context) - return messages_pb2.SimpleResponse(payload=messages_pb2.Payload( - type=messages_pb2.COMPRESSABLE, - body=b'\x00' * request.response_size)) + return messages_pb2.SimpleResponse( + payload=messages_pb2.Payload( + type=messages_pb2.COMPRESSABLE, + body=b'\x00' * request.response_size)) def StreamingOutputCall(self, request, context): _maybe_echo_status_and_message(request, context) @@ -100,14 +101,14 @@ class TestService(test_pb2_grpc.TestServiceServicer): def _expect_status_code(call, expected_code): if call.code() != expected_code: - raise ValueError('expected code %s, got %s' % - (expected_code, call.code())) + raise ValueError('expected code %s, got %s' % (expected_code, + call.code())) def _expect_status_details(call, expected_details): if call.details() != expected_details: - raise ValueError('expected message %s, got %s' % - (expected_details, call.details())) + raise ValueError('expected message %s, got %s' % (expected_details, + call.details())) def _validate_status_code_and_details(call, expected_code, expected_details): @@ -152,26 +153,38 @@ def _large_unary(stub): def _client_streaming(stub): - payload_body_sizes = (27182, 8, 1828, 45904,) + payload_body_sizes = ( + 27182, + 8, + 1828, + 45904, + ) payloads = (messages_pb2.Payload(body=b'\x00' * size) for size in payload_body_sizes) requests = (messages_pb2.StreamingInputCallRequest(payload=payload) for payload in payloads) response = stub.StreamingInputCall(requests) if response.aggregated_payload_size != 74922: - raise ValueError('incorrect size %d!' % - response.aggregated_payload_size) + raise ValueError( + 'incorrect size %d!' % response.aggregated_payload_size) def _server_streaming(stub): - sizes = (31415, 9, 2653, 58979,) + sizes = ( + 31415, + 9, + 2653, + 58979, + ) request = messages_pb2.StreamingOutputCallRequest( response_type=messages_pb2.COMPRESSABLE, - response_parameters=(messages_pb2.ResponseParameters(size=sizes[0]), - messages_pb2.ResponseParameters(size=sizes[1]), - messages_pb2.ResponseParameters(size=sizes[2]), - messages_pb2.ResponseParameters(size=sizes[3]),)) + response_parameters=( + messages_pb2.ResponseParameters(size=sizes[0]), + messages_pb2.ResponseParameters(size=sizes[1]), + messages_pb2.ResponseParameters(size=sizes[2]), + messages_pb2.ResponseParameters(size=sizes[3]), + )) response_iterator = stub.StreamingOutputCall(request) for index, response in enumerate(response_iterator): _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, @@ -218,8 +231,18 @@ class _Pipe(object): def _ping_pong(stub): - request_response_sizes = (31415, 9, 2653, 58979,) - request_payload_sizes = (27182, 8, 1828, 45904,) + request_response_sizes = ( + 31415, + 9, + 2653, + 58979, + ) + request_payload_sizes = ( + 27182, + 8, + 1828, + 45904, + ) with _Pipe() as pipe: response_iterator = stub.FullDuplexCall(pipe) @@ -247,8 +270,18 @@ def _cancel_after_begin(stub): def _cancel_after_first_response(stub): - request_response_sizes = (31415, 9, 2653, 58979,) - request_payload_sizes = (27182, 8, 1828, 45904,) + request_response_sizes = ( + 31415, + 9, + 2653, + 58979, + ) + request_payload_sizes = ( + 27182, + 8, + 1828, + 45904, + ) with _Pipe() as pipe: response_iterator = stub.FullDuplexCall(pipe) @@ -331,14 +364,14 @@ def _status_code_and_message(stub): def _unimplemented_method(test_service_stub): - response_future = ( - test_service_stub.UnimplementedCall.future(empty_pb2.Empty())) + response_future = (test_service_stub.UnimplementedCall.future( + empty_pb2.Empty())) _expect_status_code(response_future, grpc.StatusCode.UNIMPLEMENTED) def _unimplemented_service(unimplemented_service_stub): - response_future = ( - unimplemented_service_stub.UnimplementedCall.future(empty_pb2.Empty())) + response_future = (unimplemented_service_stub.UnimplementedCall.future( + empty_pb2.Empty())) _expect_status_code(response_future, grpc.StatusCode.UNIMPLEMENTED) @@ -392,11 +425,12 @@ def _oauth2_auth_token(stub, args): wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] response = _large_unary_common_behavior(stub, True, True, None) if wanted_email != response.username: - raise ValueError('expected username %s, got %s' % - (wanted_email, response.username)) + raise ValueError('expected username %s, got %s' % (wanted_email, + response.username)) if args.oauth_scope.find(response.oauth_scope) == -1: - raise ValueError('expected to find oauth scope "{}" in received "{}"'. - format(response.oauth_scope, args.oauth_scope)) + raise ValueError( + 'expected to find oauth scope "{}" in received "{}"'.format( + response.oauth_scope, args.oauth_scope)) def _jwt_token_creds(stub, args): @@ -404,8 +438,8 @@ def _jwt_token_creds(stub, args): wanted_email = json.load(open(json_key_filename, 'rb'))['client_email'] response = _large_unary_common_behavior(stub, True, False, None) if wanted_email != response.username: - raise ValueError('expected username %s, got %s' % - (wanted_email, response.username)) + raise ValueError('expected username %s, got %s' % (wanted_email, + response.username)) def _per_rpc_creds(stub, args): @@ -419,8 +453,8 @@ def _per_rpc_creds(stub, args): request=google_auth_transport_requests.Request())) response = _large_unary_common_behavior(stub, True, False, call_credentials) if wanted_email != response.username: - raise ValueError('expected username %s, got %s' % - (wanted_email, response.username)) + raise ValueError('expected username %s, got %s' % (wanted_email, + response.username)) @enum.unique @@ -479,5 +513,5 @@ class TestCase(enum.Enum): elif self is TestCase.PER_RPC_CREDS: _per_rpc_creds(stub, args) else: - raise NotImplementedError('Test case "%s" not implemented!' % - self.name) + raise NotImplementedError( + 'Test case "%s" not implemented!' % self.name) diff --git a/src/python/grpcio_tests/tests/interop/server.py b/src/python/grpcio_tests/tests/interop/server.py index dd4f5146e9..0810de2394 100644 --- a/src/python/grpcio_tests/tests/interop/server.py +++ b/src/python/grpcio_tests/tests/interop/server.py @@ -45,8 +45,8 @@ def serve(): if args.use_tls: private_key = resources.private_key() certificate_chain = resources.certificate_chain() - credentials = grpc.ssl_server_credentials(( - (private_key, certificate_chain),)) + credentials = grpc.ssl_server_credentials(((private_key, + certificate_chain),)) server.add_secure_port('[::]:{}'.format(args.port), credentials) else: server.add_insecure_port('[::]:{}'.format(args.port)) diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py index 8fc539e641..6d85f43130 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py @@ -119,8 +119,11 @@ class _ServicerMethods(object): class _Service( - collections.namedtuple('_Service', ('servicer_methods', 'server', - 'stub',))): + collections.namedtuple('_Service', ( + 'servicer_methods', + 'server', + 'stub', + ))): """A live and running service. Attributes: @@ -297,8 +300,8 @@ class PythonPluginTest(unittest.TestCase): responses = service.stub.StreamingOutputCall(request) expected_responses = service.servicer_methods.StreamingOutputCall( request, 'not a real RpcContext!') - for expected_response, response in moves.zip_longest(expected_responses, - responses): + for expected_response, response in moves.zip_longest( + expected_responses, responses): self.assertEqual(expected_response, response) def testStreamingOutputCallExpired(self): @@ -388,8 +391,8 @@ class PythonPluginTest(unittest.TestCase): responses = service.stub.FullDuplexCall(_full_duplex_request_iterator()) expected_responses = service.servicer_methods.FullDuplexCall( _full_duplex_request_iterator(), 'not a real RpcContext!') - for expected_response, response in moves.zip_longest(expected_responses, - responses): + for expected_response, response in moves.zip_longest( + expected_responses, responses): self.assertEqual(expected_response, response) def testFullDuplexCallExpired(self): @@ -439,8 +442,8 @@ class PythonPluginTest(unittest.TestCase): responses = service.stub.HalfDuplexCall(half_duplex_request_iterator()) expected_responses = service.servicer_methods.HalfDuplexCall( half_duplex_request_iterator(), 'not a real RpcContext!') - for expected_response, response in moves.zip_longest(expected_responses, - responses): + for expected_response, response in moves.zip_longest( + expected_responses, responses): self.assertEqual(expected_response, response) def testHalfDuplexCallWedged(self): diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py index c732e55108..ab33775ad3 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py @@ -64,8 +64,8 @@ def _massage_proto_content(proto_content, test_name_bytes, messages_proto_relative_file_name_bytes): package_substitution = (b'package grpc_protoc_plugin.invocation_testing.' + test_name_bytes + b';') - common_namespace_substituted = proto_content.replace(_COMMON_NAMESPACE, - package_substitution) + common_namespace_substituted = proto_content.replace( + _COMMON_NAMESPACE, package_substitution) split_namespace_substituted = common_namespace_substituted.replace( _SPLIT_NAMESPACE, package_substitution) message_import_replaced = split_namespace_substituted.replace( @@ -163,8 +163,12 @@ class _GrpcBeforeProtoProtocStyle(object): return pb2_grpc_protoc_exit_code, pb2_protoc_exit_code, -_PROTOC_STYLES = (_Mid2016ProtocStyle(), _SingleProtocExecutionProtocStyle(), - _ProtoBeforeGrpcProtocStyle(), _GrpcBeforeProtoProtocStyle(),) +_PROTOC_STYLES = ( + _Mid2016ProtocStyle(), + _SingleProtocExecutionProtocStyle(), + _ProtoBeforeGrpcProtocStyle(), + _GrpcBeforeProtoProtocStyle(), +) @unittest.skipIf(platform.python_implementation() == 'PyPy', @@ -180,18 +184,22 @@ class _Test(six.with_metaclass(abc.ABCMeta, unittest.TestCase)): os.makedirs(self._python_out) proto_directories_and_names = { - (self.MESSAGES_PROTO_RELATIVE_DIRECTORY_NAMES, - self.MESSAGES_PROTO_FILE_NAME,), - (self.SERVICES_PROTO_RELATIVE_DIRECTORY_NAMES, - self.SERVICES_PROTO_FILE_NAME,), + ( + self.MESSAGES_PROTO_RELATIVE_DIRECTORY_NAMES, + self.MESSAGES_PROTO_FILE_NAME, + ), + ( + self.SERVICES_PROTO_RELATIVE_DIRECTORY_NAMES, + self.SERVICES_PROTO_FILE_NAME, + ), } messages_proto_relative_file_name_forward_slashes = '/'.join( - self.MESSAGES_PROTO_RELATIVE_DIRECTORY_NAMES + ( - self.MESSAGES_PROTO_FILE_NAME,)) - _create_directory_tree(self._proto_path, ( - relative_proto_directory_names - for relative_proto_directory_names, _ in proto_directories_and_names - )) + self.MESSAGES_PROTO_RELATIVE_DIRECTORY_NAMES + + (self.MESSAGES_PROTO_FILE_NAME,)) + _create_directory_tree(self._proto_path, + (relative_proto_directory_names + for relative_proto_directory_names, _ in + proto_directories_and_names)) self._absolute_proto_file_names = set() for relative_directory_names, file_name in proto_directories_and_names: absolute_proto_file_name = path.join( @@ -200,8 +208,7 @@ class _Test(six.with_metaclass(abc.ABCMeta, unittest.TestCase)): 'tests.protoc_plugin.protos.invocation_testing', path.join(*relative_directory_names + (file_name,))) massaged_proto_content = _massage_proto_content( - raw_proto_content, - self.NAME.encode(), + raw_proto_content, self.NAME.encode(), messages_proto_relative_file_name_forward_slashes.encode()) with open(absolute_proto_file_name, 'wb') as proto_file: proto_file.write(massaged_proto_content) @@ -275,7 +282,9 @@ def _create_test_case_class(split_proto, protoc_style): if split_proto: attributes['MESSAGES_PROTO_RELATIVE_DIRECTORY_NAMES'] = ( - 'split_messages', 'sub',) + 'split_messages', + 'sub', + ) attributes['MESSAGES_PROTO_FILE_NAME'] = 'messages.proto' attributes['SERVICES_PROTO_RELATIVE_DIRECTORY_NAMES'] = ( 'split_services',) @@ -301,7 +310,10 @@ def _create_test_case_class(split_proto, protoc_style): def _create_test_case_classes(): - for split_proto in (False, True,): + for split_proto in ( + False, + True, + ): for protoc_style in _PROTOC_STYLES: yield _create_test_case_class(split_proto, protoc_style) diff --git a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py index 424b153ff8..ad0ecf0079 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py @@ -36,10 +36,28 @@ _RELATIVE_PROTO_PATH = 'relative_proto_path' _RELATIVE_PYTHON_OUT = 'relative_python_out' _PROTO_FILES_PATH_COMPONENTS = ( - ('beta_grpc_plugin_test', 'payload', 'test_payload.proto',), - ('beta_grpc_plugin_test', 'requests', 'r', 'test_requests.proto',), - ('beta_grpc_plugin_test', 'responses', 'test_responses.proto',), - ('beta_grpc_plugin_test', 'service', 'test_service.proto',),) + ( + 'beta_grpc_plugin_test', + 'payload', + 'test_payload.proto', + ), + ( + 'beta_grpc_plugin_test', + 'requests', + 'r', + 'test_requests.proto', + ), + ( + 'beta_grpc_plugin_test', + 'responses', + 'test_responses.proto', + ), + ( + 'beta_grpc_plugin_test', + 'service', + 'test_service.proto', + ), +) _PAYLOAD_PB2 = 'beta_grpc_plugin_test.payload.test_payload_pb2' _REQUESTS_PB2 = 'beta_grpc_plugin_test.requests.r.test_requests_pb2' diff --git a/src/python/grpcio_tests/tests/qps/benchmark_client.py b/src/python/grpcio_tests/tests/qps/benchmark_client.py index 17fa61ea36..e6392a8b8c 100644 --- a/src/python/grpcio_tests/tests/qps/benchmark_client.py +++ b/src/python/grpcio_tests/tests/qps/benchmark_client.py @@ -155,7 +155,8 @@ class _SyncStream(object): _TIMEOUT) for _ in response_stream: self._handle_response( - self, time.time() - self._send_time_queue.get_nowait()) + self, + time.time() - self._send_time_queue.get_nowait()) def stop(self): self._is_streaming = False diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py index fef4fb0459..41e2403c8f 100644 --- a/src/python/grpcio_tests/tests/qps/worker_server.py +++ b/src/python/grpcio_tests/tests/qps/worker_server.py @@ -72,8 +72,8 @@ class WorkerServer(services_pb2_grpc.WorkerServiceServicer): server = test_common.test_server(max_workers=server_threads) if config.server_type == control_pb2.ASYNC_SERVER: servicer = benchmark_server.BenchmarkServer() - services_pb2_grpc.add_BenchmarkServiceServicer_to_server(servicer, - server) + services_pb2_grpc.add_BenchmarkServiceServicer_to_server( + servicer, server) elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER: resp_size = config.payload_config.bytebuf_params.resp_size servicer = benchmark_server.GenericBenchmarkServer(resp_size) @@ -87,12 +87,12 @@ class WorkerServer(services_pb2_grpc.WorkerServiceServicer): 'grpc.testing.BenchmarkService', method_implementations) server.add_generic_rpc_handlers((handler,)) else: - raise Exception( - 'Unsupported server type {}'.format(config.server_type)) + raise Exception('Unsupported server type {}'.format( + config.server_type)) if config.HasField('security_params'): # Use SSL - server_creds = grpc.ssl_server_credentials(( - (resources.private_key(), resources.certificate_chain()),)) + server_creds = grpc.ssl_server_credentials( + ((resources.private_key(), resources.certificate_chain()),)) port = server.add_secure_port('[::]:{}'.format(config.port), server_creds) else: @@ -156,8 +156,8 @@ class WorkerServer(services_pb2_grpc.WorkerServiceServicer): else: raise Exception('Async streaming client not supported') else: - raise Exception( - 'Unsupported client type {}'.format(config.client_type)) + raise Exception('Unsupported client type {}'.format( + config.client_type)) # In multi-channel tests, we split the load across all channels load_factor = float(config.client_channels) diff --git a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py index 86037e258a..7ffdba6a67 100644 --- a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py +++ b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py @@ -33,7 +33,13 @@ _EMPTY_PROTO_SYMBOL_NAME = 'grpc.testing.Empty' _SERVICE_NAMES = ('Angstrom', 'Bohr', 'Curie', 'Dyson', 'Einstein', 'Feynman', 'Galilei') _EMPTY_EXTENSIONS_SYMBOL_NAME = 'grpc.testing.proto2.EmptyWithExtensions' -_EMPTY_EXTENSIONS_NUMBERS = (124, 125, 126, 127, 128,) +_EMPTY_EXTENSIONS_NUMBERS = ( + 124, + 125, + 126, + 127, + 128, +) def _file_descriptor_to_proto(descriptor): @@ -54,10 +60,12 @@ class ReflectionServicerTest(unittest.TestCase): self._stub = reflection_pb2_grpc.ServerReflectionStub(channel) def testFileByName(self): - requests = (reflection_pb2.ServerReflectionRequest( - file_by_filename=_EMPTY_PROTO_FILE_NAME), - reflection_pb2.ServerReflectionRequest( - file_by_filename='i-donut-exist'),) + requests = ( + reflection_pb2.ServerReflectionRequest( + file_by_filename=_EMPTY_PROTO_FILE_NAME), + reflection_pb2.ServerReflectionRequest( + file_by_filename='i-donut-exist'), + ) responses = tuple(self._stub.ServerReflectionInfo(iter(requests))) expected_responses = ( reflection_pb2.ServerReflectionResponse( @@ -70,14 +78,18 @@ class ReflectionServicerTest(unittest.TestCase): error_response=reflection_pb2.ErrorResponse( error_code=grpc.StatusCode.NOT_FOUND.value[0], error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(), - )),) + )), + ) self.assertSequenceEqual(expected_responses, responses) def testFileBySymbol(self): - requests = (reflection_pb2.ServerReflectionRequest( - file_containing_symbol=_EMPTY_PROTO_SYMBOL_NAME - ), reflection_pb2.ServerReflectionRequest( - file_containing_symbol='i.donut.exist.co.uk.org.net.me.name.foo'),) + requests = ( + reflection_pb2.ServerReflectionRequest( + file_containing_symbol=_EMPTY_PROTO_SYMBOL_NAME), + reflection_pb2.ServerReflectionRequest( + file_containing_symbol='i.donut.exist.co.uk.org.net.me.name.foo' + ), + ) responses = tuple(self._stub.ServerReflectionInfo(iter(requests))) expected_responses = ( reflection_pb2.ServerReflectionResponse( @@ -90,18 +102,23 @@ class ReflectionServicerTest(unittest.TestCase): error_response=reflection_pb2.ErrorResponse( error_code=grpc.StatusCode.NOT_FOUND.value[0], error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(), - )),) + )), + ) self.assertSequenceEqual(expected_responses, responses) def testFileContainingExtension(self): - requests = (reflection_pb2.ServerReflectionRequest( - file_containing_extension=reflection_pb2.ExtensionRequest( - containing_type=_EMPTY_EXTENSIONS_SYMBOL_NAME, - extension_number=125,), - ), reflection_pb2.ServerReflectionRequest( - file_containing_extension=reflection_pb2.ExtensionRequest( - containing_type='i.donut.exist.co.uk.org.net.me.name.foo', - extension_number=55,),),) + requests = ( + reflection_pb2.ServerReflectionRequest( + file_containing_extension=reflection_pb2.ExtensionRequest( + containing_type=_EMPTY_EXTENSIONS_SYMBOL_NAME, + extension_number=125, + ),), + reflection_pb2.ServerReflectionRequest( + file_containing_extension=reflection_pb2.ExtensionRequest( + containing_type='i.donut.exist.co.uk.org.net.me.name.foo', + extension_number=55, + ),), + ) responses = tuple(self._stub.ServerReflectionInfo(iter(requests))) expected_responses = ( reflection_pb2.ServerReflectionResponse( @@ -114,14 +131,18 @@ class ReflectionServicerTest(unittest.TestCase): error_response=reflection_pb2.ErrorResponse( error_code=grpc.StatusCode.NOT_FOUND.value[0], error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(), - )),) + )), + ) self.assertSequenceEqual(expected_responses, responses) def testExtensionNumbersOfType(self): - requests = (reflection_pb2.ServerReflectionRequest( - all_extension_numbers_of_type=_EMPTY_EXTENSIONS_SYMBOL_NAME - ), reflection_pb2.ServerReflectionRequest( - all_extension_numbers_of_type='i.donut.exist.co.uk.net.name.foo'),) + requests = ( + reflection_pb2.ServerReflectionRequest( + all_extension_numbers_of_type=_EMPTY_EXTENSIONS_SYMBOL_NAME), + reflection_pb2.ServerReflectionRequest( + all_extension_numbers_of_type='i.donut.exist.co.uk.net.name.foo' + ), + ) responses = tuple(self._stub.ServerReflectionInfo(iter(requests))) expected_responses = ( reflection_pb2.ServerReflectionResponse( @@ -135,12 +156,12 @@ class ReflectionServicerTest(unittest.TestCase): error_response=reflection_pb2.ErrorResponse( error_code=grpc.StatusCode.NOT_FOUND.value[0], error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(), - )),) + )), + ) self.assertSequenceEqual(expected_responses, responses) def testListServices(self): - requests = (reflection_pb2.ServerReflectionRequest( - list_services='',),) + requests = (reflection_pb2.ServerReflectionRequest(list_services='',),) responses = tuple(self._stub.ServerReflectionInfo(iter(requests))) expected_responses = (reflection_pb2.ServerReflectionResponse( valid_host='', diff --git a/src/python/grpcio_tests/tests/stress/client.py b/src/python/grpcio_tests/tests/stress/client.py index 40caa3926a..41f2e1b6c2 100644 --- a/src/python/grpcio_tests/tests/stress/client.py +++ b/src/python/grpcio_tests/tests/stress/client.py @@ -102,8 +102,10 @@ def _get_channel(target, args): root_certificates = None # will load default roots. channel_credentials = grpc.ssl_channel_credentials( root_certificates=root_certificates) - options = (('grpc.ssl_target_name_override', - args.server_host_override,),) + options = (( + 'grpc.ssl_target_name_override', + args.server_host_override, + ),) channel = grpc.secure_channel( target, channel_credentials, options=options) else: diff --git a/src/python/grpcio_tests/tests/testing/_client_application.py b/src/python/grpcio_tests/tests/testing/_client_application.py index aff32fb4dc..7d0d74c8c4 100644 --- a/src/python/grpcio_tests/tests/testing/_client_application.py +++ b/src/python/grpcio_tests/tests/testing/_client_application.py @@ -235,8 +235,8 @@ def run(scenario, channel): elif scenario is Scenario.INFINITE_REQUEST_STREAM: return _run_infinite_request_stream(stub) except grpc.RpcError as rpc_error: - return Outcome(Outcome.Kind.RPC_ERROR, - rpc_error.code(), rpc_error.details()) + return Outcome(Outcome.Kind.RPC_ERROR, rpc_error.code(), + rpc_error.details()) _IMPLEMENTATIONS = { @@ -256,5 +256,5 @@ def run(scenario, channel): try: return _IMPLEMENTATIONS[scenario](stub) except grpc.RpcError as rpc_error: - return Outcome(Outcome.Kind.RPC_ERROR, - rpc_error.code(), rpc_error.details()) + return Outcome(Outcome.Kind.RPC_ERROR, rpc_error.code(), + rpc_error.details()) diff --git a/src/python/grpcio_tests/tests/testing/_client_test.py b/src/python/grpcio_tests/tests/testing/_client_test.py index 172f386d7b..5b051c3939 100644 --- a/src/python/grpcio_tests/tests/testing/_client_test.py +++ b/src/python/grpcio_tests/tests/testing/_client_test.py @@ -193,8 +193,10 @@ class ClientTest(unittest.TestCase): rpc.take_request() rpc.take_request() rpc.requests_closed() - rpc.send_initial_metadata(( - ('my_metadata_key', 'My Metadata Value!',),)) + rpc.send_initial_metadata((( + 'my_metadata_key', + 'My Metadata Value!', + ),)) for rpc in rpcs[:-1]: rpc.terminate(_application_common.STREAM_UNARY_RESPONSE, (), grpc.StatusCode.OK, '') diff --git a/src/python/grpcio_tests/tests/testing/_server_application.py b/src/python/grpcio_tests/tests/testing/_server_application.py index 06f09c8cb4..02769ca68d 100644 --- a/src/python/grpcio_tests/tests/testing/_server_application.py +++ b/src/python/grpcio_tests/tests/testing/_server_application.py @@ -41,8 +41,10 @@ class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer): yield services_pb2.Strange() def StreUn(self, request_iterator, context): - context.send_initial_metadata(( - ('server_application_metadata_key', 'Hi there!',),)) + context.send_initial_metadata((( + 'server_application_metadata_key', + 'Hi there!', + ),)) for request in request_iterator: if request != _application_common.STREAM_UNARY_REQUEST: context.set_code(grpc.StatusCode.INVALID_ARGUMENT) diff --git a/src/python/grpcio_tests/tests/testing/_server_test.py b/src/python/grpcio_tests/tests/testing/_server_test.py index 7897bcce01..4f4abd7708 100644 --- a/src/python/grpcio_tests/tests/testing/_server_test.py +++ b/src/python/grpcio_tests/tests/testing/_server_test.py @@ -110,14 +110,19 @@ class FirstServiceServicerTest(unittest.TestCase): second_termination = rpc.termination() third_termination = rpc.termination() - for later_initial_metadata in (second_initial_metadata, - third_initial_metadata,): + for later_initial_metadata in ( + second_initial_metadata, + third_initial_metadata, + ): self.assertEqual(first_initial_metadata, later_initial_metadata) response = first_termination[0] terminal_metadata = first_termination[1] code = first_termination[2] details = first_termination[3] - for later_termination in (second_termination, third_termination,): + for later_termination in ( + second_termination, + third_termination, + ): self.assertEqual(response, later_termination[0]) self.assertEqual(terminal_metadata, later_termination[1]) self.assertIs(code, later_termination[2]) diff --git a/src/python/grpcio_tests/tests/testing/_time_test.py b/src/python/grpcio_tests/tests/testing/_time_test.py index 797394ae20..9dfe36fb83 100644 --- a/src/python/grpcio_tests/tests/testing/_time_test.py +++ b/src/python/grpcio_tests/tests/testing/_time_test.py @@ -105,8 +105,8 @@ class TimeTest(object): test_event.set, _QUANTUM * (2 + random.random())) for _ in range(_MANY): background_noise_futures.append( - self._time.call_in(threading.Event().set, _QUANTUM * 1000 * - random.random())) + self._time.call_in(threading.Event().set, + _QUANTUM * 1000 * random.random())) self._time.sleep_for(_QUANTUM) cancelled = set() for test_event, test_future in possibly_cancelled_futures.items(): diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index 3bf5308749..e033c1063f 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -34,6 +34,7 @@ "unit._cython._no_messages_server_completion_queue_per_call_test.Test", "unit._cython._no_messages_single_server_completion_queue_test.Test", "unit._cython._read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest", + "unit._cython._server_test.Test", "unit._cython.cygrpc_test.InsecureServerInsecureClient", "unit._cython.cygrpc_test.SecureServerSecureClient", "unit._cython.cygrpc_test.TypeSmokeTest", diff --git a/src/python/grpcio_tests/tests/unit/_api_test.py b/src/python/grpcio_tests/tests/unit/_api_test.py index d6f4447532..f6245be77d 100644 --- a/src/python/grpcio_tests/tests/unit/_api_test.py +++ b/src/python/grpcio_tests/tests/unit/_api_test.py @@ -26,28 +26,57 @@ class AllTest(unittest.TestCase): def testAll(self): expected_grpc_code_elements = ( - 'FutureTimeoutError', 'FutureCancelledError', 'Future', - 'ChannelConnectivity', 'StatusCode', 'RpcError', 'RpcContext', - 'Call', 'ChannelCredentials', 'CallCredentials', - 'AuthMetadataContext', 'AuthMetadataPluginCallback', - 'AuthMetadataPlugin', 'ServerCertificateConfiguration', - 'ServerCredentials', 'UnaryUnaryMultiCallable', - 'UnaryStreamMultiCallable', 'StreamUnaryMultiCallable', - 'StreamStreamMultiCallable', 'UnaryUnaryClientInterceptor', - 'UnaryStreamClientInterceptor', 'StreamUnaryClientInterceptor', - 'StreamStreamClientInterceptor', 'Channel', 'ServicerContext', - 'RpcMethodHandler', 'HandlerCallDetails', 'GenericRpcHandler', - 'ServiceRpcHandler', 'Server', 'ServerInterceptor', - 'unary_unary_rpc_method_handler', 'unary_stream_rpc_method_handler', - 'stream_unary_rpc_method_handler', 'ClientCallDetails', + 'FutureTimeoutError', + 'FutureCancelledError', + 'Future', + 'ChannelConnectivity', + 'StatusCode', + 'RpcError', + 'RpcContext', + 'Call', + 'ChannelCredentials', + 'CallCredentials', + 'AuthMetadataContext', + 'AuthMetadataPluginCallback', + 'AuthMetadataPlugin', + 'ServerCertificateConfiguration', + 'ServerCredentials', + 'UnaryUnaryMultiCallable', + 'UnaryStreamMultiCallable', + 'StreamUnaryMultiCallable', + 'StreamStreamMultiCallable', + 'UnaryUnaryClientInterceptor', + 'UnaryStreamClientInterceptor', + 'StreamUnaryClientInterceptor', + 'StreamStreamClientInterceptor', + 'Channel', + 'ServicerContext', + 'RpcMethodHandler', + 'HandlerCallDetails', + 'GenericRpcHandler', + 'ServiceRpcHandler', + 'Server', + 'ServerInterceptor', + 'unary_unary_rpc_method_handler', + 'unary_stream_rpc_method_handler', + 'stream_unary_rpc_method_handler', + 'ClientCallDetails', 'stream_stream_rpc_method_handler', - 'method_handlers_generic_handler', 'ssl_channel_credentials', - 'metadata_call_credentials', 'access_token_call_credentials', - 'composite_call_credentials', 'composite_channel_credentials', - 'ssl_server_credentials', 'ssl_server_certificate_configuration', - 'dynamic_ssl_server_credentials', 'channel_ready_future', - 'insecure_channel', 'secure_channel', 'intercept_channel', - 'server',) + 'method_handlers_generic_handler', + 'ssl_channel_credentials', + 'metadata_call_credentials', + 'access_token_call_credentials', + 'composite_call_credentials', + 'composite_channel_credentials', + 'ssl_server_credentials', + 'ssl_server_certificate_configuration', + 'dynamic_ssl_server_credentials', + 'channel_ready_future', + 'insecure_channel', + 'secure_channel', + 'intercept_channel', + 'server', + ) six.assertCountEqual(self, expected_grpc_code_elements, _from_grpc_import_star.GRPC_ELEMENTS) @@ -56,12 +85,13 @@ class AllTest(unittest.TestCase): class ChannelConnectivityTest(unittest.TestCase): def testChannelConnectivity(self): - self.assertSequenceEqual( - (grpc.ChannelConnectivity.IDLE, grpc.ChannelConnectivity.CONNECTING, - grpc.ChannelConnectivity.READY, - grpc.ChannelConnectivity.TRANSIENT_FAILURE, - grpc.ChannelConnectivity.SHUTDOWN,), - tuple(grpc.ChannelConnectivity)) + self.assertSequenceEqual(( + grpc.ChannelConnectivity.IDLE, + grpc.ChannelConnectivity.CONNECTING, + grpc.ChannelConnectivity.READY, + grpc.ChannelConnectivity.TRANSIENT_FAILURE, + grpc.ChannelConnectivity.SHUTDOWN, + ), tuple(grpc.ChannelConnectivity)) class ChannelTest(unittest.TestCase): diff --git a/src/python/grpcio_tests/tests/unit/_auth_context_test.py b/src/python/grpcio_tests/tests/unit/_auth_context_test.py index ebc04a71e0..468869a03e 100644 --- a/src/python/grpcio_tests/tests/unit/_auth_context_test.py +++ b/src/python/grpcio_tests/tests/unit/_auth_context_test.py @@ -29,8 +29,12 @@ _RESPONSE = b'\x00\x00\x00' _UNARY_UNARY = '/test/UnaryUnary' _SERVER_HOST_OVERRIDE = 'foo.test.google.fr' -_CLIENT_IDS = (b'*.test.google.fr', b'waterzooi.test.google.be', - b'*.test.youtube.com', b'192.168.1.3',) +_CLIENT_IDS = ( + b'*.test.google.fr', + b'waterzooi.test.google.be', + b'*.test.youtube.com', + b'192.168.1.3', +) _ID = 'id' _ID_KEY = 'id_key' _AUTH_CTX = 'auth_ctx' @@ -39,7 +43,10 @@ _PRIVATE_KEY = resources.private_key() _CERTIFICATE_CHAIN = resources.certificate_chain() _TEST_ROOT_CERTIFICATES = resources.test_root_certificates() _SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),) -_PROPERTY_OPTIONS = (('grpc.ssl_target_name_override', _SERVER_HOST_OVERRIDE,),) +_PROPERTY_OPTIONS = (( + 'grpc.ssl_target_name_override', + _SERVER_HOST_OVERRIDE, +),) def handle_unary_unary(request, servicer_context): diff --git a/src/python/grpcio_tests/tests/unit/_channel_args_test.py b/src/python/grpcio_tests/tests/unit/_channel_args_test.py index 0a6b512866..1a2d2c0117 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_args_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_args_test.py @@ -24,8 +24,13 @@ class TestPointerWrapper(object): return 123456 -TEST_CHANNEL_ARGS = (('arg1', b'bytes_val'), ('arg2', 'str_val'), ('arg3', 1), - (b'arg4', 'str_val'), ('arg6', TestPointerWrapper()),) +TEST_CHANNEL_ARGS = ( + ('arg1', b'bytes_val'), + ('arg2', 'str_val'), + ('arg3', 1), + (b'arg4', 'str_val'), + ('arg6', TestPointerWrapper()), +) class ChannelArgsTest(unittest.TestCase): diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py index 93e599d8f8..7550cd39ba 100644 --- a/src/python/grpcio_tests/tests/unit/_compression_test.py +++ b/src/python/grpcio_tests/tests/unit/_compression_test.py @@ -26,16 +26,16 @@ _STREAM_STREAM = '/test/StreamStream' def handle_unary(request, servicer_context): - servicer_context.send_initial_metadata( - [('grpc-internal-encoding-request', 'gzip')]) + servicer_context.send_initial_metadata([('grpc-internal-encoding-request', + 'gzip')]) return request def handle_stream(request_iterator, servicer_context): # TODO(issue:#6891) We should be able to remove this loop, # and replace with return; yield - servicer_context.send_initial_metadata( - [('grpc-internal-encoding-request', 'gzip')]) + servicer_context.send_initial_metadata([('grpc-internal-encoding-request', + 'gzip')]) for request in request_iterator: yield request diff --git a/src/python/grpcio_tests/tests/unit/_credentials_test.py b/src/python/grpcio_tests/tests/unit/_credentials_test.py index 097898b987..f487fe66a2 100644 --- a/src/python/grpcio_tests/tests/unit/_credentials_test.py +++ b/src/python/grpcio_tests/tests/unit/_credentials_test.py @@ -26,8 +26,8 @@ class CredentialsTest(unittest.TestCase): third = grpc.access_token_call_credentials('ghi') first_and_second = grpc.composite_call_credentials(first, second) - first_second_and_third = grpc.composite_call_credentials(first, second, - third) + first_second_and_third = grpc.composite_call_credentials( + first, second, third) self.assertIsInstance(first_and_second, grpc.CallCredentials) self.assertIsInstance(first_second_and_third, grpc.CallCredentials) diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py index 7df8e2fde6..b81d6fbc61 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py @@ -53,7 +53,7 @@ class _Handler(object): self._state = state self._lock = threading.Lock() self._completion_queue = completion_queue - self._call = rpc_event.operation_call + self._call = rpc_event.call def __call__(self): with self._state.condition: @@ -65,10 +65,10 @@ class _Handler(object): with self._lock: self._call.start_server_batch( - (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),), + (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),), _RECEIVE_CLOSE_ON_SERVER_TAG) self._call.start_server_batch( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),), + (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), _RECEIVE_MESSAGE_TAG) first_event = self._completion_queue.poll() if _is_cancellation_event(first_event): @@ -76,12 +76,13 @@ class _Handler(object): else: with self._lock: operations = ( - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.operation_send_message(b'\x79\x57', _EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'\x79\x57', _EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( _EMPTY_METADATA, cygrpc.StatusCode.ok, b'test details!', - _EMPTY_FLAGS),) + _EMPTY_FLAGS), + ) self._call.start_server_batch(operations, _SERVER_COMPLETE_CALL_TAG) self._completion_queue.poll() @@ -151,8 +152,12 @@ class CancelManyCallsTest(unittest.TestCase): state = _State() - server_thread_args = (state, server, server_completion_queue, - server_thread_pool,) + server_thread_args = ( + state, + server, + server_completion_queue, + server_thread_pool, + ) server_thread = threading.Thread(target=_serve, args=server_thread_args) server_thread.start() @@ -170,13 +175,14 @@ class CancelManyCallsTest(unittest.TestCase): None, _EMPTY_FLAGS, client_completion_queue, b'/twinkies', None, _INFINITE_FUTURE) operations = ( - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.operation_send_message(b'\x45\x56', _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'\x45\x56', _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ) tag = 'client_complete_call_{0:04d}_tag'.format(index) client_call.start_client_batch(operations, tag) client_due.add(tag) @@ -193,8 +199,8 @@ class CancelManyCallsTest(unittest.TestCase): state.condition.notify_all() break - client_driver.events(test_constants.RPC_CONCURRENCY * - _SUCCESS_CALL_FRACTION) + client_driver.events( + test_constants.RPC_CONCURRENCY * _SUCCESS_CALL_FRACTION) with client_condition: for client_call in client_calls: client_call.cancel() diff --git a/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py b/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py index 1d57ea7ec1..4eeb34b92e 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py @@ -56,7 +56,10 @@ class ChannelTest(unittest.TestCase): def test_single_channel_lonely_connectivity(self): channel, completion_queue = _channel_and_completion_queue() - _in_parallel(_connectivity_loop, (channel, completion_queue,)) + _in_parallel(_connectivity_loop, ( + channel, + completion_queue, + )) completion_queue.shutdown() def test_multiple_channels_lonely_connectivity(self): diff --git a/src/python/grpcio_tests/tests/unit/_cython/_common.py b/src/python/grpcio_tests/tests/unit/_cython/_common.py index c5acd36bf2..ffd226fa95 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_common.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_common.py @@ -23,14 +23,20 @@ RPC_COUNT = 4000 INFINITE_FUTURE = cygrpc.Timespec(float('+inf')) EMPTY_FLAGS = 0 -INVOCATION_METADATA = (('client-md-key', 'client-md-key'), - ('client-md-key-bin', b'\x00\x01' * 3000),) +INVOCATION_METADATA = ( + ('client-md-key', 'client-md-key'), + ('client-md-key-bin', b'\x00\x01' * 3000), +) -INITIAL_METADATA = (('server-initial-md-key', 'server-initial-md-value'), - ('server-initial-md-key-bin', b'\x00\x02' * 3000),) +INITIAL_METADATA = ( + ('server-initial-md-key', 'server-initial-md-value'), + ('server-initial-md-key-bin', b'\x00\x02' * 3000), +) -TRAILING_METADATA = (('server-trailing-md-key', 'server-trailing-md-value'), - ('server-trailing-md-key-bin', b'\x00\x03' * 3000),) +TRAILING_METADATA = ( + ('server-trailing-md-key', 'server-trailing-md-value'), + ('server-trailing-md-key-bin', b'\x00\x03' * 3000), +) class QueueDriver(object): @@ -76,7 +82,10 @@ def execute_many_times(behavior): class OperationResult( collections.namedtuple('OperationResult', ( - 'start_batch_result', 'completion_type', 'success',))): + 'start_batch_result', + 'completion_type', + 'success', + ))): pass diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py index d08003af44..4ef4ad33e5 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py @@ -49,18 +49,19 @@ class Test(_common.RpcTest, unittest.TestCase): with self.client_condition: client_receive_initial_metadata_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_receive_initial_metadata( - _common.EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), ], client_receive_initial_metadata_tag)) + self.assertEqual(cygrpc.CallError.ok, + client_receive_initial_metadata_start_batch_result) client_complete_rpc_start_batch_result = client_call.start_client_batch( [ - cygrpc.operation_send_initial_metadata( + cygrpc.SendInitialMetadataOperation( _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), - cygrpc.operation_send_close_from_client( - _common.EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client( - _common.EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS), ], client_complete_rpc_tag) + self.assertEqual(cygrpc.CallError.ok, + client_complete_rpc_start_batch_result) self.client_driver.add_due({ client_receive_initial_metadata_tag, client_complete_rpc_tag, @@ -71,8 +72,8 @@ class Test(_common.RpcTest, unittest.TestCase): with server_call_condition: server_send_initial_metadata_start_batch_result = ( - server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_send_initial_metadata( + server_request_call_event.call.start_server_batch([ + cygrpc.SendInitialMetadataOperation( _common.INITIAL_METADATA, _common.EMPTY_FLAGS), ], server_send_initial_metadata_tag)) server_call_driver.add_due({ @@ -83,10 +84,9 @@ class Test(_common.RpcTest, unittest.TestCase): with server_call_condition: server_complete_rpc_start_batch_result = ( - server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_receive_close_on_server( - _common.EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + server_request_call_event.call.start_server_batch([ + cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( _common.TRAILING_METADATA, cygrpc.StatusCode.ok, b'test details', _common.EMPTY_FLAGS), ], server_complete_rpc_tag)) @@ -101,27 +101,29 @@ class Test(_common.RpcTest, unittest.TestCase): client_complete_rpc_event = self.client_driver.event_with_tag( client_complete_rpc_tag) - return (_common.OperationResult(server_request_call_start_batch_result, - server_request_call_event.type, - server_request_call_event.success), - _common.OperationResult( - client_receive_initial_metadata_start_batch_result, - client_receive_initial_metadata_event.type, - client_receive_initial_metadata_event.success), - _common.OperationResult(client_complete_rpc_start_batch_result, - client_complete_rpc_event.type, - client_complete_rpc_event.success), - _common.OperationResult( - server_send_initial_metadata_start_batch_result, - server_send_initial_metadata_event.type, - server_send_initial_metadata_event.success), - _common.OperationResult(server_complete_rpc_start_batch_result, - server_complete_rpc_event.type, - server_complete_rpc_event.success),) + return ( + _common.OperationResult(server_request_call_start_batch_result, + server_request_call_event.completion_type, + server_request_call_event.success), + _common.OperationResult( + client_receive_initial_metadata_start_batch_result, + client_receive_initial_metadata_event.completion_type, + client_receive_initial_metadata_event.success), + _common.OperationResult(client_complete_rpc_start_batch_result, + client_complete_rpc_event.completion_type, + client_complete_rpc_event.success), + _common.OperationResult( + server_send_initial_metadata_start_batch_result, + server_send_initial_metadata_event.completion_type, + server_send_initial_metadata_event.success), + _common.OperationResult(server_complete_rpc_start_batch_result, + server_complete_rpc_event.completion_type, + server_complete_rpc_event.success), + ) def test_rpcs(self): - expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) * - 5] * _common.RPC_COUNT + expecteds = [( + _common.SUCCESSFUL_OPERATION_RESULT,) * 5] * _common.RPC_COUNT actuallys = _common.execute_many_times(self._do_rpcs) self.assertSequenceEqual(expecteds, actuallys) diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py index d0166a2b29..85395c9680 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py @@ -44,17 +44,14 @@ class Test(_common.RpcTest, unittest.TestCase): with self.client_condition: client_receive_initial_metadata_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_receive_initial_metadata( - _common.EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), ], client_receive_initial_metadata_tag)) client_complete_rpc_start_batch_result = client_call.start_client_batch( [ - cygrpc.operation_send_initial_metadata( + cygrpc.SendInitialMetadataOperation( _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), - cygrpc.operation_send_close_from_client( - _common.EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client( - _common.EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS), ], client_complete_rpc_tag) self.client_driver.add_due({ client_receive_initial_metadata_tag, @@ -66,8 +63,8 @@ class Test(_common.RpcTest, unittest.TestCase): with self.server_condition: server_send_initial_metadata_start_batch_result = ( - server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_send_initial_metadata( + server_request_call_event.call.start_server_batch([ + cygrpc.SendInitialMetadataOperation( _common.INITIAL_METADATA, _common.EMPTY_FLAGS), ], server_send_initial_metadata_tag)) self.server_driver.add_due({ @@ -78,12 +75,11 @@ class Test(_common.RpcTest, unittest.TestCase): with self.server_condition: server_complete_rpc_start_batch_result = ( - server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_receive_close_on_server( - _common.EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + server_request_call_event.call.start_server_batch([ + cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( _common.TRAILING_METADATA, cygrpc.StatusCode.ok, - b'test details', _common.EMPTY_FLAGS), + 'test details', _common.EMPTY_FLAGS), ], server_complete_rpc_tag)) self.server_driver.add_due({ server_complete_rpc_tag, @@ -96,27 +92,29 @@ class Test(_common.RpcTest, unittest.TestCase): client_complete_rpc_event = self.client_driver.event_with_tag( client_complete_rpc_tag) - return (_common.OperationResult(server_request_call_start_batch_result, - server_request_call_event.type, - server_request_call_event.success), - _common.OperationResult( - client_receive_initial_metadata_start_batch_result, - client_receive_initial_metadata_event.type, - client_receive_initial_metadata_event.success), - _common.OperationResult(client_complete_rpc_start_batch_result, - client_complete_rpc_event.type, - client_complete_rpc_event.success), - _common.OperationResult( - server_send_initial_metadata_start_batch_result, - server_send_initial_metadata_event.type, - server_send_initial_metadata_event.success), - _common.OperationResult(server_complete_rpc_start_batch_result, - server_complete_rpc_event.type, - server_complete_rpc_event.success),) + return ( + _common.OperationResult(server_request_call_start_batch_result, + server_request_call_event.completion_type, + server_request_call_event.success), + _common.OperationResult( + client_receive_initial_metadata_start_batch_result, + client_receive_initial_metadata_event.completion_type, + client_receive_initial_metadata_event.success), + _common.OperationResult(client_complete_rpc_start_batch_result, + client_complete_rpc_event.completion_type, + client_complete_rpc_event.success), + _common.OperationResult( + server_send_initial_metadata_start_batch_result, + server_send_initial_metadata_event.completion_type, + server_send_initial_metadata_event.success), + _common.OperationResult(server_complete_rpc_start_batch_result, + server_complete_rpc_event.completion_type, + server_complete_rpc_event.success), + ) def test_rpcs(self): - expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) * - 5] * _common.RPC_COUNT + expecteds = [( + _common.SUCCESSFUL_OPERATION_RESULT,) * 5] * _common.RPC_COUNT actuallys = _common.execute_many_times(self._do_rpcs) self.assertSequenceEqual(expecteds, actuallys) diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py index c7d19058da..82ef25b2a7 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py @@ -137,9 +137,12 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): server_send_first_message_tag = 'server_send_first_message_tag' server_send_second_message_tag = 'server_send_second_message_tag' server_complete_rpc_tag = 'server_complete_rpc_tag' - server_call_due = set( - (server_send_initial_metadata_tag, server_send_first_message_tag, - server_send_second_message_tag, server_complete_rpc_tag,)) + server_call_due = set(( + server_send_initial_metadata_tag, + server_send_first_message_tag, + server_send_second_message_tag, + server_complete_rpc_tag, + )) server_call_completion_queue = cygrpc.CompletionQueue() server_call_driver = _QueueDriver(server_call_condition, server_call_completion_queue, @@ -159,15 +162,15 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): with client_condition: client_receive_initial_metadata_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), ], client_receive_initial_metadata_tag)) client_due.add(client_receive_initial_metadata_tag) client_complete_rpc_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), ], client_complete_rpc_tag)) client_due.add(client_complete_rpc_tag) @@ -175,13 +178,13 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): with server_call_condition: server_send_initial_metadata_start_batch_result = ( - server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), + server_rpc_event.call.start_server_batch([ + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), ], server_send_initial_metadata_tag)) server_send_first_message_start_batch_result = ( - server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS), + server_rpc_event.call.start_server_batch([ + cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS), ], server_send_first_message_tag)) server_send_initial_metadata_event = server_call_driver.event_with_tag( server_send_initial_metadata_tag) @@ -189,13 +192,13 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): server_send_first_message_tag) with server_call_condition: server_send_second_message_start_batch_result = ( - server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS), + server_rpc_event.call.start_server_batch([ + cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS), ], server_send_second_message_tag)) server_complete_rpc_start_batch_result = ( - server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + server_rpc_event.call.start_server_batch([ + cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( (), cygrpc.StatusCode.ok, b'test details', _EMPTY_FLAGS), ], server_complete_rpc_tag)) @@ -209,7 +212,7 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): client_receive_first_message_tag = 'client_receive_first_message_tag' client_receive_first_message_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], client_receive_first_message_tag)) client_due.add(client_receive_first_message_tag) client_receive_first_message_event = client_driver.event_with_tag( @@ -232,9 +235,8 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): self.assertEqual(cygrpc.CallError.ok, client_call_cancel_result) self.assertIs(server_rpc_tag, server_rpc_event.tag) self.assertEqual(cygrpc.CompletionType.operation_complete, - server_rpc_event.type) - self.assertIsInstance(server_rpc_event.operation_call, cygrpc.Call) - self.assertEqual(0, len(server_rpc_event.batch_operations)) + server_rpc_event.completion_type) + self.assertIsInstance(server_rpc_event.call, cygrpc.Call) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/unit/_cython/_server_test.py b/src/python/grpcio_tests/tests/unit/_cython/_server_test.py new file mode 100644 index 0000000000..12bf40be6b --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_cython/_server_test.py @@ -0,0 +1,49 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test servers at the level of the Cython API.""" + +import threading +import time +import unittest + +from grpc._cython import cygrpc + + +class Test(unittest.TestCase): + + def test_lonely_server(self): + server_call_completion_queue = cygrpc.CompletionQueue() + server_shutdown_completion_queue = cygrpc.CompletionQueue() + server = cygrpc.Server(cygrpc.ChannelArgs([])) + server.register_completion_queue(server_call_completion_queue) + server.register_completion_queue(server_shutdown_completion_queue) + port = server.add_http2_port(b'[::]:0') + server.start() + + server_request_call_tag = 'server_request_call_tag' + server_request_call_start_batch_result = server.request_call( + server_call_completion_queue, server_call_completion_queue, + server_request_call_tag) + + time.sleep(4) + + server_shutdown_tag = 'server_shutdown_tag' + server_shutdown_result = server.shutdown( + server_shutdown_completion_queue, server_shutdown_tag) + server_request_call_event = server_call_completion_queue.poll() + server_shutdown_event = server_shutdown_completion_queue.poll() + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py index 33a35ae235..5f9b74ba98 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py @@ -29,17 +29,14 @@ _EMPTY_FLAGS = 0 def _metadata_plugin(context, callback): - callback(((_CALL_CREDENTIALS_METADATA_KEY, - _CALL_CREDENTIALS_METADATA_VALUE,),), cygrpc.StatusCode.ok, b'') + callback((( + _CALL_CREDENTIALS_METADATA_KEY, + _CALL_CREDENTIALS_METADATA_VALUE, + ),), cygrpc.StatusCode.ok, b'') class TypeSmokeTest(unittest.TestCase): - def testOperationFlags(self): - operation = cygrpc.operation_send_message(b'asdf', - cygrpc.WriteFlag.no_compress) - self.assertEqual(cygrpc.WriteFlag.no_compress, operation.flags) - def testTimespec(self): now = time.time() now_timespec_a = cygrpc.Timespec(now) @@ -92,7 +89,8 @@ class TypeSmokeTest(unittest.TestCase): shutdown_tag = object() server.shutdown(completion_queue, shutdown_tag) event = completion_queue.poll() - self.assertEqual(cygrpc.CompletionType.operation_complete, event.type) + self.assertEqual(cygrpc.CompletionType.operation_complete, + event.completion_type) self.assertIs(shutdown_tag, event.tag) del server del completion_queue @@ -117,13 +115,12 @@ class ServerClientMixin(object): cygrpc.ChannelArg(cygrpc.ChannelArgKey.ssl_target_name_override, host_override) ]) - self.client_channel = cygrpc.Channel( - 'localhost:{}'.format(self.port).encode(), - client_channel_arguments, client_credentials) + self.client_channel = cygrpc.Channel('localhost:{}'.format( + self.port).encode(), client_channel_arguments, + client_credentials) else: - self.client_channel = cygrpc.Channel( - 'localhost:{}'.format(self.port).encode(), - cygrpc.ChannelArgs([])) + self.client_channel = cygrpc.Channel('localhost:{}'.format( + self.port).encode(), cygrpc.ChannelArgs([])) if host_override: self.host_argument = None # default host self.expected_host = host_override @@ -152,12 +149,12 @@ class ServerClientMixin(object): self.assertEqual(cygrpc.CallError.ok, call_result) event = queue.poll(deadline) self.assertEqual(cygrpc.CompletionType.operation_complete, - event.type) + event.completion_type) self.assertTrue(event.success) self.assertIs(tag, event.tag) except Exception as error: - raise Exception( - "Error in '{}': {}".format(description, error.message)) + raise Exception("Error in '{}': {}".format( + description, error.message)) return event return test_utilities.SimpleFuture(performer) @@ -174,7 +171,7 @@ class ServerClientMixin(object): SERVER_TRAILING_METADATA_KEY = 'california_is_in_a_drought' SERVER_TRAILING_METADATA_VALUE = 'zomg it is' SERVER_STATUS_CODE = cygrpc.StatusCode.ok - SERVER_STATUS_DETAILS = b'our work is never over' + SERVER_STATUS_DETAILS = 'our work is never over' REQUEST = b'in death a member of project mayhem has a name' RESPONSE = b'his name is robert paulson' METHOD = b'twinkies' @@ -193,16 +190,23 @@ class ServerClientMixin(object): None, 0, self.client_completion_queue, METHOD, self.host_argument, cygrpc_deadline) client_initial_metadata = ( - (CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE,), - (CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE,),) + ( + CLIENT_METADATA_ASCII_KEY, + CLIENT_METADATA_ASCII_VALUE, + ), + ( + CLIENT_METADATA_BIN_KEY, + CLIENT_METADATA_BIN_VALUE, + ), + ) client_start_batch_result = client_call.start_client_batch([ - cygrpc.operation_send_initial_metadata(client_initial_metadata, - _EMPTY_FLAGS), - cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS) + cygrpc.SendInitialMetadataOperation(client_initial_metadata, + _EMPTY_FLAGS), + cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), ], client_call_tag) self.assertEqual(cygrpc.CallError.ok, client_start_batch_result) client_event_future = test_utilities.CompletionQueuePollFuture( @@ -210,33 +214,35 @@ class ServerClientMixin(object): request_event = self.server_completion_queue.poll(cygrpc_deadline) self.assertEqual(cygrpc.CompletionType.operation_complete, - request_event.type) - self.assertIsInstance(request_event.operation_call, cygrpc.Call) + request_event.completion_type) + self.assertIsInstance(request_event.call, cygrpc.Call) self.assertIs(server_request_tag, request_event.tag) - self.assertEqual(0, len(request_event.batch_operations)) self.assertTrue( test_common.metadata_transmitted(client_initial_metadata, - request_event.request_metadata)) - self.assertEqual(METHOD, request_event.request_call_details.method) - self.assertEqual(self.expected_host, - request_event.request_call_details.host) + request_event.invocation_metadata)) + self.assertEqual(METHOD, request_event.call_details.method) + self.assertEqual(self.expected_host, request_event.call_details.host) self.assertLess( - abs(DEADLINE - float(request_event.request_call_details.deadline)), + abs(DEADLINE - float(request_event.call_details.deadline)), DEADLINE_TOLERANCE) server_call_tag = object() - server_call = request_event.operation_call - server_initial_metadata = ( - (SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE,),) - server_trailing_metadata = ( - (SERVER_TRAILING_METADATA_KEY, SERVER_TRAILING_METADATA_VALUE,),) + server_call = request_event.call + server_initial_metadata = (( + SERVER_INITIAL_METADATA_KEY, + SERVER_INITIAL_METADATA_VALUE, + ),) + server_trailing_metadata = (( + SERVER_TRAILING_METADATA_KEY, + SERVER_TRAILING_METADATA_VALUE, + ),) server_start_batch_result = server_call.start_server_batch([ - cygrpc.operation_send_initial_metadata( - server_initial_metadata, - _EMPTY_FLAGS), cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS), - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + cygrpc.SendInitialMetadataOperation(server_initial_metadata, + _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.SendMessageOperation(RESPONSE, _EMPTY_FLAGS), + cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS, _EMPTY_FLAGS) ], server_call_tag) @@ -249,25 +255,24 @@ class ServerClientMixin(object): found_client_op_types = set() for client_result in client_event.batch_operations: # we expect each op type to be unique - self.assertNotIn(client_result.type, found_client_op_types) - found_client_op_types.add(client_result.type) - if client_result.type == cygrpc.OperationType.receive_initial_metadata: + self.assertNotIn(client_result.type(), found_client_op_types) + found_client_op_types.add(client_result.type()) + if client_result.type( + ) == cygrpc.OperationType.receive_initial_metadata: self.assertTrue( test_common.metadata_transmitted( server_initial_metadata, - client_result.received_metadata)) - elif client_result.type == cygrpc.OperationType.receive_message: - self.assertEqual(RESPONSE, - client_result.received_message.bytes()) - elif client_result.type == cygrpc.OperationType.receive_status_on_client: + client_result.initial_metadata())) + elif client_result.type() == cygrpc.OperationType.receive_message: + self.assertEqual(RESPONSE, client_result.message()) + elif client_result.type( + ) == cygrpc.OperationType.receive_status_on_client: self.assertTrue( test_common.metadata_transmitted( server_trailing_metadata, - client_result.received_metadata)) - self.assertEqual(SERVER_STATUS_DETAILS, - client_result.received_status_details) - self.assertEqual(SERVER_STATUS_CODE, - client_result.received_status_code) + client_result.trailing_metadata())) + self.assertEqual(SERVER_STATUS_DETAILS, client_result.details()) + self.assertEqual(SERVER_STATUS_CODE, client_result.code()) self.assertEqual( set([ cygrpc.OperationType.send_initial_metadata, @@ -281,13 +286,13 @@ class ServerClientMixin(object): self.assertEqual(5, len(server_event.batch_operations)) found_server_op_types = set() for server_result in server_event.batch_operations: - self.assertNotIn(client_result.type, found_server_op_types) - found_server_op_types.add(server_result.type) - if server_result.type == cygrpc.OperationType.receive_message: - self.assertEqual(REQUEST, - server_result.received_message.bytes()) - elif server_result.type == cygrpc.OperationType.receive_close_on_server: - self.assertFalse(server_result.received_cancelled) + self.assertNotIn(client_result.type(), found_server_op_types) + found_server_op_types.add(server_result.type()) + if server_result.type() == cygrpc.OperationType.receive_message: + self.assertEqual(REQUEST, server_result.message()) + elif server_result.type( + ) == cygrpc.OperationType.receive_close_on_server: + self.assertFalse(server_result.cancelled()) self.assertEqual( set([ cygrpc.OperationType.send_initial_metadata, @@ -323,13 +328,12 @@ class ServerClientMixin(object): cygrpc_deadline, description) client_event_future = perform_client_operations([ - cygrpc.operation_send_initial_metadata(empty_metadata, - _EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), ], "Client prologue") request_event = self.server_completion_queue.poll(cygrpc_deadline) - server_call = request_event.operation_call + server_call = request_event.call def perform_server_operations(operations, description): return self._perform_operations(operations, server_call, @@ -337,8 +341,7 @@ class ServerClientMixin(object): cygrpc_deadline, description) server_event_future = perform_server_operations([ - cygrpc.operation_send_initial_metadata(empty_metadata, - _EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS), ], "Server prologue") client_event_future.result() # force completion @@ -347,12 +350,12 @@ class ServerClientMixin(object): # Messaging for _ in range(10): client_event_future = perform_client_operations([ - cygrpc.operation_send_message(b'', _EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], "Client message") server_event_future = perform_server_operations([ - cygrpc.operation_send_message(b'', _EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], "Server receive") client_event_future.result() # force completion @@ -360,13 +363,13 @@ class ServerClientMixin(object): # Epilogue client_event_future = perform_client_operations([ - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS) + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS) ], "Client epilogue") server_event_future = perform_server_operations([ - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( empty_metadata, cygrpc.StatusCode.ok, b'', _EMPTY_FLAGS) ], "Server epilogue") @@ -386,10 +389,11 @@ class InsecureServerInsecureClient(unittest.TestCase, ServerClientMixin): class SecureServerSecureClient(unittest.TestCase, ServerClientMixin): def setUp(self): - server_credentials = cygrpc.server_credentials_ssl(None, [ - cygrpc.SslPemKeyCertPair(resources.private_key(), - resources.certificate_chain()) - ], False) + server_credentials = cygrpc.server_credentials_ssl( + None, [ + cygrpc.SslPemKeyCertPair(resources.private_key(), + resources.certificate_chain()) + ], False) client_credentials = cygrpc.SSLChannelCredentials( resources.test_root_certificates(), None, None) self.setUpMixin(server_credentials, client_credentials, diff --git a/src/python/grpcio_tests/tests/unit/_empty_message_test.py b/src/python/grpcio_tests/tests/unit/_empty_message_test.py index 14695bc13f..c55ef61c13 100644 --- a/src/python/grpcio_tests/tests/unit/_empty_message_test.py +++ b/src/python/grpcio_tests/tests/unit/_empty_message_test.py @@ -106,13 +106,13 @@ class EmptyMessageTest(unittest.TestCase): list(response_iterator)) def testStreamUnary(self): - response = self._channel.stream_unary(_STREAM_UNARY)( - iter([_REQUEST] * test_constants.STREAM_LENGTH)) + response = self._channel.stream_unary(_STREAM_UNARY)(iter( + [_REQUEST] * test_constants.STREAM_LENGTH)) self.assertEqual(_RESPONSE, response) def testStreamStream(self): - response_iterator = self._channel.stream_stream(_STREAM_STREAM)( - iter([_REQUEST] * test_constants.STREAM_LENGTH)) + response_iterator = self._channel.stream_stream(_STREAM_STREAM)(iter( + [_REQUEST] * test_constants.STREAM_LENGTH)) self.assertSequenceEqual([_RESPONSE] * test_constants.STREAM_LENGTH, list(response_iterator)) diff --git a/src/python/grpcio_tests/tests/unit/_interceptor_test.py b/src/python/grpcio_tests/tests/unit/_interceptor_test.py index 2aee298df2..3d547b71cd 100644 --- a/src/python/grpcio_tests/tests/unit/_interceptor_test.py +++ b/src/python/grpcio_tests/tests/unit/_interceptor_test.py @@ -65,7 +65,10 @@ class _Handler(object): def handle_unary_unary(self, request, servicer_context): self._control.control() if servicer_context is not None: - servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + servicer_context.set_trailing_metadata((( + 'testkey', + 'testvalue', + ),)) return request def handle_unary_stream(self, request, servicer_context): @@ -74,7 +77,10 @@ class _Handler(object): yield request self._control.control() if servicer_context is not None: - servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + servicer_context.set_trailing_metadata((( + 'testkey', + 'testvalue', + ),)) def handle_stream_unary(self, request_iterator, servicer_context): if servicer_context is not None: @@ -86,13 +92,19 @@ class _Handler(object): response_elements.append(request) self._control.control() if servicer_context is not None: - servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + servicer_context.set_trailing_metadata((( + 'testkey', + 'testvalue', + ),)) return b''.join(response_elements) def handle_stream_stream(self, request_iterator, servicer_context): self._control.control() if servicer_context is not None: - servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + servicer_context.set_trailing_metadata((( + 'testkey', + 'testvalue', + ),)) for request in request_iterator: self._control.control() yield request @@ -162,9 +174,10 @@ def _stream_stream_multi_callable(channel): class _ClientCallDetails( - collections.namedtuple('_ClientCallDetails', - ('method', 'timeout', 'metadata', - 'credentials')), grpc.ClientCallDetails): + collections.namedtuple( + '_ClientCallDetails', + ('method', 'timeout', 'metadata', 'credentials')), + grpc.ClientCallDetails): pass @@ -262,7 +275,10 @@ def _append_request_header_interceptor(header, value): metadata = [] if client_call_details.metadata: metadata = list(client_call_details.metadata) - metadata.append((header, value,)) + metadata.append(( + header, + value, + )) client_call_details = _ClientCallDetails( client_call_details.method, client_call_details.timeout, metadata, client_call_details.credentials) @@ -306,9 +322,11 @@ class InterceptorTest(unittest.TestCase): self._server = grpc.server( self._server_pool, options=(('grpc.so_reuseport', 0),), - interceptors=(_LoggingInterceptor('s1', self._record), - conditional_interceptor, - _LoggingInterceptor('s2', self._record),)) + interceptors=( + _LoggingInterceptor('s1', self._record), + conditional_interceptor, + _LoggingInterceptor('s2', self._record), + )) port = self._server.add_insecure_port('[::]:0') self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),)) self._server.start() @@ -333,8 +351,8 @@ class InterceptorTest(unittest.TestCase): interceptor = _wrap_request_iterator_stream_interceptor(triple) channel = grpc.intercept_channel(self._channel, interceptor) - requests = tuple(b'\x07\x08' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) multi_callable = _stream_stream_multi_callable(channel) response_iterator = multi_callable( @@ -365,8 +383,8 @@ class InterceptorTest(unittest.TestCase): multi_callable = _unary_unary_multi_callable(defective_channel) call_future = multi_callable.future( request, - metadata=( - ('test', 'InterceptedUnaryRequestBlockingUnaryResponse'),)) + metadata=(('test', + 'InterceptedUnaryRequestBlockingUnaryResponse'),)) self.assertIsNotNone(call_future.exception()) self.assertEqual(call_future.code(), grpc.StatusCode.INTERNAL) @@ -374,12 +392,14 @@ class InterceptorTest(unittest.TestCase): def testInterceptedHeaderManipulationWithServerSideVerification(self): request = b'\x07\x08' - channel = grpc.intercept_channel( - self._channel, _append_request_header_interceptor('secret', '42')) - channel = grpc.intercept_channel( - channel, - _LoggingInterceptor('c1', self._record), - _LoggingInterceptor('c2', self._record)) + channel = grpc.intercept_channel(self._channel, + _append_request_header_interceptor( + 'secret', '42')) + channel = grpc.intercept_channel(channel, + _LoggingInterceptor( + 'c1', self._record), + _LoggingInterceptor( + 'c2', self._record)) self._record[:] = [] @@ -401,16 +421,17 @@ class InterceptorTest(unittest.TestCase): self._record[:] = [] - channel = grpc.intercept_channel( - self._channel, - _LoggingInterceptor('c1', self._record), - _LoggingInterceptor('c2', self._record)) + channel = grpc.intercept_channel(self._channel, + _LoggingInterceptor( + 'c1', self._record), + _LoggingInterceptor( + 'c2', self._record)) multi_callable = _unary_unary_multi_callable(channel) multi_callable( request, - metadata=( - ('test', 'InterceptedUnaryRequestBlockingUnaryResponse'),)) + metadata=(('test', + 'InterceptedUnaryRequestBlockingUnaryResponse'),)) self.assertSequenceEqual(self._record, [ 'c1:intercept_unary_unary', 'c2:intercept_unary_unary', @@ -420,10 +441,11 @@ class InterceptorTest(unittest.TestCase): def testInterceptedUnaryRequestBlockingUnaryResponseWithCall(self): request = b'\x07\x08' - channel = grpc.intercept_channel( - self._channel, - _LoggingInterceptor('c1', self._record), - _LoggingInterceptor('c2', self._record)) + channel = grpc.intercept_channel(self._channel, + _LoggingInterceptor( + 'c1', self._record), + _LoggingInterceptor( + 'c2', self._record)) self._record[:] = [] @@ -443,10 +465,11 @@ class InterceptorTest(unittest.TestCase): request = b'\x07\x08' self._record[:] = [] - channel = grpc.intercept_channel( - self._channel, - _LoggingInterceptor('c1', self._record), - _LoggingInterceptor('c2', self._record)) + channel = grpc.intercept_channel(self._channel, + _LoggingInterceptor( + 'c1', self._record), + _LoggingInterceptor( + 'c2', self._record)) multi_callable = _unary_unary_multi_callable(channel) response_future = multi_callable.future( @@ -463,10 +486,11 @@ class InterceptorTest(unittest.TestCase): request = b'\x37\x58' self._record[:] = [] - channel = grpc.intercept_channel( - self._channel, - _LoggingInterceptor('c1', self._record), - _LoggingInterceptor('c2', self._record)) + channel = grpc.intercept_channel(self._channel, + _LoggingInterceptor( + 'c1', self._record), + _LoggingInterceptor( + 'c2', self._record)) multi_callable = _unary_stream_multi_callable(channel) response_iterator = multi_callable( @@ -480,21 +504,22 @@ class InterceptorTest(unittest.TestCase): ]) def testInterceptedStreamRequestBlockingUnaryResponse(self): - requests = tuple(b'\x07\x08' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) request_iterator = iter(requests) self._record[:] = [] - channel = grpc.intercept_channel( - self._channel, - _LoggingInterceptor('c1', self._record), - _LoggingInterceptor('c2', self._record)) + channel = grpc.intercept_channel(self._channel, + _LoggingInterceptor( + 'c1', self._record), + _LoggingInterceptor( + 'c2', self._record)) multi_callable = _stream_unary_multi_callable(channel) multi_callable( request_iterator, - metadata=( - ('test', 'InterceptedStreamRequestBlockingUnaryResponse'),)) + metadata=(('test', + 'InterceptedStreamRequestBlockingUnaryResponse'),)) self.assertSequenceEqual(self._record, [ 'c1:intercept_stream_unary', 'c2:intercept_stream_unary', @@ -502,15 +527,16 @@ class InterceptorTest(unittest.TestCase): ]) def testInterceptedStreamRequestBlockingUnaryResponseWithCall(self): - requests = tuple(b'\x07\x08' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) request_iterator = iter(requests) self._record[:] = [] - channel = grpc.intercept_channel( - self._channel, - _LoggingInterceptor('c1', self._record), - _LoggingInterceptor('c2', self._record)) + channel = grpc.intercept_channel(self._channel, + _LoggingInterceptor( + 'c1', self._record), + _LoggingInterceptor( + 'c2', self._record)) multi_callable = _stream_unary_multi_callable(channel) multi_callable.with_call( @@ -525,15 +551,16 @@ class InterceptorTest(unittest.TestCase): ]) def testInterceptedStreamRequestFutureUnaryResponse(self): - requests = tuple(b'\x07\x08' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) request_iterator = iter(requests) self._record[:] = [] - channel = grpc.intercept_channel( - self._channel, - _LoggingInterceptor('c1', self._record), - _LoggingInterceptor('c2', self._record)) + channel = grpc.intercept_channel(self._channel, + _LoggingInterceptor( + 'c1', self._record), + _LoggingInterceptor( + 'c2', self._record)) multi_callable = _stream_unary_multi_callable(channel) response_future = multi_callable.future( @@ -547,15 +574,16 @@ class InterceptorTest(unittest.TestCase): ]) def testInterceptedStreamRequestStreamResponse(self): - requests = tuple(b'\x77\x58' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x77\x58' for _ in range(test_constants.STREAM_LENGTH)) request_iterator = iter(requests) self._record[:] = [] - channel = grpc.intercept_channel( - self._channel, - _LoggingInterceptor('c1', self._record), - _LoggingInterceptor('c2', self._record)) + channel = grpc.intercept_channel(self._channel, + _LoggingInterceptor( + 'c1', self._record), + _LoggingInterceptor( + 'c2', self._record)) multi_callable = _stream_stream_multi_callable(channel) response_iterator = multi_callable( diff --git a/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py index 8f4c075e2c..4edf0fc4ad 100644 --- a/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py +++ b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py @@ -106,8 +106,8 @@ class InvalidMetadataTest(unittest.TestCase): self.assertEqual(response_iterator.code(), grpc.StatusCode.INTERNAL) def testStreamRequestBlockingUnaryResponse(self): - request_iterator = (b'\x07\x08' - for _ in range(test_constants.STREAM_LENGTH)) + request_iterator = ( + b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) metadata = (('InVaLiD', 'StreamRequestBlockingUnaryResponse'),) expected_error_details = "metadata was invalid: %s" % metadata with self.assertRaises(ValueError) as exception_context: @@ -115,8 +115,8 @@ class InvalidMetadataTest(unittest.TestCase): self.assertIn(expected_error_details, str(exception_context.exception)) def testStreamRequestBlockingUnaryResponseWithCall(self): - request_iterator = (b'\x07\x08' - for _ in range(test_constants.STREAM_LENGTH)) + request_iterator = ( + b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) metadata = (('InVaLiD', 'StreamRequestBlockingUnaryResponseWithCall'),) expected_error_details = "metadata was invalid: %s" % metadata multi_callable = _stream_unary_multi_callable(self._channel) @@ -125,8 +125,8 @@ class InvalidMetadataTest(unittest.TestCase): self.assertIn(expected_error_details, str(exception_context.exception)) def testStreamRequestFutureUnaryResponse(self): - request_iterator = (b'\x07\x08' - for _ in range(test_constants.STREAM_LENGTH)) + request_iterator = ( + b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) metadata = (('InVaLiD', 'StreamRequestFutureUnaryResponse'),) expected_error_details = "metadata was invalid: %s" % metadata response_future = self._stream_unary.future( @@ -141,8 +141,8 @@ class InvalidMetadataTest(unittest.TestCase): self.assertEqual(response_future.code(), grpc.StatusCode.INTERNAL) def testStreamRequestStreamResponse(self): - request_iterator = (b'\x07\x08' - for _ in range(test_constants.STREAM_LENGTH)) + request_iterator = ( + b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) metadata = (('InVaLiD', 'StreamRequestStreamResponse'),) expected_error_details = "metadata was invalid: %s" % metadata response_iterator = self._stream_stream( diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py index b46d176d04..e40cca8b24 100644 --- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py +++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py @@ -62,7 +62,10 @@ class _Handler(object): def handle_unary_unary(self, request, servicer_context): self._control.control() if servicer_context is not None: - servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + servicer_context.set_trailing_metadata((( + 'testkey', + 'testvalue', + ),)) return request def handle_unary_stream(self, request, servicer_context): @@ -71,7 +74,10 @@ class _Handler(object): yield request self._control.control() if servicer_context is not None: - servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + servicer_context.set_trailing_metadata((( + 'testkey', + 'testvalue', + ),)) def handle_stream_unary(self, request_iterator, servicer_context): if servicer_context is not None: @@ -83,13 +89,19 @@ class _Handler(object): response_elements.append(request) self._control.control() if servicer_context is not None: - servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + servicer_context.set_trailing_metadata((( + 'testkey', + 'testvalue', + ),)) return b''.join(response_elements) def handle_stream_stream(self, request_iterator, servicer_context): self._control.control() if servicer_context is not None: - servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + servicer_context.set_trailing_metadata((( + 'testkey', + 'testvalue', + ),)) for request in request_iterator: self._control.control() yield request @@ -208,8 +220,8 @@ class InvocationDefectsTest(unittest.TestCase): with self.assertRaises(grpc.RpcError): response = multi_callable( requests, - metadata=( - ('test', 'IterableStreamRequestBlockingUnaryResponse'),)) + metadata=(('test', + 'IterableStreamRequestBlockingUnaryResponse'),)) def testIterableStreamRequestFutureUnaryResponse(self): requests = [b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)] diff --git a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py index ec67f99fbc..bb6ac70497 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py @@ -36,16 +36,16 @@ _UNARY_STREAM = 'UnaryStream' _STREAM_UNARY = 'StreamUnary' _STREAM_STREAM = 'StreamStream' -_CLIENT_METADATA = (('client-md-key', 'client-md-key'), - ('client-md-key-bin', b'\x00\x01')) +_CLIENT_METADATA = (('client-md-key', 'client-md-key'), ('client-md-key-bin', + b'\x00\x01')) -_SERVER_INITIAL_METADATA = ( - ('server-initial-md-key', 'server-initial-md-value'), - ('server-initial-md-key-bin', b'\x00\x02')) +_SERVER_INITIAL_METADATA = (('server-initial-md-key', + 'server-initial-md-value'), + ('server-initial-md-key-bin', b'\x00\x02')) -_SERVER_TRAILING_METADATA = ( - ('server-trailing-md-key', 'server-trailing-md-value'), - ('server-trailing-md-key-bin', b'\x00\x03')) +_SERVER_TRAILING_METADATA = (('server-trailing-md-key', + 'server-trailing-md-value'), + ('server-trailing-md-key-bin', b'\x00\x03')) _NON_OK_CODE = grpc.StatusCode.NOT_FOUND _DETAILS = 'Test details!' @@ -193,17 +193,33 @@ class MetadataCodeDetailsTest(unittest.TestCase): channel = grpc.insecure_channel('localhost:{}'.format(port)) self._unary_unary = channel.unary_unary( - '/'.join(('', _SERVICE, _UNARY_UNARY,)), + '/'.join(( + '', + _SERVICE, + _UNARY_UNARY, + )), request_serializer=_REQUEST_SERIALIZER, - response_deserializer=_RESPONSE_DESERIALIZER,) - self._unary_stream = channel.unary_stream( - '/'.join(('', _SERVICE, _UNARY_STREAM,)),) - self._stream_unary = channel.stream_unary( - '/'.join(('', _SERVICE, _STREAM_UNARY,)),) + response_deserializer=_RESPONSE_DESERIALIZER, + ) + self._unary_stream = channel.unary_stream('/'.join(( + '', + _SERVICE, + _UNARY_STREAM, + )),) + self._stream_unary = channel.stream_unary('/'.join(( + '', + _SERVICE, + _STREAM_UNARY, + )),) self._stream_stream = channel.stream_stream( - '/'.join(('', _SERVICE, _STREAM_STREAM,)), + '/'.join(( + '', + _SERVICE, + _STREAM_STREAM, + )), request_serializer=_REQUEST_SERIALIZER, - response_deserializer=_RESPONSE_DESERIALIZER,) + response_deserializer=_RESPONSE_DESERIALIZER, + ) def testSuccessfulUnaryUnary(self): self._servicer.set_details(_DETAILS) diff --git a/src/python/grpcio_tests/tests/unit/_metadata_test.py b/src/python/grpcio_tests/tests/unit/_metadata_test.py index f2dac7bdc5..a918066ea4 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_test.py @@ -33,18 +33,50 @@ _UNARY_STREAM = '/test/UnaryStream' _STREAM_UNARY = '/test/StreamUnary' _STREAM_STREAM = '/test/StreamStream' -_INVOCATION_METADATA = ((b'invocation-md-key', u'invocation-md-value',), - (u'invocation-md-key-bin', b'\x00\x01',),) -_EXPECTED_INVOCATION_METADATA = (('invocation-md-key', 'invocation-md-value',), - ('invocation-md-key-bin', b'\x00\x01',),) +_INVOCATION_METADATA = ( + ( + b'invocation-md-key', + u'invocation-md-value', + ), + ( + u'invocation-md-key-bin', + b'\x00\x01', + ), +) +_EXPECTED_INVOCATION_METADATA = ( + ( + 'invocation-md-key', + 'invocation-md-value', + ), + ( + 'invocation-md-key-bin', + b'\x00\x01', + ), +) _INITIAL_METADATA = ((b'initial-md-key', u'initial-md-value'), (u'initial-md-key-bin', b'\x00\x02')) -_EXPECTED_INITIAL_METADATA = (('initial-md-key', 'initial-md-value',), - ('initial-md-key-bin', b'\x00\x02',),) - -_TRAILING_METADATA = (('server-trailing-md-key', 'server-trailing-md-value',), - ('server-trailing-md-key-bin', b'\x00\x03',),) +_EXPECTED_INITIAL_METADATA = ( + ( + 'initial-md-key', + 'initial-md-value', + ), + ( + 'initial-md-key-bin', + b'\x00\x02', + ), +) + +_TRAILING_METADATA = ( + ( + 'server-trailing-md-key', + 'server-trailing-md-value', + ), + ( + 'server-trailing-md-key-bin', + b'\x00\x03', + ), +) _EXPECTED_TRAILING_METADATA = _TRAILING_METADATA @@ -146,8 +178,8 @@ class MetadataTest(unittest.TestCase): def setUp(self): self._server = test_common.test_server() - self._server.add_generic_rpc_handlers( - (_GenericHandler(weakref.proxy(self)),)) + self._server.add_generic_rpc_handlers((_GenericHandler( + weakref.proxy(self)),)) port = self._server.add_insecure_port('[::]:0') self._server.start() self._channel = grpc.insecure_channel( diff --git a/src/python/grpcio_tests/tests/unit/_rpc_test.py b/src/python/grpcio_tests/tests/unit/_rpc_test.py index 1515a87d93..54f01d9f8d 100644 --- a/src/python/grpcio_tests/tests/unit/_rpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_rpc_test.py @@ -64,7 +64,10 @@ class _Handler(object): def handle_unary_unary(self, request, servicer_context): self._control.control() if servicer_context is not None: - servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + servicer_context.set_trailing_metadata((( + 'testkey', + 'testvalue', + ),)) # TODO(https://github.com/grpc/grpc/issues/8483): test the values # returned by these methods rather than only "smoke" testing that # the return after having been called. @@ -78,7 +81,10 @@ class _Handler(object): yield request self._control.control() if servicer_context is not None: - servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + servicer_context.set_trailing_metadata((( + 'testkey', + 'testvalue', + ),)) def handle_stream_unary(self, request_iterator, servicer_context): if servicer_context is not None: @@ -90,13 +96,19 @@ class _Handler(object): response_elements.append(request) self._control.control() if servicer_context is not None: - servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + servicer_context.set_trailing_metadata((( + 'testkey', + 'testvalue', + ),)) return b''.join(response_elements) def handle_stream_stream(self, request_iterator, servicer_context): self._control.control() if servicer_context is not None: - servicer_context.set_trailing_metadata((('testkey', 'testvalue',),)) + servicer_context.set_trailing_metadata((( + 'testkey', + 'testvalue', + ),)) for request in request_iterator: self._control.control() yield request @@ -244,8 +256,8 @@ class RPCTest(unittest.TestCase): self.assertSequenceEqual(expected_responses, responses) def testSuccessfulStreamRequestBlockingUnaryResponse(self): - requests = tuple(b'\x07\x08' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) expected_response = self._handler.handle_stream_unary( iter(requests), None) request_iterator = iter(requests) @@ -253,14 +265,14 @@ class RPCTest(unittest.TestCase): multi_callable = _stream_unary_multi_callable(self._channel) response = multi_callable( request_iterator, - metadata=( - ('test', 'SuccessfulStreamRequestBlockingUnaryResponse'),)) + metadata=(('test', + 'SuccessfulStreamRequestBlockingUnaryResponse'),)) self.assertEqual(expected_response, response) def testSuccessfulStreamRequestBlockingUnaryResponseWithCall(self): - requests = tuple(b'\x07\x08' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) expected_response = self._handler.handle_stream_unary( iter(requests), None) request_iterator = iter(requests) @@ -276,8 +288,8 @@ class RPCTest(unittest.TestCase): self.assertIs(grpc.StatusCode.OK, call.code()) def testSuccessfulStreamRequestFutureUnaryResponse(self): - requests = tuple(b'\x07\x08' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) expected_response = self._handler.handle_stream_unary( iter(requests), None) request_iterator = iter(requests) @@ -293,8 +305,8 @@ class RPCTest(unittest.TestCase): self.assertIsNone(response_future.traceback()) def testSuccessfulStreamRequestStreamResponse(self): - requests = tuple(b'\x77\x58' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x77\x58' for _ in range(test_constants.STREAM_LENGTH)) expected_responses = tuple( self._handler.handle_stream_stream(iter(requests), None)) request_iterator = iter(requests) @@ -326,8 +338,8 @@ class RPCTest(unittest.TestCase): def testConcurrentBlockingInvocations(self): pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - requests = tuple(b'\x07\x08' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) expected_response = self._handler.handle_stream_unary( iter(requests), None) expected_responses = [expected_response @@ -342,15 +354,15 @@ class RPCTest(unittest.TestCase): request_iterator, metadata=(('test', 'ConcurrentBlockingInvocations'),)) response_futures[index] = response_future - responses = tuple(response_future.result() - for response_future in response_futures) + responses = tuple( + response_future.result() for response_future in response_futures) pool.shutdown(wait=True) self.assertSequenceEqual(expected_responses, responses) def testConcurrentFutureInvocations(self): - requests = tuple(b'\x07\x08' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) expected_response = self._handler.handle_stream_unary( iter(requests), None) expected_responses = [expected_response @@ -364,8 +376,8 @@ class RPCTest(unittest.TestCase): request_iterator, metadata=(('test', 'ConcurrentFutureInvocations'),)) response_futures[index] = response_future - responses = tuple(response_future.result() - for response_future in response_futures) + responses = tuple( + response_future.result() for response_future in response_futures) self.assertSequenceEqual(expected_responses, responses) @@ -424,14 +436,14 @@ class RPCTest(unittest.TestCase): multi_callable = _unary_stream_multi_callable(self._channel) response_iterator = multi_callable( request, - metadata=( - ('test', 'ConsumingSomeButNotAllStreamResponsesUnaryRequest'),)) + metadata=(('test', + 'ConsumingSomeButNotAllStreamResponsesUnaryRequest'),)) for _ in range(test_constants.STREAM_LENGTH // 2): next(response_iterator) def testConsumingSomeButNotAllStreamResponsesStreamRequest(self): - requests = tuple(b'\x67\x88' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x67\x88' for _ in range(test_constants.STREAM_LENGTH)) request_iterator = iter(requests) multi_callable = _stream_stream_multi_callable(self._channel) @@ -443,15 +455,15 @@ class RPCTest(unittest.TestCase): next(response_iterator) def testConsumingTooManyStreamResponsesStreamRequest(self): - requests = tuple(b'\x67\x88' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x67\x88' for _ in range(test_constants.STREAM_LENGTH)) request_iterator = iter(requests) multi_callable = _stream_stream_multi_callable(self._channel) response_iterator = multi_callable( request_iterator, - metadata=( - ('test', 'ConsumingTooManyStreamResponsesStreamRequest'),)) + metadata=(('test', + 'ConsumingTooManyStreamResponsesStreamRequest'),)) for _ in range(test_constants.STREAM_LENGTH): next(response_iterator) for _ in range(test_constants.STREAM_LENGTH): @@ -503,8 +515,8 @@ class RPCTest(unittest.TestCase): self.assertIsNotNone(response_iterator.trailing_metadata()) def testCancelledStreamRequestUnaryResponse(self): - requests = tuple(b'\x07\x08' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) request_iterator = iter(requests) multi_callable = _stream_unary_multi_callable(self._channel) @@ -528,8 +540,8 @@ class RPCTest(unittest.TestCase): self.assertIsNotNone(response_future.trailing_metadata()) def testCancelledStreamRequestStreamResponse(self): - requests = tuple(b'\x07\x08' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) request_iterator = iter(requests) multi_callable = _stream_stream_multi_callable(self._channel) @@ -555,8 +567,8 @@ class RPCTest(unittest.TestCase): multi_callable.with_call( request, timeout=test_constants.SHORT_TIMEOUT, - metadata=( - ('test', 'ExpiredUnaryRequestBlockingUnaryResponse'),)) + metadata=(('test', + 'ExpiredUnaryRequestBlockingUnaryResponse'),)) self.assertIsInstance(exception_context.exception, grpc.Call) self.assertIsNotNone(exception_context.exception.initial_metadata()) @@ -610,8 +622,8 @@ class RPCTest(unittest.TestCase): response_iterator.code()) def testExpiredStreamRequestBlockingUnaryResponse(self): - requests = tuple(b'\x07\x08' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) request_iterator = iter(requests) multi_callable = _stream_unary_multi_callable(self._channel) @@ -620,8 +632,8 @@ class RPCTest(unittest.TestCase): multi_callable( request_iterator, timeout=test_constants.SHORT_TIMEOUT, - metadata=( - ('test', 'ExpiredStreamRequestBlockingUnaryResponse'),)) + metadata=(('test', + 'ExpiredStreamRequestBlockingUnaryResponse'),)) self.assertIsInstance(exception_context.exception, grpc.RpcError) self.assertIsInstance(exception_context.exception, grpc.Call) @@ -632,8 +644,8 @@ class RPCTest(unittest.TestCase): self.assertIsNotNone(exception_context.exception.trailing_metadata()) def testExpiredStreamRequestFutureUnaryResponse(self): - requests = tuple(b'\x07\x18' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x07\x18' for _ in range(test_constants.STREAM_LENGTH)) request_iterator = iter(requests) callback = _Callback() @@ -644,8 +656,8 @@ class RPCTest(unittest.TestCase): timeout=test_constants.SHORT_TIMEOUT, metadata=(('test', 'ExpiredStreamRequestFutureUnaryResponse'),)) with self.assertRaises(grpc.FutureTimeoutError): - response_future.result(timeout=test_constants.SHORT_TIMEOUT / - 2.0) + response_future.result( + timeout=test_constants.SHORT_TIMEOUT / 2.0) response_future.add_done_callback(callback) value_passed_to_callback = callback.value() @@ -663,8 +675,8 @@ class RPCTest(unittest.TestCase): self.assertIsNotNone(response_future.trailing_metadata()) def testExpiredStreamRequestStreamResponse(self): - requests = tuple(b'\x67\x18' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x67\x18' for _ in range(test_constants.STREAM_LENGTH)) request_iterator = iter(requests) multi_callable = _stream_stream_multi_callable(self._channel) @@ -689,8 +701,8 @@ class RPCTest(unittest.TestCase): with self.assertRaises(grpc.RpcError) as exception_context: multi_callable.with_call( request, - metadata=( - ('test', 'FailedUnaryRequestBlockingUnaryResponse'),)) + metadata=(('test', + 'FailedUnaryRequestBlockingUnaryResponse'),)) self.assertIs(grpc.StatusCode.UNKNOWN, exception_context.exception.code()) @@ -734,8 +746,8 @@ class RPCTest(unittest.TestCase): exception_context.exception.code()) def testFailedStreamRequestBlockingUnaryResponse(self): - requests = tuple(b'\x47\x58' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x47\x58' for _ in range(test_constants.STREAM_LENGTH)) request_iterator = iter(requests) multi_callable = _stream_unary_multi_callable(self._channel) @@ -743,15 +755,15 @@ class RPCTest(unittest.TestCase): with self.assertRaises(grpc.RpcError) as exception_context: multi_callable( request_iterator, - metadata=( - ('test', 'FailedStreamRequestBlockingUnaryResponse'),)) + metadata=(('test', + 'FailedStreamRequestBlockingUnaryResponse'),)) self.assertIs(grpc.StatusCode.UNKNOWN, exception_context.exception.code()) def testFailedStreamRequestFutureUnaryResponse(self): - requests = tuple(b'\x07\x18' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x07\x18' for _ in range(test_constants.STREAM_LENGTH)) request_iterator = iter(requests) callback = _Callback() @@ -773,8 +785,8 @@ class RPCTest(unittest.TestCase): self.assertIs(response_future, value_passed_to_callback) def testFailedStreamRequestStreamResponse(self): - requests = tuple(b'\x67\x88' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x67\x88' for _ in range(test_constants.STREAM_LENGTH)) request_iterator = iter(requests) multi_callable = _stream_stream_multi_callable(self._channel) @@ -805,8 +817,8 @@ class RPCTest(unittest.TestCase): request, metadata=(('test', 'IgnoredUnaryRequestStreamResponse'),)) def testIgnoredStreamRequestFutureUnaryResponse(self): - requests = tuple(b'\x07\x18' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x07\x18' for _ in range(test_constants.STREAM_LENGTH)) request_iterator = iter(requests) multi_callable = _stream_unary_multi_callable(self._channel) @@ -815,8 +827,8 @@ class RPCTest(unittest.TestCase): metadata=(('test', 'IgnoredStreamRequestFutureUnaryResponse'),)) def testIgnoredStreamRequestStreamResponse(self): - requests = tuple(b'\x67\x88' - for _ in range(test_constants.STREAM_LENGTH)) + requests = tuple( + b'\x67\x88' for _ in range(test_constants.STREAM_LENGTH)) request_iterator = iter(requests) multi_callable = _stream_stream_multi_callable(self._channel) diff --git a/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py b/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py index 2c513da5d0..0d78034b7b 100644 --- a/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py +++ b/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py @@ -74,7 +74,8 @@ def _create_client_stub( expect_success, root_certificates=None, private_key=None, - certificate_chain=None,): + certificate_chain=None, +): channel = grpc.secure_channel('localhost:{}'.format(port), grpc.ssl_channel_credentials( root_certificates=root_certificates, diff --git a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py b/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py index fe3e71d686..18f5af058a 100644 --- a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py +++ b/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py @@ -52,7 +52,9 @@ class CleanupThreadTest(unittest.TestCase): target=target, name='test-name', args=('arg1', 'arg2'), - kwargs={'arg3': 'arg3'}) + kwargs={ + 'arg3': 'arg3' + }) cleanup_thread.start() cleanup_thread.join() self.assertEqual(cleanup_thread.name, 'test-name') diff --git a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py index eb8dc80a6e..61c03f64ba 100644 --- a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py +++ b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py @@ -163,7 +163,10 @@ class BetaFeaturesTest(unittest.TestCase): self._server = implementations.server( method_implementations, options=server_options) server_credentials = implementations.ssl_server_credentials([ - (resources.private_key(), resources.certificate_chain(),), + ( + resources.private_key(), + resources.certificate_chain(), + ), ]) port = self._server.add_secure_port('[::]:0', server_credentials) self._server.start() @@ -289,7 +292,10 @@ class ContextManagementAndLifecycleTest(unittest.TestCase): self._server_options = implementations.server_options( thread_pool_size=test_constants.POOL_SIZE) self._server_credentials = implementations.ssl_server_credentials([ - (resources.private_key(), resources.certificate_chain(),), + ( + resources.private_key(), + resources.certificate_chain(), + ), ]) self._channel_credentials = implementations.ssl_channel_credentials( resources.test_root_certificates()) diff --git a/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py b/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py index e4b81e7e78..c99738e085 100644 --- a/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py +++ b/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py @@ -32,8 +32,11 @@ _SERVER_HOST_OVERRIDE = 'foo.test.google.fr' class _SerializationBehaviors( collections.namedtuple('_SerializationBehaviors', ( - 'request_serializers', 'request_deserializers', - 'response_serializers', 'response_deserializers',))): + 'request_serializers', + 'request_deserializers', + 'response_serializers', + 'response_deserializers', + ))): pass @@ -73,7 +76,10 @@ class _Implementation(test_interfaces.Implementation): server = implementations.server( method_implementations, options=server_options) server_credentials = implementations.ssl_server_credentials([ - (resources.private_key(), resources.certificate_chain(),), + ( + resources.private_key(), + resources.certificate_chain(), + ), ]) port = server.add_secure_port('[::]:0', server_credentials) server.start() @@ -116,9 +122,10 @@ class _Implementation(test_interfaces.Implementation): def load_tests(loader, tests, pattern): - return unittest.TestSuite(tests=tuple( - loader.loadTestsFromTestCase(test_case_class) - for test_case_class in test_cases.test_cases(_Implementation()))) + return unittest.TestSuite( + tests=tuple( + loader.loadTestsFromTestCase(test_case_class) + for test_case_class in test_cases.test_cases(_Implementation()))) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/unit/beta/_implementations_test.py b/src/python/grpcio_tests/tests/unit/beta/_implementations_test.py index 75a615eeff..5a53766d29 100644 --- a/src/python/grpcio_tests/tests/unit/beta/_implementations_test.py +++ b/src/python/grpcio_tests/tests/unit/beta/_implementations_test.py @@ -41,8 +41,8 @@ class CallCredentialsTest(unittest.TestCase): def test_google_call_credentials(self): creds = oauth2client_client.GoogleCredentials( 'token', 'client_id', 'secret', 'refresh_token', - datetime.datetime(2008, 6, 24), 'https://refresh.uri.com/', - 'user_agent') + datetime.datetime(2008, 6, + 24), 'https://refresh.uri.com/', 'user_agent') call_creds = implementations.google_call_credentials(creds) self.assertIsInstance(call_creds, implementations.CallCredentials) diff --git a/src/python/grpcio_tests/tests/unit/beta/test_utilities.py b/src/python/grpcio_tests/tests/unit/beta/test_utilities.py index 65da0f2020..c8d920d35e 100644 --- a/src/python/grpcio_tests/tests/unit/beta/test_utilities.py +++ b/src/python/grpcio_tests/tests/unit/beta/test_utilities.py @@ -33,6 +33,8 @@ def not_really_secure_channel(host, port, channel_credentials, conducted. """ target = '%s:%d' % (host, port) - channel = grpc.secure_channel(target, channel_credentials, ( - ('grpc.ssl_target_name_override', server_host_override,),)) + channel = grpc.secure_channel(target, channel_credentials, (( + 'grpc.ssl_target_name_override', + server_host_override, + ),)) return implementations.Channel(channel) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py index 45fd321ed6..5d8679aa62 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py +++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py @@ -70,8 +70,8 @@ class TestCase( self.implementation.destantiate(self._memo) def testSuccessfulUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -81,8 +81,8 @@ class TestCase( test_messages.verify(request, response, self) def testSuccessfulUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_stream_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -93,8 +93,8 @@ class TestCase( test_messages.verify(request, responses, self) def testSuccessfulStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -104,8 +104,8 @@ class TestCase( test_messages.verify(requests, response, self) def testSuccessfulStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_stream_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -116,8 +116,8 @@ class TestCase( test_messages.verify(requests, responses, self) def testSequentialInvocations(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: first_request = test_messages.request() second_request = test_messages.request() @@ -134,8 +134,8 @@ class TestCase( def testParallelInvocations(self): pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = [] response_futures = [] @@ -158,8 +158,8 @@ class TestCase( def testWaitingForSomeButNotAllParallelInvocations(self): pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = [] response_futures_to_indices = {} @@ -197,8 +197,8 @@ class TestCase( raise NotImplementedError() def testExpiredUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -208,8 +208,8 @@ class TestCase( request, _3069_test_constant.REALLY_SHORT_TIMEOUT) def testExpiredUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_stream_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -220,33 +220,33 @@ class TestCase( list(response_iterator) def testExpiredStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() with self._control.pause(), self.assertRaises( face.ExpirationError): - self._invoker.blocking(group, method)( - iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) + self._invoker.blocking( + group, method)(iter(requests), + _3069_test_constant.REALLY_SHORT_TIMEOUT) def testExpiredStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_stream_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() with self._control.pause(), self.assertRaises( face.ExpirationError): - response_iterator = self._invoker.blocking(group, method)( - iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) + response_iterator = self._invoker.blocking( + group, method)(iter(requests), + _3069_test_constant.REALLY_SHORT_TIMEOUT) list(response_iterator) def testFailedUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -255,8 +255,8 @@ class TestCase( request, test_constants.LONG_TIMEOUT) def testFailedUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_stream_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -266,8 +266,8 @@ class TestCase( list(response_iterator) def testFailedStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -276,8 +276,8 @@ class TestCase( iter(requests), test_constants.LONG_TIMEOUT) def testFailedStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_stream_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py index 0e399c4bc4..b1c33da43a 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py +++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py @@ -34,11 +34,15 @@ _IDENTITY = lambda x: x class TestServiceDigest( collections.namedtuple('TestServiceDigest', ( - 'methods', 'inline_method_implementations', - 'event_method_implementations', 'multi_method_implementation', - 'unary_unary_messages_sequences', 'unary_stream_messages_sequences', + 'methods', + 'inline_method_implementations', + 'event_method_implementations', + 'multi_method_implementation', + 'unary_unary_messages_sequences', + 'unary_stream_messages_sequences', 'stream_unary_messages_sequences', - 'stream_stream_messages_sequences',))): + 'stream_stream_messages_sequences', + ))): """A transformation of a service.TestService. Attributes: @@ -421,8 +425,8 @@ def digest(service, control, pool): events.update(stream_unary.events) events.update(stream_stream.events) - return TestServiceDigest( - methods, inlines, events, - _MultiMethodImplementation(adaptations, control, pool), - unary_unary.messages, unary_stream.messages, stream_unary.messages, - stream_stream.messages) + return TestServiceDigest(methods, inlines, events, + _MultiMethodImplementation(adaptations, control, + pool), + unary_unary.messages, unary_stream.messages, + stream_unary.messages, stream_stream.messages) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py index bc65bf4c80..3d9b2816aa 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py +++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py @@ -134,8 +134,8 @@ class TestCase( self._digest_pool.shutdown(wait=True) def testSuccessfulUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() callback = _Callback() @@ -151,8 +151,8 @@ class TestCase( self.assertIsNone(response_future.traceback()) def testSuccessfulUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_stream_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -163,8 +163,8 @@ class TestCase( test_messages.verify(request, responses, self) def testSuccessfulStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() request_iterator = _PauseableIterator(iter(requests)) @@ -185,8 +185,8 @@ class TestCase( self.assertIsNone(response_future.traceback()) def testSuccessfulStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_stream_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() request_iterator = _PauseableIterator(iter(requests)) @@ -201,8 +201,8 @@ class TestCase( test_messages.verify(requests, responses, self) def testSequentialInvocations(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: first_request = test_messages.request() second_request = test_messages.request() @@ -220,8 +220,8 @@ class TestCase( test_messages.verify(second_request, second_response, self) def testParallelInvocations(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: first_request = test_messages.request() second_request = test_messages.request() @@ -236,8 +236,8 @@ class TestCase( test_messages.verify(first_request, first_response, self) test_messages.verify(second_request, second_response, self) - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = [] response_futures = [] @@ -258,8 +258,8 @@ class TestCase( def testWaitingForSomeButNotAllParallelInvocations(self): pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = [] response_futures_to_indices = {} @@ -282,8 +282,8 @@ class TestCase( pool.shutdown(wait=True) def testCancelledUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() callback = _Callback() @@ -305,8 +305,8 @@ class TestCase( response_future.traceback() def testCancelledUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_stream_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -319,8 +319,8 @@ class TestCase( next(response_iterator) def testCancelledStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() callback = _Callback() @@ -342,8 +342,8 @@ class TestCase( response_future.traceback() def testCancelledStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_stream_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -356,8 +356,8 @@ class TestCase( next(response_iterator) def testExpiredUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() callback = _Callback() @@ -376,8 +376,8 @@ class TestCase( self.assertIsNotNone(response_future.traceback()) def testExpiredUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_stream_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -388,16 +388,16 @@ class TestCase( list(response_iterator) def testExpiredStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() callback = _Callback() with self._control.pause(): - response_future = self._invoker.future(group, method)( - iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) + response_future = self._invoker.future( + group, method)(iter(requests), + _3069_test_constant.REALLY_SHORT_TIMEOUT) response_future.add_done_callback(callback) self.assertIs(callback.future(), response_future) self.assertIsInstance(response_future.exception(), @@ -409,21 +409,21 @@ class TestCase( self.assertIsNotNone(response_future.traceback()) def testExpiredStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_stream_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() with self._control.pause(): - response_iterator = self._invoker.future(group, method)( - iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) + response_iterator = self._invoker.future( + group, method)(iter(requests), + _3069_test_constant.REALLY_SHORT_TIMEOUT) with self.assertRaises(face.ExpirationError): list(response_iterator) def testFailedUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_unary_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() callback = _Callback() @@ -448,8 +448,8 @@ class TestCase( self.assertIsNotNone(abortion_callback.future()) def testFailedUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_stream_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.unary_stream_messages_sequences)): for test_messages in test_messages_sequence: request = test_messages.request() @@ -464,17 +464,17 @@ class TestCase( list(response_iterator) def testFailedStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_unary_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.stream_unary_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() callback = _Callback() abortion_callback = _Callback() with self._control.fail(): - response_future = self._invoker.future(group, method)( - iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) + response_future = self._invoker.future( + group, method)(iter(requests), + _3069_test_constant.REALLY_SHORT_TIMEOUT) response_future.add_done_callback(callback) response_future.add_abortion_callback(abortion_callback) @@ -491,8 +491,8 @@ class TestCase( self.assertIsNotNone(abortion_callback.future()) def testFailedStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_stream_messages_sequences)): + for (group, method), test_messages_sequence in (six.iteritems( + self._digest.stream_stream_messages_sequences)): for test_messages in test_messages_sequence: requests = test_messages.requests() @@ -502,7 +502,7 @@ class TestCase( # expiration of the RPC. with self._control.fail(), self.assertRaises( face.ExpirationError): - response_iterator = self._invoker.future(group, method)( - iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) + response_iterator = self._invoker.future( + group, method)(iter(requests), + _3069_test_constant.REALLY_SHORT_TIMEOUT) list(response_iterator) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py index fd55f4e09f..efc93d56b0 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py +++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py @@ -191,5 +191,8 @@ def invoker_constructors(): Returns: A sequence of InvokerConstructors. """ - return (_GenericInvokerConstructor(), _MultiCallableInvokerConstructor(), - _DynamicInvokerConstructor(),) + return ( + _GenericInvokerConstructor(), + _MultiCallableInvokerConstructor(), + _DynamicInvokerConstructor(), + ) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py index 69c7ac2d73..a84e02a79a 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py +++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py @@ -33,8 +33,8 @@ def _get_last_trade_price(stock_request, stock_reply_callback, control, active): if active(): stock_reply_callback( stock_pb2.StockReply( - symbol=stock_request.symbol, price=_price( - stock_request.symbol))) + symbol=stock_request.symbol, + price=_price(stock_request.symbol))) else: raise abandonment.Abandoned() diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py index d1c5b8f76b..cff4b7cdea 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py +++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py @@ -24,7 +24,8 @@ from tests.unit.framework.interfaces.face import test_interfaces # pylint: disa _TEST_CASE_SUPERCLASSES = ( _blocking_invocation_inline_service.TestCase, - _future_invocation_asynchronous_event_service.TestCase,) + _future_invocation_asynchronous_event_service.TestCase, +) def test_cases(implementation): @@ -42,8 +43,9 @@ def test_cases(implementation): for invoker_constructor in _invocation.invoker_constructors(): for super_class in _TEST_CASE_SUPERCLASSES: test_case_classes.append( - type(invoker_constructor.name() + super_class.NAME, ( - super_class,), { + type( + invoker_constructor.name() + super_class.NAME, + (super_class,), { 'implementation': implementation, 'invoker_constructor': invoker_constructor, '__module__': implementation.__module__, diff --git a/src/python/grpcio_tests/tests/unit/resources.py b/src/python/grpcio_tests/tests/unit/resources.py index 11ef9e8565..51a8979f58 100644 --- a/src/python/grpcio_tests/tests/unit/resources.py +++ b/src/python/grpcio_tests/tests/unit/resources.py @@ -58,7 +58,8 @@ def cert_hier_1_client_1_key(): def cert_hier_1_client_1_cert(): return pkg_resources.resource_string( __name__, - 'credentials/certificate_hierarchy_1/intermediate/certs/client.cert.pem') + 'credentials/certificate_hierarchy_1/intermediate/certs/client.cert.pem' + ) def cert_hier_1_server_1_key(): @@ -97,7 +98,8 @@ def cert_hier_2_client_1_key(): def cert_hier_2_client_1_cert(): return pkg_resources.resource_string( __name__, - 'credentials/certificate_hierarchy_2/intermediate/certs/client.cert.pem') + 'credentials/certificate_hierarchy_2/intermediate/certs/client.cert.pem' + ) def cert_hier_2_server_1_key(): diff --git a/src/python/grpcio_tests/tests/unit/test_common.py b/src/python/grpcio_tests/tests/unit/test_common.py index 6334a32b0e..61717ae135 100644 --- a/src/python/grpcio_tests/tests/unit/test_common.py +++ b/src/python/grpcio_tests/tests/unit/test_common.py @@ -19,9 +19,21 @@ from concurrent import futures import grpc import six -INVOCATION_INITIAL_METADATA = (('0', 'abc'), ('1', 'def'), ('2', 'ghi'),) -SERVICE_INITIAL_METADATA = (('3', 'jkl'), ('4', 'mno'), ('5', 'pqr'),) -SERVICE_TERMINAL_METADATA = (('6', 'stu'), ('7', 'vwx'), ('8', 'yza'),) +INVOCATION_INITIAL_METADATA = ( + ('0', 'abc'), + ('1', 'def'), + ('2', 'ghi'), +) +SERVICE_INITIAL_METADATA = ( + ('3', 'jkl'), + ('4', 'mno'), + ('5', 'pqr'), +) +SERVICE_TERMINAL_METADATA = ( + ('6', 'stu'), + ('7', 'vwx'), + ('8', 'yza'), +) DETAILS = 'test details' @@ -80,8 +92,10 @@ def test_secure_channel(target, channel_credentials, server_host_override): An implementations.Channel to the remote host through which RPCs may be conducted. """ - channel = grpc.secure_channel(target, channel_credentials, ( - ('grpc.ssl_target_name_override', server_host_override,),)) + channel = grpc.secure_channel(target, channel_credentials, (( + 'grpc.ssl_target_name_override', + server_host_override, + ),)) return channel |