diff options
Diffstat (limited to 'src/python')
78 files changed, 1993 insertions, 3360 deletions
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 7fa7303691..b7ed0c8563 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -813,7 +813,11 @@ class StreamStreamMultiCallable(six.with_metaclass(abc.ABCMeta)): class Channel(six.with_metaclass(abc.ABCMeta)): - """Affords RPC invocation via generic methods on client-side.""" + """Affords RPC invocation via generic methods on client-side. + + Channel objects implement the Context Manager type, although they need not + support being entered and exited multiple times. + """ @abc.abstractmethod def subscribe(self, callback, try_to_connect=False): @@ -926,6 +930,17 @@ class Channel(six.with_metaclass(abc.ABCMeta)): """ raise NotImplementedError() + @abc.abstractmethod + def close(self): + """Closes this Channel and releases all resources held by it. + + Closing the Channel will immediately terminate all RPCs active with the + Channel and it is not valid to invoke new RPCs with the Channel. + + This method is idempotent. + """ + raise NotImplementedError() + ########################## Service-Side Context ############################## diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 25a4210974..8cc0e981ef 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -79,27 +79,6 @@ def _wait_once_until(condition, until): condition.wait(timeout=remaining) -_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = ( - 'Internal gRPC call error %d. ' + - 'Please report to https://github.com/grpc/grpc/issues') - - -def _check_call_error(call_error, metadata): - if call_error == cygrpc.CallError.invalid_metadata: - raise ValueError('metadata was invalid: %s' % metadata) - elif call_error != cygrpc.CallError.ok: - raise ValueError(_INTERNAL_CALL_ERROR_MESSAGE_FORMAT % call_error) - - -def _call_error_set_RPCstate(state, call_error, metadata): - if call_error == cygrpc.CallError.invalid_metadata: - _abort(state, grpc.StatusCode.INTERNAL, - 'metadata was invalid: %s' % metadata) - else: - _abort(state, grpc.StatusCode.INTERNAL, - _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % call_error) - - class _RPCState(object): def __init__(self, due, initial_metadata, trailing_metadata, code, details): @@ -163,7 +142,7 @@ def _handle_event(event, state, response_deserializer): return callbacks -def _event_handler(state, call, response_deserializer): +def _event_handler(state, response_deserializer): def handle_event(event): with state.condition: @@ -172,40 +151,47 @@ def _event_handler(state, call, response_deserializer): done = not state.due for callback in callbacks: callback() - return call if done else None + return done return handle_event -def _consume_request_iterator(request_iterator, state, call, - request_serializer): - event_handler = _event_handler(state, call, None) +def _consume_request_iterator(request_iterator, state, call, request_serializer, + event_handler): - def consume_request_iterator(): + def consume_request_iterator(): # pylint: disable=too-many-branches while True: try: request = next(request_iterator) except StopIteration: break except Exception: # pylint: disable=broad-except - logging.exception("Exception iterating requests!") - call.cancel() - _abort(state, grpc.StatusCode.UNKNOWN, - "Exception iterating requests!") + code = grpc.StatusCode.UNKNOWN + details = 'Exception iterating requests!' + logging.exception(details) + call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], + details) + _abort(state, code, details) return serialized_request = _common.serialize(request, request_serializer) with state.condition: if state.code is None and not state.cancelled: if serialized_request is None: - call.cancel() + code = grpc.StatusCode.INTERNAL # pylint: disable=redefined-variable-type details = 'Exception serializing request!' - _abort(state, grpc.StatusCode.INTERNAL, details) + call.cancel( + _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], + details) + _abort(state, code, details) return else: operations = (cygrpc.SendMessageOperation( serialized_request, _EMPTY_FLAGS),) - call.start_client_batch(operations, event_handler) - state.due.add(cygrpc.OperationType.send_message) + operating = call.operate(operations, event_handler) + if operating: + state.due.add(cygrpc.OperationType.send_message) + else: + return while True: state.condition.wait() if state.code is None: @@ -219,19 +205,12 @@ def _consume_request_iterator(request_iterator, state, call, if state.code is None: operations = ( cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),) - call.start_client_batch(operations, event_handler) - state.due.add(cygrpc.OperationType.send_close_from_client) + operating = call.operate(operations, event_handler) + if operating: + state.due.add(cygrpc.OperationType.send_close_from_client) - def stop_consumption_thread(timeout): # pylint: disable=unused-argument - with state.condition: - if state.code is None: - call.cancel() - state.cancelled = True - _abort(state, grpc.StatusCode.CANCELLED, 'Cancelled!') - state.condition.notify_all() - - consumption_thread = _common.CleanupThread( - stop_consumption_thread, target=consume_request_iterator) + consumption_thread = threading.Thread(target=consume_request_iterator) + consumption_thread.daemon = True consumption_thread.start() @@ -247,9 +226,12 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): def cancel(self): with self._state.condition: if self._state.code is None: - self._call.cancel() + code = grpc.StatusCode.CANCELLED + details = 'Locally cancelled by application!' + self._call.cancel( + _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details) self._state.cancelled = True - _abort(self._state, grpc.StatusCode.CANCELLED, 'Cancelled!') + _abort(self._state, code, details) self._state.condition.notify_all() return False @@ -318,12 +300,13 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): def _next(self): with self._state.condition: if self._state.code is None: - event_handler = _event_handler(self._state, self._call, + event_handler = _event_handler(self._state, self._response_deserializer) - self._call.start_client_batch( + operating = self._call.operate( (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), event_handler) - self._state.due.add(cygrpc.OperationType.receive_message) + if operating: + self._state.due.add(cygrpc.OperationType.receive_message) elif self._state.code is grpc.StatusCode.OK: raise StopIteration() else: @@ -408,9 +391,12 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): def __del__(self): with self._state.condition: if self._state.code is None: - self._call.cancel() - self._state.cancelled = True self._state.code = grpc.StatusCode.CANCELLED + self._state.details = 'Cancelled upon garbage collection!' + self._state.cancelled = True + self._call.cancel( + _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code], + self._state.details) self._state.condition.notify_all() @@ -437,6 +423,24 @@ def _end_unary_response_blocking(state, call, with_call, deadline): raise _Rendezvous(state, None, None, deadline) +def _stream_unary_invocation_operationses(metadata): + return ( + ( + cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ), + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), + ) + + +def _stream_unary_invocation_operationses_and_tags(metadata): + return tuple(( + operations, + None, + ) for operations in _stream_unary_invocation_operationses(metadata)) + + class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): def __init__(self, channel, managed_call, method, request_serializer, @@ -448,8 +452,8 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): self._response_deserializer = response_deserializer def _prepare(self, request, timeout, metadata): - deadline, serialized_request, rendezvous = (_start_unary_request( - request, timeout, self._request_serializer)) + deadline, serialized_request, rendezvous = _start_unary_request( + request, timeout, self._request_serializer) if serialized_request is None: return None, None, None, rendezvous else: @@ -467,48 +471,38 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): def _blocking(self, request, timeout, metadata, credentials): state, operations, deadline, rendezvous = self._prepare( request, timeout, metadata) - if rendezvous: + if state is None: raise rendezvous else: - completion_queue = cygrpc.CompletionQueue() - call = self._channel.create_call(None, 0, completion_queue, - self._method, None, deadline) - if credentials is not None: - call.set_credentials(credentials._credentials) - call_error = call.start_client_batch(operations, None) - _check_call_error(call_error, metadata) - _handle_event(completion_queue.poll(), state, - self._response_deserializer) - return state, call, deadline + call = self._channel.segregated_call( + 0, self._method, None, deadline, metadata, None + if credentials is None else credentials._credentials, (( + operations, + None, + ),)) + event = call.next_event() + _handle_event(event, state, self._response_deserializer) + return state, call, def __call__(self, request, timeout=None, metadata=None, credentials=None): - state, call, deadline = self._blocking(request, timeout, metadata, - credentials) - return _end_unary_response_blocking(state, call, False, deadline) + state, call, = self._blocking(request, timeout, metadata, credentials) + return _end_unary_response_blocking(state, call, False, None) def with_call(self, request, timeout=None, metadata=None, credentials=None): - state, call, deadline = self._blocking(request, timeout, metadata, - credentials) - return _end_unary_response_blocking(state, call, True, deadline) + state, call, = self._blocking(request, timeout, metadata, credentials) + return _end_unary_response_blocking(state, call, True, None) def future(self, request, timeout=None, metadata=None, credentials=None): state, operations, deadline, rendezvous = self._prepare( request, timeout, metadata) - if rendezvous: - return rendezvous + if state is None: + raise rendezvous else: - call, drive_call = self._managed_call(None, 0, self._method, None, - deadline) - if credentials is not None: - call.set_credentials(credentials._credentials) - event_handler = _event_handler(state, call, - self._response_deserializer) - with state.condition: - call_error = call.start_client_batch(operations, event_handler) - if call_error != cygrpc.CallError.ok: - _call_error_set_RPCstate(state, call_error, metadata) - return _Rendezvous(state, None, None, deadline) - drive_call() + event_handler = _event_handler(state, self._response_deserializer) + call = self._managed_call( + 0, self._method, None, deadline, metadata, None + if credentials is None else credentials._credentials, + (operations,), event_handler) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -524,34 +518,27 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): self._response_deserializer = response_deserializer def __call__(self, request, timeout=None, metadata=None, credentials=None): - deadline, serialized_request, rendezvous = (_start_unary_request( - request, timeout, self._request_serializer)) + deadline, serialized_request, rendezvous = _start_unary_request( + request, timeout, self._request_serializer) if serialized_request is None: raise rendezvous else: state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) - call, drive_call = self._managed_call(None, 0, self._method, None, - deadline) - if credentials is not None: - call.set_credentials(credentials._credentials) - event_handler = _event_handler(state, call, - self._response_deserializer) - with state.condition: - call.start_client_batch( - (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), - event_handler) - operations = ( + operationses = ( + ( cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), - ) - call_error = call.start_client_batch(operations, event_handler) - if call_error != cygrpc.CallError.ok: - _call_error_set_RPCstate(state, call_error, metadata) - return _Rendezvous(state, None, None, deadline) - drive_call() + ), + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), + ) + event_handler = _event_handler(state, self._response_deserializer) + call = self._managed_call( + 0, self._method, None, deadline, metadata, None + if credentials is None else credentials._credentials, + operationses, event_handler) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -569,49 +556,38 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): def _blocking(self, request_iterator, timeout, metadata, credentials): deadline = _deadline(timeout) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) - completion_queue = cygrpc.CompletionQueue() - call = self._channel.create_call(None, 0, completion_queue, - self._method, None, deadline) - if credentials is not None: - call.set_credentials(credentials._credentials) - with state.condition: - call.start_client_batch( - (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), None) - operations = ( - cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), - cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), - ) - call_error = call.start_client_batch(operations, None) - _check_call_error(call_error, metadata) - _consume_request_iterator(request_iterator, state, call, - self._request_serializer) + call = self._channel.segregated_call( + 0, self._method, None, deadline, metadata, None + if credentials is None else credentials._credentials, + _stream_unary_invocation_operationses_and_tags(metadata)) + _consume_request_iterator(request_iterator, state, call, + self._request_serializer, None) while True: - event = completion_queue.poll() + event = call.next_event() with state.condition: _handle_event(event, state, self._response_deserializer) state.condition.notify_all() if not state.due: break - return state, call, deadline + return state, call, def __call__(self, request_iterator, timeout=None, metadata=None, credentials=None): - state, call, deadline = self._blocking(request_iterator, timeout, - metadata, credentials) - return _end_unary_response_blocking(state, call, False, deadline) + state, call, = self._blocking(request_iterator, timeout, metadata, + credentials) + return _end_unary_response_blocking(state, call, False, None) def with_call(self, request_iterator, timeout=None, metadata=None, credentials=None): - state, call, deadline = self._blocking(request_iterator, timeout, - metadata, credentials) - return _end_unary_response_blocking(state, call, True, deadline) + state, call, = self._blocking(request_iterator, timeout, metadata, + credentials) + return _end_unary_response_blocking(state, call, True, None) def future(self, request_iterator, @@ -620,27 +596,13 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): credentials=None): deadline = _deadline(timeout) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) - call, drive_call = self._managed_call(None, 0, self._method, None, - deadline) - if credentials is not None: - call.set_credentials(credentials._credentials) - event_handler = _event_handler(state, call, self._response_deserializer) - with state.condition: - call.start_client_batch( - (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), - event_handler) - operations = ( - cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), - cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), - ) - call_error = call.start_client_batch(operations, event_handler) - if call_error != cygrpc.CallError.ok: - _call_error_set_RPCstate(state, call_error, metadata) - return _Rendezvous(state, None, None, deadline) - drive_call() - _consume_request_iterator(request_iterator, state, call, - self._request_serializer) + event_handler = _event_handler(state, self._response_deserializer) + call = self._managed_call( + 0, self._method, None, deadline, metadata, None + if credentials is None else credentials._credentials, + _stream_unary_invocation_operationses(metadata), event_handler) + _consume_request_iterator(request_iterator, state, call, + self._request_serializer, event_handler) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -661,26 +623,20 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): credentials=None): deadline = _deadline(timeout) state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) - call, drive_call = self._managed_call(None, 0, self._method, None, - deadline) - if credentials is not None: - call.set_credentials(credentials._credentials) - event_handler = _event_handler(state, call, self._response_deserializer) - with state.condition: - call.start_client_batch( - (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), - event_handler) - operations = ( + operationses = ( + ( cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), - ) - call_error = call.start_client_batch(operations, event_handler) - if call_error != cygrpc.CallError.ok: - _call_error_set_RPCstate(state, call_error, metadata) - return _Rendezvous(state, None, None, deadline) - drive_call() - _consume_request_iterator(request_iterator, state, call, - self._request_serializer) + ), + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), + ) + event_handler = _event_handler(state, self._response_deserializer) + call = self._managed_call( + 0, self._method, None, deadline, metadata, None + if credentials is None else credentials._credentials, operationses, + event_handler) + _consume_request_iterator(request_iterator, state, call, + self._request_serializer, event_handler) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -689,67 +645,63 @@ class _ChannelCallState(object): def __init__(self, channel): self.lock = threading.Lock() self.channel = channel - self.completion_queue = cygrpc.CompletionQueue() - self.managed_calls = None + self.managed_calls = 0 def _run_channel_spin_thread(state): def channel_spin(): while True: - event = state.completion_queue.poll() - completed_call = event.tag(event) - if completed_call is not None: + event = state.channel.next_call_event() + call_completed = event.tag(event) + if call_completed: with state.lock: - state.managed_calls.remove(completed_call) - if not state.managed_calls: - state.managed_calls = None + state.managed_calls -= 1 + if state.managed_calls == 0: return - def stop_channel_spin(timeout): # pylint: disable=unused-argument - with state.lock: - if state.managed_calls is not None: - for call in state.managed_calls: - call.cancel() - - channel_spin_thread = _common.CleanupThread( - stop_channel_spin, target=channel_spin) + channel_spin_thread = threading.Thread(target=channel_spin) + channel_spin_thread.daemon = True channel_spin_thread.start() def _channel_managed_call_management(state): - def create(parent, flags, method, host, deadline): - """Creates a managed cygrpc.Call and a function to call to drive it. - - If operations are successfully added to the returned cygrpc.Call, the - returned function must be called. If operations are not successfully added - to the returned cygrpc.Call, the returned function must not be called. - - Args: - parent: A cygrpc.Call to be used as the parent of the created call. - flags: An integer bitfield of call flags. - method: The RPC method. - host: A host string for the created call. - deadline: A float to be the deadline of the created call or None if the - call is to have an infinite deadline. - - Returns: - A cygrpc.Call with which to conduct an RPC and a function to call if - operations are successfully started on the call. - """ - call = state.channel.create_call(parent, flags, state.completion_queue, - method, host, deadline) - - def drive(): - with state.lock: - if state.managed_calls is None: - state.managed_calls = set((call,)) - _run_channel_spin_thread(state) - else: - state.managed_calls.add(call) + # pylint: disable=too-many-arguments + def create(flags, method, host, deadline, metadata, credentials, + operationses, event_handler): + """Creates a cygrpc.IntegratedCall. - return call, drive + Args: + flags: An integer bitfield of call flags. + method: The RPC method. + host: A host string for the created call. + deadline: A float to be the deadline of the created call or None if + the call is to have an infinite deadline. + metadata: The metadata for the call or None. + credentials: A cygrpc.CallCredentials or None. + operationses: An iterable of iterables of cygrpc.Operations to be + started on the call. + event_handler: A behavior to call to handle the events resultant from + the operations on the call. + + Returns: + A cygrpc.IntegratedCall with which to conduct an RPC. + """ + operationses_and_tags = tuple(( + operations, + event_handler, + ) for operations in operationses) + with state.lock: + call = state.channel.integrated_call(flags, method, host, deadline, + metadata, credentials, + operationses_and_tags) + if state.managed_calls == 0: + state.managed_calls = 1 + _run_channel_spin_thread(state) + else: + state.managed_calls += 1 + return call return create @@ -819,12 +771,9 @@ def _poll_connectivity(state, channel, initial_try_to_connect): callback_and_connectivity[1] = state.connectivity if callbacks: _spawn_delivery(state, callbacks) - completion_queue = cygrpc.CompletionQueue() while True: - channel.watch_connectivity_state(connectivity, - time.time() + 0.2, completion_queue, - None) - event = completion_queue.poll() + event = channel.watch_connectivity_state(connectivity, + time.time() + 0.2) with state.lock: if not state.callbacks_and_connectivities and not state.try_to_connect: state.polling = False @@ -855,10 +804,10 @@ def _moot(state): def _subscribe(state, callback, try_to_connect): with state.lock: if not state.callbacks_and_connectivities and not state.polling: - polling_thread = _common.CleanupThread( - lambda timeout: _moot(state), + polling_thread = threading.Thread( target=_poll_connectivity, args=(state, state.channel, bool(try_to_connect))) + polling_thread.daemon = True polling_thread.start() state.polling = True state.callbacks_and_connectivities.append([callback, None]) @@ -906,11 +855,6 @@ class Channel(grpc.Channel): self._call_state = _ChannelCallState(self._channel) self._connectivity_state = _ChannelConnectivityState(self._channel) - # TODO(https://github.com/grpc/grpc/issues/9884) - # Temporary work around UNAVAILABLE issues - # Remove this once c-core has retry support - _subscribe(self._connectivity_state, lambda *args: None, None) - def subscribe(self, callback, try_to_connect=None): _subscribe(self._connectivity_state, callback, try_to_connect) @@ -949,5 +893,28 @@ class Channel(grpc.Channel): self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), request_serializer, response_deserializer) + def _close(self): + self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!') + _moot(self._connectivity_state) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._close() + return False + + def close(self): + self._close() + def __del__(self): + # TODO(https://github.com/grpc/grpc/issues/12531): Several releases + # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call + # here (or more likely, call self._close() here). We don't do this today + # because many valid use cases today allow the channel to be deleted + # immediately after stubs are created. After a sufficient period of time + # has passed for all users to be trusted to hang out to their channels + # for as long as they are in use and to close them after using them, + # then deletion of this grpc._channel.Channel instance can be made to + # effect closure of the underlying cygrpc.Channel instance. _moot(self._connectivity_state) diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py index bbb69ad489..862987a0cd 100644 --- a/src/python/grpcio/grpc/_common.py +++ b/src/python/grpcio/grpc/_common.py @@ -14,8 +14,6 @@ """Shared implementation.""" import logging -import threading -import time import six @@ -101,35 +99,3 @@ def deserialize(serialized_message, deserializer): def fully_qualified_method(group, method): return '/{}/{}'.format(group, method) - - -class CleanupThread(threading.Thread): - """A threading.Thread subclass supporting custom behavior on join(). - - On Python Interpreter exit, Python will attempt to join outstanding threads - prior to garbage collection. We may need to do additional cleanup, and - we accomplish this by overriding the join() method. - """ - - def __init__(self, behavior, *args, **kwargs): - """Constructor. - - Args: - behavior (function): Function called on join() with a single - argument, timeout, indicating the maximum duration of - `behavior`, or None indicating `behavior` has no deadline. - `behavior` must be idempotent. - args: Positional arguments passed to threading.Thread constructor. - kwargs: Keyword arguments passed to threading.Thread constructor. - """ - super(CleanupThread, self).__init__(*args, **kwargs) - self._behavior = behavior - - def join(self, timeout=None): - start_time = time.time() - self._behavior(timeout) - end_time = time.time() - if timeout is not None: - timeout -= end_time - start_time - timeout = max(timeout, 0) - super(CleanupThread, self).join(timeout) diff --git a/src/python/grpcio/grpc/_cython/.gitignore b/src/python/grpcio/grpc/_cython/.gitignore index 306e3ad277..b9936e932c 100644 --- a/src/python/grpcio/grpc/_cython/.gitignore +++ b/src/python/grpcio/grpc/_cython/.gitignore @@ -1,4 +1,4 @@ -cygrpc.c +cygrpc.cpp *.a *.so *.dll diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi index 0892215b6d..2e02111ddd 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi @@ -30,9 +30,12 @@ cdef class Call: tag, operations, self if retain_self else None) batch_operation_tag.prepare() cpython.Py_INCREF(batch_operation_tag) - return grpc_call_start_batch( + cdef grpc_call_error error + with nogil: + error = grpc_call_start_batch( self.c_call, batch_operation_tag.c_ops, batch_operation_tag.c_nops, <cpython.PyObject *>batch_operation_tag, NULL) + return error def start_client_batch(self, operations, tag): # We don't reference this call in the operations tag because diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi index 1ba76b7f83..eefc685c0b 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi @@ -13,9 +13,59 @@ # limitations under the License. +cdef _check_call_error_no_metadata(c_call_error) + + +cdef _check_and_raise_call_error_no_metadata(c_call_error) + + +cdef _check_call_error(c_call_error, metadata) + + +cdef class _CallState: + + cdef grpc_call *c_call + cdef set due + + +cdef class _ChannelState: + + cdef object condition + cdef grpc_channel *c_channel + # A boolean field indicating that the channel is open (if True) or is being + # closed (i.e. a call to close is currently executing) or is closed (if + # False). + # TODO(https://github.com/grpc/grpc/issues/3064): Eliminate "is being closed" + # a state in which condition may be acquired by any thread, eliminate this + # field and just use the NULLness of c_channel as an indication that the + # channel is closed. + cdef object open + + # A dict from _BatchOperationTag to _CallState + cdef dict integrated_call_states + cdef grpc_completion_queue *c_call_completion_queue + + # A set of _CallState + cdef set segregated_call_states + + cdef set connectivity_due + cdef grpc_completion_queue *c_connectivity_completion_queue + + +cdef class IntegratedCall: + + cdef _ChannelState _channel_state + cdef _CallState _call_state + + +cdef class SegregatedCall: + + cdef _ChannelState _channel_state + cdef _CallState _call_state + cdef grpc_completion_queue *_c_completion_queue + + cdef class Channel: cdef grpc_arg_pointer_vtable _vtable - cdef grpc_channel *c_channel - cdef list references - cdef readonly _ArgumentsProcessor _arguments_processor + cdef _ChannelState _state diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index a3966497bc..72e74e84ae 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -14,82 +14,439 @@ cimport cpython +import threading + +_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = ( + 'Internal gRPC call error %d. ' + + 'Please report to https://github.com/grpc/grpc/issues') + + +cdef str _call_error_metadata(metadata): + return 'metadata was invalid: %s' % metadata + + +cdef str _call_error_no_metadata(c_call_error): + return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error + + +cdef str _call_error(c_call_error, metadata): + if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA: + return _call_error_metadata(metadata) + else: + return _call_error_no_metadata(c_call_error) + + +cdef _check_call_error_no_metadata(c_call_error): + if c_call_error != GRPC_CALL_OK: + return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error + else: + return None + + +cdef _check_and_raise_call_error_no_metadata(c_call_error): + error = _check_call_error_no_metadata(c_call_error) + if error is not None: + raise ValueError(error) + + +cdef _check_call_error(c_call_error, metadata): + if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA: + return _call_error_metadata(metadata) + else: + return _check_call_error_no_metadata(c_call_error) + + +cdef void _raise_call_error_no_metadata(c_call_error) except *: + raise ValueError(_call_error_no_metadata(c_call_error)) + + +cdef void _raise_call_error(c_call_error, metadata) except *: + raise ValueError(_call_error(c_call_error, metadata)) + + +cdef _destroy_c_completion_queue(grpc_completion_queue *c_completion_queue): + grpc_completion_queue_shutdown(c_completion_queue) + grpc_completion_queue_destroy(c_completion_queue) + + +cdef class _CallState: + + def __cinit__(self): + self.due = set() + + +cdef class _ChannelState: + + def __cinit__(self): + self.condition = threading.Condition() + self.open = True + self.integrated_call_states = {} + self.segregated_call_states = set() + self.connectivity_due = set() + + +cdef tuple _operate(grpc_call *c_call, object operations, object user_tag): + cdef grpc_call_error c_call_error + cdef _BatchOperationTag tag = _BatchOperationTag(user_tag, operations, None) + tag.prepare() + cpython.Py_INCREF(tag) + with nogil: + c_call_error = grpc_call_start_batch( + c_call, tag.c_ops, tag.c_nops, <cpython.PyObject *>tag, NULL) + return c_call_error, tag + + +cdef object _operate_from_integrated_call( + _ChannelState channel_state, _CallState call_state, object operations, + object user_tag): + cdef grpc_call_error c_call_error + cdef _BatchOperationTag tag + with channel_state.condition: + if call_state.due: + c_call_error, tag = _operate(call_state.c_call, operations, user_tag) + if c_call_error == GRPC_CALL_OK: + call_state.due.add(tag) + channel_state.integrated_call_states[tag] = call_state + return True + else: + _raise_call_error_no_metadata(c_call_error) + else: + return False + + +cdef object _operate_from_segregated_call( + _ChannelState channel_state, _CallState call_state, object operations, + object user_tag): + cdef grpc_call_error c_call_error + cdef _BatchOperationTag tag + with channel_state.condition: + if call_state.due: + c_call_error, tag = _operate(call_state.c_call, operations, user_tag) + if c_call_error == GRPC_CALL_OK: + call_state.due.add(tag) + return True + else: + _raise_call_error_no_metadata(c_call_error) + else: + return False + + +cdef _cancel( + _ChannelState channel_state, _CallState call_state, grpc_status_code code, + str details): + cdef grpc_call_error c_call_error + with channel_state.condition: + if call_state.due: + c_call_error = grpc_call_cancel_with_status( + call_state.c_call, code, _encode(details), NULL) + _check_and_raise_call_error_no_metadata(c_call_error) + + +cdef BatchOperationEvent _next_call_event( + _ChannelState channel_state, grpc_completion_queue *c_completion_queue, + on_success): + tag, event = _latent_event(c_completion_queue, None) + with channel_state.condition: + on_success(tag) + channel_state.condition.notify_all() + return event + + +# TODO(https://github.com/grpc/grpc/issues/14569): This could be a lot simpler. +cdef void _call( + _ChannelState channel_state, _CallState call_state, + grpc_completion_queue *c_completion_queue, on_success, int flags, method, + host, object deadline, CallCredentials credentials, + object operationses_and_user_tags, object metadata) except *: + """Invokes an RPC. + + Args: + channel_state: A _ChannelState with its "open" attribute set to True. RPCs + may not be invoked on a closed channel. + call_state: An empty _CallState to be altered (specifically assigned a + c_call and having its due set populated) if the RPC invocation is + successful. + c_completion_queue: A grpc_completion_queue to be used for the call's + operations. + on_success: A behavior to be called if attempting to start operations for + the call succeeds. If called the behavior will be called while holding the + channel_state condition and passed the tags associated with operations + that were successfully started for the call. + flags: Flags to be passed to gRPC Core as part of call creation. + method: The fully-qualified name of the RPC method being invoked. + host: A "host" string to be passed to gRPC Core as part of call creation. + deadline: A float for the deadline of the RPC, or None if the RPC is to have + no deadline. + credentials: A _CallCredentials for the RPC or None. + operationses_and_user_tags: A sequence of length-two sequences the first + element of which is a sequence of Operations and the second element of + which is an object to be used as a tag. A SendInitialMetadataOperation + must be present in the first element of this value. + metadata: The metadata for this call. + """ + cdef grpc_slice method_slice + cdef grpc_slice host_slice + cdef grpc_slice *host_slice_ptr + cdef grpc_call_credentials *c_call_credentials + cdef grpc_call_error c_call_error + cdef tuple error_and_wrapper_tag + cdef _BatchOperationTag wrapper_tag + with channel_state.condition: + if channel_state.open: + method_slice = _slice_from_bytes(method) + if host is None: + host_slice_ptr = NULL + else: + host_slice = _slice_from_bytes(host) + host_slice_ptr = &host_slice + call_state.c_call = grpc_channel_create_call( + channel_state.c_channel, NULL, flags, + c_completion_queue, method_slice, host_slice_ptr, + _timespec_from_time(deadline), NULL) + grpc_slice_unref(method_slice) + if host_slice_ptr: + grpc_slice_unref(host_slice) + if credentials is not None: + c_call_credentials = credentials.c() + c_call_error = grpc_call_set_credentials( + call_state.c_call, c_call_credentials) + grpc_call_credentials_release(c_call_credentials) + if c_call_error != GRPC_CALL_OK: + grpc_call_unref(call_state.c_call) + call_state.c_call = NULL + _raise_call_error_no_metadata(c_call_error) + started_tags = set() + for operations, user_tag in operationses_and_user_tags: + c_call_error, tag = _operate(call_state.c_call, operations, user_tag) + if c_call_error == GRPC_CALL_OK: + started_tags.add(tag) + else: + grpc_call_cancel(call_state.c_call, NULL) + grpc_call_unref(call_state.c_call) + call_state.c_call = NULL + _raise_call_error(c_call_error, metadata) + else: + call_state.due.update(started_tags) + on_success(started_tags) + else: + raise ValueError('Cannot invoke RPC on closed channel!') + +cdef void _process_integrated_call_tag( + _ChannelState state, _BatchOperationTag tag) except *: + cdef _CallState call_state = state.integrated_call_states.pop(tag) + call_state.due.remove(tag) + if not call_state.due: + grpc_call_unref(call_state.c_call) + call_state.c_call = NULL + + +cdef class IntegratedCall: + + def __cinit__(self, _ChannelState channel_state, _CallState call_state): + self._channel_state = channel_state + self._call_state = call_state + + def operate(self, operations, tag): + return _operate_from_integrated_call( + self._channel_state, self._call_state, operations, tag) + + def cancel(self, code, details): + _cancel(self._channel_state, self._call_state, code, details) + + +cdef IntegratedCall _integrated_call( + _ChannelState state, int flags, method, host, object deadline, + object metadata, CallCredentials credentials, operationses_and_user_tags): + call_state = _CallState() + + def on_success(started_tags): + for started_tag in started_tags: + state.integrated_call_states[started_tag] = call_state + + _call( + state, call_state, state.c_call_completion_queue, on_success, flags, + method, host, deadline, credentials, operationses_and_user_tags, metadata) + + return IntegratedCall(state, call_state) + + +cdef object _process_segregated_call_tag( + _ChannelState state, _CallState call_state, + grpc_completion_queue *c_completion_queue, _BatchOperationTag tag): + call_state.due.remove(tag) + if not call_state.due: + grpc_call_unref(call_state.c_call) + call_state.c_call = NULL + state.segregated_call_states.remove(call_state) + _destroy_c_completion_queue(c_completion_queue) + return True + else: + return False + + +cdef class SegregatedCall: + + def __cinit__(self, _ChannelState channel_state, _CallState call_state): + self._channel_state = channel_state + self._call_state = call_state + + def operate(self, operations, tag): + return _operate_from_segregated_call( + self._channel_state, self._call_state, operations, tag) + + def cancel(self, code, details): + _cancel(self._channel_state, self._call_state, code, details) + + def next_event(self): + def on_success(tag): + _process_segregated_call_tag( + self._channel_state, self._call_state, self._c_completion_queue, tag) + return _next_call_event( + self._channel_state, self._c_completion_queue, on_success) + + +cdef SegregatedCall _segregated_call( + _ChannelState state, int flags, method, host, object deadline, + object metadata, CallCredentials credentials, operationses_and_user_tags): + cdef _CallState call_state = _CallState() + cdef grpc_completion_queue *c_completion_queue = ( + grpc_completion_queue_create_for_next(NULL)) + cdef SegregatedCall segregated_call + + def on_success(started_tags): + state.segregated_call_states.add(call_state) + + try: + _call( + state, call_state, c_completion_queue, on_success, flags, method, host, + deadline, credentials, operationses_and_user_tags, metadata) + except: + _destroy_c_completion_queue(c_completion_queue) + raise + + segregated_call = SegregatedCall(state, call_state) + segregated_call._c_completion_queue = c_completion_queue + return segregated_call + + +cdef object _watch_connectivity_state( + _ChannelState state, grpc_connectivity_state last_observed_state, + object deadline): + cdef _ConnectivityTag tag = _ConnectivityTag(object()) + with state.condition: + if state.open: + cpython.Py_INCREF(tag) + grpc_channel_watch_connectivity_state( + state.c_channel, last_observed_state, _timespec_from_time(deadline), + state.c_connectivity_completion_queue, <cpython.PyObject *>tag) + state.connectivity_due.add(tag) + else: + raise ValueError('Cannot invoke RPC on closed channel!') + completed_tag, event = _latent_event( + state.c_connectivity_completion_queue, None) + with state.condition: + state.connectivity_due.remove(completed_tag) + state.condition.notify_all() + return event + + +cdef _close(_ChannelState state, grpc_status_code code, object details): + cdef _CallState call_state + encoded_details = _encode(details) + with state.condition: + if state.open: + state.open = False + for call_state in set(state.integrated_call_states.values()): + grpc_call_cancel_with_status( + call_state.c_call, code, encoded_details, NULL) + for call_state in state.segregated_call_states: + grpc_call_cancel_with_status( + call_state.c_call, code, encoded_details, NULL) + # TODO(https://github.com/grpc/grpc/issues/3064): Cancel connectivity + # watching. + + while state.integrated_call_states: + state.condition.wait() + while state.segregated_call_states: + state.condition.wait() + while state.connectivity_due: + state.condition.wait() + + _destroy_c_completion_queue(state.c_call_completion_queue) + _destroy_c_completion_queue(state.c_connectivity_completion_queue) + grpc_channel_destroy(state.c_channel) + state.c_channel = NULL + grpc_shutdown() + state.condition.notify_all() + else: + # Another call to close already completed in the past or is currently + # being executed in another thread. + while state.c_channel != NULL: + state.condition.wait() + cdef class Channel: - def __cinit__(self, bytes target, object arguments, - ChannelCredentials channel_credentials=None): + def __cinit__( + self, bytes target, object arguments, + ChannelCredentials channel_credentials): grpc_init() + self._state = _ChannelState() self._vtable.copy = &_copy_pointer self._vtable.destroy = &_destroy_pointer self._vtable.cmp = &_compare_pointer cdef _ArgumentsProcessor arguments_processor = _ArgumentsProcessor( arguments) cdef grpc_channel_args *c_arguments = arguments_processor.c(&self._vtable) - self.references = [] - c_target = target if channel_credentials is None: - self.c_channel = grpc_insecure_channel_create(c_target, c_arguments, NULL) + self._state.c_channel = grpc_insecure_channel_create( + <char *>target, c_arguments, NULL) else: c_channel_credentials = channel_credentials.c() - self.c_channel = grpc_secure_channel_create( - c_channel_credentials, c_target, c_arguments, NULL) + self._state.c_channel = grpc_secure_channel_create( + c_channel_credentials, <char *>target, c_arguments, NULL) grpc_channel_credentials_release(c_channel_credentials) - arguments_processor.un_c() - self.references.append(target) - self.references.append(arguments) - - def create_call(self, Call parent, int flags, - CompletionQueue queue not None, - method, host, object deadline): - if queue.is_shutting_down: - raise ValueError("queue must not be shutting down or shutdown") - cdef grpc_slice method_slice = _slice_from_bytes(method) - cdef grpc_slice host_slice - cdef grpc_slice *host_slice_ptr = NULL - if host is not None: - host_slice = _slice_from_bytes(host) - host_slice_ptr = &host_slice - cdef Call operation_call = Call() - operation_call.references = [self, queue] - cdef grpc_call *parent_call = NULL - if parent is not None: - parent_call = parent.c_call - operation_call.c_call = grpc_channel_create_call( - self.c_channel, parent_call, flags, - queue.c_completion_queue, method_slice, host_slice_ptr, - _timespec_from_time(deadline), NULL) - grpc_slice_unref(method_slice) - if host_slice_ptr: - grpc_slice_unref(host_slice) - return operation_call + self._state.c_call_completion_queue = ( + grpc_completion_queue_create_for_next(NULL)) + self._state.c_connectivity_completion_queue = ( + grpc_completion_queue_create_for_next(NULL)) + + def target(self): + cdef char *c_target + with self._state.condition: + c_target = grpc_channel_get_target(self._state.c_channel) + target = <bytes>c_target + gpr_free(c_target) + return target + + def integrated_call( + self, int flags, method, host, object deadline, object metadata, + CallCredentials credentials, operationses_and_tags): + return _integrated_call( + self._state, flags, method, host, deadline, metadata, credentials, + operationses_and_tags) + + def next_call_event(self): + def on_success(tag): + _process_integrated_call_tag(self._state, tag) + return _next_call_event( + self._state, self._state.c_call_completion_queue, on_success) + + def segregated_call( + self, int flags, method, host, object deadline, object metadata, + CallCredentials credentials, operationses_and_tags): + return _segregated_call( + self._state, flags, method, host, deadline, metadata, credentials, + operationses_and_tags) def check_connectivity_state(self, bint try_to_connect): - cdef grpc_connectivity_state result - with nogil: - result = grpc_channel_check_connectivity_state(self.c_channel, - try_to_connect) - return result + with self._state.condition: + return grpc_channel_check_connectivity_state( + self._state.c_channel, try_to_connect) def watch_connectivity_state( - self, grpc_connectivity_state last_observed_state, - object deadline, CompletionQueue queue not None, tag): - cdef _ConnectivityTag connectivity_tag = _ConnectivityTag(tag) - cpython.Py_INCREF(connectivity_tag) - grpc_channel_watch_connectivity_state( - self.c_channel, last_observed_state, _timespec_from_time(deadline), - queue.c_completion_queue, <cpython.PyObject *>connectivity_tag) + self, grpc_connectivity_state last_observed_state, object deadline): + return _watch_connectivity_state(self._state, last_observed_state, deadline) - def target(self): - cdef char *target = NULL - with nogil: - target = grpc_channel_get_target(self.c_channel) - result = <bytes>target - with nogil: - gpr_free(target) - return result - - def __dealloc__(self): - if self.c_channel != NULL: - grpc_channel_destroy(self.c_channel) - grpc_shutdown() + def close(self, code, details): + _close(self._state, code, details) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi index 5ea0287b81..9f06ce086e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi @@ -13,10 +13,16 @@ # limitations under the License. +cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) + + +cdef _interpret_event(grpc_event c_event) + + cdef class CompletionQueue: cdef grpc_completion_queue *c_completion_queue cdef bint is_shutting_down cdef bint is_shutdown - cdef _interpret_event(self, grpc_event event) + cdef _interpret_event(self, grpc_event c_event) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 40496d1124..a2d765546a 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -20,6 +20,53 @@ import time cdef int _INTERRUPT_CHECK_PERIOD_MS = 200 +cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline): + cdef gpr_timespec c_increment + cdef gpr_timespec c_timeout + cdef gpr_timespec c_deadline + c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN) + if deadline is None: + c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) + else: + c_deadline = _timespec_from_time(deadline) + + with nogil: + while True: + c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment) + if gpr_time_cmp(c_timeout, c_deadline) > 0: + c_timeout = c_deadline + c_event = grpc_completion_queue_next(c_completion_queue, c_timeout, NULL) + if (c_event.type != GRPC_QUEUE_TIMEOUT or + gpr_time_cmp(c_timeout, c_deadline) == 0): + break + + # Handle any signals + with gil: + cpython.PyErr_CheckSignals() + return c_event + + +cdef _interpret_event(grpc_event c_event): + cdef _Tag tag + if c_event.type == GRPC_QUEUE_TIMEOUT: + # NOTE(nathaniel): For now we coopt ConnectivityEvent here. + return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None) + elif c_event.type == GRPC_QUEUE_SHUTDOWN: + # NOTE(nathaniel): For now we coopt ConnectivityEvent here. + return None, ConnectivityEvent(GRPC_QUEUE_SHUTDOWN, False, None) + else: + tag = <_Tag>c_event.tag + # We receive event tags only after they've been inc-ref'd elsewhere in + # the code. + cpython.Py_DECREF(tag) + return tag, tag.event(c_event) + + +cdef _latent_event(grpc_completion_queue *c_completion_queue, object deadline): + cdef grpc_event c_event = _next(c_completion_queue, deadline) + return _interpret_event(c_event) + + cdef class CompletionQueue: def __cinit__(self, shutdown_cq=False): @@ -36,48 +83,16 @@ cdef class CompletionQueue: self.is_shutting_down = False self.is_shutdown = False - cdef _interpret_event(self, grpc_event event): - cdef _Tag tag = None - if event.type == GRPC_QUEUE_TIMEOUT: - # NOTE(nathaniel): For now we coopt ConnectivityEvent here. - return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None) - elif event.type == GRPC_QUEUE_SHUTDOWN: + cdef _interpret_event(self, grpc_event c_event): + unused_tag, event = _interpret_event(c_event) + if event.completion_type == GRPC_QUEUE_SHUTDOWN: self.is_shutdown = True - # NOTE(nathaniel): For now we coopt ConnectivityEvent here. - return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, True, None) - else: - tag = <_Tag>event.tag - # We receive event tags only after they've been inc-ref'd elsewhere in - # the code. - cpython.Py_DECREF(tag) - return tag.event(event) + return event + # We name this 'poll' to avoid problems with CPython's expectations for + # 'special' methods (like next and __next__). def poll(self, deadline=None): - # We name this 'poll' to avoid problems with CPython's expectations for - # 'special' methods (like next and __next__). - cdef gpr_timespec c_increment - cdef gpr_timespec c_timeout - cdef gpr_timespec c_deadline - if deadline is None: - c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) - else: - c_deadline = _timespec_from_time(deadline) - with nogil: - c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN) - - while True: - c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment) - if gpr_time_cmp(c_timeout, c_deadline) > 0: - c_timeout = c_deadline - event = grpc_completion_queue_next( - self.c_completion_queue, c_timeout, NULL) - if event.type != GRPC_QUEUE_TIMEOUT or gpr_time_cmp(c_timeout, c_deadline) == 0: - break; - - # Handle any signals - with gil: - cpython.PyErr_CheckSignals() - return self._interpret_event(event) + return self._interpret_event(_next(self.c_completion_queue, deadline)) def shutdown(self): with nogil: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd new file mode 100644 index 0000000000..f5688d08cd --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd @@ -0,0 +1,152 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# distutils: language=c++ + +from libc.stdint cimport uint32_t + +cdef extern from "grpc/impl/codegen/slice.h": + struct grpc_slice_buffer: + int count + +cdef extern from "src/core/lib/iomgr/error.h": + struct grpc_error: + pass + +cdef extern from "src/core/lib/iomgr/gevent_util.h": + grpc_error* grpc_socket_error(char* error) + char* grpc_slice_buffer_start(grpc_slice_buffer* buffer, int i) + int grpc_slice_buffer_length(grpc_slice_buffer* buffer, int i) + +cdef extern from "src/core/lib/iomgr/sockaddr.h": + ctypedef struct grpc_sockaddr: + pass + +cdef extern from "src/core/lib/iomgr/resolve_address.h": + ctypedef struct grpc_resolved_addresses: + size_t naddrs + grpc_resolved_address* addrs + + ctypedef struct grpc_resolved_address: + char[128] addr + size_t len + +cdef extern from "src/core/lib/iomgr/resolve_address_custom.h": + struct grpc_custom_resolver: + pass + + struct grpc_custom_resolver_vtable: + grpc_error* (*resolve)(char* host, char* port, grpc_resolved_addresses** res); + void (*resolve_async)(grpc_custom_resolver* resolver, char* host, char* port); + + void grpc_custom_resolve_callback(grpc_custom_resolver* resolver, + grpc_resolved_addresses* result, + grpc_error* error); + +cdef extern from "src/core/lib/iomgr/tcp_custom.h": + struct grpc_custom_socket: + void* impl + # We don't care about the rest of the fields + ctypedef void (*grpc_custom_connect_callback)(grpc_custom_socket* socket, + grpc_error* error) + ctypedef void (*grpc_custom_write_callback)(grpc_custom_socket* socket, + grpc_error* error) + ctypedef void (*grpc_custom_read_callback)(grpc_custom_socket* socket, + size_t nread, grpc_error* error) + ctypedef void (*grpc_custom_accept_callback)(grpc_custom_socket* socket, + grpc_custom_socket* client, + grpc_error* error) + ctypedef void (*grpc_custom_close_callback)(grpc_custom_socket* socket) + + struct grpc_socket_vtable: + grpc_error* (*init)(grpc_custom_socket* socket, int domain); + void (*connect)(grpc_custom_socket* socket, const grpc_sockaddr* addr, + size_t len, grpc_custom_connect_callback cb); + void (*destroy)(grpc_custom_socket* socket); + void (*shutdown)(grpc_custom_socket* socket); + void (*close)(grpc_custom_socket* socket, grpc_custom_close_callback cb); + void (*write)(grpc_custom_socket* socket, grpc_slice_buffer* slices, + grpc_custom_write_callback cb); + void (*read)(grpc_custom_socket* socket, char* buffer, size_t length, + grpc_custom_read_callback cb); + grpc_error* (*getpeername)(grpc_custom_socket* socket, + const grpc_sockaddr* addr, int* len); + grpc_error* (*getsockname)(grpc_custom_socket* socket, + const grpc_sockaddr* addr, int* len); + grpc_error* (*bind)(grpc_custom_socket* socket, const grpc_sockaddr* addr, + size_t len, int flags); + grpc_error* (*listen)(grpc_custom_socket* socket); + void (*accept)(grpc_custom_socket* socket, grpc_custom_socket* client, + grpc_custom_accept_callback cb); + +cdef extern from "src/core/lib/iomgr/timer_custom.h": + struct grpc_custom_timer: + void* timer + int timeout_ms + # We don't care about the rest of the fields + + struct grpc_custom_timer_vtable: + void (*start)(grpc_custom_timer* t); + void (*stop)(grpc_custom_timer* t); + + void grpc_custom_timer_callback(grpc_custom_timer* t, grpc_error* error); + +cdef extern from "src/core/lib/iomgr/pollset_custom.h": + struct grpc_custom_poller_vtable: + void (*init)() + void (*poll)(size_t timeout_ms) + void (*kick)() + void (*shutdown)() + +cdef extern from "src/core/lib/iomgr/iomgr_custom.h": + void grpc_custom_iomgr_init(grpc_socket_vtable* socket, + grpc_custom_resolver_vtable* resolver, + grpc_custom_timer_vtable* timer, + grpc_custom_poller_vtable* poller); + +cdef extern from "src/core/lib/iomgr/sockaddr_utils.h": + int grpc_sockaddr_get_port(const grpc_resolved_address *addr); + int grpc_sockaddr_to_string(char **out, const grpc_resolved_address *addr, + int normalize); + void grpc_string_to_sockaddr(grpc_resolved_address *out, char* addr, int port); + int grpc_sockaddr_set_port(const grpc_resolved_address *resolved_addr, + int port) + const char* grpc_sockaddr_get_uri_scheme(const grpc_resolved_address* resolved_addr) + + +cdef class TimerWrapper: + + cdef grpc_custom_timer *c_timer + cdef object timer + cdef object event + +cdef class SocketWrapper: + cdef object sockopts + cdef object socket + cdef object closed + cdef grpc_custom_socket *c_socket + cdef char* c_buffer + cdef size_t len + cdef grpc_custom_socket *accepting_socket + + cdef grpc_custom_connect_callback connect_cb + cdef grpc_custom_write_callback write_cb + cdef grpc_custom_read_callback read_cb + cdef grpc_custom_accept_callback accept_cb + cdef grpc_custom_close_callback close_cb + + +cdef class ResolveWrapper: + cdef grpc_custom_resolver *c_resolver + cdef char* c_host + cdef char* c_port diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx new file mode 100644 index 0000000000..31ef671aed --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx @@ -0,0 +1,448 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# distutils: language=c++ + +cimport cpython +from libc cimport string +from libc.stdlib cimport malloc, free +import errno +gevent_g = None +gevent_socket = None +gevent_hub = None +gevent_event = None +g_event = None +g_pool = None + +cdef grpc_error* grpc_error_none(): + return <grpc_error*>0 + +cdef grpc_error* socket_error(str syscall, str err): + error_str = "{} failed: {}".format(syscall, err) + error_bytes = str_to_bytes(error_str) + return grpc_socket_error(error_bytes) + +cdef resolved_addr_to_tuple(grpc_resolved_address* address): + cdef char* res_str + port = grpc_sockaddr_get_port(address) + str_len = grpc_sockaddr_to_string(&res_str, address, 0) + byte_str = _decode(<bytes>res_str[:str_len]) + if byte_str.endswith(':' + str(port)): + byte_str = byte_str[:(0 - len(str(port)) - 1)] + byte_str = byte_str.lstrip('[') + byte_str = byte_str.rstrip(']') + byte_str = '{}'.format(byte_str) + return byte_str, port + +cdef sockaddr_to_tuple(const grpc_sockaddr* address, size_t length): + cdef grpc_resolved_address c_addr + string.memcpy(<void*>c_addr.addr, <void*> address, length) + c_addr.len = length + return resolved_addr_to_tuple(&c_addr) + +cdef sockaddr_is_ipv4(const grpc_sockaddr* address, size_t length): + cdef grpc_resolved_address c_addr + string.memcpy(<void*>c_addr.addr, <void*> address, length) + c_addr.len = length + return grpc_sockaddr_get_uri_scheme(&c_addr) == b'ipv4' + +cdef grpc_resolved_addresses* tuples_to_resolvaddr(tups): + cdef grpc_resolved_addresses* addresses + tups_set = set((tup[4][0], tup[4][1]) for tup in tups) + addresses = <grpc_resolved_addresses*> malloc(sizeof(grpc_resolved_addresses)) + addresses.naddrs = len(tups_set) + addresses.addrs = <grpc_resolved_address*> malloc(sizeof(grpc_resolved_address) * len(tups_set)) + i = 0 + for tup in set(tups_set): + hostname = str_to_bytes(tup[0]) + grpc_string_to_sockaddr(&addresses.addrs[i], hostname, tup[1]) + i += 1 + return addresses + +def _spawn_greenlet(*args): + greenlet = g_pool.spawn(*args) + +############################### +### socket implementation ### +############################### + +cdef class SocketWrapper: + def __cinit__(self): + self.sockopts = [] + self.socket = None + self.c_socket = NULL + self.c_buffer = NULL + self.len = 0 + +cdef grpc_error* socket_init(grpc_custom_socket* socket, int domain) with gil: + sw = SocketWrapper() + sw.c_socket = socket + sw.sockopts = [] + cpython.Py_INCREF(sw) + # Python doesn't support AF_UNSPEC sockets, so we defer creation until + # bind/connect when we know what type of socket we need + sw.socket = None + sw.closed = False + sw.accepting_socket = NULL + socket.impl = <void*>sw + return grpc_error_none() + +cdef socket_connect_async_cython(SocketWrapper socket_wrapper, addr_tuple): + try: + socket_wrapper.socket.connect(addr_tuple) + socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket, + grpc_error_none()) + except IOError as io_error: + socket_wrapper.connect_cb(<grpc_custom_socket*>socket_wrapper.c_socket, + socket_error("connect", str(io_error))) + g_event.set() + +def socket_connect_async(socket_wrapper, addr_tuple): + socket_connect_async_cython(socket_wrapper, addr_tuple) + +cdef void socket_connect(grpc_custom_socket* socket, const grpc_sockaddr* addr, + size_t addr_len, + grpc_custom_connect_callback cb) with gil: + py_socket = None + socket_wrapper = <SocketWrapper>socket.impl + socket_wrapper.connect_cb = cb + addr_tuple = sockaddr_to_tuple(addr, addr_len) + if sockaddr_is_ipv4(addr, addr_len): + py_socket = gevent_socket.socket(gevent_socket.AF_INET) + else: + py_socket = gevent_socket.socket(gevent_socket.AF_INET6) + applysockopts(py_socket) + socket_wrapper.socket = py_socket + _spawn_greenlet(socket_connect_async, socket_wrapper, addr_tuple) + +cdef void socket_destroy(grpc_custom_socket* socket) with gil: + cpython.Py_DECREF(<SocketWrapper>socket.impl) + +cdef void socket_shutdown(grpc_custom_socket* socket) with gil: + try: + (<SocketWrapper>socket.impl).socket.shutdown(gevent_socket.SHUT_RDWR) + except IOError as io_error: + if io_error.errno != errno.ENOTCONN: + raise io_error + +cdef void socket_close(grpc_custom_socket* socket, + grpc_custom_close_callback cb) with gil: + socket_wrapper = (<SocketWrapper>socket.impl) + if socket_wrapper.socket is not None: + socket_wrapper.socket.close() + socket_wrapper.closed = True + socket_wrapper.close_cb = cb + # Delay the close callback until the accept() call has picked it up + if socket_wrapper.accepting_socket != NULL: + return + socket_wrapper.close_cb(socket) + +def socket_sendmsg(socket, write_bytes): + try: + return socket.sendmsg(write_bytes) + except AttributeError: + # sendmsg not available on all Pythons/Platforms + return socket.send(b''.join(write_bytes)) + +cdef socket_write_async_cython(SocketWrapper socket_wrapper, write_bytes): + try: + while write_bytes: + sent_byte_count = socket_sendmsg(socket_wrapper.socket, write_bytes) + while sent_byte_count > 0: + if sent_byte_count < len(write_bytes[0]): + write_bytes[0] = write_bytes[0][sent_byte_count:] + sent_byte_count = 0 + else: + sent_byte_count -= len(write_bytes[0]) + write_bytes = write_bytes[1:] + socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket, + grpc_error_none()) + except IOError as io_error: + socket_wrapper.write_cb(<grpc_custom_socket*>socket_wrapper.c_socket, + socket_error("send", str(io_error))) + g_event.set() + +def socket_write_async(socket_wrapper, write_bytes): + socket_write_async_cython(socket_wrapper, write_bytes) + +cdef void socket_write(grpc_custom_socket* socket, grpc_slice_buffer* buffer, + grpc_custom_write_callback cb) with gil: + cdef char* start + sw = <SocketWrapper>socket.impl + sw.write_cb = cb + write_bytes = [] + for i in range(buffer.count): + start = grpc_slice_buffer_start(buffer, i) + length = grpc_slice_buffer_length(buffer, i) + write_bytes.append(<bytes>start[:length]) + _spawn_greenlet(socket_write_async, <SocketWrapper>socket.impl, write_bytes) + +cdef socket_read_async_cython(SocketWrapper socket_wrapper): + cdef char* buff_char_arr + try: + buff_str = socket_wrapper.socket.recv(socket_wrapper.len) + buff_char_arr = buff_str + string.memcpy(<void*>socket_wrapper.c_buffer, buff_char_arr, len(buff_str)) + socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket, + len(buff_str), grpc_error_none()) + except IOError as io_error: + socket_wrapper.read_cb(<grpc_custom_socket*>socket_wrapper.c_socket, + -1, socket_error("recv", str(io_error))) + g_event.set() + +def socket_read_async(socket_wrapper): + socket_read_async_cython(socket_wrapper) + +cdef void socket_read(grpc_custom_socket* socket, char* buffer, + size_t length, grpc_custom_read_callback cb) with gil: + sw = <SocketWrapper>socket.impl + sw.read_cb = cb + sw.c_buffer = buffer + sw.len = length + _spawn_greenlet(socket_read_async, sw) + +cdef grpc_error* socket_getpeername(grpc_custom_socket* socket, + const grpc_sockaddr* addr, + int* length) with gil: + cdef char* src_buf + peer = (<SocketWrapper>socket.impl).socket.getpeername() + + cdef grpc_resolved_address c_addr + hostname = str_to_bytes(peer[0]) + grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) + string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len) + length[0] = c_addr.len + return grpc_error_none() + +cdef grpc_error* socket_getsockname(grpc_custom_socket* socket, + const grpc_sockaddr* addr, + int* length) with gil: + cdef char* src_buf + cdef grpc_resolved_address c_addr + if (<SocketWrapper>socket.impl).socket is None: + peer = ('0.0.0.0', 0) + else: + peer = (<SocketWrapper>socket.impl).socket.getsockname() + hostname = str_to_bytes(peer[0]) + grpc_string_to_sockaddr(&c_addr, hostname, peer[1]) + string.memcpy(<void*>addr, <void*>c_addr.addr, c_addr.len) + length[0] = c_addr.len + return grpc_error_none() + +def applysockopts(s): + s.setsockopt(gevent_socket.SOL_SOCKET, gevent_socket.SO_REUSEADDR, 1) + s.setsockopt(gevent_socket.IPPROTO_TCP, gevent_socket.TCP_NODELAY, True) + +cdef grpc_error* socket_bind(grpc_custom_socket* socket, + const grpc_sockaddr* addr, + size_t len, int flags) with gil: + addr_tuple = sockaddr_to_tuple(addr, len) + try: + try: + py_socket = gevent_socket.socket(gevent_socket.AF_INET) + applysockopts(py_socket) + py_socket.bind(addr_tuple) + except gevent_socket.gaierror as e: + py_socket = gevent_socket.socket(gevent_socket.AF_INET6) + applysockopts(py_socket) + py_socket.bind(addr_tuple) + (<SocketWrapper>socket.impl).socket = py_socket + except IOError as io_error: + return socket_error("bind", str(io_error)) + else: + return grpc_error_none() + +cdef grpc_error* socket_listen(grpc_custom_socket* socket) with gil: + (<SocketWrapper>socket.impl).socket.listen(50) + return grpc_error_none() + +cdef void accept_callback_cython(SocketWrapper s): + try: + conn, address = s.socket.accept() + sw = SocketWrapper() + sw.closed = False + sw.c_socket = s.accepting_socket + sw.sockopts = [] + sw.socket = conn + sw.c_socket.impl = <void*>sw + sw.accepting_socket = NULL + cpython.Py_INCREF(sw) + s.accepting_socket = NULL + s.accept_cb(<grpc_custom_socket*>s.c_socket, sw.c_socket, grpc_error_none()) + except IOError as io_error: + #TODO actual error + s.accepting_socket = NULL + s.accept_cb(<grpc_custom_socket*>s.c_socket, s.accepting_socket, + socket_error("accept", str(io_error))) + if s.closed: + s.close_cb(<grpc_custom_socket*>s.c_socket) + g_event.set() + +def socket_accept_async(s): + accept_callback_cython(s) + +cdef void socket_accept(grpc_custom_socket* socket, grpc_custom_socket* client, + grpc_custom_accept_callback cb) with gil: + sw = <SocketWrapper>socket.impl + sw.accepting_socket = client + sw.accept_cb = cb + _spawn_greenlet(socket_accept_async, sw) + +##################################### +######Resolver implementation ####### +##################################### + +cdef class ResolveWrapper: + def __cinit__(self): + self.c_resolver = NULL + self.c_host = NULL + self.c_port = NULL + +cdef socket_resolve_async_cython(ResolveWrapper resolve_wrapper): + try: + res = gevent_socket.getaddrinfo(resolve_wrapper.c_host, resolve_wrapper.c_port) + grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver, + tuples_to_resolvaddr(res), grpc_error_none()) + except IOError as io_error: + grpc_custom_resolve_callback(<grpc_custom_resolver*>resolve_wrapper.c_resolver, + <grpc_resolved_addresses*>0, + socket_error("getaddrinfo", str(io_error))) + g_event.set() + +def socket_resolve_async_python(resolve_wrapper): + socket_resolve_async_cython(resolve_wrapper) + +cdef void socket_resolve_async(grpc_custom_resolver* r, char* host, char* port) with gil: + rw = ResolveWrapper() + rw.c_resolver = r + rw.c_host = host + rw.c_port = port + _spawn_greenlet(socket_resolve_async_python, rw) + +cdef grpc_error* socket_resolve(char* host, char* port, + grpc_resolved_addresses** res) with gil: + try: + result = gevent_socket.getaddrinfo(host, port) + res[0] = tuples_to_resolvaddr(result) + return grpc_error_none() + except IOError as io_error: + return socket_error("getaddrinfo", str(io_error)) + +############################### +### timer implementation ###### +############################### + +cdef class TimerWrapper: + def __cinit__(self, deadline): + self.timer = gevent_hub.get_hub().loop.timer(deadline) + self.event = None + + def start(self): + self.event = gevent_event.Event() + self.timer.start(self.on_finish) + + def on_finish(self): + grpc_custom_timer_callback(self.c_timer, grpc_error_none()) + self.timer.stop() + g_event.set() + + def stop(self): + self.event.set() + self.timer.stop() + +cdef void timer_start(grpc_custom_timer* t) with gil: + timer = TimerWrapper(t.timeout_ms / 1000.0) + timer.c_timer = t + t.timer = <void*>timer + timer.start() + +cdef void timer_stop(grpc_custom_timer* t) with gil: + time_wrapper = <object>t.timer + time_wrapper.stop() + +############################### +### pollset implementation ### +############################### + +cdef void init_loop() with gil: + pass + +cdef void destroy_loop() with gil: + g_pool.join() + +cdef void kick_loop() with gil: + g_event.set() + +cdef void run_loop(size_t timeout_ms) with gil: + timeout = timeout_ms / 1000.0 + if timeout_ms > 0: + g_event.wait(timeout) + g_event.clear() + +############################### +### Initializer ############### +############################### + +cdef grpc_socket_vtable gevent_socket_vtable +cdef grpc_custom_resolver_vtable gevent_resolver_vtable +cdef grpc_custom_timer_vtable gevent_timer_vtable +cdef grpc_custom_poller_vtable gevent_pollset_vtable + +def init_grpc_gevent(): + # Lazily import gevent + global gevent_socket + global gevent_g + global gevent_hub + global gevent_event + global g_event + global g_pool + import gevent + gevent_g = gevent + import gevent.socket + gevent_socket = gevent.socket + import gevent.hub + gevent_hub = gevent.hub + import gevent.event + gevent_event = gevent.event + import gevent.pool + + g_event = gevent.event.Event() + g_pool = gevent.pool.Group() + gevent_resolver_vtable.resolve = socket_resolve + gevent_resolver_vtable.resolve_async = socket_resolve_async + + gevent_socket_vtable.init = socket_init + gevent_socket_vtable.connect = socket_connect + gevent_socket_vtable.destroy = socket_destroy + gevent_socket_vtable.shutdown = socket_shutdown + gevent_socket_vtable.close = socket_close + gevent_socket_vtable.write = socket_write + gevent_socket_vtable.read = socket_read + gevent_socket_vtable.getpeername = socket_getpeername + gevent_socket_vtable.getsockname = socket_getsockname + gevent_socket_vtable.bind = socket_bind + gevent_socket_vtable.listen = socket_listen + gevent_socket_vtable.accept = socket_accept + + gevent_timer_vtable.start = timer_start + gevent_timer_vtable.stop = timer_stop + + gevent_pollset_vtable.init = init_loop + gevent_pollset_vtable.poll = run_loop + gevent_pollset_vtable.kick = kick_loop + gevent_pollset_vtable.shutdown = destroy_loop + + grpc_custom_iomgr_init(&gevent_socket_vtable, + &gevent_resolver_vtable, + &gevent_timer_vtable, + &gevent_pollset_vtable) diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd index b6a794c6d7..c8ace7c3cc 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pxd +++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# distutils: language=c++ include "_cygrpc/grpc.pxi" @@ -27,3 +28,5 @@ include "_cygrpc/security.pxd.pxi" include "_cygrpc/server.pxd.pxi" include "_cygrpc/tag.pxd.pxi" include "_cygrpc/time.pxd.pxi" + +include "_cygrpc/grpc_gevent.pxd" diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index 2ee2e6b73e..f5f08fc983 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# distutils: language=c++ cimport cpython @@ -35,6 +36,8 @@ include "_cygrpc/server.pyx.pxi" include "_cygrpc/tag.pyx.pxi" include "_cygrpc/time.pyx.pxi" +include "_cygrpc/grpc_gevent.pyx" + # # initialize gRPC # diff --git a/src/python/grpcio/grpc/_grpcio_metadata.py b/src/python/grpcio/grpc/_grpcio_metadata.py index 4a69d859fc..ad53f60ad3 100644 --- a/src/python/grpcio/grpc/_grpcio_metadata.py +++ b/src/python/grpcio/grpc/_grpcio_metadata.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc/_grpcio_metadata.py.template`!!! -__version__ = """1.11.0.dev0""" +__version__ = """1.13.0.dev0""" diff --git a/src/python/grpcio/grpc/_interceptor.py b/src/python/grpcio/grpc/_interceptor.py index d029472c68..f465e35a9c 100644 --- a/src/python/grpcio/grpc/_interceptor.py +++ b/src/python/grpcio/grpc/_interceptor.py @@ -334,6 +334,19 @@ class _Channel(grpc.Channel): else: return thunk(method) + def _close(self): + self._channel.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._close() + return False + + def close(self): + self._channel.close() + def intercept_channel(channel, *interceptors): for interceptor in reversed(list(interceptors)): diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index c988e0c87c..d849cadbee 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -780,14 +780,8 @@ def _start(state): state.stage = _ServerStage.STARTED _request_call(state) - def cleanup_server(timeout): - if timeout is None: - _stop(state, _UNEXPECTED_EXIT_SERVER_GRACE).wait() - else: - _stop(state, timeout).wait() - - thread = _common.CleanupThread( - cleanup_server, target=_serve, args=(state,)) + thread = threading.Thread(target=_serve, args=(state,)) + thread.daemon = True thread.start() diff --git a/src/python/grpcio/grpc/beta/_server_adaptations.py b/src/python/grpcio/grpc/beta/_server_adaptations.py index 3c04fd7639..ccafec8951 100644 --- a/src/python/grpcio/grpc/beta/_server_adaptations.py +++ b/src/python/grpcio/grpc/beta/_server_adaptations.py @@ -168,11 +168,8 @@ def _run_request_pipe_thread(request_iterator, request_consumer, return request_consumer.terminate() - def stop_request_pipe(timeout): # pylint: disable=unused-argument - thread_joined.set() - - request_pipe_thread = _common.CleanupThread( - stop_request_pipe, target=pipe_requests) + request_pipe_thread = threading.Thread(target=pipe_requests) + request_pipe_thread.daemon = True request_pipe_thread.start() diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py b/src/python/grpcio/grpc/experimental/__init__.py index 5fb4f3c3cf..dcec322b69 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py +++ b/src/python/grpcio/grpc/experimental/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2015 gRPC authors. +# Copyright 2018 gRPC authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,3 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""gRPC's experimental APIs. + +These APIs are subject to be removed during any minor version release. +""" diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py b/src/python/grpcio/grpc/experimental/gevent.py index 6eb7ba33f6..159d612b4e 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py +++ b/src/python/grpcio/grpc/experimental/gevent.py @@ -1,4 +1,4 @@ -# Copyright 2015 gRPC authors. +# Copyright 2018 gRPC authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,11 +11,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""A test constant working around issue 3069.""" +"""gRPC's Python gEvent APIs.""" -# test_constants is referenced from specification in this module. -from tests.unit.framework.common import test_constants # pylint: disable=unused-import +from grpc._cython import cygrpc as _cygrpc -# TODO(issue 3069): Replace uses of this constant with -# test_constants.SHORT_TIMEOUT. -REALLY_SHORT_TIMEOUT = 0.1 + +def init_gevent(): + """Patches gRPC's libraries to be compatible with gevent. + + This must be called AFTER the python standard lib has been patched, + but BEFORE creating and gRPC objects. + + In order for progress to be made, the application must drive the event loop. + """ + _cygrpc.init_grpc_gevent() diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index c1654358a3..bf6c2534a8 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -15,6 +15,9 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_core_dependencies.py.template`!!! CORE_SOURCE_FILES = [ + 'third_party/address_sorting/address_sorting.c', + 'third_party/address_sorting/address_sorting_posix.c', + 'third_party/address_sorting/address_sorting_windows.c', 'src/core/lib/gpr/alloc.cc', 'src/core/lib/gpr/arena.cc', 'src/core/lib/gpr/atm.cc', @@ -60,10 +63,13 @@ CORE_SOURCE_FILES = [ 'src/core/lib/channel/channel_args.cc', 'src/core/lib/channel/channel_stack.cc', 'src/core/lib/channel/channel_stack_builder.cc', + 'src/core/lib/channel/channel_trace.cc', + 'src/core/lib/channel/channel_trace_registry.cc', 'src/core/lib/channel/connected_channel.cc', 'src/core/lib/channel/handshaker.cc', 'src/core/lib/channel/handshaker_factory.cc', 'src/core/lib/channel/handshaker_registry.cc', + 'src/core/lib/channel/status_util.cc', 'src/core/lib/compression/compression.cc', 'src/core/lib/compression/compression_internal.cc', 'src/core/lib/compression/message_compress.cc', @@ -97,6 +103,8 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/gethostname_sysconf.cc', 'src/core/lib/iomgr/iocp_windows.cc', 'src/core/lib/iomgr/iomgr.cc', + 'src/core/lib/iomgr/iomgr_custom.cc', + 'src/core/lib/iomgr/iomgr_internal.cc', 'src/core/lib/iomgr/iomgr_posix.cc', 'src/core/lib/iomgr/iomgr_uv.cc', 'src/core/lib/iomgr/iomgr_windows.cc', @@ -105,12 +113,16 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/lockfree_event.cc', 'src/core/lib/iomgr/network_status_tracker.cc', 'src/core/lib/iomgr/polling_entity.cc', - 'src/core/lib/iomgr/pollset_set_uv.cc', + 'src/core/lib/iomgr/pollset.cc', + 'src/core/lib/iomgr/pollset_custom.cc', + 'src/core/lib/iomgr/pollset_set.cc', + 'src/core/lib/iomgr/pollset_set_custom.cc', 'src/core/lib/iomgr/pollset_set_windows.cc', 'src/core/lib/iomgr/pollset_uv.cc', 'src/core/lib/iomgr/pollset_windows.cc', + 'src/core/lib/iomgr/resolve_address.cc', + 'src/core/lib/iomgr/resolve_address_custom.cc', 'src/core/lib/iomgr/resolve_address_posix.cc', - 'src/core/lib/iomgr/resolve_address_uv.cc', 'src/core/lib/iomgr/resolve_address_windows.cc', 'src/core/lib/iomgr/resource_quota.cc', 'src/core/lib/iomgr/sockaddr_utils.cc', @@ -122,19 +134,24 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/socket_utils_uv.cc', 'src/core/lib/iomgr/socket_utils_windows.cc', 'src/core/lib/iomgr/socket_windows.cc', + 'src/core/lib/iomgr/tcp_client.cc', + 'src/core/lib/iomgr/tcp_client_custom.cc', 'src/core/lib/iomgr/tcp_client_posix.cc', - 'src/core/lib/iomgr/tcp_client_uv.cc', 'src/core/lib/iomgr/tcp_client_windows.cc', + 'src/core/lib/iomgr/tcp_custom.cc', 'src/core/lib/iomgr/tcp_posix.cc', + 'src/core/lib/iomgr/tcp_server.cc', + 'src/core/lib/iomgr/tcp_server_custom.cc', 'src/core/lib/iomgr/tcp_server_posix.cc', 'src/core/lib/iomgr/tcp_server_utils_posix_common.cc', 'src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.cc', 'src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.cc', - 'src/core/lib/iomgr/tcp_server_uv.cc', 'src/core/lib/iomgr/tcp_server_windows.cc', 'src/core/lib/iomgr/tcp_uv.cc', 'src/core/lib/iomgr/tcp_windows.cc', 'src/core/lib/iomgr/time_averaged_stats.cc', + 'src/core/lib/iomgr/timer.cc', + 'src/core/lib/iomgr/timer_custom.cc', 'src/core/lib/iomgr/timer_generic.cc', 'src/core/lib/iomgr/timer_heap.cc', 'src/core/lib/iomgr/timer_manager.cc', @@ -279,9 +296,9 @@ CORE_SOURCE_FILES = [ 'third_party/nanopb/pb_decode.c', 'third_party/nanopb/pb_encode.c', 'src/core/tsi/transport_security.cc', - 'src/core/tsi/transport_security_adapter.cc', 'src/core/ext/transport/chttp2/client/insecure/channel_create.cc', 'src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc', + 'src/core/ext/transport/chttp2/client/authority.cc', 'src/core/ext/transport/chttp2/client/chttp2_connector.cc', 'src/core/ext/filters/client_channel/backup_poller.cc', 'src/core/ext/filters/client_channel/channel_connectivity.cc', @@ -301,13 +318,15 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/resolver.cc', 'src/core/ext/filters/client_channel/resolver_registry.cc', 'src/core/ext/filters/client_channel/retry_throttle.cc', - 'src/core/ext/filters/client_channel/status_util.cc', 'src/core/ext/filters/client_channel/subchannel.cc', 'src/core/ext/filters/client_channel/subchannel_index.cc', 'src/core/ext/filters/client_channel/uri_parser.cc', 'src/core/ext/filters/deadline/deadline_filter.cc', 'src/core/tsi/alts_transport_security.cc', 'src/core/tsi/fake_transport_security.cc', + 'src/core/tsi/ssl/session_cache/ssl_session_boringssl.cc', + 'src/core/tsi/ssl/session_cache/ssl_session_cache.cc', + 'src/core/tsi/ssl/session_cache/ssl_session_openssl.cc', 'src/core/tsi/ssl_transport_security.cc', 'src/core/tsi/transport_security_grpc.cc', 'src/core/ext/transport/chttp2/server/chttp2_server.cc', @@ -324,7 +343,6 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c', 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', - 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc', @@ -337,6 +355,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/census/grpc_context.cc', 'src/core/ext/filters/max_age/max_age_filter.cc', 'src/core/ext/filters/message_size/message_size_filter.cc', + 'src/core/ext/filters/http/client_authority_filter.cc', 'src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc', 'src/core/ext/filters/workarounds/workaround_utils.cc', 'src/core/plugin_registry/grpc_plugin_registry.cc', @@ -409,7 +428,6 @@ CORE_SOURCE_FILES = [ 'third_party/boringssl/crypto/cpu-intel.c', 'third_party/boringssl/crypto/cpu-ppc64le.c', 'third_party/boringssl/crypto/crypto.c', - 'third_party/boringssl/crypto/curve25519/curve25519.c', 'third_party/boringssl/crypto/curve25519/spake25519.c', 'third_party/boringssl/crypto/curve25519/x25519-x86_64.c', 'third_party/boringssl/crypto/dh/check.c', @@ -595,6 +613,7 @@ CORE_SOURCE_FILES = [ 'third_party/boringssl/ssl/tls13_server.cc', 'third_party/boringssl/ssl/tls_method.cc', 'third_party/boringssl/ssl/tls_record.cc', + 'third_party/boringssl/third_party/fiat/curve25519.c', 'third_party/zlib/adler32.c', 'third_party/zlib/compress.c', 'third_party/zlib/crc32.c', diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py index 32e82493f3..57dc26dbeb 100644 --- a/src/python/grpcio/grpc_version.py +++ b/src/python/grpcio/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!! -VERSION = '1.11.0.dev0' +VERSION = '1.13.0.dev0' diff --git a/src/python/grpcio_health_checking/grpc_version.py b/src/python/grpcio_health_checking/grpc_version.py index ad4c85cc12..ba0d4a3b6d 100644 --- a/src/python/grpcio_health_checking/grpc_version.py +++ b/src/python/grpcio_health_checking/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_health_checking/grpc_version.py.template`!!! -VERSION = '1.11.0.dev0' +VERSION = '1.13.0.dev0' diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py index 60d309ec65..35c09827ba 100644 --- a/src/python/grpcio_health_checking/setup.py +++ b/src/python/grpcio_health_checking/setup.py @@ -57,7 +57,7 @@ PACKAGE_DIRECTORIES = { } INSTALL_REQUIRES = ( - 'protobuf>=3.5.0.post1', + 'protobuf>=3.5.2.post1', 'grpcio>={version}'.format(version=grpc_version.VERSION), ) diff --git a/src/python/grpcio_reflection/README.rst b/src/python/grpcio_reflection/README.rst new file mode 100644 index 0000000000..da99a44904 --- /dev/null +++ b/src/python/grpcio_reflection/README.rst @@ -0,0 +1,10 @@ +gRPC Python Reflection package +============================== + +Reference package for reflection in GRPC Python. + +Dependencies +------------ + +Depends on the `grpcio` package, available from PyPI via `pip install grpcio`. + diff --git a/src/python/grpcio_reflection/grpc_version.py b/src/python/grpcio_reflection/grpc_version.py index 6322d847b1..ea2878d9ee 100644 --- a/src/python/grpcio_reflection/grpc_version.py +++ b/src/python/grpcio_reflection/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_reflection/grpc_version.py.template`!!! -VERSION = '1.11.0.dev0' +VERSION = '1.13.0.dev0' diff --git a/src/python/grpcio_reflection/setup.py b/src/python/grpcio_reflection/setup.py index 10c4c38f19..589d0ff556 100644 --- a/src/python/grpcio_reflection/setup.py +++ b/src/python/grpcio_reflection/setup.py @@ -58,7 +58,7 @@ PACKAGE_DIRECTORIES = { } INSTALL_REQUIRES = ( - 'protobuf>=3.5.0.post1', + 'protobuf>=3.5.2.post1', 'grpcio>={version}'.format(version=grpc_version.VERSION), ) diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_channel.py b/src/python/grpcio_testing/grpc_testing/_channel/_channel.py index b015b8d738..0c1941e6be 100644 --- a/src/python/grpcio_testing/grpc_testing/_channel/_channel.py +++ b/src/python/grpcio_testing/grpc_testing/_channel/_channel.py @@ -56,6 +56,21 @@ class TestingChannel(grpc_testing.Channel): response_deserializer=None): return _multi_callable.StreamStream(method, self._state) + def _close(self): + # TODO(https://github.com/grpc/grpc/issues/12531): Decide what + # action to take here, if any? + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._close() + return False + + def close(self): + self._close() + def take_unary_unary(self, method_descriptor): return _channel_rpc.unary_unary(self._state, method_descriptor) diff --git a/src/python/grpcio_testing/grpc_version.py b/src/python/grpcio_testing/grpc_version.py index 1e75fea12e..02f19f2283 100644 --- a/src/python/grpcio_testing/grpc_version.py +++ b/src/python/grpcio_testing/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_testing/grpc_version.py.template`!!! -VERSION = '1.11.0.dev0' +VERSION = '1.13.0.dev0' diff --git a/src/python/grpcio_testing/setup.py b/src/python/grpcio_testing/setup.py index 5a9d593ec1..eb480a5464 100644 --- a/src/python/grpcio_testing/setup.py +++ b/src/python/grpcio_testing/setup.py @@ -29,7 +29,7 @@ PACKAGE_DIRECTORIES = { } INSTALL_REQUIRES = ( - 'protobuf>=3.5.0.post1', + 'protobuf>=3.5.2.post1', 'grpcio>={version}'.format(version=grpc_version.VERSION), ) diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index 93f84572b7..42e01c18d3 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -108,6 +108,57 @@ class TestLite(setuptools.Command): self.distribution.fetch_build_eggs(self.distribution.tests_require) +class TestGevent(setuptools.Command): + """Command to run tests w/gevent.""" + + BANNED_TESTS = ( + # These tests send a lot of RPCs and are really slow on gevent. They will + # eventually succeed, but need to dig into performance issues. + 'unit._cython._no_messages_server_completion_queue_per_call_test.Test.test_rpcs', + 'unit._cython._no_messages_single_server_completion_queue_test.Test.test_rpcs', + # I have no idea why this doesn't work in gevent, but it shouldn't even be + # using the c-core + 'testing._client_test.ClientTest.test_infinite_request_stream_real_time', + # TODO(https://github.com/grpc/grpc/issues/14789) enable this test + 'unit._server_ssl_cert_config_test', + # TODO(https://github.com/grpc/grpc/issues/14901) enable this test + 'protoc_plugin._python_plugin_test.PythonPluginTest', + # Beta API is unsupported for gevent + 'protoc_plugin.beta_python_plugin_test', + 'unit.beta._beta_features_test', + ) + description = 'run tests with gevent. Assumes grpc/gevent are installed' + user_options = [] + + def initialize_options(self): + pass + + def finalize_options(self): + # distutils requires this override. + pass + + def run(self): + from gevent import monkey + monkey.patch_all() + + import tests + + import grpc.experimental.gevent + grpc.experimental.gevent.init_gevent() + + import gevent + + import tests + loader = tests.Loader() + loader.loadTestsFromNames(['tests']) + runner = tests.Runner() + runner.skip_tests(self.BANNED_TESTS) + result = gevent.spawn(runner.run, loader.suite) + result.join() + if not result.value.wasSuccessful(): + sys.exit('Test failure') + + class RunInterop(test.test): description = 'run interop test client/server' diff --git a/src/python/grpcio_tests/grpc_version.py b/src/python/grpcio_tests/grpc_version.py index 0cd7bd257f..9d2e41644e 100644 --- a/src/python/grpcio_tests/grpc_version.py +++ b/src/python/grpcio_tests/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_tests/grpc_version.py.template`!!! -VERSION = '1.11.0.dev0' +VERSION = '1.13.0.dev0' diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py index 250df65803..98ac19d188 100644 --- a/src/python/grpcio_tests/setup.py +++ b/src/python/grpcio_tests/setup.py @@ -41,7 +41,7 @@ INSTALL_REQUIRES = ( 'grpcio>={version}'.format(version=grpc_version.VERSION), 'grpcio-tools>={version}'.format(version=grpc_version.VERSION), 'grpcio-health-checking>={version}'.format(version=grpc_version.VERSION), - 'oauth2client>=1.4.7', 'protobuf>=3.5.0.post1', 'six>=1.10', + 'oauth2client>=1.4.7', 'protobuf>=3.5.2.post1', 'six>=1.10', 'google-auth>=1.0.0', 'requests>=2.14.2') COMMAND_CLASS = { @@ -50,7 +50,8 @@ COMMAND_CLASS = { 'build_package_protos': grpc_tools.command.BuildPackageProtos, 'build_py': commands.BuildPy, 'run_interop': commands.RunInterop, - 'test_lite': commands.TestLite + 'test_lite': commands.TestLite, + 'test_gevent': commands.TestGevent, } PACKAGE_DATA = { diff --git a/src/python/grpcio_tests/tests/_loader.py b/src/python/grpcio_tests/tests/_loader.py index 31680916b4..be0af64646 100644 --- a/src/python/grpcio_tests/tests/_loader.py +++ b/src/python/grpcio_tests/tests/_loader.py @@ -54,7 +54,7 @@ class Loader(object): for module in modules: try: package_paths = module.__path__ - except: + except AttributeError: continue self.walk_packages(package_paths) coverage_context.stop() diff --git a/src/python/grpcio_tests/tests/_result.py b/src/python/grpcio_tests/tests/_result.py index 9907c4e1f9..b105f18e78 100644 --- a/src/python/grpcio_tests/tests/_result.py +++ b/src/python/grpcio_tests/tests/_result.py @@ -46,7 +46,7 @@ class CaseResult( None. """ - class Kind: + class Kind(object): UNTESTED = 'untested' RUNNING = 'running' ERROR = 'error' @@ -257,7 +257,7 @@ class CoverageResult(AugmentedResult): #coverage.Coverage().combine() -class _Colors: +class _Colors(object): """Namespaced constants for terminal color magic numbers.""" HEADER = '\033[95m' INFO = '\033[94m' diff --git a/src/python/grpcio_tests/tests/_runner.py b/src/python/grpcio_tests/tests/_runner.py index 8e27dc6c6d..eaaa027e61 100644 --- a/src/python/grpcio_tests/tests/_runner.py +++ b/src/python/grpcio_tests/tests/_runner.py @@ -117,6 +117,12 @@ class AugmentedCase(collections.namedtuple('AugmentedCase', ['case', 'id'])): class Runner(object): + def __init__(self): + self._skipped_tests = [] + + def skip_tests(self, tests): + self._skipped_tests = tests + def run(self, suite): """See setuptools' test_runner setup argument for information.""" # only run test cases with id starting with given prefix @@ -181,27 +187,31 @@ class Runner(object): # Run the tests result.startTestRun() for augmented_case in augmented_cases: - sys.stdout.write('Running {}\n'.format( - augmented_case.case.id())) - sys.stdout.flush() - case_thread = threading.Thread( - target=augmented_case.case.run, args=(result,)) - try: - with stdout_pipe, stderr_pipe: - case_thread.start() - while case_thread.is_alive(): - check_kill_self() - time.sleep(0) - case_thread.join() - except: - # re-raise the exception after forcing the with-block to end - raise - result.set_output(augmented_case.case, stdout_pipe.output(), - stderr_pipe.output()) - sys.stdout.write(result_out.getvalue()) - sys.stdout.flush() - result_out.truncate(0) - check_kill_self() + for skipped_test in self._skipped_tests: + if skipped_test in augmented_case.case.id(): + break + else: + sys.stdout.write('Running {}\n'.format( + augmented_case.case.id())) + sys.stdout.flush() + case_thread = threading.Thread( + target=augmented_case.case.run, args=(result,)) + try: + with stdout_pipe, stderr_pipe: + case_thread.start() + while case_thread.is_alive(): + check_kill_self() + time.sleep(0) + case_thread.join() + except: + # re-raise the exception after forcing the with-block to end + raise + result.set_output(augmented_case.case, stdout_pipe.output(), + stderr_pipe.output()) + sys.stdout.write(result_out.getvalue()) + sys.stdout.flush() + result_out.truncate(0) + check_kill_self() result.stopTestRun() stdout_pipe.close() stderr_pipe.close() diff --git a/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py b/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py index 8d464b2d4b..ace15bea58 100644 --- a/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py +++ b/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py @@ -36,6 +36,9 @@ class InsecureIntraopTest(_intraop_test_case.IntraopTestCase, self.stub = test_pb2_grpc.TestServiceStub( grpc.insecure_channel('localhost:{}'.format(port))) + def tearDown(self): + self.server.stop(None) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py index c89135998d..e27e551ecb 100644 --- a/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py +++ b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py @@ -45,6 +45,9 @@ class SecureIntraopTest(_intraop_test_case.IntraopTestCase, unittest.TestCase): _SERVER_HOST_OVERRIDE, ),))) + def tearDown(self): + self.server.stop(None) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/interop/client.py b/src/python/grpcio_tests/tests/interop/client.py index 3780ed9020..698c37017f 100644 --- a/src/python/grpcio_tests/tests/interop/client.py +++ b/src/python/grpcio_tests/tests/interop/client.py @@ -66,10 +66,6 @@ def _args(): return parser.parse_args() -def _application_default_credentials(): - return oauth2client_client.GoogleCredentials.get_application_default() - - def _stub(args): target = '{}:{}'.format(args.server_host, args.server_port) if args.test_case == 'oauth2_auth_token': diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py index 6d85f43130..00e60b43ef 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py @@ -237,6 +237,7 @@ class PythonPluginTest(unittest.TestCase): self.assertIsNotNone(service.servicer_methods) self.assertIsNotNone(service.server) self.assertIsNotNone(service.stub) + service.server.stop(None) def testIncompleteServicer(self): service = _CreateIncompleteService() @@ -245,6 +246,7 @@ class PythonPluginTest(unittest.TestCase): service.stub.UnaryCall(request) self.assertIs(exception_context.exception.code(), grpc.StatusCode.UNIMPLEMENTED) + service.server.stop(None) def testUnaryCall(self): service = _CreateService() @@ -253,6 +255,7 @@ class PythonPluginTest(unittest.TestCase): expected_response = service.servicer_methods.UnaryCall( request, 'not a real context!') self.assertEqual(expected_response, response) + service.server.stop(None) def testUnaryCallFuture(self): service = _CreateService() @@ -264,6 +267,7 @@ class PythonPluginTest(unittest.TestCase): expected_response = service.servicer_methods.UnaryCall( request, 'not a real RpcContext!') self.assertEqual(expected_response, response) + service.server.stop(None) def testUnaryCallFutureExpired(self): service = _CreateService() @@ -276,6 +280,7 @@ class PythonPluginTest(unittest.TestCase): self.assertIs(exception_context.exception.code(), grpc.StatusCode.DEADLINE_EXCEEDED) self.assertIs(response_future.code(), grpc.StatusCode.DEADLINE_EXCEEDED) + service.server.stop(None) def testUnaryCallFutureCancelled(self): service = _CreateService() @@ -285,6 +290,7 @@ class PythonPluginTest(unittest.TestCase): response_future.cancel() self.assertTrue(response_future.cancelled()) self.assertIs(response_future.code(), grpc.StatusCode.CANCELLED) + service.server.stop(None) def testUnaryCallFutureFailed(self): service = _CreateService() @@ -293,6 +299,7 @@ class PythonPluginTest(unittest.TestCase): response_future = service.stub.UnaryCall.future(request) self.assertIsNotNone(response_future.exception()) self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN) + service.server.stop(None) def testStreamingOutputCall(self): service = _CreateService() @@ -303,6 +310,7 @@ class PythonPluginTest(unittest.TestCase): for expected_response, response in moves.zip_longest( expected_responses, responses): self.assertEqual(expected_response, response) + service.server.stop(None) def testStreamingOutputCallExpired(self): service = _CreateService() @@ -314,6 +322,7 @@ class PythonPluginTest(unittest.TestCase): list(responses) self.assertIs(exception_context.exception.code(), grpc.StatusCode.DEADLINE_EXCEEDED) + service.server.stop(None) def testStreamingOutputCallCancelled(self): service = _CreateService() @@ -324,6 +333,7 @@ class PythonPluginTest(unittest.TestCase): with self.assertRaises(grpc.RpcError) as exception_context: next(responses) self.assertIs(responses.code(), grpc.StatusCode.CANCELLED) + service.server.stop(None) def testStreamingOutputCallFailed(self): service = _CreateService() @@ -335,6 +345,7 @@ class PythonPluginTest(unittest.TestCase): next(responses) self.assertIs(exception_context.exception.code(), grpc.StatusCode.UNKNOWN) + service.server.stop(None) def testStreamingInputCall(self): service = _CreateService() @@ -343,6 +354,7 @@ class PythonPluginTest(unittest.TestCase): expected_response = service.servicer_methods.StreamingInputCall( _streaming_input_request_iterator(), 'not a real RpcContext!') self.assertEqual(expected_response, response) + service.server.stop(None) def testStreamingInputCallFuture(self): service = _CreateService() @@ -353,6 +365,7 @@ class PythonPluginTest(unittest.TestCase): expected_response = service.servicer_methods.StreamingInputCall( _streaming_input_request_iterator(), 'not a real RpcContext!') self.assertEqual(expected_response, response) + service.server.stop(None) def testStreamingInputCallFutureExpired(self): service = _CreateService() @@ -367,6 +380,7 @@ class PythonPluginTest(unittest.TestCase): grpc.StatusCode.DEADLINE_EXCEEDED) self.assertIs(exception_context.exception.code(), grpc.StatusCode.DEADLINE_EXCEEDED) + service.server.stop(None) def testStreamingInputCallFutureCancelled(self): service = _CreateService() @@ -377,6 +391,7 @@ class PythonPluginTest(unittest.TestCase): self.assertTrue(response_future.cancelled()) with self.assertRaises(grpc.FutureCancelledError): response_future.result() + service.server.stop(None) def testStreamingInputCallFutureFailed(self): service = _CreateService() @@ -385,6 +400,7 @@ class PythonPluginTest(unittest.TestCase): _streaming_input_request_iterator()) self.assertIsNotNone(response_future.exception()) self.assertIs(response_future.code(), grpc.StatusCode.UNKNOWN) + service.server.stop(None) def testFullDuplexCall(self): service = _CreateService() @@ -394,6 +410,7 @@ class PythonPluginTest(unittest.TestCase): for expected_response, response in moves.zip_longest( expected_responses, responses): self.assertEqual(expected_response, response) + service.server.stop(None) def testFullDuplexCallExpired(self): request_iterator = _full_duplex_request_iterator() @@ -405,6 +422,7 @@ class PythonPluginTest(unittest.TestCase): list(responses) self.assertIs(exception_context.exception.code(), grpc.StatusCode.DEADLINE_EXCEEDED) + service.server.stop(None) def testFullDuplexCallCancelled(self): service = _CreateService() @@ -416,6 +434,7 @@ class PythonPluginTest(unittest.TestCase): next(responses) self.assertIs(exception_context.exception.code(), grpc.StatusCode.CANCELLED) + service.server.stop(None) def testFullDuplexCallFailed(self): request_iterator = _full_duplex_request_iterator() @@ -426,6 +445,7 @@ class PythonPluginTest(unittest.TestCase): next(responses) self.assertIs(exception_context.exception.code(), grpc.StatusCode.UNKNOWN) + service.server.stop(None) def testHalfDuplexCall(self): service = _CreateService() @@ -445,6 +465,7 @@ class PythonPluginTest(unittest.TestCase): for expected_response, response in moves.zip_longest( expected_responses, responses): self.assertEqual(expected_response, response) + service.server.stop(None) def testHalfDuplexCallWedged(self): condition = threading.Condition() @@ -478,6 +499,7 @@ class PythonPluginTest(unittest.TestCase): next(responses) self.assertIs(exception_context.exception.code(), grpc.StatusCode.DEADLINE_EXCEEDED) + service.server.stop(None) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py index ab33775ad3..e21ea0010a 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py @@ -271,6 +271,7 @@ class _Test(six.with_metaclass(abc.ABCMeta, unittest.TestCase)): stub = services_module.TestServiceStub(channel) response = stub.Call(self._messages_pb2.Request()) self.assertEqual(self._messages_pb2.Response(), response) + server.stop(None) def _create_test_case_class(split_proto, protoc_style): diff --git a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py index ad0ecf0079..b46e53315e 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py @@ -329,9 +329,7 @@ class PythonPluginTest(unittest.TestCase): _packagify(self._python_out) - with _system_path([ - self._python_out, - ]): + with _system_path([self._python_out]): self._payload_pb2 = importlib.import_module(_PAYLOAD_PB2) self._requests_pb2 = importlib.import_module(_REQUESTS_PB2) self._responses_pb2 = importlib.import_module(_RESPONSES_PB2) diff --git a/src/python/grpcio_tests/tests/qps/benchmark_client.py b/src/python/grpcio_tests/tests/qps/benchmark_client.py index e6392a8b8c..0488450740 100644 --- a/src/python/grpcio_tests/tests/qps/benchmark_client.py +++ b/src/python/grpcio_tests/tests/qps/benchmark_client.py @@ -22,7 +22,7 @@ from six.moves import queue import grpc from src.proto.grpc.testing import messages_pb2 -from src.proto.grpc.testing import services_pb2_grpc +from src.proto.grpc.testing import benchmark_service_pb2_grpc from tests.unit import resources from tests.unit import test_common @@ -58,7 +58,8 @@ class BenchmarkClient: if config.payload_config.WhichOneof('payload') == 'simple_params': self._generic = False - self._stub = services_pb2_grpc.BenchmarkServiceStub(channel) + self._stub = benchmark_service_pb2_grpc.BenchmarkServiceStub( + channel) payload = messages_pb2.Payload( body='\0' * config.payload_config.simple_params.req_size) self._request = messages_pb2.SimpleRequest( diff --git a/src/python/grpcio_tests/tests/qps/benchmark_server.py b/src/python/grpcio_tests/tests/qps/benchmark_server.py index bb07844491..2bd89cbbdf 100644 --- a/src/python/grpcio_tests/tests/qps/benchmark_server.py +++ b/src/python/grpcio_tests/tests/qps/benchmark_server.py @@ -13,10 +13,10 @@ # limitations under the License. from src.proto.grpc.testing import messages_pb2 -from src.proto.grpc.testing import services_pb2_grpc +from src.proto.grpc.testing import benchmark_service_pb2_grpc -class BenchmarkServer(services_pb2_grpc.BenchmarkServiceServicer): +class BenchmarkServer(benchmark_service_pb2_grpc.BenchmarkServiceServicer): """Synchronous Server implementation for the Benchmark service.""" def UnaryCall(self, request, context): @@ -29,7 +29,8 @@ class BenchmarkServer(services_pb2_grpc.BenchmarkServiceServicer): yield messages_pb2.SimpleResponse(payload=payload) -class GenericBenchmarkServer(services_pb2_grpc.BenchmarkServiceServicer): +class GenericBenchmarkServer( + benchmark_service_pb2_grpc.BenchmarkServiceServicer): """Generic Server implementation for the Benchmark service.""" def __init__(self, resp_size): diff --git a/src/python/grpcio_tests/tests/qps/qps_worker.py b/src/python/grpcio_tests/tests/qps/qps_worker.py index 54f69db109..c33d013882 100644 --- a/src/python/grpcio_tests/tests/qps/qps_worker.py +++ b/src/python/grpcio_tests/tests/qps/qps_worker.py @@ -17,7 +17,7 @@ import argparse import time import grpc -from src.proto.grpc.testing import services_pb2_grpc +from src.proto.grpc.testing import worker_service_pb2_grpc from tests.qps import worker_server from tests.unit import test_common @@ -26,7 +26,8 @@ from tests.unit import test_common def run_worker_server(port): server = test_common.test_server() servicer = worker_server.WorkerServer() - services_pb2_grpc.add_WorkerServiceServicer_to_server(servicer, server) + worker_service_pb2_grpc.add_WorkerServiceServicer_to_server( + servicer, server) server.add_insecure_port('[::]:{}'.format(port)) server.start() servicer.wait_for_quit() diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py index 41e2403c8f..db145fbf64 100644 --- a/src/python/grpcio_tests/tests/qps/worker_server.py +++ b/src/python/grpcio_tests/tests/qps/worker_server.py @@ -20,7 +20,7 @@ import time from concurrent import futures import grpc from src.proto.grpc.testing import control_pb2 -from src.proto.grpc.testing import services_pb2_grpc +from src.proto.grpc.testing import worker_service_pb2_grpc from src.proto.grpc.testing import stats_pb2 from tests.qps import benchmark_client @@ -31,7 +31,7 @@ from tests.unit import resources from tests.unit import test_common -class WorkerServer(services_pb2_grpc.WorkerServiceServicer): +class WorkerServer(worker_service_pb2_grpc.WorkerServiceServicer): """Python Worker Server implementation.""" def __init__(self): @@ -72,7 +72,7 @@ class WorkerServer(services_pb2_grpc.WorkerServiceServicer): server = test_common.test_server(max_workers=server_threads) if config.server_type == control_pb2.ASYNC_SERVER: servicer = benchmark_server.BenchmarkServer() - services_pb2_grpc.add_BenchmarkServiceServicer_to_server( + worker_service_pb2_grpc.add_BenchmarkServiceServicer_to_server( servicer, server) elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER: resp_size = config.payload_config.bytebuf_params.resp_size diff --git a/src/python/grpcio_tests/tests/stress/test_runner.py b/src/python/grpcio_tests/tests/stress/test_runner.py index d5038e3ba2..764cda17fb 100644 --- a/src/python/grpcio_tests/tests/stress/test_runner.py +++ b/src/python/grpcio_tests/tests/stress/test_runner.py @@ -50,7 +50,7 @@ class TestRunner(threading.Thread): test_case.test_interoperability(self._stub, None) end_time = time.time() self._histogram.add((end_time - start_time) * 1e9) - except Exception as e: + except Exception as e: # pylint: disable=broad-except traceback.print_exc() self._exception_queue.put( Exception("An exception occured during test {}" diff --git a/src/python/grpcio_tests/tests/testing/_client_application.py b/src/python/grpcio_tests/tests/testing/_client_application.py index 7d0d74c8c4..3ddeba2373 100644 --- a/src/python/grpcio_tests/tests/testing/_client_application.py +++ b/src/python/grpcio_tests/tests/testing/_client_application.py @@ -215,30 +215,6 @@ def _run_infinite_request_stream(stub): return _UNSATISFACTORY_OUTCOME -def run(scenario, channel): - stub = services_pb2_grpc.FirstServiceStub(channel) - try: - if scenario is Scenario.UNARY_UNARY: - return _run_unary_unary(stub) - elif scenario is Scenario.UNARY_STREAM: - return _run_unary_stream(stub) - elif scenario is Scenario.STREAM_UNARY: - return _run_stream_unary(stub) - elif scenario is Scenario.STREAM_STREAM: - return _run_stream_stream(stub) - elif scenario is Scenario.CONCURRENT_STREAM_UNARY: - return _run_concurrent_stream_unary(stub) - elif scenario is Scenario.CONCURRENT_STREAM_STREAM: - return _run_concurrent_stream_stream(stub) - elif scenario is Scenario.CANCEL_UNARY_UNARY: - return _run_cancel_unary_unary(stub) - elif scenario is Scenario.INFINITE_REQUEST_STREAM: - return _run_infinite_request_stream(stub) - except grpc.RpcError as rpc_error: - return Outcome(Outcome.Kind.RPC_ERROR, rpc_error.code(), - rpc_error.details()) - - _IMPLEMENTATIONS = { Scenario.UNARY_UNARY: _run_unary_unary, Scenario.UNARY_STREAM: _run_unary_stream, diff --git a/src/python/grpcio_tests/tests/testing/_server_application.py b/src/python/grpcio_tests/tests/testing/_server_application.py index 02769ca68d..243c385daf 100644 --- a/src/python/grpcio_tests/tests/testing/_server_application.py +++ b/src/python/grpcio_tests/tests/testing/_server_application.py @@ -38,7 +38,7 @@ class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer): context.set_code(grpc.StatusCode.INVALID_ARGUMENT) context.set_details('Something is wrong with your request!') return - yield services_pb2.Strange() + yield services_pb2.Strange() # pylint: disable=unreachable def StreUn(self, request_iterator, context): context.send_initial_metadata((( diff --git a/src/python/grpcio_tests/tests/testing/_server_test.py b/src/python/grpcio_tests/tests/testing/_server_test.py index 4f4abd7708..88e3a79ae5 100644 --- a/src/python/grpcio_tests/tests/testing/_server_test.py +++ b/src/python/grpcio_tests/tests/testing/_server_test.py @@ -21,13 +21,8 @@ import grpc_testing from tests.testing import _application_common from tests.testing import _application_testing_common from tests.testing import _server_application -from tests.testing.proto import services_pb2 -# TODO(https://github.com/google/protobuf/issues/3452): Drop this skip. -@unittest.skipIf( - services_pb2.DESCRIPTOR.services_by_name.get('FirstService') is None, - 'Fix protobuf issue 3452!') class FirstServiceServicerTest(unittest.TestCase): def setUp(self): diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index e033c1063f..0d94426413 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -25,6 +25,7 @@ "unit._auth_test.AccessTokenAuthMetadataPluginTest", "unit._auth_test.GoogleCallCredentialsTest", "unit._channel_args_test.ChannelArgsTest", + "unit._channel_close_test.ChannelCloseTest", "unit._channel_connectivity_test.ChannelConnectivityTest", "unit._channel_ready_future_test.ChannelReadyFutureTest", "unit._compression_test.CompressionTest", @@ -52,16 +53,9 @@ "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestCertConfigReuse", "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithClientAuth", "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithoutClientAuth", - "unit._thread_cleanup_test.CleanupThreadTest", "unit.beta._beta_features_test.BetaFeaturesTest", "unit.beta._beta_features_test.ContextManagementAndLifecycleTest", "unit.beta._connectivity_channel_test.ConnectivityStatesTest", - "unit.beta._face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest", - "unit.beta._face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest", - "unit.beta._face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest", - "unit.beta._face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest", - "unit.beta._face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest", - "unit.beta._face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest", "unit.beta._implementations_test.CallCredentialsTest", "unit.beta._implementations_test.ChannelCredentialsTest", "unit.beta._not_found_test.NotFoundTest", diff --git a/src/python/grpcio_tests/tests/unit/_auth_context_test.py b/src/python/grpcio_tests/tests/unit/_auth_context_test.py index 468869a03e..8c1a30e032 100644 --- a/src/python/grpcio_tests/tests/unit/_auth_context_test.py +++ b/src/python/grpcio_tests/tests/unit/_auth_context_test.py @@ -102,7 +102,8 @@ class AuthContextTest(unittest.TestCase): self.assertIsNone(auth_data[_ID]) self.assertIsNone(auth_data[_ID_KEY]) self.assertDictEqual({ - 'transport_security_type': [b'ssl'] + 'transport_security_type': [b'ssl'], + 'ssl_session_reused': [b'false'], }, auth_data[_AUTH_CTX]) def testSecureClientCert(self): diff --git a/src/python/grpcio_tests/tests/unit/_channel_close_test.py b/src/python/grpcio_tests/tests/unit/_channel_close_test.py new file mode 100644 index 0000000000..af3a9ee1ee --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_channel_close_test.py @@ -0,0 +1,185 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests server and client side compression.""" + +import threading +import time +import unittest + +import grpc + +from tests.unit import test_common +from tests.unit.framework.common import test_constants + +_BEAT = 0.5 +_SOME_TIME = 5 +_MORE_TIME = 10 + + +class _MethodHandler(grpc.RpcMethodHandler): + + request_streaming = True + response_streaming = True + request_deserializer = None + response_serializer = None + + def stream_stream(self, request_iterator, servicer_context): + for request in request_iterator: + yield request * 2 + + +_METHOD_HANDLER = _MethodHandler() + + +class _GenericHandler(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + return _METHOD_HANDLER + + +_GENERIC_HANDLER = _GenericHandler() + + +class _Pipe(object): + + def __init__(self, values): + self._condition = threading.Condition() + self._values = list(values) + self._open = True + + def __iter__(self): + return self + + def _next(self): + with self._condition: + while not self._values and self._open: + self._condition.wait() + if self._values: + return self._values.pop(0) + else: + raise StopIteration() + + def next(self): + return self._next() + + def __next__(self): + return self._next() + + def add(self, value): + with self._condition: + self._values.append(value) + self._condition.notify() + + def close(self): + with self._condition: + self._open = False + self._condition.notify() + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.close() + + +class ChannelCloseTest(unittest.TestCase): + + def setUp(self): + self._server = test_common.test_server( + max_workers=test_constants.THREAD_CONCURRENCY) + self._server.add_generic_rpc_handlers((_GENERIC_HANDLER,)) + self._port = self._server.add_insecure_port('[::]:0') + self._server.start() + + def tearDown(self): + self._server.stop(None) + + def test_close_immediately_after_call_invocation(self): + channel = grpc.insecure_channel('localhost:{}'.format(self._port)) + multi_callable = channel.stream_stream('Meffod') + request_iterator = _Pipe(()) + response_iterator = multi_callable(request_iterator) + channel.close() + request_iterator.close() + + self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED) + + def test_close_while_call_active(self): + channel = grpc.insecure_channel('localhost:{}'.format(self._port)) + multi_callable = channel.stream_stream('Meffod') + request_iterator = _Pipe((b'abc',)) + response_iterator = multi_callable(request_iterator) + next(response_iterator) + channel.close() + request_iterator.close() + + self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED) + + def test_context_manager_close_while_call_active(self): + with grpc.insecure_channel('localhost:{}'.format( + self._port)) as channel: # pylint: disable=bad-continuation + multi_callable = channel.stream_stream('Meffod') + request_iterator = _Pipe((b'abc',)) + response_iterator = multi_callable(request_iterator) + next(response_iterator) + request_iterator.close() + + self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED) + + def test_context_manager_close_while_many_calls_active(self): + with grpc.insecure_channel('localhost:{}'.format( + self._port)) as channel: # pylint: disable=bad-continuation + multi_callable = channel.stream_stream('Meffod') + request_iterators = tuple( + _Pipe((b'abc',)) + for _ in range(test_constants.THREAD_CONCURRENCY)) + response_iterators = [] + for request_iterator in request_iterators: + response_iterator = multi_callable(request_iterator) + next(response_iterator) + response_iterators.append(response_iterator) + for request_iterator in request_iterators: + request_iterator.close() + + for response_iterator in response_iterators: + self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED) + + def test_many_concurrent_closes(self): + channel = grpc.insecure_channel('localhost:{}'.format(self._port)) + multi_callable = channel.stream_stream('Meffod') + request_iterator = _Pipe((b'abc',)) + response_iterator = multi_callable(request_iterator) + next(response_iterator) + start = time.time() + end = start + _MORE_TIME + + def sleep_some_time_then_close(): + time.sleep(_SOME_TIME) + channel.close() + + for _ in range(test_constants.THREAD_CONCURRENCY): + close_thread = threading.Thread(target=sleep_some_time_then_close) + close_thread.start() + while True: + request_iterator.add(b'def') + time.sleep(_BEAT) + if end < time.time(): + break + request_iterator.close() + + self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py index 7550cd39ba..0b11f03adf 100644 --- a/src/python/grpcio_tests/tests/unit/_compression_test.py +++ b/src/python/grpcio_tests/tests/unit/_compression_test.py @@ -52,9 +52,9 @@ class _MethodHandler(grpc.RpcMethodHandler): self.stream_unary = None self.stream_stream = None if self.request_streaming and self.response_streaming: - self.stream_stream = lambda x, y: handle_stream(x, y) + self.stream_stream = handle_stream elif not self.request_streaming and not self.response_streaming: - self.unary_unary = lambda x, y: handle_unary(x, y) + self.unary_unary = handle_unary class _GenericHandler(grpc.GenericRpcHandler): diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py index 3765ce4fb0..578a3d79ad 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py @@ -19,6 +19,7 @@ import unittest from grpc._cython import cygrpc from grpc.framework.foundation import logging_pool from tests.unit.framework.common import test_constants +from tests.unit._cython import test_utilities _EMPTY_FLAGS = 0 _EMPTY_METADATA = () @@ -30,6 +31,8 @@ _RECEIVE_MESSAGE_TAG = 'receive_message' _SERVER_COMPLETE_CALL_TAG = 'server_complete_call' _SUCCESS_CALL_FRACTION = 1.0 / 8.0 +_SUCCESSFUL_CALLS = int(test_constants.RPC_CONCURRENCY * _SUCCESS_CALL_FRACTION) +_UNSUCCESSFUL_CALLS = test_constants.RPC_CONCURRENCY - _SUCCESSFUL_CALLS class _State(object): @@ -43,7 +46,7 @@ class _State(object): def _is_cancellation_event(event): return (event.tag is _RECEIVE_CLOSE_ON_SERVER_TAG and - event.batch_operations[0].received_cancelled) + event.batch_operations[0].cancelled()) class _Handler(object): @@ -150,7 +153,8 @@ class CancelManyCallsTest(unittest.TestCase): server.register_completion_queue(server_completion_queue) port = server.add_http2_port(b'[::]:0') server.start() - channel = cygrpc.Channel('localhost:{}'.format(port).encode(), None) + channel = cygrpc.Channel('localhost:{}'.format(port).encode(), None, + None) state = _State() @@ -165,31 +169,33 @@ class CancelManyCallsTest(unittest.TestCase): client_condition = threading.Condition() client_due = set() - client_completion_queue = cygrpc.CompletionQueue() - client_driver = _QueueDriver(client_condition, client_completion_queue, - client_due) - client_driver.start() with client_condition: client_calls = [] for index in range(test_constants.RPC_CONCURRENCY): - client_call = channel.create_call(None, _EMPTY_FLAGS, - client_completion_queue, - b'/twinkies', None, None) - operations = ( - cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.SendMessageOperation(b'\x45\x56', _EMPTY_FLAGS), - cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), - cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), - cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), - ) tag = 'client_complete_call_{0:04d}_tag'.format(index) - client_call.start_client_batch(operations, tag) + client_call = channel.integrated_call( + _EMPTY_FLAGS, b'/twinkies', None, None, _EMPTY_METADATA, + None, (( + ( + cygrpc.SendInitialMetadataOperation( + _EMPTY_METADATA, _EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'\x45\x56', + _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation( + _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ), + tag, + ),)) client_due.add(tag) client_calls.append(client_call) + client_events_future = test_utilities.SimpleFuture( + lambda: tuple(channel.next_call_event() for _ in range(_SUCCESSFUL_CALLS))) + with state.condition: while True: if state.parked_handlers < test_constants.THREAD_CONCURRENCY: @@ -201,12 +207,14 @@ class CancelManyCallsTest(unittest.TestCase): state.condition.notify_all() break - client_driver.events( - test_constants.RPC_CONCURRENCY * _SUCCESS_CALL_FRACTION) + client_events_future.result() with client_condition: for client_call in client_calls: - client_call.cancel() + client_call.cancel(cygrpc.StatusCode.cancelled, 'Cancelled!') + for _ in range(_UNSUCCESSFUL_CALLS): + channel.next_call_event() + channel.close(cygrpc.StatusCode.unknown, 'Cancelled on channel close!') with state.condition: server.shutdown(server_completion_queue, _SERVER_SHUTDOWN_TAG) diff --git a/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py b/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py index 7305d0fa3f..d95286071d 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py @@ -21,25 +21,20 @@ from grpc._cython import cygrpc from tests.unit.framework.common import test_constants -def _channel_and_completion_queue(): - channel = cygrpc.Channel(b'localhost:54321', ()) - completion_queue = cygrpc.CompletionQueue() - return channel, completion_queue +def _channel(): + return cygrpc.Channel(b'localhost:54321', (), None) -def _connectivity_loop(channel, completion_queue): +def _connectivity_loop(channel): for _ in range(100): connectivity = channel.check_connectivity_state(True) - channel.watch_connectivity_state(connectivity, - time.time() + 0.2, completion_queue, - None) - completion_queue.poll() + channel.watch_connectivity_state(connectivity, time.time() + 0.2) def _create_loop_destroy(): - channel, completion_queue = _channel_and_completion_queue() - _connectivity_loop(channel, completion_queue) - completion_queue.shutdown() + channel = _channel() + _connectivity_loop(channel) + channel.close(cygrpc.StatusCode.ok, 'Channel close!') def _in_parallel(behavior, arguments): @@ -55,12 +50,9 @@ def _in_parallel(behavior, arguments): class ChannelTest(unittest.TestCase): def test_single_channel_lonely_connectivity(self): - channel, completion_queue = _channel_and_completion_queue() - _in_parallel(_connectivity_loop, ( - channel, - completion_queue, - )) - completion_queue.shutdown() + channel = _channel() + _connectivity_loop(channel) + channel.close(cygrpc.StatusCode.ok, 'Channel close!') def test_multiple_channels_lonely_connectivity(self): _in_parallel(_create_loop_destroy, ()) diff --git a/src/python/grpcio_tests/tests/unit/_cython/_common.py b/src/python/grpcio_tests/tests/unit/_cython/_common.py index 7fd3d19b4e..d8210f36f8 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_common.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_common.py @@ -100,7 +100,8 @@ class RpcTest(object): self.server.register_completion_queue(self.server_completion_queue) port = self.server.add_http2_port(b'[::]:0') self.server.start() - self.channel = cygrpc.Channel('localhost:{}'.format(port).encode(), []) + self.channel = cygrpc.Channel('localhost:{}'.format(port).encode(), [], + None) self._server_shutdown_tag = 'server_shutdown_tag' self.server_condition = threading.Condition() diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py index 7caa98f72d..8a721788f4 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py @@ -19,6 +19,7 @@ import unittest from grpc._cython import cygrpc from tests.unit._cython import _common +from tests.unit._cython import test_utilities class Test(_common.RpcTest, unittest.TestCase): @@ -41,31 +42,27 @@ class Test(_common.RpcTest, unittest.TestCase): server_request_call_tag, }) - client_call = self.channel.create_call(None, _common.EMPTY_FLAGS, - self.client_completion_queue, - b'/twinkies', None, None) client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' client_complete_rpc_tag = 'client_complete_rpc_tag' - with self.client_condition: - client_receive_initial_metadata_start_batch_result = ( - client_call.start_client_batch([ - cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), - ], client_receive_initial_metadata_tag)) - self.assertEqual(cygrpc.CallError.ok, - client_receive_initial_metadata_start_batch_result) - client_complete_rpc_start_batch_result = client_call.start_client_batch( + client_call = self.channel.integrated_call( + _common.EMPTY_FLAGS, b'/twinkies', None, None, + _common.INVOCATION_METADATA, None, [( [ - cygrpc.SendInitialMetadataOperation( - _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), - cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS), - ], client_complete_rpc_tag) - self.assertEqual(cygrpc.CallError.ok, - client_complete_rpc_start_batch_result) - self.client_driver.add_due({ + cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), + ], client_receive_initial_metadata_tag, - client_complete_rpc_tag, - }) + )]) + client_call.operate([ + cygrpc.SendInitialMetadataOperation(_common.INVOCATION_METADATA, + _common.EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS), + ], client_complete_rpc_tag) + + client_events_future = test_utilities.SimpleFuture( + lambda: [ + self.channel.next_call_event(), + self.channel.next_call_event(),]) server_request_call_event = self.server_driver.event_with_tag( server_request_call_tag) @@ -96,20 +93,23 @@ class Test(_common.RpcTest, unittest.TestCase): server_complete_rpc_event = server_call_driver.event_with_tag( server_complete_rpc_tag) - client_receive_initial_metadata_event = self.client_driver.event_with_tag( - client_receive_initial_metadata_tag) - client_complete_rpc_event = self.client_driver.event_with_tag( - client_complete_rpc_tag) + client_events = client_events_future.result() + if client_events[0].tag is client_receive_initial_metadata_tag: + client_receive_initial_metadata_event = client_events[0] + client_complete_rpc_event = client_events[1] + else: + client_complete_rpc_event = client_events[0] + client_receive_initial_metadata_event = client_events[1] return ( _common.OperationResult(server_request_call_start_batch_result, server_request_call_event.completion_type, server_request_call_event.success), _common.OperationResult( - client_receive_initial_metadata_start_batch_result, + cygrpc.CallError.ok, client_receive_initial_metadata_event.completion_type, client_receive_initial_metadata_event.success), - _common.OperationResult(client_complete_rpc_start_batch_result, + _common.OperationResult(cygrpc.CallError.ok, client_complete_rpc_event.completion_type, client_complete_rpc_event.success), _common.OperationResult( diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py index 8582a39c01..47f39ebce2 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py @@ -19,6 +19,7 @@ import unittest from grpc._cython import cygrpc from tests.unit._cython import _common +from tests.unit._cython import test_utilities class Test(_common.RpcTest, unittest.TestCase): @@ -36,28 +37,31 @@ class Test(_common.RpcTest, unittest.TestCase): server_request_call_tag, }) - client_call = self.channel.create_call(None, _common.EMPTY_FLAGS, - self.client_completion_queue, - b'/twinkies', None, None) client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' client_complete_rpc_tag = 'client_complete_rpc_tag' - with self.client_condition: - client_receive_initial_metadata_start_batch_result = ( - client_call.start_client_batch([ - cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), - ], client_receive_initial_metadata_tag)) - client_complete_rpc_start_batch_result = client_call.start_client_batch( - [ - cygrpc.SendInitialMetadataOperation( - _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), - cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS), - ], client_complete_rpc_tag) - self.client_driver.add_due({ - client_receive_initial_metadata_tag, - client_complete_rpc_tag, - }) - + client_call = self.channel.integrated_call( + _common.EMPTY_FLAGS, b'/twinkies', None, None, + _common.INVOCATION_METADATA, None, [ + ( + [ + cygrpc.SendInitialMetadataOperation( + _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation( + _common.EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation( + _common.EMPTY_FLAGS), + ], + client_complete_rpc_tag, + ), + ]) + client_call.operate([ + cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), + ], client_receive_initial_metadata_tag) + + client_events_future = test_utilities.SimpleFuture( + lambda: [ + self.channel.next_call_event(), + self.channel.next_call_event(),]) server_request_call_event = self.server_driver.event_with_tag( server_request_call_tag) @@ -87,20 +91,19 @@ class Test(_common.RpcTest, unittest.TestCase): server_complete_rpc_event = self.server_driver.event_with_tag( server_complete_rpc_tag) - client_receive_initial_metadata_event = self.client_driver.event_with_tag( - client_receive_initial_metadata_tag) - client_complete_rpc_event = self.client_driver.event_with_tag( - client_complete_rpc_tag) + client_events = client_events_future.result() + client_receive_initial_metadata_event = client_events[0] + client_complete_rpc_event = client_events[1] return ( _common.OperationResult(server_request_call_start_batch_result, server_request_call_event.completion_type, server_request_call_event.success), _common.OperationResult( - client_receive_initial_metadata_start_batch_result, + cygrpc.CallError.ok, client_receive_initial_metadata_event.completion_type, client_receive_initial_metadata_event.success), - _common.OperationResult(client_complete_rpc_start_batch_result, + _common.OperationResult(cygrpc.CallError.ok, client_complete_rpc_event.completion_type, client_complete_rpc_event.success), _common.OperationResult( diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py index bc63b54879..8a903bfaf9 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py @@ -17,6 +17,7 @@ import threading import unittest from grpc._cython import cygrpc +from tests.unit._cython import test_utilities _EMPTY_FLAGS = 0 _EMPTY_METADATA = () @@ -118,7 +119,8 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): server.register_completion_queue(server_completion_queue) port = server.add_http2_port(b'[::]:0') server.start() - channel = cygrpc.Channel('localhost:{}'.format(port).encode(), set()) + channel = cygrpc.Channel('localhost:{}'.format(port).encode(), set(), + None) server_shutdown_tag = 'server_shutdown_tag' server_driver = _ServerDriver(server_completion_queue, @@ -127,10 +129,6 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): client_condition = threading.Condition() client_due = set() - client_completion_queue = cygrpc.CompletionQueue() - client_driver = _QueueDriver(client_condition, client_completion_queue, - client_due) - client_driver.start() server_call_condition = threading.Condition() server_send_initial_metadata_tag = 'server_send_initial_metadata_tag' @@ -154,25 +152,28 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): server_completion_queue, server_rpc_tag) - client_call = channel.create_call(None, _EMPTY_FLAGS, - client_completion_queue, b'/twinkies', - None, None) client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' client_complete_rpc_tag = 'client_complete_rpc_tag' - with client_condition: - client_receive_initial_metadata_start_batch_result = ( - client_call.start_client_batch([ - cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), - ], client_receive_initial_metadata_tag)) - client_due.add(client_receive_initial_metadata_tag) - client_complete_rpc_start_batch_result = ( - client_call.start_client_batch([ - cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), - ], client_complete_rpc_tag)) - client_due.add(client_complete_rpc_tag) + client_call = channel.segregated_call( + _EMPTY_FLAGS, b'/twinkies', None, None, _EMPTY_METADATA, None, ( + ( + [ + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + ], + client_receive_initial_metadata_tag, + ), + ( + [ + cygrpc.SendInitialMetadataOperation( + _EMPTY_METADATA, _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ], + client_complete_rpc_tag, + ), + )) + client_receive_initial_metadata_event_future = test_utilities.SimpleFuture( + client_call.next_event) server_rpc_event = server_driver.first_event() @@ -208,19 +209,20 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): server_complete_rpc_tag) server_call_driver.events() - with client_condition: - client_receive_first_message_tag = 'client_receive_first_message_tag' - client_receive_first_message_start_batch_result = ( - client_call.start_client_batch([ - cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), - ], client_receive_first_message_tag)) - client_due.add(client_receive_first_message_tag) - client_receive_first_message_event = client_driver.event_with_tag( - client_receive_first_message_tag) + client_recieve_initial_metadata_event = client_receive_initial_metadata_event_future.result( + ) + + client_receive_first_message_tag = 'client_receive_first_message_tag' + client_call.operate([ + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + ], client_receive_first_message_tag) + client_receive_first_message_event = client_call.next_event() - client_call_cancel_result = client_call.cancel() - client_driver.events() + client_call_cancel_result = client_call.cancel( + cygrpc.StatusCode.cancelled, 'Cancelled during test!') + client_complete_rpc_event = client_call.next_event() + channel.close(cygrpc.StatusCode.unknown, 'Channel closed!') server.shutdown(server_completion_queue, server_shutdown_tag) server.cancel_all_calls() server_driver.events() @@ -228,11 +230,6 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): self.assertEqual(cygrpc.CallError.ok, request_call_result) self.assertEqual(cygrpc.CallError.ok, server_send_initial_metadata_start_batch_result) - self.assertEqual(cygrpc.CallError.ok, - client_receive_initial_metadata_start_batch_result) - self.assertEqual(cygrpc.CallError.ok, - client_complete_rpc_start_batch_result) - self.assertEqual(cygrpc.CallError.ok, client_call_cancel_result) self.assertIs(server_rpc_tag, server_rpc_event.tag) self.assertEqual(cygrpc.CompletionType.operation_complete, server_rpc_event.completion_type) diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py index 9045ff58a0..724a690746 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py @@ -51,8 +51,8 @@ class TypeSmokeTest(unittest.TestCase): del server def testChannelUpDown(self): - channel = cygrpc.Channel(b'[::]:0', None) - del channel + channel = cygrpc.Channel(b'[::]:0', None, None) + channel.close(cygrpc.StatusCode.cancelled, 'Test method anyway!') def test_metadata_plugin_call_credentials_up_down(self): cygrpc.MetadataPluginCallCredentials(_metadata_plugin, @@ -121,7 +121,7 @@ class ServerClientMixin(object): client_credentials) else: self.client_channel = cygrpc.Channel('localhost:{}'.format( - self.port).encode(), set()) + self.port).encode(), set(), None) if host_override: self.host_argument = None # default host self.expected_host = host_override @@ -131,17 +131,20 @@ class ServerClientMixin(object): self.expected_host = self.host_argument def tearDownMixin(self): + self.client_channel.close(cygrpc.StatusCode.ok, 'test being torn down!') + del self.client_channel del self.server del self.client_completion_queue del self.server_completion_queue - def _perform_operations(self, operations, call, queue, deadline, - description): - """Perform the list of operations with given call, queue, and deadline. + def _perform_queue_operations(self, operations, call, queue, deadline, + description): + """Perform the operations with given call, queue, and deadline. - Invocation errors are reported with as an exception with `description` in - the message. Performs the operations asynchronously, returning a future. - """ + Invocation errors are reported with as an exception with `description` + in the message. Performs the operations asynchronously, returning a + future. + """ def performer(): tag = object() @@ -185,9 +188,6 @@ class ServerClientMixin(object): self.assertEqual(cygrpc.CallError.ok, request_call_result) client_call_tag = object() - client_call = self.client_channel.create_call( - None, 0, self.client_completion_queue, METHOD, self.host_argument, - DEADLINE) client_initial_metadata = ( ( CLIENT_METADATA_ASCII_KEY, @@ -198,18 +198,24 @@ class ServerClientMixin(object): CLIENT_METADATA_BIN_VALUE, ), ) - client_start_batch_result = client_call.start_client_batch([ - cygrpc.SendInitialMetadataOperation(client_initial_metadata, - _EMPTY_FLAGS), - cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS), - cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), - cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), - cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), - ], client_call_tag) - self.assertEqual(cygrpc.CallError.ok, client_start_batch_result) - client_event_future = test_utilities.CompletionQueuePollFuture( - self.client_completion_queue, DEADLINE) + client_call = self.client_channel.integrated_call( + 0, METHOD, self.host_argument, DEADLINE, client_initial_metadata, + None, [ + ( + [ + cygrpc.SendInitialMetadataOperation( + client_initial_metadata, _EMPTY_FLAGS), + cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ], + client_call_tag, + ), + ]) + client_event_future = test_utilities.SimpleFuture( + self.client_channel.next_call_event) request_event = self.server_completion_queue.poll(deadline=DEADLINE) self.assertEqual(cygrpc.CompletionType.operation_complete, @@ -285,7 +291,7 @@ class ServerClientMixin(object): self.assertEqual(5, len(server_event.batch_operations)) found_server_op_types = set() for server_result in server_event.batch_operations: - self.assertNotIn(client_result.type(), found_server_op_types) + self.assertNotIn(server_result.type(), found_server_op_types) found_server_op_types.add(server_result.type()) if server_result.type() == cygrpc.OperationType.receive_message: self.assertEqual(REQUEST, server_result.message()) @@ -304,66 +310,76 @@ class ServerClientMixin(object): del client_call del server_call - def test6522(self): + def test_6522(self): DEADLINE = time.time() + 5 DEADLINE_TOLERANCE = 0.25 METHOD = b'twinkies' empty_metadata = () + # Prologue server_request_tag = object() self.server.request_call(self.server_completion_queue, self.server_completion_queue, server_request_tag) - client_call = self.client_channel.create_call( - None, 0, self.client_completion_queue, METHOD, self.host_argument, - DEADLINE) - - # Prologue - def perform_client_operations(operations, description): - return self._perform_operations(operations, client_call, - self.client_completion_queue, - DEADLINE, description) - - client_event_future = perform_client_operations([ - cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS), - cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), - ], "Client prologue") + client_call = self.client_channel.segregated_call( + 0, METHOD, self.host_argument, DEADLINE, None, None, ([( + [ + cygrpc.SendInitialMetadataOperation(empty_metadata, + _EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + ], + object(), + ), ( + [ + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ], + object(), + )])) + + client_initial_metadata_event_future = test_utilities.SimpleFuture( + client_call.next_event) request_event = self.server_completion_queue.poll(deadline=DEADLINE) server_call = request_event.call def perform_server_operations(operations, description): - return self._perform_operations(operations, server_call, - self.server_completion_queue, - DEADLINE, description) + return self._perform_queue_operations(operations, server_call, + self.server_completion_queue, + DEADLINE, description) server_event_future = perform_server_operations([ cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS), ], "Server prologue") - client_event_future.result() # force completion + client_initial_metadata_event_future.result() # force completion server_event_future.result() # Messaging for _ in range(10): - client_event_future = perform_client_operations([ + client_call.operate([ cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS), cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], "Client message") + client_message_event_future = test_utilities.SimpleFuture( + client_call.next_event) server_event_future = perform_server_operations([ cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS), cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], "Server receive") - client_event_future.result() # force completion + client_message_event_future.result() # force completion server_event_future.result() # Epilogue - client_event_future = perform_client_operations([ + client_call.operate([ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS) ], "Client epilogue") + # One for ReceiveStatusOnClient, one for SendCloseFromClient. + client_events_future = test_utilities.SimpleFuture( + lambda: { + client_call.next_event(), + client_call.next_event(),}) server_event_future = perform_server_operations([ cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), @@ -371,7 +387,7 @@ class ServerClientMixin(object): empty_metadata, cygrpc.StatusCode.ok, b'', _EMPTY_FLAGS) ], "Server epilogue") - client_event_future.result() # force completion + client_events_future.result() # force completion server_event_future.result() diff --git a/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py index 4a00b9ef2f..7d5eaaaa84 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py +++ b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py @@ -25,7 +25,7 @@ class SimpleFuture(object): def wrapped_function(): try: self._result = function(*args, **kwargs) - except Exception as error: + except Exception as error: # pylint: disable=broad-except self._error = error self._result = None @@ -41,7 +41,7 @@ class SimpleFuture(object): self._thread.join() if self._error: # TODO(atash): re-raise exceptions in a way that preserves tracebacks - raise self._error + raise self._error # pylint: disable=raising-bad-type return self._result diff --git a/src/python/grpcio_tests/tests/unit/_exit_test.py b/src/python/grpcio_tests/tests/unit/_exit_test.py index 6e6d9de0fb..f40f3ae07c 100644 --- a/src/python/grpcio_tests/tests/unit/_exit_test.py +++ b/src/python/grpcio_tests/tests/unit/_exit_test.py @@ -49,7 +49,7 @@ def cleanup_processes(): for process in processes: try: process.kill() - except Exception: + except Exception: # pylint: disable=broad-except pass diff --git a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py index e683131722..ad847ae03e 100644 --- a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py +++ b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py @@ -14,7 +14,7 @@ _BEFORE_IMPORT = tuple(globals()) -from grpc import * +from grpc import * # pylint: disable=wildcard-import _AFTER_IMPORT = tuple(globals()) diff --git a/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py index 4edf0fc4ad..f153089a24 100644 --- a/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py +++ b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py @@ -81,29 +81,16 @@ class InvalidMetadataTest(unittest.TestCase): request = b'\x07\x08' metadata = (('InVaLiD', 'UnaryRequestFutureUnaryResponse'),) expected_error_details = "metadata was invalid: %s" % metadata - response_future = self._unary_unary.future(request, metadata=metadata) - with self.assertRaises(grpc.RpcError) as exception_context: - response_future.result() - self.assertEqual(exception_context.exception.details(), - expected_error_details) - self.assertEqual(exception_context.exception.code(), - grpc.StatusCode.INTERNAL) - self.assertEqual(response_future.details(), expected_error_details) - self.assertEqual(response_future.code(), grpc.StatusCode.INTERNAL) + with self.assertRaises(ValueError) as exception_context: + self._unary_unary.future(request, metadata=metadata) def testUnaryRequestStreamResponse(self): request = b'\x37\x58' metadata = (('InVaLiD', 'UnaryRequestStreamResponse'),) expected_error_details = "metadata was invalid: %s" % metadata - response_iterator = self._unary_stream(request, metadata=metadata) - with self.assertRaises(grpc.RpcError) as exception_context: - next(response_iterator) - self.assertEqual(exception_context.exception.details(), - expected_error_details) - self.assertEqual(exception_context.exception.code(), - grpc.StatusCode.INTERNAL) - self.assertEqual(response_iterator.details(), expected_error_details) - self.assertEqual(response_iterator.code(), grpc.StatusCode.INTERNAL) + with self.assertRaises(ValueError) as exception_context: + self._unary_stream(request, metadata=metadata) + self.assertIn(expected_error_details, str(exception_context.exception)) def testStreamRequestBlockingUnaryResponse(self): request_iterator = ( @@ -129,32 +116,18 @@ class InvalidMetadataTest(unittest.TestCase): b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) metadata = (('InVaLiD', 'StreamRequestFutureUnaryResponse'),) expected_error_details = "metadata was invalid: %s" % metadata - response_future = self._stream_unary.future( - request_iterator, metadata=metadata) - with self.assertRaises(grpc.RpcError) as exception_context: - response_future.result() - self.assertEqual(exception_context.exception.details(), - expected_error_details) - self.assertEqual(exception_context.exception.code(), - grpc.StatusCode.INTERNAL) - self.assertEqual(response_future.details(), expected_error_details) - self.assertEqual(response_future.code(), grpc.StatusCode.INTERNAL) + with self.assertRaises(ValueError) as exception_context: + self._stream_unary.future(request_iterator, metadata=metadata) + self.assertIn(expected_error_details, str(exception_context.exception)) def testStreamRequestStreamResponse(self): request_iterator = ( b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) metadata = (('InVaLiD', 'StreamRequestStreamResponse'),) expected_error_details = "metadata was invalid: %s" % metadata - response_iterator = self._stream_stream( - request_iterator, metadata=metadata) - with self.assertRaises(grpc.RpcError) as exception_context: - next(response_iterator) - self.assertEqual(exception_context.exception.details(), - expected_error_details) - self.assertEqual(exception_context.exception.code(), - grpc.StatusCode.INTERNAL) - self.assertEqual(response_iterator.details(), expected_error_details) - self.assertEqual(response_iterator.code(), grpc.StatusCode.INTERNAL) + with self.assertRaises(ValueError) as exception_context: + self._stream_stream(request_iterator, metadata=metadata) + self.assertIn(expected_error_details, str(exception_context.exception)) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py index e40cca8b24..93a5fdf9ff 100644 --- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py +++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py @@ -165,11 +165,13 @@ class FailAfterFewIterationsCounter(object): def __next__(self): if self._current >= self._high: - raise Exception("This is a deliberate failure in a unit test.") + raise test_control.Defect() else: self._current += 1 return self._bytestring + next = __next__ + def _unary_unary_multi_callable(channel): return channel.unary_unary(_UNARY_UNARY) diff --git a/src/python/grpcio_tests/tests/unit/_reconnect_test.py b/src/python/grpcio_tests/tests/unit/_reconnect_test.py index 8acba5a30b..a708d8d862 100644 --- a/src/python/grpcio_tests/tests/unit/_reconnect_test.py +++ b/src/python/grpcio_tests/tests/unit/_reconnect_test.py @@ -89,7 +89,10 @@ class ReconnectTest(unittest.TestCase): multi_callable = channel.unary_unary(_UNARY_UNARY) self.assertEqual(_RESPONSE, multi_callable(_REQUEST)) server.stop(None) - time.sleep(1) + # By default, the channel connectivity is checked every 5s + # GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS can be set to change + # this. + time.sleep(5.1) server = grpc.server(server_pool, (handler,)) server.add_insecure_port('[::]:{}'.format(port)) server.start() diff --git a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py b/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py deleted file mode 100644 index 18f5af058a..0000000000 --- a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py +++ /dev/null @@ -1,115 +0,0 @@ -# Copyright 2016 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for CleanupThread.""" - -import threading -import time -import unittest - -from grpc import _common - -_SHORT_TIME = 0.5 -_LONG_TIME = 5.0 -_EPSILON = 0.5 - - -def cleanup(timeout): - if timeout is not None: - time.sleep(timeout) - else: - time.sleep(_LONG_TIME) - - -def slow_cleanup(timeout): - # Don't respect timeout - time.sleep(_LONG_TIME) - - -class CleanupThreadTest(unittest.TestCase): - - def testTargetInvocation(self): - event = threading.Event() - - def target(arg1, arg2, arg3=None): - self.assertEqual('arg1', arg1) - self.assertEqual('arg2', arg2) - self.assertEqual('arg3', arg3) - event.set() - - cleanup_thread = _common.CleanupThread( - behavior=lambda x: None, - target=target, - name='test-name', - args=('arg1', 'arg2'), - kwargs={ - 'arg3': 'arg3' - }) - cleanup_thread.start() - cleanup_thread.join() - self.assertEqual(cleanup_thread.name, 'test-name') - self.assertTrue(event.is_set()) - - def testJoinNoTimeout(self): - cleanup_thread = _common.CleanupThread(behavior=cleanup) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join() - end_time = time.time() - self.assertAlmostEqual( - _LONG_TIME, end_time - start_time, delta=_EPSILON) - - def testJoinTimeout(self): - cleanup_thread = _common.CleanupThread(behavior=cleanup) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join(_SHORT_TIME) - end_time = time.time() - self.assertAlmostEqual( - _SHORT_TIME, end_time - start_time, delta=_EPSILON) - - def testJoinTimeoutSlowBehavior(self): - cleanup_thread = _common.CleanupThread(behavior=slow_cleanup) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join(_SHORT_TIME) - end_time = time.time() - self.assertAlmostEqual( - _LONG_TIME, end_time - start_time, delta=_EPSILON) - - def testJoinTimeoutSlowTarget(self): - event = threading.Event() - - def target(): - event.wait(_LONG_TIME) - - cleanup_thread = _common.CleanupThread(behavior=cleanup, target=target) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join(_SHORT_TIME) - end_time = time.time() - self.assertAlmostEqual( - _SHORT_TIME, end_time - start_time, delta=_EPSILON) - event.set() - - def testJoinZeroTimeout(self): - cleanup_thread = _common.CleanupThread(behavior=cleanup) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join(0) - end_time = time.time() - self.assertAlmostEqual(0, end_time - start_time, delta=_EPSILON) - - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py index 61c03f64ba..b43c647fc9 100644 --- a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py +++ b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py @@ -65,7 +65,7 @@ class _Servicer(object): self._serviced = True self._condition.notify_all() return - yield + yield # pylint: disable=unreachable def stream_unary(self, request_iterator, context): for request in request_iterator: diff --git a/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py b/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py deleted file mode 100644 index c99738e085..0000000000 --- a/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py +++ /dev/null @@ -1,132 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests Face interface compliance of the gRPC Python Beta API.""" - -import collections -import unittest - -import six - -from grpc.beta import implementations -from grpc.beta import interfaces -from tests.unit import resources -from tests.unit import test_common as grpc_test_common -from tests.unit.beta import test_utilities -from tests.unit.framework.common import test_constants -from tests.unit.framework.interfaces.face import test_cases -from tests.unit.framework.interfaces.face import test_interfaces - -_SERVER_HOST_OVERRIDE = 'foo.test.google.fr' - - -class _SerializationBehaviors( - collections.namedtuple('_SerializationBehaviors', ( - 'request_serializers', - 'request_deserializers', - 'response_serializers', - 'response_deserializers', - ))): - pass - - -def _serialization_behaviors_from_test_methods(test_methods): - request_serializers = {} - request_deserializers = {} - response_serializers = {} - response_deserializers = {} - for (group, method), test_method in six.iteritems(test_methods): - request_serializers[group, method] = test_method.serialize_request - request_deserializers[group, method] = test_method.deserialize_request - response_serializers[group, method] = test_method.serialize_response - response_deserializers[group, method] = test_method.deserialize_response - return _SerializationBehaviors(request_serializers, request_deserializers, - response_serializers, response_deserializers) - - -class _Implementation(test_interfaces.Implementation): - - def instantiate(self, methods, method_implementations, - multi_method_implementation): - serialization_behaviors = _serialization_behaviors_from_test_methods( - methods) - # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest. - service = next(iter(methods))[0] - # TODO(nathaniel): Add a "cardinalities_by_group" attribute to - # _digest.TestServiceDigest. - cardinalities = { - method: method_object.cardinality() - for (group, method), method_object in six.iteritems(methods) - } - - server_options = implementations.server_options( - request_deserializers=serialization_behaviors.request_deserializers, - response_serializers=serialization_behaviors.response_serializers, - thread_pool_size=test_constants.POOL_SIZE) - server = implementations.server( - method_implementations, options=server_options) - server_credentials = implementations.ssl_server_credentials([ - ( - resources.private_key(), - resources.certificate_chain(), - ), - ]) - port = server.add_secure_port('[::]:0', server_credentials) - server.start() - channel_credentials = implementations.ssl_channel_credentials( - resources.test_root_certificates()) - channel = test_utilities.not_really_secure_channel( - 'localhost', port, channel_credentials, _SERVER_HOST_OVERRIDE) - stub_options = implementations.stub_options( - request_serializers=serialization_behaviors.request_serializers, - response_deserializers=serialization_behaviors. - response_deserializers, - thread_pool_size=test_constants.POOL_SIZE) - generic_stub = implementations.generic_stub( - channel, options=stub_options) - dynamic_stub = implementations.dynamic_stub( - channel, service, cardinalities, options=stub_options) - return generic_stub, {service: dynamic_stub}, server - - def destantiate(self, memo): - memo.stop(test_constants.SHORT_TIMEOUT).wait() - - def invocation_metadata(self): - return grpc_test_common.INVOCATION_INITIAL_METADATA - - def initial_metadata(self): - return grpc_test_common.SERVICE_INITIAL_METADATA - - def terminal_metadata(self): - return grpc_test_common.SERVICE_TERMINAL_METADATA - - def code(self): - return interfaces.StatusCode.OK - - def details(self): - return grpc_test_common.DETAILS - - def metadata_transmitted(self, original_metadata, transmitted_metadata): - return original_metadata is None or grpc_test_common.metadata_transmitted( - original_metadata, transmitted_metadata) - - -def load_tests(loader, tests, pattern): - return unittest.TestSuite( - tests=tuple( - loader.loadTestsFromTestCase(test_case_class) - for test_case_class in test_cases.test_cases(_Implementation()))) - - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py deleted file mode 100644 index 5fb4f3c3cf..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py deleted file mode 100644 index 5d8679aa62..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py +++ /dev/null @@ -1,287 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Test code for the Face layer of RPC Framework.""" - -from __future__ import division - -import abc -import itertools -import unittest -from concurrent import futures - -import six - -# test_interfaces is referenced from specification in this module. -from grpc.framework.foundation import logging_pool -from grpc.framework.interfaces.face import face -from tests.unit.framework.common import test_constants -from tests.unit.framework.common import test_control -from tests.unit.framework.common import test_coverage -from tests.unit.framework.interfaces.face import _3069_test_constant -from tests.unit.framework.interfaces.face import _digest -from tests.unit.framework.interfaces.face import _stock_service -from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import - - -class TestCase( - six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, - unittest.TestCase)): - """A test of the Face layer of RPC Framework. - - Concrete subclasses must have an "implementation" attribute of type - test_interfaces.Implementation and an "invoker_constructor" attribute of type - _invocation.InvokerConstructor. - """ - - NAME = 'BlockingInvocationInlineServiceTest' - - def setUp(self): - """See unittest.TestCase.setUp for full specification. - - Overriding implementations must call this implementation. - """ - self._control = test_control.PauseFailControl() - self._digest = _digest.digest(_stock_service.STOCK_TEST_SERVICE, - self._control, None) - - generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate( - self._digest.methods, self._digest.inline_method_implementations, - None) - self._invoker = self.invoker_constructor.construct_invoker( - generic_stub, dynamic_stubs, self._digest.methods) - - def tearDown(self): - """See unittest.TestCase.tearDown for full specification. - - Overriding implementations must call this implementation. - """ - self._invoker = None - self.implementation.destantiate(self._memo) - - def testSuccessfulUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - response, call = self._invoker.blocking(group, method)( - request, test_constants.LONG_TIMEOUT, with_call=True) - - test_messages.verify(request, response, self) - - def testSuccessfulUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - response_iterator = self._invoker.blocking(group, method)( - request, test_constants.LONG_TIMEOUT) - responses = list(response_iterator) - - test_messages.verify(request, responses, self) - - def testSuccessfulStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - response, call = self._invoker.blocking(group, method)( - iter(requests), test_constants.LONG_TIMEOUT, with_call=True) - - test_messages.verify(requests, response, self) - - def testSuccessfulStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - response_iterator = self._invoker.blocking(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - responses = list(response_iterator) - - test_messages.verify(requests, responses, self) - - def testSequentialInvocations(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - first_request = test_messages.request() - second_request = test_messages.request() - - first_response = self._invoker.blocking(group, method)( - first_request, test_constants.LONG_TIMEOUT) - - test_messages.verify(first_request, first_response, self) - - second_response = self._invoker.blocking(group, method)( - second_request, test_constants.LONG_TIMEOUT) - - test_messages.verify(second_request, second_response, self) - - def testParallelInvocations(self): - pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = [] - response_futures = [] - for _ in range(test_constants.THREAD_CONCURRENCY): - request = test_messages.request() - response_future = pool.submit( - self._invoker.blocking(group, method), request, - test_constants.LONG_TIMEOUT) - requests.append(request) - response_futures.append(response_future) - - responses = [ - response_future.result() - for response_future in response_futures - ] - - for request, response in zip(requests, responses): - test_messages.verify(request, response, self) - pool.shutdown(wait=True) - - def testWaitingForSomeButNotAllParallelInvocations(self): - pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = [] - response_futures_to_indices = {} - for index in range(test_constants.THREAD_CONCURRENCY): - request = test_messages.request() - response_future = pool.submit( - self._invoker.blocking(group, method), request, - test_constants.LONG_TIMEOUT) - requests.append(request) - response_futures_to_indices[response_future] = index - - some_completed_response_futures_iterator = itertools.islice( - futures.as_completed(response_futures_to_indices), - test_constants.THREAD_CONCURRENCY // 2) - for response_future in some_completed_response_futures_iterator: - index = response_futures_to_indices[response_future] - test_messages.verify(requests[index], - response_future.result(), self) - pool.shutdown(wait=True) - - @unittest.skip('Cancellation impossible with blocking control flow!') - def testCancelledUnaryRequestUnaryResponse(self): - raise NotImplementedError() - - @unittest.skip('Cancellation impossible with blocking control flow!') - def testCancelledUnaryRequestStreamResponse(self): - raise NotImplementedError() - - @unittest.skip('Cancellation impossible with blocking control flow!') - def testCancelledStreamRequestUnaryResponse(self): - raise NotImplementedError() - - @unittest.skip('Cancellation impossible with blocking control flow!') - def testCancelledStreamRequestStreamResponse(self): - raise NotImplementedError() - - def testExpiredUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.pause(), self.assertRaises( - face.ExpirationError): - self._invoker.blocking(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - - def testExpiredUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.pause(), self.assertRaises( - face.ExpirationError): - response_iterator = self._invoker.blocking(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - list(response_iterator) - - def testExpiredStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.pause(), self.assertRaises( - face.ExpirationError): - self._invoker.blocking( - group, method)(iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) - - def testExpiredStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.pause(), self.assertRaises( - face.ExpirationError): - response_iterator = self._invoker.blocking( - group, method)(iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) - list(response_iterator) - - def testFailedUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.fail(), self.assertRaises(face.RemoteError): - self._invoker.blocking(group, method)( - request, test_constants.LONG_TIMEOUT) - - def testFailedUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.fail(), self.assertRaises(face.RemoteError): - response_iterator = self._invoker.blocking(group, method)( - request, test_constants.LONG_TIMEOUT) - list(response_iterator) - - def testFailedStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.fail(), self.assertRaises(face.RemoteError): - self._invoker.blocking(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - - def testFailedStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.fail(), self.assertRaises(face.RemoteError): - response_iterator = self._invoker.blocking(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - list(response_iterator) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py deleted file mode 100644 index b1c33da43a..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py +++ /dev/null @@ -1,432 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Code for making a service.TestService more amenable to use in tests.""" - -import collections -import threading - -import six - -# test_control, _service, and test_interfaces are referenced from specification -# in this module. -from grpc.framework.common import cardinality -from grpc.framework.common import style -from grpc.framework.foundation import stream -from grpc.framework.foundation import stream_util -from grpc.framework.interfaces.face import face -from tests.unit.framework.common import test_control # pylint: disable=unused-import -from tests.unit.framework.interfaces.face import _service # pylint: disable=unused-import -from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import - -_IDENTITY = lambda x: x - - -class TestServiceDigest( - collections.namedtuple('TestServiceDigest', ( - 'methods', - 'inline_method_implementations', - 'event_method_implementations', - 'multi_method_implementation', - 'unary_unary_messages_sequences', - 'unary_stream_messages_sequences', - 'stream_unary_messages_sequences', - 'stream_stream_messages_sequences', - ))): - """A transformation of a service.TestService. - - Attributes: - methods: A dict from method group-name pair to test_interfaces.Method object - describing the RPC methods that may be called during the test. - inline_method_implementations: A dict from method group-name pair to - face.MethodImplementation object to be used in tests of in-line calls to - behaviors under test. - event_method_implementations: A dict from method group-name pair to - face.MethodImplementation object to be used in tests of event-driven calls - to behaviors under test. - multi_method_implementation: A face.MultiMethodImplementation to be used in - tests of generic calls to behaviors under test. - unary_unary_messages_sequences: A dict from method group-name pair to - sequence of service.UnaryUnaryTestMessages objects to be used to test the - identified method. - unary_stream_messages_sequences: A dict from method group-name pair to - sequence of service.UnaryStreamTestMessages objects to be used to test the - identified method. - stream_unary_messages_sequences: A dict from method group-name pair to - sequence of service.StreamUnaryTestMessages objects to be used to test the - identified method. - stream_stream_messages_sequences: A dict from method group-name pair to - sequence of service.StreamStreamTestMessages objects to be used to test - the identified method. - """ - - -class _BufferingConsumer(stream.Consumer): - """A trivial Consumer that dumps what it consumes in a user-mutable buffer.""" - - def __init__(self): - self.consumed = [] - self.terminated = False - - def consume(self, value): - self.consumed.append(value) - - def terminate(self): - self.terminated = True - - def consume_and_terminate(self, value): - self.consumed.append(value) - self.terminated = True - - -class _InlineUnaryUnaryMethod(face.MethodImplementation): - - def __init__(self, unary_unary_test_method, control): - self._test_method = unary_unary_test_method - self._control = control - - self.cardinality = cardinality.Cardinality.UNARY_UNARY - self.style = style.Service.INLINE - - def unary_unary_inline(self, request, context): - response_list = [] - self._test_method.service(request, response_list.append, context, - self._control) - return response_list.pop(0) - - -class _EventUnaryUnaryMethod(face.MethodImplementation): - - def __init__(self, unary_unary_test_method, control, pool): - self._test_method = unary_unary_test_method - self._control = control - self._pool = pool - - self.cardinality = cardinality.Cardinality.UNARY_UNARY - self.style = style.Service.EVENT - - def unary_unary_event(self, request, response_callback, context): - if self._pool is None: - self._test_method.service(request, response_callback, context, - self._control) - else: - self._pool.submit(self._test_method.service, request, - response_callback, context, self._control) - - -class _InlineUnaryStreamMethod(face.MethodImplementation): - - def __init__(self, unary_stream_test_method, control): - self._test_method = unary_stream_test_method - self._control = control - - self.cardinality = cardinality.Cardinality.UNARY_STREAM - self.style = style.Service.INLINE - - def unary_stream_inline(self, request, context): - response_consumer = _BufferingConsumer() - self._test_method.service(request, response_consumer, context, - self._control) - for response in response_consumer.consumed: - yield response - - -class _EventUnaryStreamMethod(face.MethodImplementation): - - def __init__(self, unary_stream_test_method, control, pool): - self._test_method = unary_stream_test_method - self._control = control - self._pool = pool - - self.cardinality = cardinality.Cardinality.UNARY_STREAM - self.style = style.Service.EVENT - - def unary_stream_event(self, request, response_consumer, context): - if self._pool is None: - self._test_method.service(request, response_consumer, context, - self._control) - else: - self._pool.submit(self._test_method.service, request, - response_consumer, context, self._control) - - -class _InlineStreamUnaryMethod(face.MethodImplementation): - - def __init__(self, stream_unary_test_method, control): - self._test_method = stream_unary_test_method - self._control = control - - self.cardinality = cardinality.Cardinality.STREAM_UNARY - self.style = style.Service.INLINE - - def stream_unary_inline(self, request_iterator, context): - response_list = [] - request_consumer = self._test_method.service(response_list.append, - context, self._control) - for request in request_iterator: - request_consumer.consume(request) - request_consumer.terminate() - return response_list.pop(0) - - -class _EventStreamUnaryMethod(face.MethodImplementation): - - def __init__(self, stream_unary_test_method, control, pool): - self._test_method = stream_unary_test_method - self._control = control - self._pool = pool - - self.cardinality = cardinality.Cardinality.STREAM_UNARY - self.style = style.Service.EVENT - - def stream_unary_event(self, response_callback, context): - request_consumer = self._test_method.service(response_callback, context, - self._control) - if self._pool is None: - return request_consumer - else: - return stream_util.ThreadSwitchingConsumer(request_consumer, - self._pool) - - -class _InlineStreamStreamMethod(face.MethodImplementation): - - def __init__(self, stream_stream_test_method, control): - self._test_method = stream_stream_test_method - self._control = control - - self.cardinality = cardinality.Cardinality.STREAM_STREAM - self.style = style.Service.INLINE - - def stream_stream_inline(self, request_iterator, context): - response_consumer = _BufferingConsumer() - request_consumer = self._test_method.service(response_consumer, context, - self._control) - - for request in request_iterator: - request_consumer.consume(request) - while response_consumer.consumed: - yield response_consumer.consumed.pop(0) - response_consumer.terminate() - - -class _EventStreamStreamMethod(face.MethodImplementation): - - def __init__(self, stream_stream_test_method, control, pool): - self._test_method = stream_stream_test_method - self._control = control - self._pool = pool - - self.cardinality = cardinality.Cardinality.STREAM_STREAM - self.style = style.Service.EVENT - - def stream_stream_event(self, response_consumer, context): - request_consumer = self._test_method.service(response_consumer, context, - self._control) - if self._pool is None: - return request_consumer - else: - return stream_util.ThreadSwitchingConsumer(request_consumer, - self._pool) - - -class _UnaryConsumer(stream.Consumer): - """A Consumer that only allows consumption of exactly one value.""" - - def __init__(self, action): - self._lock = threading.Lock() - self._action = action - self._consumed = False - self._terminated = False - - def consume(self, value): - with self._lock: - if self._consumed: - raise ValueError('Unary consumer already consumed!') - elif self._terminated: - raise ValueError('Unary consumer already terminated!') - else: - self._consumed = True - - self._action(value) - - def terminate(self): - with self._lock: - if not self._consumed: - raise ValueError('Unary consumer hasn\'t yet consumed!') - elif self._terminated: - raise ValueError('Unary consumer already terminated!') - else: - self._terminated = True - - def consume_and_terminate(self, value): - with self._lock: - if self._consumed: - raise ValueError('Unary consumer already consumed!') - elif self._terminated: - raise ValueError('Unary consumer already terminated!') - else: - self._consumed = True - self._terminated = True - - self._action(value) - - -class _UnaryUnaryAdaptation(object): - - def __init__(self, unary_unary_test_method): - self._method = unary_unary_test_method - - def service(self, response_consumer, context, control): - - def action(request): - self._method.service(request, - response_consumer.consume_and_terminate, - context, control) - - return _UnaryConsumer(action) - - -class _UnaryStreamAdaptation(object): - - def __init__(self, unary_stream_test_method): - self._method = unary_stream_test_method - - def service(self, response_consumer, context, control): - - def action(request): - self._method.service(request, response_consumer, context, control) - - return _UnaryConsumer(action) - - -class _StreamUnaryAdaptation(object): - - def __init__(self, stream_unary_test_method): - self._method = stream_unary_test_method - - def service(self, response_consumer, context, control): - return self._method.service(response_consumer.consume_and_terminate, - context, control) - - -class _MultiMethodImplementation(face.MultiMethodImplementation): - - def __init__(self, methods, control, pool): - self._methods = methods - self._control = control - self._pool = pool - - def service(self, group, name, response_consumer, context): - method = self._methods.get(group, name, None) - if method is None: - raise face.NoSuchMethodError(group, name) - elif self._pool is None: - return method(response_consumer, context, self._control) - else: - request_consumer = method(response_consumer, context, self._control) - return stream_util.ThreadSwitchingConsumer(request_consumer, - self._pool) - - -class _Assembly( - collections.namedtuple( - '_Assembly', - ['methods', 'inlines', 'events', 'adaptations', 'messages'])): - """An intermediate structure created when creating a TestServiceDigest.""" - - -def _assemble(scenarios, identifiers, inline_method_constructor, - event_method_constructor, adapter, control, pool): - """Creates an _Assembly from the given scenarios.""" - methods = {} - inlines = {} - events = {} - adaptations = {} - messages = {} - for identifier, scenario in six.iteritems(scenarios): - if identifier in identifiers: - raise ValueError('Repeated identifier "(%s, %s)"!' % identifier) - - test_method = scenario[0] - inline_method = inline_method_constructor(test_method, control) - event_method = event_method_constructor(test_method, control, pool) - adaptation = adapter(test_method) - - methods[identifier] = test_method - inlines[identifier] = inline_method - events[identifier] = event_method - adaptations[identifier] = adaptation - messages[identifier] = scenario[1] - - return _Assembly(methods, inlines, events, adaptations, messages) - - -def digest(service, control, pool): - """Creates a TestServiceDigest from a TestService. - - Args: - service: A _service.TestService. - control: A test_control.Control. - pool: If RPC methods should be serviced in a separate thread, a thread pool. - None if RPC methods should be serviced in the thread belonging to the - run-time that calls for their service. - - Returns: - A TestServiceDigest synthesized from the given service.TestService. - """ - identifiers = set() - - unary_unary = _assemble(service.unary_unary_scenarios(), identifiers, - _InlineUnaryUnaryMethod, _EventUnaryUnaryMethod, - _UnaryUnaryAdaptation, control, pool) - identifiers.update(unary_unary.inlines) - - unary_stream = _assemble(service.unary_stream_scenarios(), identifiers, - _InlineUnaryStreamMethod, _EventUnaryStreamMethod, - _UnaryStreamAdaptation, control, pool) - identifiers.update(unary_stream.inlines) - - stream_unary = _assemble(service.stream_unary_scenarios(), identifiers, - _InlineStreamUnaryMethod, _EventStreamUnaryMethod, - _StreamUnaryAdaptation, control, pool) - identifiers.update(stream_unary.inlines) - - stream_stream = _assemble(service.stream_stream_scenarios(), identifiers, - _InlineStreamStreamMethod, - _EventStreamStreamMethod, _IDENTITY, control, - pool) - identifiers.update(stream_stream.inlines) - - methods = dict(unary_unary.methods) - methods.update(unary_stream.methods) - methods.update(stream_unary.methods) - methods.update(stream_stream.methods) - adaptations = dict(unary_unary.adaptations) - adaptations.update(unary_stream.adaptations) - adaptations.update(stream_unary.adaptations) - adaptations.update(stream_stream.adaptations) - inlines = dict(unary_unary.inlines) - inlines.update(unary_stream.inlines) - inlines.update(stream_unary.inlines) - inlines.update(stream_stream.inlines) - events = dict(unary_unary.events) - events.update(unary_stream.events) - events.update(stream_unary.events) - events.update(stream_stream.events) - - return TestServiceDigest(methods, inlines, events, - _MultiMethodImplementation(adaptations, control, - pool), - unary_unary.messages, unary_stream.messages, - stream_unary.messages, stream_stream.messages) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py deleted file mode 100644 index 3d9b2816aa..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py +++ /dev/null @@ -1,508 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Test code for the Face layer of RPC Framework.""" - -from __future__ import division - -import abc -import contextlib -import itertools -import threading -import unittest -from concurrent import futures - -import six - -# test_interfaces is referenced from specification in this module. -from grpc.framework.foundation import future -from grpc.framework.foundation import logging_pool -from grpc.framework.interfaces.face import face -from tests.unit.framework.common import test_constants -from tests.unit.framework.common import test_control -from tests.unit.framework.common import test_coverage -from tests.unit.framework.interfaces.face import _3069_test_constant -from tests.unit.framework.interfaces.face import _digest -from tests.unit.framework.interfaces.face import _stock_service -from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import - - -class _PauseableIterator(object): - - def __init__(self, upstream): - self._upstream = upstream - self._condition = threading.Condition() - self._paused = False - - @contextlib.contextmanager - def pause(self): - with self._condition: - self._paused = True - yield - with self._condition: - self._paused = False - self._condition.notify_all() - - def __iter__(self): - return self - - def __next__(self): - return self.next() - - def next(self): - with self._condition: - while self._paused: - self._condition.wait() - return next(self._upstream) - - -class _Callback(object): - - def __init__(self): - self._condition = threading.Condition() - self._called = False - self._passed_future = None - self._passed_other_stuff = None - - def __call__(self, *args, **kwargs): - with self._condition: - self._called = True - if args: - self._passed_future = args[0] - if 1 < len(args) or kwargs: - self._passed_other_stuff = tuple(args[1:]), dict(kwargs) - self._condition.notify_all() - - def future(self): - with self._condition: - while True: - if self._passed_other_stuff is not None: - raise ValueError( - 'Test callback passed unexpected values: %s', - self._passed_other_stuff) - elif self._called: - return self._passed_future - else: - self._condition.wait() - - -class TestCase( - six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, - unittest.TestCase)): - """A test of the Face layer of RPC Framework. - - Concrete subclasses must have an "implementation" attribute of type - test_interfaces.Implementation and an "invoker_constructor" attribute of type - _invocation.InvokerConstructor. - """ - - NAME = 'FutureInvocationAsynchronousEventServiceTest' - - def setUp(self): - """See unittest.TestCase.setUp for full specification. - - Overriding implementations must call this implementation. - """ - self._control = test_control.PauseFailControl() - self._digest_pool = logging_pool.pool(test_constants.POOL_SIZE) - self._digest = _digest.digest(_stock_service.STOCK_TEST_SERVICE, - self._control, self._digest_pool) - - generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate( - self._digest.methods, self._digest.event_method_implementations, - None) - self._invoker = self.invoker_constructor.construct_invoker( - generic_stub, dynamic_stubs, self._digest.methods) - - def tearDown(self): - """See unittest.TestCase.tearDown for full specification. - - Overriding implementations must call this implementation. - """ - self._invoker = None - self.implementation.destantiate(self._memo) - self._digest_pool.shutdown(wait=True) - - def testSuccessfulUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - callback = _Callback() - - response_future = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - response_future.add_done_callback(callback) - response = response_future.result() - - test_messages.verify(request, response, self) - self.assertIs(callback.future(), response_future) - self.assertIsNone(response_future.exception()) - self.assertIsNone(response_future.traceback()) - - def testSuccessfulUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - response_iterator = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - responses = list(response_iterator) - - test_messages.verify(request, responses, self) - - def testSuccessfulStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - request_iterator = _PauseableIterator(iter(requests)) - callback = _Callback() - - # Use of a paused iterator of requests allows us to test that control is - # returned to calling code before the iterator yields any requests. - with request_iterator.pause(): - response_future = self._invoker.future(group, method)( - request_iterator, test_constants.LONG_TIMEOUT) - response_future.add_done_callback(callback) - future_passed_to_callback = callback.future() - response = future_passed_to_callback.result() - - test_messages.verify(requests, response, self) - self.assertIs(future_passed_to_callback, response_future) - self.assertIsNone(response_future.exception()) - self.assertIsNone(response_future.traceback()) - - def testSuccessfulStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - request_iterator = _PauseableIterator(iter(requests)) - - # Use of a paused iterator of requests allows us to test that control is - # returned to calling code before the iterator yields any requests. - with request_iterator.pause(): - response_iterator = self._invoker.future(group, method)( - request_iterator, test_constants.LONG_TIMEOUT) - responses = list(response_iterator) - - test_messages.verify(requests, responses, self) - - def testSequentialInvocations(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - first_request = test_messages.request() - second_request = test_messages.request() - - first_response_future = self._invoker.future(group, method)( - first_request, test_constants.LONG_TIMEOUT) - first_response = first_response_future.result() - - test_messages.verify(first_request, first_response, self) - - second_response_future = self._invoker.future(group, method)( - second_request, test_constants.LONG_TIMEOUT) - second_response = second_response_future.result() - - test_messages.verify(second_request, second_response, self) - - def testParallelInvocations(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - first_request = test_messages.request() - second_request = test_messages.request() - - first_response_future = self._invoker.future(group, method)( - first_request, test_constants.LONG_TIMEOUT) - second_response_future = self._invoker.future(group, method)( - second_request, test_constants.LONG_TIMEOUT) - first_response = first_response_future.result() - second_response = second_response_future.result() - - test_messages.verify(first_request, first_response, self) - test_messages.verify(second_request, second_response, self) - - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = [] - response_futures = [] - for _ in range(test_constants.THREAD_CONCURRENCY): - request = test_messages.request() - response_future = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - requests.append(request) - response_futures.append(response_future) - - responses = [ - response_future.result() - for response_future in response_futures - ] - - for request, response in zip(requests, responses): - test_messages.verify(request, response, self) - - def testWaitingForSomeButNotAllParallelInvocations(self): - pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = [] - response_futures_to_indices = {} - for index in range(test_constants.THREAD_CONCURRENCY): - request = test_messages.request() - inner_response_future = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - outer_response_future = pool.submit( - inner_response_future.result) - requests.append(request) - response_futures_to_indices[outer_response_future] = index - - some_completed_response_futures_iterator = itertools.islice( - futures.as_completed(response_futures_to_indices), - test_constants.THREAD_CONCURRENCY // 2) - for response_future in some_completed_response_futures_iterator: - index = response_futures_to_indices[response_future] - test_messages.verify(requests[index], - response_future.result(), self) - pool.shutdown(wait=True) - - def testCancelledUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - callback = _Callback() - - with self._control.pause(): - response_future = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - response_future.add_done_callback(callback) - cancel_method_return_value = response_future.cancel() - - self.assertIs(callback.future(), response_future) - self.assertFalse(cancel_method_return_value) - self.assertTrue(response_future.cancelled()) - with self.assertRaises(future.CancelledError): - response_future.result() - with self.assertRaises(future.CancelledError): - response_future.exception() - with self.assertRaises(future.CancelledError): - response_future.traceback() - - def testCancelledUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.pause(): - response_iterator = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - response_iterator.cancel() - - with self.assertRaises(face.CancellationError): - next(response_iterator) - - def testCancelledStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - callback = _Callback() - - with self._control.pause(): - response_future = self._invoker.future(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - response_future.add_done_callback(callback) - cancel_method_return_value = response_future.cancel() - - self.assertIs(callback.future(), response_future) - self.assertFalse(cancel_method_return_value) - self.assertTrue(response_future.cancelled()) - with self.assertRaises(future.CancelledError): - response_future.result() - with self.assertRaises(future.CancelledError): - response_future.exception() - with self.assertRaises(future.CancelledError): - response_future.traceback() - - def testCancelledStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.pause(): - response_iterator = self._invoker.future(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - response_iterator.cancel() - - with self.assertRaises(face.CancellationError): - next(response_iterator) - - def testExpiredUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - callback = _Callback() - - with self._control.pause(): - response_future = self._invoker.future(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - response_future.add_done_callback(callback) - self.assertIs(callback.future(), response_future) - self.assertIsInstance(response_future.exception(), - face.ExpirationError) - with self.assertRaises(face.ExpirationError): - response_future.result() - self.assertIsInstance(response_future.exception(), - face.AbortionError) - self.assertIsNotNone(response_future.traceback()) - - def testExpiredUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.pause(): - response_iterator = self._invoker.future(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - with self.assertRaises(face.ExpirationError): - list(response_iterator) - - def testExpiredStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - callback = _Callback() - - with self._control.pause(): - response_future = self._invoker.future( - group, method)(iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) - response_future.add_done_callback(callback) - self.assertIs(callback.future(), response_future) - self.assertIsInstance(response_future.exception(), - face.ExpirationError) - with self.assertRaises(face.ExpirationError): - response_future.result() - self.assertIsInstance(response_future.exception(), - face.AbortionError) - self.assertIsNotNone(response_future.traceback()) - - def testExpiredStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.pause(): - response_iterator = self._invoker.future( - group, method)(iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) - with self.assertRaises(face.ExpirationError): - list(response_iterator) - - def testFailedUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - callback = _Callback() - abortion_callback = _Callback() - - with self._control.fail(): - response_future = self._invoker.future(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - response_future.add_done_callback(callback) - response_future.add_abortion_callback(abortion_callback) - - self.assertIs(callback.future(), response_future) - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is - # indistinguishable from simply not having called its - # response_callback before the expiration of the RPC. - self.assertIsInstance(response_future.exception(), - face.ExpirationError) - with self.assertRaises(face.ExpirationError): - response_future.result() - self.assertIsNotNone(response_future.traceback()) - self.assertIsNotNone(abortion_callback.future()) - - def testFailedUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is indistinguishable - # from simply not having called its response_consumer before the - # expiration of the RPC. - with self._control.fail(), self.assertRaises( - face.ExpirationError): - response_iterator = self._invoker.future(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - list(response_iterator) - - def testFailedStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - callback = _Callback() - abortion_callback = _Callback() - - with self._control.fail(): - response_future = self._invoker.future( - group, method)(iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) - response_future.add_done_callback(callback) - response_future.add_abortion_callback(abortion_callback) - - self.assertIs(callback.future(), response_future) - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is - # indistinguishable from simply not having called its - # response_callback before the expiration of the RPC. - self.assertIsInstance(response_future.exception(), - face.ExpirationError) - with self.assertRaises(face.ExpirationError): - response_future.result() - self.assertIsNotNone(response_future.traceback()) - self.assertIsNotNone(abortion_callback.future()) - - def testFailedStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is indistinguishable - # from simply not having called its response_consumer before the - # expiration of the RPC. - with self._control.fail(), self.assertRaises( - face.ExpirationError): - response_iterator = self._invoker.future( - group, method)(iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) - list(response_iterator) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py deleted file mode 100644 index efc93d56b0..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py +++ /dev/null @@ -1,198 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Coverage across the Face layer's generic-to-dynamic range for invocation.""" - -import abc - -import six - -from grpc.framework.common import cardinality - -_CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR = { - cardinality.Cardinality.UNARY_UNARY: 'blocking_unary_unary', - cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream', - cardinality.Cardinality.STREAM_UNARY: 'blocking_stream_unary', - cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream', -} - -_CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR = { - cardinality.Cardinality.UNARY_UNARY: 'future_unary_unary', - cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream', - cardinality.Cardinality.STREAM_UNARY: 'future_stream_unary', - cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream', -} - -_CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR = { - cardinality.Cardinality.UNARY_UNARY: 'event_unary_unary', - cardinality.Cardinality.UNARY_STREAM: 'event_unary_stream', - cardinality.Cardinality.STREAM_UNARY: 'event_stream_unary', - cardinality.Cardinality.STREAM_STREAM: 'event_stream_stream', -} - -_CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE = { - cardinality.Cardinality.UNARY_UNARY: 'unary_unary', - cardinality.Cardinality.UNARY_STREAM: 'unary_stream', - cardinality.Cardinality.STREAM_UNARY: 'stream_unary', - cardinality.Cardinality.STREAM_STREAM: 'stream_stream', -} - - -class Invoker(six.with_metaclass(abc.ABCMeta)): - """A type used to invoke test RPCs.""" - - @abc.abstractmethod - def blocking(self, group, name): - """Invokes an RPC with blocking control flow.""" - raise NotImplementedError() - - @abc.abstractmethod - def future(self, group, name): - """Invokes an RPC with future control flow.""" - raise NotImplementedError() - - @abc.abstractmethod - def event(self, group, name): - """Invokes an RPC with event control flow.""" - raise NotImplementedError() - - -class InvokerConstructor(six.with_metaclass(abc.ABCMeta)): - """A type used to create Invokers.""" - - @abc.abstractmethod - def name(self): - """Specifies the name of the Invoker constructed by this object.""" - raise NotImplementedError() - - @abc.abstractmethod - def construct_invoker(self, generic_stub, dynamic_stubs, methods): - """Constructs an Invoker for the given stubs and methods.""" - raise NotImplementedError() - - -class _GenericInvoker(Invoker): - - def __init__(self, generic_stub, methods): - self._stub = generic_stub - self._methods = methods - - def _behavior(self, group, name, cardinality_to_generic_method): - method_cardinality = self._methods[group, name].cardinality() - behavior = getattr(self._stub, - cardinality_to_generic_method[method_cardinality]) - return lambda *args, **kwargs: behavior(group, name, *args, **kwargs) - - def blocking(self, group, name): - return self._behavior(group, name, - _CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR) - - def future(self, group, name): - return self._behavior(group, name, - _CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR) - - def event(self, group, name): - return self._behavior(group, name, - _CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR) - - -class _GenericInvokerConstructor(InvokerConstructor): - - def name(self): - return 'GenericInvoker' - - def construct_invoker(self, generic_stub, dynamic_stub, methods): - return _GenericInvoker(generic_stub, methods) - - -class _MultiCallableInvoker(Invoker): - - def __init__(self, generic_stub, methods): - self._stub = generic_stub - self._methods = methods - - def _multi_callable(self, group, name): - method_cardinality = self._methods[group, name].cardinality() - behavior = getattr( - self._stub, - _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality]) - return behavior(group, name) - - def blocking(self, group, name): - return self._multi_callable(group, name) - - def future(self, group, name): - method_cardinality = self._methods[group, name].cardinality() - behavior = getattr( - self._stub, - _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality]) - if method_cardinality in (cardinality.Cardinality.UNARY_UNARY, - cardinality.Cardinality.STREAM_UNARY): - return behavior(group, name).future - else: - return behavior(group, name) - - def event(self, group, name): - return self._multi_callable(group, name).event - - -class _MultiCallableInvokerConstructor(InvokerConstructor): - - def name(self): - return 'MultiCallableInvoker' - - def construct_invoker(self, generic_stub, dynamic_stub, methods): - return _MultiCallableInvoker(generic_stub, methods) - - -class _DynamicInvoker(Invoker): - - def __init__(self, dynamic_stubs, methods): - self._stubs = dynamic_stubs - self._methods = methods - - def blocking(self, group, name): - return getattr(self._stubs[group], name) - - def future(self, group, name): - if self._methods[group, name].cardinality() in ( - cardinality.Cardinality.UNARY_UNARY, - cardinality.Cardinality.STREAM_UNARY): - return getattr(self._stubs[group], name).future - else: - return getattr(self._stubs[group], name) - - def event(self, group, name): - return getattr(self._stubs[group], name).event - - -class _DynamicInvokerConstructor(InvokerConstructor): - - def name(self): - return 'DynamicInvoker' - - def construct_invoker(self, generic_stub, dynamic_stubs, methods): - return _DynamicInvoker(dynamic_stubs, methods) - - -def invoker_constructors(): - """Creates a sequence of InvokerConstructors to use in tests of RPCs. - - Returns: - A sequence of InvokerConstructors. - """ - return ( - _GenericInvokerConstructor(), - _MultiCallableInvokerConstructor(), - _DynamicInvokerConstructor(), - ) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py deleted file mode 100644 index f1c96b6dc5..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py +++ /dev/null @@ -1,304 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Private interfaces implemented by data sets used in Face-layer tests.""" - -import abc - -import six - -# face is referenced from specification in this module. -from grpc.framework.interfaces.face import face # pylint: disable=unused-import -from tests.unit.framework.interfaces.face import test_interfaces - - -class UnaryUnaryTestMethodImplementation( - six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): - """A controllable implementation of a unary-unary method.""" - - @abc.abstractmethod - def service(self, request, response_callback, context, control): - """Services an RPC that accepts one message and produces one message. - - Args: - request: The single request message for the RPC. - response_callback: A callback to be called to accept the response message - of the RPC. - context: An face.ServicerContext object. - control: A test_control.Control to control execution of this method. - - Raises: - abandonment.Abandoned: May or may not be raised when the RPC has been - aborted. - """ - raise NotImplementedError() - - -class UnaryUnaryTestMessages(six.with_metaclass(abc.ABCMeta)): - """A type for unary-request-unary-response message pairings.""" - - @abc.abstractmethod - def request(self): - """Affords a request message. - - Implementations of this method should return a different message with each - call so that multiple test executions of the test method may be made with - different inputs. - - Returns: - A request message. - """ - raise NotImplementedError() - - @abc.abstractmethod - def verify(self, request, response, test_case): - """Verifies that the computed response matches the given request. - - Args: - request: A request message. - response: A response message. - test_case: A unittest.TestCase object affording useful assertion methods. - - Raises: - AssertionError: If the request and response do not match, indicating that - there was some problem executing the RPC under test. - """ - raise NotImplementedError() - - -class UnaryStreamTestMethodImplementation( - six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): - """A controllable implementation of a unary-stream method.""" - - @abc.abstractmethod - def service(self, request, response_consumer, context, control): - """Services an RPC that takes one message and produces a stream of messages. - - Args: - request: The single request message for the RPC. - response_consumer: A stream.Consumer to be called to accept the response - messages of the RPC. - context: A face.ServicerContext object. - control: A test_control.Control to control execution of this method. - - Raises: - abandonment.Abandoned: May or may not be raised when the RPC has been - aborted. - """ - raise NotImplementedError() - - -class UnaryStreamTestMessages(six.with_metaclass(abc.ABCMeta)): - """A type for unary-request-stream-response message pairings.""" - - @abc.abstractmethod - def request(self): - """Affords a request message. - - Implementations of this method should return a different message with each - call so that multiple test executions of the test method may be made with - different inputs. - - Returns: - A request message. - """ - raise NotImplementedError() - - @abc.abstractmethod - def verify(self, request, responses, test_case): - """Verifies that the computed responses match the given request. - - Args: - request: A request message. - responses: A sequence of response messages. - test_case: A unittest.TestCase object affording useful assertion methods. - - Raises: - AssertionError: If the request and responses do not match, indicating that - there was some problem executing the RPC under test. - """ - raise NotImplementedError() - - -class StreamUnaryTestMethodImplementation( - six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): - """A controllable implementation of a stream-unary method.""" - - @abc.abstractmethod - def service(self, response_callback, context, control): - """Services an RPC that takes a stream of messages and produces one message. - - Args: - response_callback: A callback to be called to accept the response message - of the RPC. - context: A face.ServicerContext object. - control: A test_control.Control to control execution of this method. - - Returns: - A stream.Consumer with which to accept the request messages of the RPC. - The consumer returned from this method may or may not be invoked to - completion: in the case of RPC abortion, RPC Framework will simply stop - passing messages to this object. Implementations must not assume that - this object will be called to completion of the request stream or even - called at all. - - Raises: - abandonment.Abandoned: May or may not be raised when the RPC has been - aborted. - """ - raise NotImplementedError() - - -class StreamUnaryTestMessages(six.with_metaclass(abc.ABCMeta)): - """A type for stream-request-unary-response message pairings.""" - - @abc.abstractmethod - def requests(self): - """Affords a sequence of request messages. - - Implementations of this method should return a different sequences with each - call so that multiple test executions of the test method may be made with - different inputs. - - Returns: - A sequence of request messages. - """ - raise NotImplementedError() - - @abc.abstractmethod - def verify(self, requests, response, test_case): - """Verifies that the computed response matches the given requests. - - Args: - requests: A sequence of request messages. - response: A response message. - test_case: A unittest.TestCase object affording useful assertion methods. - - Raises: - AssertionError: If the requests and response do not match, indicating that - there was some problem executing the RPC under test. - """ - raise NotImplementedError() - - -class StreamStreamTestMethodImplementation( - six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): - """A controllable implementation of a stream-stream method.""" - - @abc.abstractmethod - def service(self, response_consumer, context, control): - """Services an RPC that accepts and produces streams of messages. - - Args: - response_consumer: A stream.Consumer to be called to accept the response - messages of the RPC. - context: A face.ServicerContext object. - control: A test_control.Control to control execution of this method. - - Returns: - A stream.Consumer with which to accept the request messages of the RPC. - The consumer returned from this method may or may not be invoked to - completion: in the case of RPC abortion, RPC Framework will simply stop - passing messages to this object. Implementations must not assume that - this object will be called to completion of the request stream or even - called at all. - - Raises: - abandonment.Abandoned: May or may not be raised when the RPC has been - aborted. - """ - raise NotImplementedError() - - -class StreamStreamTestMessages(six.with_metaclass(abc.ABCMeta)): - """A type for stream-request-stream-response message pairings.""" - - @abc.abstractmethod - def requests(self): - """Affords a sequence of request messages. - - Implementations of this method should return a different sequences with each - call so that multiple test executions of the test method may be made with - different inputs. - - Returns: - A sequence of request messages. - """ - raise NotImplementedError() - - @abc.abstractmethod - def verify(self, requests, responses, test_case): - """Verifies that the computed response matches the given requests. - - Args: - requests: A sequence of request messages. - responses: A sequence of response messages. - test_case: A unittest.TestCase object affording useful assertion methods. - - Raises: - AssertionError: If the requests and responses do not match, indicating - that there was some problem executing the RPC under test. - """ - raise NotImplementedError() - - -class TestService(six.with_metaclass(abc.ABCMeta)): - """A specification of implemented methods to use in tests.""" - - @abc.abstractmethod - def unary_unary_scenarios(self): - """Affords unary-request-unary-response test methods and their messages. - - Returns: - A dict from method group-name pair to implementation/messages pair. The - first element of the pair is a UnaryUnaryTestMethodImplementation object - and the second element is a sequence of UnaryUnaryTestMethodMessages - objects. - """ - raise NotImplementedError() - - @abc.abstractmethod - def unary_stream_scenarios(self): - """Affords unary-request-stream-response test methods and their messages. - - Returns: - A dict from method group-name pair to implementation/messages pair. The - first element of the pair is a UnaryStreamTestMethodImplementation - object and the second element is a sequence of - UnaryStreamTestMethodMessages objects. - """ - raise NotImplementedError() - - @abc.abstractmethod - def stream_unary_scenarios(self): - """Affords stream-request-unary-response test methods and their messages. - - Returns: - A dict from method group-name pair to implementation/messages pair. The - first element of the pair is a StreamUnaryTestMethodImplementation - object and the second element is a sequence of - StreamUnaryTestMethodMessages objects. - """ - raise NotImplementedError() - - @abc.abstractmethod - def stream_stream_scenarios(self): - """Affords stream-request-stream-response test methods and their messages. - - Returns: - A dict from method group-name pair to implementation/messages pair. The - first element of the pair is a StreamStreamTestMethodImplementation - object and the second element is a sequence of - StreamStreamTestMethodMessages objects. - """ - raise NotImplementedError() diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py deleted file mode 100644 index a84e02a79a..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py +++ /dev/null @@ -1,390 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Examples of Python implementations of the stock.proto Stock service.""" - -from grpc.framework.common import cardinality -from grpc.framework.foundation import abandonment -from grpc.framework.foundation import stream -from tests.unit.framework.common import test_constants -from tests.unit.framework.interfaces.face import _service -from tests.unit._junkdrawer import stock_pb2 - -_STOCK_GROUP_NAME = 'Stock' -_SYMBOL_FORMAT = 'test symbol:%03d' - -# A test-appropriate security-pricing function. :-P -_price = lambda symbol_name: float(hash(symbol_name) % 4096) - - -def _get_last_trade_price(stock_request, stock_reply_callback, control, active): - """A unary-request, unary-response test method.""" - control.control() - if active(): - stock_reply_callback( - stock_pb2.StockReply( - symbol=stock_request.symbol, - price=_price(stock_request.symbol))) - else: - raise abandonment.Abandoned() - - -def _get_last_trade_price_multiple(stock_reply_consumer, control, active): - """A stream-request, stream-response test method.""" - - def stock_reply_for_stock_request(stock_request): - control.control() - if active(): - return stock_pb2.StockReply( - symbol=stock_request.symbol, price=_price(stock_request.symbol)) - else: - raise abandonment.Abandoned() - - class StockRequestConsumer(stream.Consumer): - - def consume(self, stock_request): - stock_reply_consumer.consume( - stock_reply_for_stock_request(stock_request)) - - def terminate(self): - control.control() - stock_reply_consumer.terminate() - - def consume_and_terminate(self, stock_request): - stock_reply_consumer.consume_and_terminate( - stock_reply_for_stock_request(stock_request)) - - return StockRequestConsumer() - - -def _watch_future_trades(stock_request, stock_reply_consumer, control, active): - """A unary-request, stream-response test method.""" - base_price = _price(stock_request.symbol) - for index in range(stock_request.num_trades_to_watch): - control.control() - if active(): - stock_reply_consumer.consume( - stock_pb2.StockReply( - symbol=stock_request.symbol, price=base_price + index)) - else: - raise abandonment.Abandoned() - stock_reply_consumer.terminate() - - -def _get_highest_trade_price(stock_reply_callback, control, active): - """A stream-request, unary-response test method.""" - - class StockRequestConsumer(stream.Consumer): - """Keeps an ongoing record of the most valuable symbol yet consumed.""" - - def __init__(self): - self._symbol = None - self._price = None - - def consume(self, stock_request): - control.control() - if active(): - if self._price is None: - self._symbol = stock_request.symbol - self._price = _price(stock_request.symbol) - else: - candidate_price = _price(stock_request.symbol) - if self._price < candidate_price: - self._symbol = stock_request.symbol - self._price = candidate_price - - def terminate(self): - control.control() - if active(): - if self._symbol is None: - raise ValueError() - else: - stock_reply_callback( - stock_pb2.StockReply( - symbol=self._symbol, price=self._price)) - self._symbol = None - self._price = None - - def consume_and_terminate(self, stock_request): - control.control() - if active(): - if self._price is None: - stock_reply_callback( - stock_pb2.StockReply( - symbol=stock_request.symbol, - price=_price(stock_request.symbol))) - else: - candidate_price = _price(stock_request.symbol) - if self._price < candidate_price: - stock_reply_callback( - stock_pb2.StockReply( - symbol=stock_request.symbol, - price=candidate_price)) - else: - stock_reply_callback( - stock_pb2.StockReply( - symbol=self._symbol, price=self._price)) - - self._symbol = None - self._price = None - - return StockRequestConsumer() - - -class GetLastTradePrice(_service.UnaryUnaryTestMethodImplementation): - """GetLastTradePrice for use in tests.""" - - def group(self): - return _STOCK_GROUP_NAME - - def name(self): - return 'GetLastTradePrice' - - def cardinality(self): - return cardinality.Cardinality.UNARY_UNARY - - def request_class(self): - return stock_pb2.StockRequest - - def response_class(self): - return stock_pb2.StockReply - - def serialize_request(self, request): - return request.SerializeToString() - - def deserialize_request(self, serialized_request): - return stock_pb2.StockRequest.FromString(serialized_request) - - def serialize_response(self, response): - return response.SerializeToString() - - def deserialize_response(self, serialized_response): - return stock_pb2.StockReply.FromString(serialized_response) - - def service(self, request, response_callback, context, control): - _get_last_trade_price(request, response_callback, control, - context.is_active) - - -class GetLastTradePriceMessages(_service.UnaryUnaryTestMessages): - - def __init__(self): - self._index = 0 - - def request(self): - symbol = _SYMBOL_FORMAT % self._index - self._index += 1 - return stock_pb2.StockRequest(symbol=symbol) - - def verify(self, request, response, test_case): - test_case.assertEqual(request.symbol, response.symbol) - test_case.assertEqual(_price(request.symbol), response.price) - - -class GetLastTradePriceMultiple(_service.StreamStreamTestMethodImplementation): - """GetLastTradePriceMultiple for use in tests.""" - - def group(self): - return _STOCK_GROUP_NAME - - def name(self): - return 'GetLastTradePriceMultiple' - - def cardinality(self): - return cardinality.Cardinality.STREAM_STREAM - - def request_class(self): - return stock_pb2.StockRequest - - def response_class(self): - return stock_pb2.StockReply - - def serialize_request(self, request): - return request.SerializeToString() - - def deserialize_request(self, serialized_request): - return stock_pb2.StockRequest.FromString(serialized_request) - - def serialize_response(self, response): - return response.SerializeToString() - - def deserialize_response(self, serialized_response): - return stock_pb2.StockReply.FromString(serialized_response) - - def service(self, response_consumer, context, control): - return _get_last_trade_price_multiple(response_consumer, control, - context.is_active) - - -class GetLastTradePriceMultipleMessages(_service.StreamStreamTestMessages): - """Pairs of message streams for use with GetLastTradePriceMultiple.""" - - def __init__(self): - self._index = 0 - - def requests(self): - base_index = self._index - self._index += 1 - return [ - stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % (base_index + index)) - for index in range(test_constants.STREAM_LENGTH) - ] - - def verify(self, requests, responses, test_case): - test_case.assertEqual(len(requests), len(responses)) - for stock_request, stock_reply in zip(requests, responses): - test_case.assertEqual(stock_request.symbol, stock_reply.symbol) - test_case.assertEqual( - _price(stock_request.symbol), stock_reply.price) - - -class WatchFutureTrades(_service.UnaryStreamTestMethodImplementation): - """WatchFutureTrades for use in tests.""" - - def group(self): - return _STOCK_GROUP_NAME - - def name(self): - return 'WatchFutureTrades' - - def cardinality(self): - return cardinality.Cardinality.UNARY_STREAM - - def request_class(self): - return stock_pb2.StockRequest - - def response_class(self): - return stock_pb2.StockReply - - def serialize_request(self, request): - return request.SerializeToString() - - def deserialize_request(self, serialized_request): - return stock_pb2.StockRequest.FromString(serialized_request) - - def serialize_response(self, response): - return response.SerializeToString() - - def deserialize_response(self, serialized_response): - return stock_pb2.StockReply.FromString(serialized_response) - - def service(self, request, response_consumer, context, control): - _watch_future_trades(request, response_consumer, control, - context.is_active) - - -class WatchFutureTradesMessages(_service.UnaryStreamTestMessages): - """Pairs of a single request message and a sequence of response messages.""" - - def __init__(self): - self._index = 0 - - def request(self): - symbol = _SYMBOL_FORMAT % self._index - self._index += 1 - return stock_pb2.StockRequest( - symbol=symbol, num_trades_to_watch=test_constants.STREAM_LENGTH) - - def verify(self, request, responses, test_case): - test_case.assertEqual(test_constants.STREAM_LENGTH, len(responses)) - base_price = _price(request.symbol) - for index, response in enumerate(responses): - test_case.assertEqual(base_price + index, response.price) - - -class GetHighestTradePrice(_service.StreamUnaryTestMethodImplementation): - """GetHighestTradePrice for use in tests.""" - - def group(self): - return _STOCK_GROUP_NAME - - def name(self): - return 'GetHighestTradePrice' - - def cardinality(self): - return cardinality.Cardinality.STREAM_UNARY - - def request_class(self): - return stock_pb2.StockRequest - - def response_class(self): - return stock_pb2.StockReply - - def serialize_request(self, request): - return request.SerializeToString() - - def deserialize_request(self, serialized_request): - return stock_pb2.StockRequest.FromString(serialized_request) - - def serialize_response(self, response): - return response.SerializeToString() - - def deserialize_response(self, serialized_response): - return stock_pb2.StockReply.FromString(serialized_response) - - def service(self, response_callback, context, control): - return _get_highest_trade_price(response_callback, control, - context.is_active) - - -class GetHighestTradePriceMessages(_service.StreamUnaryTestMessages): - - def requests(self): - return [ - stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % index) - for index in range(test_constants.STREAM_LENGTH) - ] - - def verify(self, requests, response, test_case): - price = None - symbol = None - for stock_request in requests: - current_symbol = stock_request.symbol - current_price = _price(current_symbol) - if price is None or price < current_price: - price = current_price - symbol = current_symbol - test_case.assertEqual(price, response.price) - test_case.assertEqual(symbol, response.symbol) - - -class StockTestService(_service.TestService): - """A corpus of test data with one method of each RPC cardinality.""" - - def unary_unary_scenarios(self): - return { - (_STOCK_GROUP_NAME, 'GetLastTradePrice'): - (GetLastTradePrice(), [GetLastTradePriceMessages()]), - } - - def unary_stream_scenarios(self): - return { - (_STOCK_GROUP_NAME, 'WatchFutureTrades'): - (WatchFutureTrades(), [WatchFutureTradesMessages()]), - } - - def stream_unary_scenarios(self): - return { - (_STOCK_GROUP_NAME, 'GetHighestTradePrice'): - (GetHighestTradePrice(), [GetHighestTradePriceMessages()]) - } - - def stream_stream_scenarios(self): - return { - (_STOCK_GROUP_NAME, 'GetLastTradePriceMultiple'): - (GetLastTradePriceMultiple(), - [GetLastTradePriceMultipleMessages()]), - } - - -STOCK_TEST_SERVICE = StockTestService() diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py deleted file mode 100644 index cff4b7cdea..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tools for creating tests of implementations of the Face layer.""" - -# unittest is referenced from specification in this module. -import unittest # pylint: disable=unused-import - -# test_interfaces is referenced from specification in this module. -from tests.unit.framework.interfaces.face import _blocking_invocation_inline_service -from tests.unit.framework.interfaces.face import _future_invocation_asynchronous_event_service -from tests.unit.framework.interfaces.face import _invocation -from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import - -_TEST_CASE_SUPERCLASSES = ( - _blocking_invocation_inline_service.TestCase, - _future_invocation_asynchronous_event_service.TestCase, -) - - -def test_cases(implementation): - """Creates unittest.TestCase classes for a given Face layer implementation. - - Args: - implementation: A test_interfaces.Implementation specifying creation and - destruction of a given Face layer implementation. - - Returns: - A sequence of subclasses of unittest.TestCase defining tests of the - specified Face layer implementation. - """ - test_case_classes = [] - for invoker_constructor in _invocation.invoker_constructors(): - for super_class in _TEST_CASE_SUPERCLASSES: - test_case_classes.append( - type( - invoker_constructor.name() + super_class.NAME, - (super_class,), { - 'implementation': implementation, - 'invoker_constructor': invoker_constructor, - '__module__': implementation.__module__, - })) - return test_case_classes diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py deleted file mode 100644 index d0de8e1c54..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py +++ /dev/null @@ -1,212 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Interfaces used in tests of implementations of the Face layer.""" - -import abc - -import six - -from grpc.framework.common import cardinality # pylint: disable=unused-import -from grpc.framework.interfaces.face import face # pylint: disable=unused-import - - -class Method(six.with_metaclass(abc.ABCMeta)): - """Specifies a method to be used in tests.""" - - @abc.abstractmethod - def group(self): - """Identify the group of the method. - - Returns: - The group of the method. - """ - raise NotImplementedError() - - @abc.abstractmethod - def name(self): - """Identify the name of the method. - - Returns: - The name of the method. - """ - raise NotImplementedError() - - @abc.abstractmethod - def cardinality(self): - """Identify the cardinality of the method. - - Returns: - A cardinality.Cardinality value describing the streaming semantics of the - method. - """ - raise NotImplementedError() - - @abc.abstractmethod - def request_class(self): - """Identify the class used for the method's request objects. - - Returns: - The class object of the class to which the method's request objects - belong. - """ - raise NotImplementedError() - - @abc.abstractmethod - def response_class(self): - """Identify the class used for the method's response objects. - - Returns: - The class object of the class to which the method's response objects - belong. - """ - raise NotImplementedError() - - @abc.abstractmethod - def serialize_request(self, request): - """Serialize the given request object. - - Args: - request: A request object appropriate for this method. - """ - raise NotImplementedError() - - @abc.abstractmethod - def deserialize_request(self, serialized_request): - """Synthesize a request object from a given bytestring. - - Args: - serialized_request: A bytestring deserializable into a request object - appropriate for this method. - """ - raise NotImplementedError() - - @abc.abstractmethod - def serialize_response(self, response): - """Serialize the given response object. - - Args: - response: A response object appropriate for this method. - """ - raise NotImplementedError() - - @abc.abstractmethod - def deserialize_response(self, serialized_response): - """Synthesize a response object from a given bytestring. - - Args: - serialized_response: A bytestring deserializable into a response object - appropriate for this method. - """ - raise NotImplementedError() - - -class Implementation(six.with_metaclass(abc.ABCMeta)): - """Specifies an implementation of the Face layer.""" - - @abc.abstractmethod - def instantiate(self, methods, method_implementations, - multi_method_implementation): - """Instantiates the Face layer implementation to be used in a test. - - Args: - methods: A sequence of Method objects describing the methods available to - be called during the test. - method_implementations: A dictionary from group-name pair to - face.MethodImplementation object specifying implementation of a method. - multi_method_implementation: A face.MultiMethodImplementation or None. - - Returns: - A sequence of length three the first element of which is a - face.GenericStub, the second element of which is dictionary from groups - to face.DynamicStubs affording invocation of the group's methods, and - the third element of which is an arbitrary memo object to be kept and - passed to destantiate at the conclusion of the test. The returned stubs - must be backed by the provided implementations. - """ - raise NotImplementedError() - - @abc.abstractmethod - def destantiate(self, memo): - """Destroys the Face layer implementation under test. - - Args: - memo: The object from the third position of the return value of a call to - instantiate. - """ - raise NotImplementedError() - - @abc.abstractmethod - def invocation_metadata(self): - """Provides the metadata to be used when invoking a test RPC. - - Returns: - An object to use as the supplied-at-invocation-time metadata in a test - RPC. - """ - raise NotImplementedError() - - @abc.abstractmethod - def initial_metadata(self): - """Provides the metadata for use as a test RPC's first servicer metadata. - - Returns: - An object to use as the from-the-servicer-before-responses metadata in a - test RPC. - """ - raise NotImplementedError() - - @abc.abstractmethod - def terminal_metadata(self): - """Provides the metadata for use as a test RPC's second servicer metadata. - - Returns: - An object to use as the from-the-servicer-after-all-responses metadata in - a test RPC. - """ - raise NotImplementedError() - - @abc.abstractmethod - def code(self): - """Provides the value for use as a test RPC's code. - - Returns: - An object to use as the from-the-servicer code in a test RPC. - """ - raise NotImplementedError() - - @abc.abstractmethod - def details(self): - """Provides the value for use as a test RPC's details. - - Returns: - An object to use as the from-the-servicer details in a test RPC. - """ - raise NotImplementedError() - - @abc.abstractmethod - def metadata_transmitted(self, original_metadata, transmitted_metadata): - """Identifies whether or not metadata was properly transmitted. - - Args: - original_metadata: A metadata value passed to the Face interface - implementation under test. - transmitted_metadata: The same metadata value after having been - transmitted via an RPC performed by the Face interface implementation - under test. - - Returns: - Whether or not the metadata was properly transmitted by the Face interface - implementation under test. - """ - raise NotImplementedError() |