diff options
author | 2018-05-29 10:45:32 -0700 | |
---|---|---|
committer | 2018-05-29 10:45:32 -0700 | |
commit | e501a3d3fdd63a4e8c184ff2bcb1e643dbfe4401 (patch) | |
tree | f83b87e440fe49217353e7daff913e7e6629e3e5 /src/python | |
parent | e1d7deeb5396691ae8ea515b0db6e778a2c0a59d (diff) | |
parent | 1bd0debdc7f57f2d51716789660b895a6c229350 (diff) |
Merge branch 'master' of https://github.com/Vizerai/grpc into filter_port
Diffstat (limited to 'src/python')
55 files changed, 1260 insertions, 785 deletions
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 7fa7303691..b7ed0c8563 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -813,7 +813,11 @@ class StreamStreamMultiCallable(six.with_metaclass(abc.ABCMeta)): class Channel(six.with_metaclass(abc.ABCMeta)): - """Affords RPC invocation via generic methods on client-side.""" + """Affords RPC invocation via generic methods on client-side. + + Channel objects implement the Context Manager type, although they need not + support being entered and exited multiple times. + """ @abc.abstractmethod def subscribe(self, callback, try_to_connect=False): @@ -926,6 +930,17 @@ class Channel(six.with_metaclass(abc.ABCMeta)): """ raise NotImplementedError() + @abc.abstractmethod + def close(self): + """Closes this Channel and releases all resources held by it. + + Closing the Channel will immediately terminate all RPCs active with the + Channel and it is not valid to invoke new RPCs with the Channel. + + This method is idempotent. + """ + raise NotImplementedError() + ########################## Service-Side Context ############################## diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 2eff08aa57..2017d47130 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -58,6 +58,17 @@ _STREAM_STREAM_INITIAL_DUE = ( _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( 'Exception calling channel subscription callback!') +_OK_RENDEZVOUS_REPR_FORMAT = ('<_Rendezvous of RPC that terminated with:\n' + '\tstatus = {}\n' + '\tdetails = "{}"\n' + '>') + +_NON_OK_RENDEZVOUS_REPR_FORMAT = ('<_Rendezvous of RPC that terminated with:\n' + '\tstatus = {}\n' + '\tdetails = "{}"\n' + '\tdebug_error_string = "{}"\n' + '>') + def _deadline(timeout): return None if timeout is None else time.time() + timeout @@ -79,27 +90,6 @@ def _wait_once_until(condition, until): condition.wait(timeout=remaining) -_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = ( - 'Internal gRPC call error %d. ' + - 'Please report to https://github.com/grpc/grpc/issues') - - -def _check_call_error(call_error, metadata): - if call_error == cygrpc.CallError.invalid_metadata: - raise ValueError('metadata was invalid: %s' % metadata) - elif call_error != cygrpc.CallError.ok: - raise ValueError(_INTERNAL_CALL_ERROR_MESSAGE_FORMAT % call_error) - - -def _call_error_set_RPCstate(state, call_error, metadata): - if call_error == cygrpc.CallError.invalid_metadata: - _abort(state, grpc.StatusCode.INTERNAL, - 'metadata was invalid: %s' % metadata) - else: - _abort(state, grpc.StatusCode.INTERNAL, - _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % call_error) - - class _RPCState(object): def __init__(self, due, initial_metadata, trailing_metadata, code, details): @@ -112,6 +102,7 @@ class _RPCState(object): self.trailing_metadata = trailing_metadata self.code = code self.details = details + self.debug_error_string = None # The semantics of grpc.Future.cancel and grpc.Future.cancelled are # slightly wonky, so they have to be tracked separately from the rest of the # result of the RPC. This field tracks whether cancellation was requested @@ -158,12 +149,13 @@ def _handle_event(event, state, response_deserializer): else: state.code = code state.details = batch_operation.details() + state.debug_error_string = batch_operation.error_string() callbacks.extend(state.callbacks) state.callbacks = None return callbacks -def _event_handler(state, call, response_deserializer): +def _event_handler(state, response_deserializer): def handle_event(event): with state.condition: @@ -172,40 +164,47 @@ def _event_handler(state, call, response_deserializer): done = not state.due for callback in callbacks: callback() - return call if done else None + return done return handle_event -def _consume_request_iterator(request_iterator, state, call, - request_serializer): - event_handler = _event_handler(state, call, None) +def _consume_request_iterator(request_iterator, state, call, request_serializer, + event_handler): - def consume_request_iterator(): + def consume_request_iterator(): # pylint: disable=too-many-branches while True: try: request = next(request_iterator) except StopIteration: break except Exception: # pylint: disable=broad-except - logging.exception("Exception iterating requests!") - call.cancel() - _abort(state, grpc.StatusCode.UNKNOWN, - "Exception iterating requests!") + code = grpc.StatusCode.UNKNOWN + details = 'Exception iterating requests!' + logging.exception(details) + call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], + details) + _abort(state, code, details) return serialized_request = _common.serialize(request, request_serializer) with state.condition: if state.code is None and not state.cancelled: if serialized_request is None: - call.cancel() + code = grpc.StatusCode.INTERNAL # pylint: disable=redefined-variable-type details = 'Exception serializing request!' - _abort(state, grpc.StatusCode.INTERNAL, details) + call.cancel( + _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], + details) + _abort(state, code, details) return else: operations = (cygrpc.SendMessageOperation( serialized_request, _EMPTY_FLAGS),) - call.start_client_batch(operations, event_handler) - state.due.add(cygrpc.OperationType.send_message) + operating = call.operate(operations, event_handler) + if operating: + state.due.add(cygrpc.OperationType.send_message) + else: + return while True: state.condition.wait() if state.code is None: @@ -219,19 +218,12 @@ def _consume_request_iterator(request_iterator, state, call, if state.code is None: operations = ( cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),) - call.start_client_batch(operations, event_handler) - state.due.add(cygrpc.OperationType.send_close_from_client) - - def stop_consumption_thread(timeout): # pylint: disable=unused-argument - with state.condition: - if state.code is None: - call.cancel() - state.cancelled = True - _abort(state, grpc.StatusCode.CANCELLED, 'Cancelled!') - state.condition.notify_all() + operating = call.operate(operations, event_handler) + if operating: + state.due.add(cygrpc.OperationType.send_close_from_client) - consumption_thread = _common.CleanupThread( - stop_consumption_thread, target=consume_request_iterator) + consumption_thread = threading.Thread(target=consume_request_iterator) + consumption_thread.daemon = True consumption_thread.start() @@ -247,9 +239,12 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): def cancel(self): with self._state.condition: if self._state.code is None: - self._call.cancel() + code = grpc.StatusCode.CANCELLED + details = 'Locally cancelled by application!' + self._call.cancel( + _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details) self._state.cancelled = True - _abort(self._state, grpc.StatusCode.CANCELLED, 'Cancelled!') + _abort(self._state, code, details) self._state.condition.notify_all() return False @@ -318,12 +313,13 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): def _next(self): with self._state.condition: if self._state.code is None: - event_handler = _event_handler(self._state, self._call, + event_handler = _event_handler(self._state, self._response_deserializer) - self._call.start_client_batch( + operating = self._call.operate( (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), event_handler) - self._state.due.add(cygrpc.OperationType.receive_message) + if operating: + self._state.due.add(cygrpc.OperationType.receive_message) elif self._state.code is grpc.StatusCode.OK: raise StopIteration() else: @@ -391,13 +387,23 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): self._state.condition.wait() return _common.decode(self._state.details) + def debug_error_string(self): + with self._state.condition: + while self._state.debug_error_string is None: + self._state.condition.wait() + return _common.decode(self._state.debug_error_string) + def _repr(self): with self._state.condition: if self._state.code is None: return '<_Rendezvous object of in-flight RPC>' + elif self._state.code is grpc.StatusCode.OK: + return _OK_RENDEZVOUS_REPR_FORMAT.format( + self._state.code, self._state.details) else: - return '<_Rendezvous of RPC that terminated with ({}, {})>'.format( - self._state.code, _common.decode(self._state.details)) + return _NON_OK_RENDEZVOUS_REPR_FORMAT.format( + self._state.code, self._state.details, + self._state.debug_error_string) def __repr__(self): return self._repr() @@ -408,9 +414,12 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): def __del__(self): with self._state.condition: if self._state.code is None: - self._call.cancel() - self._state.cancelled = True self._state.code = grpc.StatusCode.CANCELLED + self._state.details = 'Cancelled upon garbage collection!' + self._state.cancelled = True + self._call.cancel( + _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code], + self._state.details) self._state.condition.notify_all() @@ -437,6 +446,24 @@ def _end_unary_response_blocking(state, call, with_call, deadline): raise _Rendezvous(state, None, None, deadline) +def _stream_unary_invocation_operationses(metadata): + return ( + ( + cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ), + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), + ) + + +def _stream_unary_invocation_operationses_and_tags(metadata): + return tuple(( + operations, + None, + ) for operations in _stream_unary_invocation_operationses(metadata)) + + class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): def __init__(self, channel, managed_call, method, request_serializer, @@ -448,8 +475,8 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): self._response_deserializer = response_deserializer def _prepare(self, request, timeout, metadata): - deadline, serialized_request, rendezvous = (_start_unary_request( - request, timeout, self._request_serializer)) + deadline, serialized_request, rendezvous = _start_unary_request( + request, timeout, self._request_serializer) if serialized_request is None: return None, None, None, rendezvous else: @@ -467,48 +494,38 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): def _blocking(self, request, timeout, metadata, credentials): state, operations, deadline, rendezvous = self._prepare( request, timeout, metadata) - if rendezvous: + if state is None: raise rendezvous else: - completion_queue = cygrpc.CompletionQueue() - call = self._channel.create_call(None, 0, completion_queue, - self._method, None, deadline) - if credentials is not None: - call.set_credentials(credentials._credentials) - call_error = call.start_client_batch(operations, None) - _check_call_error(call_error, metadata) - _handle_event(completion_queue.poll(), state, - self._response_deserializer) - return state, call, deadline + call = self._channel.segregated_call( + 0, self._method, None, deadline, metadata, None + if credentials is None else credentials._credentials, (( + operations, + None, + ),)) + event = call.next_event() + _handle_event(event, state, self._response_deserializer) + return state, call, def __call__(self, request, timeout=None, metadata=None, credentials=None): - state, call, deadline = self._blocking(request, timeout, metadata, - credentials) - return _end_unary_response_blocking(state, call, False, deadline) + state, call, = self._blocking(request, timeout, metadata, credentials) + return _end_unary_response_blocking(state, call, False, None) def with_call(self, request, timeout=None, metadata=None, credentials=None): - state, call, deadline = self._blocking(request, timeout, metadata, - credentials) - return _end_unary_response_blocking(state, call, True, deadline) + state, call, = self._blocking(request, timeout, metadata, credentials) + return _end_unary_response_blocking(state, call, True, None) def future(self, request, timeout=None, metadata=None, credentials=None): state, operations, deadline, rendezvous = self._prepare( request, timeout, metadata) - if rendezvous: - return rendezvous + if state is None: + raise rendezvous else: - call, drive_call = self._managed_call(None, 0, self._method, None, - deadline) - if credentials is not None: - call.set_credentials(credentials._credentials) - event_handler = _event_handler(state, call, - self._response_deserializer) - with state.condition: - call_error = call.start_client_batch(operations, event_handler) - if call_error != cygrpc.CallError.ok: - _call_error_set_RPCstate(state, call_error, metadata) - return _Rendezvous(state, None, None, deadline) - drive_call() + event_handler = _event_handler(state, self._response_deserializer) + call = self._managed_call( + 0, self._method, None, deadline, metadata, None + if credentials is None else credentials._credentials, + (operations,), event_handler) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -524,34 +541,27 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): self._response_deserializer = response_deserializer def __call__(self, request, timeout=None, metadata=None, credentials=None): - deadline, serialized_request, rendezvous = (_start_unary_request( - request, timeout, self._request_serializer)) + deadline, serialized_request, rendezvous = _start_unary_request( + request, timeout, self._request_serializer) if serialized_request is None: raise rendezvous else: state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) - call, drive_call = self._managed_call(None, 0, self._method, None, - deadline) - if credentials is not None: - call.set_credentials(credentials._credentials) - event_handler = _event_handler(state, call, - self._response_deserializer) - with state.condition: - call.start_client_batch( - (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), - event_handler) - operations = ( + operationses = ( + ( cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), - ) - call_error = call.start_client_batch(operations, event_handler) - if call_error != cygrpc.CallError.ok: - _call_error_set_RPCstate(state, call_error, metadata) - return _Rendezvous(state, None, None, deadline) - drive_call() + ), + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), + ) + event_handler = _event_handler(state, self._response_deserializer) + call = self._managed_call( + 0, self._method, None, deadline, metadata, None + if credentials is None else credentials._credentials, + operationses, event_handler) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -569,49 +579,38 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): def _blocking(self, request_iterator, timeout, metadata, credentials): deadline = _deadline(timeout) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) - completion_queue = cygrpc.CompletionQueue() - call = self._channel.create_call(None, 0, completion_queue, - self._method, None, deadline) - if credentials is not None: - call.set_credentials(credentials._credentials) - with state.condition: - call.start_client_batch( - (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), None) - operations = ( - cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), - cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), - ) - call_error = call.start_client_batch(operations, None) - _check_call_error(call_error, metadata) - _consume_request_iterator(request_iterator, state, call, - self._request_serializer) + call = self._channel.segregated_call( + 0, self._method, None, deadline, metadata, None + if credentials is None else credentials._credentials, + _stream_unary_invocation_operationses_and_tags(metadata)) + _consume_request_iterator(request_iterator, state, call, + self._request_serializer, None) while True: - event = completion_queue.poll() + event = call.next_event() with state.condition: _handle_event(event, state, self._response_deserializer) state.condition.notify_all() if not state.due: break - return state, call, deadline + return state, call, def __call__(self, request_iterator, timeout=None, metadata=None, credentials=None): - state, call, deadline = self._blocking(request_iterator, timeout, - metadata, credentials) - return _end_unary_response_blocking(state, call, False, deadline) + state, call, = self._blocking(request_iterator, timeout, metadata, + credentials) + return _end_unary_response_blocking(state, call, False, None) def with_call(self, request_iterator, timeout=None, metadata=None, credentials=None): - state, call, deadline = self._blocking(request_iterator, timeout, - metadata, credentials) - return _end_unary_response_blocking(state, call, True, deadline) + state, call, = self._blocking(request_iterator, timeout, metadata, + credentials) + return _end_unary_response_blocking(state, call, True, None) def future(self, request_iterator, @@ -620,27 +619,13 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): credentials=None): deadline = _deadline(timeout) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) - call, drive_call = self._managed_call(None, 0, self._method, None, - deadline) - if credentials is not None: - call.set_credentials(credentials._credentials) - event_handler = _event_handler(state, call, self._response_deserializer) - with state.condition: - call.start_client_batch( - (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), - event_handler) - operations = ( - cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), - cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), - ) - call_error = call.start_client_batch(operations, event_handler) - if call_error != cygrpc.CallError.ok: - _call_error_set_RPCstate(state, call_error, metadata) - return _Rendezvous(state, None, None, deadline) - drive_call() - _consume_request_iterator(request_iterator, state, call, - self._request_serializer) + event_handler = _event_handler(state, self._response_deserializer) + call = self._managed_call( + 0, self._method, None, deadline, metadata, None + if credentials is None else credentials._credentials, + _stream_unary_invocation_operationses(metadata), event_handler) + _consume_request_iterator(request_iterator, state, call, + self._request_serializer, event_handler) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -661,26 +646,20 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): credentials=None): deadline = _deadline(timeout) state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) - call, drive_call = self._managed_call(None, 0, self._method, None, - deadline) - if credentials is not None: - call.set_credentials(credentials._credentials) - event_handler = _event_handler(state, call, self._response_deserializer) - with state.condition: - call.start_client_batch( - (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), - event_handler) - operations = ( + operationses = ( + ( cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), - ) - call_error = call.start_client_batch(operations, event_handler) - if call_error != cygrpc.CallError.ok: - _call_error_set_RPCstate(state, call_error, metadata) - return _Rendezvous(state, None, None, deadline) - drive_call() - _consume_request_iterator(request_iterator, state, call, - self._request_serializer) + ), + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), + ) + event_handler = _event_handler(state, self._response_deserializer) + call = self._managed_call( + 0, self._method, None, deadline, metadata, None + if credentials is None else credentials._credentials, operationses, + event_handler) + _consume_request_iterator(request_iterator, state, call, + self._request_serializer, event_handler) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -689,67 +668,63 @@ class _ChannelCallState(object): def __init__(self, channel): self.lock = threading.Lock() self.channel = channel - self.completion_queue = cygrpc.CompletionQueue() - self.managed_calls = None + self.managed_calls = 0 def _run_channel_spin_thread(state): def channel_spin(): while True: - event = state.completion_queue.poll() - completed_call = event.tag(event) - if completed_call is not None: + event = state.channel.next_call_event() + call_completed = event.tag(event) + if call_completed: with state.lock: - state.managed_calls.remove(completed_call) - if not state.managed_calls: - state.managed_calls = None + state.managed_calls -= 1 + if state.managed_calls == 0: return - def stop_channel_spin(timeout): # pylint: disable=unused-argument - with state.lock: - if state.managed_calls is not None: - for call in state.managed_calls: - call.cancel() - - channel_spin_thread = _common.CleanupThread( - stop_channel_spin, target=channel_spin) + channel_spin_thread = threading.Thread(target=channel_spin) + channel_spin_thread.daemon = True channel_spin_thread.start() def _channel_managed_call_management(state): - def create(parent, flags, method, host, deadline): - """Creates a managed cygrpc.Call and a function to call to drive it. - - If operations are successfully added to the returned cygrpc.Call, the - returned function must be called. If operations are not successfully added - to the returned cygrpc.Call, the returned function must not be called. - - Args: - parent: A cygrpc.Call to be used as the parent of the created call. - flags: An integer bitfield of call flags. - method: The RPC method. - host: A host string for the created call. - deadline: A float to be the deadline of the created call or None if the - call is to have an infinite deadline. - - Returns: - A cygrpc.Call with which to conduct an RPC and a function to call if - operations are successfully started on the call. - """ - call = state.channel.create_call(parent, flags, state.completion_queue, - method, host, deadline) - - def drive(): - with state.lock: - if state.managed_calls is None: - state.managed_calls = set((call,)) - _run_channel_spin_thread(state) - else: - state.managed_calls.add(call) + # pylint: disable=too-many-arguments + def create(flags, method, host, deadline, metadata, credentials, + operationses, event_handler): + """Creates a cygrpc.IntegratedCall. - return call, drive + Args: + flags: An integer bitfield of call flags. + method: The RPC method. + host: A host string for the created call. + deadline: A float to be the deadline of the created call or None if + the call is to have an infinite deadline. + metadata: The metadata for the call or None. + credentials: A cygrpc.CallCredentials or None. + operationses: An iterable of iterables of cygrpc.Operations to be + started on the call. + event_handler: A behavior to call to handle the events resultant from + the operations on the call. + + Returns: + A cygrpc.IntegratedCall with which to conduct an RPC. + """ + operationses_and_tags = tuple(( + operations, + event_handler, + ) for operations in operationses) + with state.lock: + call = state.channel.integrated_call(flags, method, host, deadline, + metadata, credentials, + operationses_and_tags) + if state.managed_calls == 0: + state.managed_calls = 1 + _run_channel_spin_thread(state) + else: + state.managed_calls += 1 + return call return create @@ -819,12 +794,9 @@ def _poll_connectivity(state, channel, initial_try_to_connect): callback_and_connectivity[1] = state.connectivity if callbacks: _spawn_delivery(state, callbacks) - completion_queue = cygrpc.CompletionQueue() while True: - channel.watch_connectivity_state(connectivity, - time.time() + 0.2, completion_queue, - None) - event = completion_queue.poll() + event = channel.watch_connectivity_state(connectivity, + time.time() + 0.2) with state.lock: if not state.callbacks_and_connectivities and not state.try_to_connect: state.polling = False @@ -855,10 +827,10 @@ def _moot(state): def _subscribe(state, callback, try_to_connect): with state.lock: if not state.callbacks_and_connectivities and not state.polling: - polling_thread = _common.CleanupThread( - lambda timeout: _moot(state), + polling_thread = threading.Thread( target=_poll_connectivity, args=(state, state.channel, bool(try_to_connect))) + polling_thread.daemon = True polling_thread.start() state.polling = True state.callbacks_and_connectivities.append([callback, None]) @@ -944,5 +916,28 @@ class Channel(grpc.Channel): self._channel, _channel_managed_call_management(self._call_state), _common.encode(method), request_serializer, response_deserializer) + def _close(self): + self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!') + _moot(self._connectivity_state) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._close() + return False + + def close(self): + self._close() + def __del__(self): + # TODO(https://github.com/grpc/grpc/issues/12531): Several releases + # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call + # here (or more likely, call self._close() here). We don't do this today + # because many valid use cases today allow the channel to be deleted + # immediately after stubs are created. After a sufficient period of time + # has passed for all users to be trusted to hang out to their channels + # for as long as they are in use and to close them after using them, + # then deletion of this grpc._channel.Channel instance can be made to + # effect closure of the underlying cygrpc.Channel instance. _moot(self._connectivity_state) diff --git a/src/python/grpcio/grpc/_common.py b/src/python/grpcio/grpc/_common.py index bbb69ad489..862987a0cd 100644 --- a/src/python/grpcio/grpc/_common.py +++ b/src/python/grpcio/grpc/_common.py @@ -14,8 +14,6 @@ """Shared implementation.""" import logging -import threading -import time import six @@ -101,35 +99,3 @@ def deserialize(serialized_message, deserializer): def fully_qualified_method(group, method): return '/{}/{}'.format(group, method) - - -class CleanupThread(threading.Thread): - """A threading.Thread subclass supporting custom behavior on join(). - - On Python Interpreter exit, Python will attempt to join outstanding threads - prior to garbage collection. We may need to do additional cleanup, and - we accomplish this by overriding the join() method. - """ - - def __init__(self, behavior, *args, **kwargs): - """Constructor. - - Args: - behavior (function): Function called on join() with a single - argument, timeout, indicating the maximum duration of - `behavior`, or None indicating `behavior` has no deadline. - `behavior` must be idempotent. - args: Positional arguments passed to threading.Thread constructor. - kwargs: Keyword arguments passed to threading.Thread constructor. - """ - super(CleanupThread, self).__init__(*args, **kwargs) - self._behavior = behavior - - def join(self, timeout=None): - start_time = time.time() - self._behavior(timeout) - end_time = time.time() - if timeout is not None: - timeout -= end_time - start_time - timeout = max(timeout, 0) - super(CleanupThread, self).join(timeout) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi index 1ba76b7f83..eefc685c0b 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi @@ -13,9 +13,59 @@ # limitations under the License. +cdef _check_call_error_no_metadata(c_call_error) + + +cdef _check_and_raise_call_error_no_metadata(c_call_error) + + +cdef _check_call_error(c_call_error, metadata) + + +cdef class _CallState: + + cdef grpc_call *c_call + cdef set due + + +cdef class _ChannelState: + + cdef object condition + cdef grpc_channel *c_channel + # A boolean field indicating that the channel is open (if True) or is being + # closed (i.e. a call to close is currently executing) or is closed (if + # False). + # TODO(https://github.com/grpc/grpc/issues/3064): Eliminate "is being closed" + # a state in which condition may be acquired by any thread, eliminate this + # field and just use the NULLness of c_channel as an indication that the + # channel is closed. + cdef object open + + # A dict from _BatchOperationTag to _CallState + cdef dict integrated_call_states + cdef grpc_completion_queue *c_call_completion_queue + + # A set of _CallState + cdef set segregated_call_states + + cdef set connectivity_due + cdef grpc_completion_queue *c_connectivity_completion_queue + + +cdef class IntegratedCall: + + cdef _ChannelState _channel_state + cdef _CallState _call_state + + +cdef class SegregatedCall: + + cdef _ChannelState _channel_state + cdef _CallState _call_state + cdef grpc_completion_queue *_c_completion_queue + + cdef class Channel: cdef grpc_arg_pointer_vtable _vtable - cdef grpc_channel *c_channel - cdef list references - cdef readonly _ArgumentsProcessor _arguments_processor + cdef _ChannelState _state diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index a3966497bc..72e74e84ae 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -14,82 +14,439 @@ cimport cpython +import threading + +_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = ( + 'Internal gRPC call error %d. ' + + 'Please report to https://github.com/grpc/grpc/issues') + + +cdef str _call_error_metadata(metadata): + return 'metadata was invalid: %s' % metadata + + +cdef str _call_error_no_metadata(c_call_error): + return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error + + +cdef str _call_error(c_call_error, metadata): + if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA: + return _call_error_metadata(metadata) + else: + return _call_error_no_metadata(c_call_error) + + +cdef _check_call_error_no_metadata(c_call_error): + if c_call_error != GRPC_CALL_OK: + return _INTERNAL_CALL_ERROR_MESSAGE_FORMAT % c_call_error + else: + return None + + +cdef _check_and_raise_call_error_no_metadata(c_call_error): + error = _check_call_error_no_metadata(c_call_error) + if error is not None: + raise ValueError(error) + + +cdef _check_call_error(c_call_error, metadata): + if c_call_error == GRPC_CALL_ERROR_INVALID_METADATA: + return _call_error_metadata(metadata) + else: + return _check_call_error_no_metadata(c_call_error) + + +cdef void _raise_call_error_no_metadata(c_call_error) except *: + raise ValueError(_call_error_no_metadata(c_call_error)) + + +cdef void _raise_call_error(c_call_error, metadata) except *: + raise ValueError(_call_error(c_call_error, metadata)) + + +cdef _destroy_c_completion_queue(grpc_completion_queue *c_completion_queue): + grpc_completion_queue_shutdown(c_completion_queue) + grpc_completion_queue_destroy(c_completion_queue) + + +cdef class _CallState: + + def __cinit__(self): + self.due = set() + + +cdef class _ChannelState: + + def __cinit__(self): + self.condition = threading.Condition() + self.open = True + self.integrated_call_states = {} + self.segregated_call_states = set() + self.connectivity_due = set() + + +cdef tuple _operate(grpc_call *c_call, object operations, object user_tag): + cdef grpc_call_error c_call_error + cdef _BatchOperationTag tag = _BatchOperationTag(user_tag, operations, None) + tag.prepare() + cpython.Py_INCREF(tag) + with nogil: + c_call_error = grpc_call_start_batch( + c_call, tag.c_ops, tag.c_nops, <cpython.PyObject *>tag, NULL) + return c_call_error, tag + + +cdef object _operate_from_integrated_call( + _ChannelState channel_state, _CallState call_state, object operations, + object user_tag): + cdef grpc_call_error c_call_error + cdef _BatchOperationTag tag + with channel_state.condition: + if call_state.due: + c_call_error, tag = _operate(call_state.c_call, operations, user_tag) + if c_call_error == GRPC_CALL_OK: + call_state.due.add(tag) + channel_state.integrated_call_states[tag] = call_state + return True + else: + _raise_call_error_no_metadata(c_call_error) + else: + return False + + +cdef object _operate_from_segregated_call( + _ChannelState channel_state, _CallState call_state, object operations, + object user_tag): + cdef grpc_call_error c_call_error + cdef _BatchOperationTag tag + with channel_state.condition: + if call_state.due: + c_call_error, tag = _operate(call_state.c_call, operations, user_tag) + if c_call_error == GRPC_CALL_OK: + call_state.due.add(tag) + return True + else: + _raise_call_error_no_metadata(c_call_error) + else: + return False + + +cdef _cancel( + _ChannelState channel_state, _CallState call_state, grpc_status_code code, + str details): + cdef grpc_call_error c_call_error + with channel_state.condition: + if call_state.due: + c_call_error = grpc_call_cancel_with_status( + call_state.c_call, code, _encode(details), NULL) + _check_and_raise_call_error_no_metadata(c_call_error) + + +cdef BatchOperationEvent _next_call_event( + _ChannelState channel_state, grpc_completion_queue *c_completion_queue, + on_success): + tag, event = _latent_event(c_completion_queue, None) + with channel_state.condition: + on_success(tag) + channel_state.condition.notify_all() + return event + + +# TODO(https://github.com/grpc/grpc/issues/14569): This could be a lot simpler. +cdef void _call( + _ChannelState channel_state, _CallState call_state, + grpc_completion_queue *c_completion_queue, on_success, int flags, method, + host, object deadline, CallCredentials credentials, + object operationses_and_user_tags, object metadata) except *: + """Invokes an RPC. + + Args: + channel_state: A _ChannelState with its "open" attribute set to True. RPCs + may not be invoked on a closed channel. + call_state: An empty _CallState to be altered (specifically assigned a + c_call and having its due set populated) if the RPC invocation is + successful. + c_completion_queue: A grpc_completion_queue to be used for the call's + operations. + on_success: A behavior to be called if attempting to start operations for + the call succeeds. If called the behavior will be called while holding the + channel_state condition and passed the tags associated with operations + that were successfully started for the call. + flags: Flags to be passed to gRPC Core as part of call creation. + method: The fully-qualified name of the RPC method being invoked. + host: A "host" string to be passed to gRPC Core as part of call creation. + deadline: A float for the deadline of the RPC, or None if the RPC is to have + no deadline. + credentials: A _CallCredentials for the RPC or None. + operationses_and_user_tags: A sequence of length-two sequences the first + element of which is a sequence of Operations and the second element of + which is an object to be used as a tag. A SendInitialMetadataOperation + must be present in the first element of this value. + metadata: The metadata for this call. + """ + cdef grpc_slice method_slice + cdef grpc_slice host_slice + cdef grpc_slice *host_slice_ptr + cdef grpc_call_credentials *c_call_credentials + cdef grpc_call_error c_call_error + cdef tuple error_and_wrapper_tag + cdef _BatchOperationTag wrapper_tag + with channel_state.condition: + if channel_state.open: + method_slice = _slice_from_bytes(method) + if host is None: + host_slice_ptr = NULL + else: + host_slice = _slice_from_bytes(host) + host_slice_ptr = &host_slice + call_state.c_call = grpc_channel_create_call( + channel_state.c_channel, NULL, flags, + c_completion_queue, method_slice, host_slice_ptr, + _timespec_from_time(deadline), NULL) + grpc_slice_unref(method_slice) + if host_slice_ptr: + grpc_slice_unref(host_slice) + if credentials is not None: + c_call_credentials = credentials.c() + c_call_error = grpc_call_set_credentials( + call_state.c_call, c_call_credentials) + grpc_call_credentials_release(c_call_credentials) + if c_call_error != GRPC_CALL_OK: + grpc_call_unref(call_state.c_call) + call_state.c_call = NULL + _raise_call_error_no_metadata(c_call_error) + started_tags = set() + for operations, user_tag in operationses_and_user_tags: + c_call_error, tag = _operate(call_state.c_call, operations, user_tag) + if c_call_error == GRPC_CALL_OK: + started_tags.add(tag) + else: + grpc_call_cancel(call_state.c_call, NULL) + grpc_call_unref(call_state.c_call) + call_state.c_call = NULL + _raise_call_error(c_call_error, metadata) + else: + call_state.due.update(started_tags) + on_success(started_tags) + else: + raise ValueError('Cannot invoke RPC on closed channel!') + +cdef void _process_integrated_call_tag( + _ChannelState state, _BatchOperationTag tag) except *: + cdef _CallState call_state = state.integrated_call_states.pop(tag) + call_state.due.remove(tag) + if not call_state.due: + grpc_call_unref(call_state.c_call) + call_state.c_call = NULL + + +cdef class IntegratedCall: + + def __cinit__(self, _ChannelState channel_state, _CallState call_state): + self._channel_state = channel_state + self._call_state = call_state + + def operate(self, operations, tag): + return _operate_from_integrated_call( + self._channel_state, self._call_state, operations, tag) + + def cancel(self, code, details): + _cancel(self._channel_state, self._call_state, code, details) + + +cdef IntegratedCall _integrated_call( + _ChannelState state, int flags, method, host, object deadline, + object metadata, CallCredentials credentials, operationses_and_user_tags): + call_state = _CallState() + + def on_success(started_tags): + for started_tag in started_tags: + state.integrated_call_states[started_tag] = call_state + + _call( + state, call_state, state.c_call_completion_queue, on_success, flags, + method, host, deadline, credentials, operationses_and_user_tags, metadata) + + return IntegratedCall(state, call_state) + + +cdef object _process_segregated_call_tag( + _ChannelState state, _CallState call_state, + grpc_completion_queue *c_completion_queue, _BatchOperationTag tag): + call_state.due.remove(tag) + if not call_state.due: + grpc_call_unref(call_state.c_call) + call_state.c_call = NULL + state.segregated_call_states.remove(call_state) + _destroy_c_completion_queue(c_completion_queue) + return True + else: + return False + + +cdef class SegregatedCall: + + def __cinit__(self, _ChannelState channel_state, _CallState call_state): + self._channel_state = channel_state + self._call_state = call_state + + def operate(self, operations, tag): + return _operate_from_segregated_call( + self._channel_state, self._call_state, operations, tag) + + def cancel(self, code, details): + _cancel(self._channel_state, self._call_state, code, details) + + def next_event(self): + def on_success(tag): + _process_segregated_call_tag( + self._channel_state, self._call_state, self._c_completion_queue, tag) + return _next_call_event( + self._channel_state, self._c_completion_queue, on_success) + + +cdef SegregatedCall _segregated_call( + _ChannelState state, int flags, method, host, object deadline, + object metadata, CallCredentials credentials, operationses_and_user_tags): + cdef _CallState call_state = _CallState() + cdef grpc_completion_queue *c_completion_queue = ( + grpc_completion_queue_create_for_next(NULL)) + cdef SegregatedCall segregated_call + + def on_success(started_tags): + state.segregated_call_states.add(call_state) + + try: + _call( + state, call_state, c_completion_queue, on_success, flags, method, host, + deadline, credentials, operationses_and_user_tags, metadata) + except: + _destroy_c_completion_queue(c_completion_queue) + raise + + segregated_call = SegregatedCall(state, call_state) + segregated_call._c_completion_queue = c_completion_queue + return segregated_call + + +cdef object _watch_connectivity_state( + _ChannelState state, grpc_connectivity_state last_observed_state, + object deadline): + cdef _ConnectivityTag tag = _ConnectivityTag(object()) + with state.condition: + if state.open: + cpython.Py_INCREF(tag) + grpc_channel_watch_connectivity_state( + state.c_channel, last_observed_state, _timespec_from_time(deadline), + state.c_connectivity_completion_queue, <cpython.PyObject *>tag) + state.connectivity_due.add(tag) + else: + raise ValueError('Cannot invoke RPC on closed channel!') + completed_tag, event = _latent_event( + state.c_connectivity_completion_queue, None) + with state.condition: + state.connectivity_due.remove(completed_tag) + state.condition.notify_all() + return event + + +cdef _close(_ChannelState state, grpc_status_code code, object details): + cdef _CallState call_state + encoded_details = _encode(details) + with state.condition: + if state.open: + state.open = False + for call_state in set(state.integrated_call_states.values()): + grpc_call_cancel_with_status( + call_state.c_call, code, encoded_details, NULL) + for call_state in state.segregated_call_states: + grpc_call_cancel_with_status( + call_state.c_call, code, encoded_details, NULL) + # TODO(https://github.com/grpc/grpc/issues/3064): Cancel connectivity + # watching. + + while state.integrated_call_states: + state.condition.wait() + while state.segregated_call_states: + state.condition.wait() + while state.connectivity_due: + state.condition.wait() + + _destroy_c_completion_queue(state.c_call_completion_queue) + _destroy_c_completion_queue(state.c_connectivity_completion_queue) + grpc_channel_destroy(state.c_channel) + state.c_channel = NULL + grpc_shutdown() + state.condition.notify_all() + else: + # Another call to close already completed in the past or is currently + # being executed in another thread. + while state.c_channel != NULL: + state.condition.wait() + cdef class Channel: - def __cinit__(self, bytes target, object arguments, - ChannelCredentials channel_credentials=None): + def __cinit__( + self, bytes target, object arguments, + ChannelCredentials channel_credentials): grpc_init() + self._state = _ChannelState() self._vtable.copy = &_copy_pointer self._vtable.destroy = &_destroy_pointer self._vtable.cmp = &_compare_pointer cdef _ArgumentsProcessor arguments_processor = _ArgumentsProcessor( arguments) cdef grpc_channel_args *c_arguments = arguments_processor.c(&self._vtable) - self.references = [] - c_target = target if channel_credentials is None: - self.c_channel = grpc_insecure_channel_create(c_target, c_arguments, NULL) + self._state.c_channel = grpc_insecure_channel_create( + <char *>target, c_arguments, NULL) else: c_channel_credentials = channel_credentials.c() - self.c_channel = grpc_secure_channel_create( - c_channel_credentials, c_target, c_arguments, NULL) + self._state.c_channel = grpc_secure_channel_create( + c_channel_credentials, <char *>target, c_arguments, NULL) grpc_channel_credentials_release(c_channel_credentials) - arguments_processor.un_c() - self.references.append(target) - self.references.append(arguments) - - def create_call(self, Call parent, int flags, - CompletionQueue queue not None, - method, host, object deadline): - if queue.is_shutting_down: - raise ValueError("queue must not be shutting down or shutdown") - cdef grpc_slice method_slice = _slice_from_bytes(method) - cdef grpc_slice host_slice - cdef grpc_slice *host_slice_ptr = NULL - if host is not None: - host_slice = _slice_from_bytes(host) - host_slice_ptr = &host_slice - cdef Call operation_call = Call() - operation_call.references = [self, queue] - cdef grpc_call *parent_call = NULL - if parent is not None: - parent_call = parent.c_call - operation_call.c_call = grpc_channel_create_call( - self.c_channel, parent_call, flags, - queue.c_completion_queue, method_slice, host_slice_ptr, - _timespec_from_time(deadline), NULL) - grpc_slice_unref(method_slice) - if host_slice_ptr: - grpc_slice_unref(host_slice) - return operation_call + self._state.c_call_completion_queue = ( + grpc_completion_queue_create_for_next(NULL)) + self._state.c_connectivity_completion_queue = ( + grpc_completion_queue_create_for_next(NULL)) + + def target(self): + cdef char *c_target + with self._state.condition: + c_target = grpc_channel_get_target(self._state.c_channel) + target = <bytes>c_target + gpr_free(c_target) + return target + + def integrated_call( + self, int flags, method, host, object deadline, object metadata, + CallCredentials credentials, operationses_and_tags): + return _integrated_call( + self._state, flags, method, host, deadline, metadata, credentials, + operationses_and_tags) + + def next_call_event(self): + def on_success(tag): + _process_integrated_call_tag(self._state, tag) + return _next_call_event( + self._state, self._state.c_call_completion_queue, on_success) + + def segregated_call( + self, int flags, method, host, object deadline, object metadata, + CallCredentials credentials, operationses_and_tags): + return _segregated_call( + self._state, flags, method, host, deadline, metadata, credentials, + operationses_and_tags) def check_connectivity_state(self, bint try_to_connect): - cdef grpc_connectivity_state result - with nogil: - result = grpc_channel_check_connectivity_state(self.c_channel, - try_to_connect) - return result + with self._state.condition: + return grpc_channel_check_connectivity_state( + self._state.c_channel, try_to_connect) def watch_connectivity_state( - self, grpc_connectivity_state last_observed_state, - object deadline, CompletionQueue queue not None, tag): - cdef _ConnectivityTag connectivity_tag = _ConnectivityTag(tag) - cpython.Py_INCREF(connectivity_tag) - grpc_channel_watch_connectivity_state( - self.c_channel, last_observed_state, _timespec_from_time(deadline), - queue.c_completion_queue, <cpython.PyObject *>connectivity_tag) + self, grpc_connectivity_state last_observed_state, object deadline): + return _watch_connectivity_state(self._state, last_observed_state, deadline) - def target(self): - cdef char *target = NULL - with nogil: - target = grpc_channel_get_target(self.c_channel) - result = <bytes>target - with nogil: - gpr_free(target) - return result - - def __dealloc__(self): - if self.c_channel != NULL: - grpc_channel_destroy(self.c_channel) - grpc_shutdown() + def close(self, code, details): + _close(self._state, code, details) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi index 5ea0287b81..9f06ce086e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pxd.pxi @@ -13,10 +13,16 @@ # limitations under the License. +cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline) + + +cdef _interpret_event(grpc_event c_event) + + cdef class CompletionQueue: cdef grpc_completion_queue *c_completion_queue cdef bint is_shutting_down cdef bint is_shutdown - cdef _interpret_event(self, grpc_event event) + cdef _interpret_event(self, grpc_event c_event) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 40496d1124..a2d765546a 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -20,6 +20,53 @@ import time cdef int _INTERRUPT_CHECK_PERIOD_MS = 200 +cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline): + cdef gpr_timespec c_increment + cdef gpr_timespec c_timeout + cdef gpr_timespec c_deadline + c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN) + if deadline is None: + c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) + else: + c_deadline = _timespec_from_time(deadline) + + with nogil: + while True: + c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment) + if gpr_time_cmp(c_timeout, c_deadline) > 0: + c_timeout = c_deadline + c_event = grpc_completion_queue_next(c_completion_queue, c_timeout, NULL) + if (c_event.type != GRPC_QUEUE_TIMEOUT or + gpr_time_cmp(c_timeout, c_deadline) == 0): + break + + # Handle any signals + with gil: + cpython.PyErr_CheckSignals() + return c_event + + +cdef _interpret_event(grpc_event c_event): + cdef _Tag tag + if c_event.type == GRPC_QUEUE_TIMEOUT: + # NOTE(nathaniel): For now we coopt ConnectivityEvent here. + return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None) + elif c_event.type == GRPC_QUEUE_SHUTDOWN: + # NOTE(nathaniel): For now we coopt ConnectivityEvent here. + return None, ConnectivityEvent(GRPC_QUEUE_SHUTDOWN, False, None) + else: + tag = <_Tag>c_event.tag + # We receive event tags only after they've been inc-ref'd elsewhere in + # the code. + cpython.Py_DECREF(tag) + return tag, tag.event(c_event) + + +cdef _latent_event(grpc_completion_queue *c_completion_queue, object deadline): + cdef grpc_event c_event = _next(c_completion_queue, deadline) + return _interpret_event(c_event) + + cdef class CompletionQueue: def __cinit__(self, shutdown_cq=False): @@ -36,48 +83,16 @@ cdef class CompletionQueue: self.is_shutting_down = False self.is_shutdown = False - cdef _interpret_event(self, grpc_event event): - cdef _Tag tag = None - if event.type == GRPC_QUEUE_TIMEOUT: - # NOTE(nathaniel): For now we coopt ConnectivityEvent here. - return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None) - elif event.type == GRPC_QUEUE_SHUTDOWN: + cdef _interpret_event(self, grpc_event c_event): + unused_tag, event = _interpret_event(c_event) + if event.completion_type == GRPC_QUEUE_SHUTDOWN: self.is_shutdown = True - # NOTE(nathaniel): For now we coopt ConnectivityEvent here. - return ConnectivityEvent(GRPC_QUEUE_TIMEOUT, True, None) - else: - tag = <_Tag>event.tag - # We receive event tags only after they've been inc-ref'd elsewhere in - # the code. - cpython.Py_DECREF(tag) - return tag.event(event) + return event + # We name this 'poll' to avoid problems with CPython's expectations for + # 'special' methods (like next and __next__). def poll(self, deadline=None): - # We name this 'poll' to avoid problems with CPython's expectations for - # 'special' methods (like next and __next__). - cdef gpr_timespec c_increment - cdef gpr_timespec c_timeout - cdef gpr_timespec c_deadline - if deadline is None: - c_deadline = gpr_inf_future(GPR_CLOCK_REALTIME) - else: - c_deadline = _timespec_from_time(deadline) - with nogil: - c_increment = gpr_time_from_millis(_INTERRUPT_CHECK_PERIOD_MS, GPR_TIMESPAN) - - while True: - c_timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c_increment) - if gpr_time_cmp(c_timeout, c_deadline) > 0: - c_timeout = c_deadline - event = grpc_completion_queue_next( - self.c_completion_queue, c_timeout, NULL) - if event.type != GRPC_QUEUE_TIMEOUT or gpr_time_cmp(c_timeout, c_deadline) == 0: - break; - - # Handle any signals - with gil: - cpython.PyErr_CheckSignals() - return self._interpret_event(event) + return self._interpret_event(_next(self.c_completion_queue, deadline)) def shutdown(self): with nogil: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index a4c0319553..2d6c900c54 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -291,6 +291,7 @@ cdef extern from "grpc/grpc.h": grpc_metadata_array *trailing_metadata grpc_status_code *status grpc_slice *status_details + char** error_string ctypedef struct grpc_op_data_recv_close_on_server: int *cancelled diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi index bfbe27785b..69a2a4989e 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi @@ -91,9 +91,11 @@ cdef class ReceiveStatusOnClientOperation(Operation): cdef grpc_metadata_array _c_trailing_metadata cdef grpc_status_code _c_code cdef grpc_slice _c_details + cdef const char* _c_error_string cdef tuple _trailing_metadata cdef object _code cdef str _details + cdef str _error_string cdef void c(self) cdef void un_c(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi index 239d0f3f95..454627f570 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi @@ -199,6 +199,8 @@ cdef class ReceiveStatusOnClientOperation(Operation): &self._c_code) self.c_op.data.receive_status_on_client.status_details = ( &self._c_details) + self.c_op.data.receive_status_on_client.error_string = ( + &self._c_error_string) cdef void un_c(self): self._trailing_metadata = _metadata(&self._c_trailing_metadata) @@ -206,6 +208,11 @@ cdef class ReceiveStatusOnClientOperation(Operation): self._code = self._c_code self._details = _decode(_slice_bytes(self._c_details)) grpc_slice_unref(self._c_details) + if self._c_error_string != NULL: + self._error_string = _decode(self._c_error_string) + gpr_free(<void*>self._c_error_string) + else: + self._error_string = "" def trailing_metadata(self): return self._trailing_metadata @@ -216,6 +223,9 @@ cdef class ReceiveStatusOnClientOperation(Operation): def details(self): return self._details + def error_string(self): + return self._error_string + cdef class ReceiveCloseOnServerOperation(Operation): diff --git a/src/python/grpcio/grpc/_grpcio_metadata.py b/src/python/grpcio/grpc/_grpcio_metadata.py index cb5da72f1f..ad53f60ad3 100644 --- a/src/python/grpcio/grpc/_grpcio_metadata.py +++ b/src/python/grpcio/grpc/_grpcio_metadata.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc/_grpcio_metadata.py.template`!!! -__version__ = """1.12.0.dev0""" +__version__ = """1.13.0.dev0""" diff --git a/src/python/grpcio/grpc/_interceptor.py b/src/python/grpcio/grpc/_interceptor.py index d029472c68..f465e35a9c 100644 --- a/src/python/grpcio/grpc/_interceptor.py +++ b/src/python/grpcio/grpc/_interceptor.py @@ -334,6 +334,19 @@ class _Channel(grpc.Channel): else: return thunk(method) + def _close(self): + self._channel.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._close() + return False + + def close(self): + self._channel.close() + def intercept_channel(channel, *interceptors): for interceptor in reversed(list(interceptors)): diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index c988e0c87c..d849cadbee 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -780,14 +780,8 @@ def _start(state): state.stage = _ServerStage.STARTED _request_call(state) - def cleanup_server(timeout): - if timeout is None: - _stop(state, _UNEXPECTED_EXIT_SERVER_GRACE).wait() - else: - _stop(state, timeout).wait() - - thread = _common.CleanupThread( - cleanup_server, target=_serve, args=(state,)) + thread = threading.Thread(target=_serve, args=(state,)) + thread.daemon = True thread.start() diff --git a/src/python/grpcio/grpc/beta/_server_adaptations.py b/src/python/grpcio/grpc/beta/_server_adaptations.py index 3c04fd7639..ccafec8951 100644 --- a/src/python/grpcio/grpc/beta/_server_adaptations.py +++ b/src/python/grpcio/grpc/beta/_server_adaptations.py @@ -168,11 +168,8 @@ def _run_request_pipe_thread(request_iterator, request_consumer, return request_consumer.terminate() - def stop_request_pipe(timeout): # pylint: disable=unused-argument - thread_joined.set() - - request_pipe_thread = _common.CleanupThread( - stop_request_pipe, target=pipe_requests) + request_pipe_thread = threading.Thread(target=pipe_requests) + request_pipe_thread.daemon = True request_pipe_thread.start() diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 81e6cb11e8..de5d2ba33f 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -28,7 +28,6 @@ CORE_SOURCE_FILES = [ 'src/core/lib/gpr/env_linux.cc', 'src/core/lib/gpr/env_posix.cc', 'src/core/lib/gpr/env_windows.cc', - 'src/core/lib/gpr/fork.cc', 'src/core/lib/gpr/host_port.cc', 'src/core/lib/gpr/log.cc', 'src/core/lib/gpr/log_android.cc', @@ -53,6 +52,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/gpr/tmpfile_posix.cc', 'src/core/lib/gpr/tmpfile_windows.cc', 'src/core/lib/gpr/wrap_memcpy.cc', + 'src/core/lib/gprpp/fork.cc', 'src/core/lib/gprpp/thd_posix.cc', 'src/core/lib/gprpp/thd_windows.cc', 'src/core/lib/profiling/basic_timers.cc', @@ -64,7 +64,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/channel/channel_stack.cc', 'src/core/lib/channel/channel_stack_builder.cc', 'src/core/lib/channel/channel_trace.cc', - 'src/core/lib/channel/channel_trace_registry.cc', + 'src/core/lib/channel/channelz_registry.cc', 'src/core/lib/channel/connected_channel.cc', 'src/core/lib/channel/handshaker.cc', 'src/core/lib/channel/handshaker_factory.cc', @@ -296,7 +296,6 @@ CORE_SOURCE_FILES = [ 'third_party/nanopb/pb_decode.c', 'third_party/nanopb/pb_encode.c', 'src/core/tsi/transport_security.cc', - 'src/core/tsi/transport_security_adapter.cc', 'src/core/ext/transport/chttp2/client/insecure/channel_create.cc', 'src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc', 'src/core/ext/transport/chttp2/client/authority.cc', @@ -344,7 +343,6 @@ CORE_SOURCE_FILES = [ 'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c', 'src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc', 'src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc', - 'src/core/ext/filters/client_channel/lb_policy/subchannel_list.cc', 'src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc', 'src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc', diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py index de5a780abd..57dc26dbeb 100644 --- a/src/python/grpcio/grpc_version.py +++ b/src/python/grpcio/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!! -VERSION = '1.12.0.dev0' +VERSION = '1.13.0.dev0' diff --git a/src/python/grpcio_health_checking/grpc_version.py b/src/python/grpcio_health_checking/grpc_version.py index afcd316e5c..ba0d4a3b6d 100644 --- a/src/python/grpcio_health_checking/grpc_version.py +++ b/src/python/grpcio_health_checking/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_health_checking/grpc_version.py.template`!!! -VERSION = '1.12.0.dev0' +VERSION = '1.13.0.dev0' diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py index 60d309ec65..35c09827ba 100644 --- a/src/python/grpcio_health_checking/setup.py +++ b/src/python/grpcio_health_checking/setup.py @@ -57,7 +57,7 @@ PACKAGE_DIRECTORIES = { } INSTALL_REQUIRES = ( - 'protobuf>=3.5.0.post1', + 'protobuf>=3.5.2.post1', 'grpcio>={version}'.format(version=grpc_version.VERSION), ) diff --git a/src/python/grpcio_reflection/grpc_version.py b/src/python/grpcio_reflection/grpc_version.py index 824b73201d..ea2878d9ee 100644 --- a/src/python/grpcio_reflection/grpc_version.py +++ b/src/python/grpcio_reflection/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_reflection/grpc_version.py.template`!!! -VERSION = '1.12.0.dev0' +VERSION = '1.13.0.dev0' diff --git a/src/python/grpcio_reflection/setup.py b/src/python/grpcio_reflection/setup.py index 10c4c38f19..589d0ff556 100644 --- a/src/python/grpcio_reflection/setup.py +++ b/src/python/grpcio_reflection/setup.py @@ -58,7 +58,7 @@ PACKAGE_DIRECTORIES = { } INSTALL_REQUIRES = ( - 'protobuf>=3.5.0.post1', + 'protobuf>=3.5.2.post1', 'grpcio>={version}'.format(version=grpc_version.VERSION), ) diff --git a/src/python/grpcio_testing/grpc_testing/_channel/_channel.py b/src/python/grpcio_testing/grpc_testing/_channel/_channel.py index b015b8d738..0c1941e6be 100644 --- a/src/python/grpcio_testing/grpc_testing/_channel/_channel.py +++ b/src/python/grpcio_testing/grpc_testing/_channel/_channel.py @@ -56,6 +56,21 @@ class TestingChannel(grpc_testing.Channel): response_deserializer=None): return _multi_callable.StreamStream(method, self._state) + def _close(self): + # TODO(https://github.com/grpc/grpc/issues/12531): Decide what + # action to take here, if any? + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._close() + return False + + def close(self): + self._close() + def take_unary_unary(self, method_descriptor): return _channel_rpc.unary_unary(self._state, method_descriptor) diff --git a/src/python/grpcio_testing/grpc_version.py b/src/python/grpcio_testing/grpc_version.py index 5b1f4c4cc0..02f19f2283 100644 --- a/src/python/grpcio_testing/grpc_version.py +++ b/src/python/grpcio_testing/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_testing/grpc_version.py.template`!!! -VERSION = '1.12.0.dev0' +VERSION = '1.13.0.dev0' diff --git a/src/python/grpcio_testing/setup.py b/src/python/grpcio_testing/setup.py index 5a9d593ec1..eb480a5464 100644 --- a/src/python/grpcio_testing/setup.py +++ b/src/python/grpcio_testing/setup.py @@ -29,7 +29,7 @@ PACKAGE_DIRECTORIES = { } INSTALL_REQUIRES = ( - 'protobuf>=3.5.0.post1', + 'protobuf>=3.5.2.post1', 'grpcio>={version}'.format(version=grpc_version.VERSION), ) diff --git a/src/python/grpcio_tests/grpc_version.py b/src/python/grpcio_tests/grpc_version.py index 382f95018e..9d2e41644e 100644 --- a/src/python/grpcio_tests/grpc_version.py +++ b/src/python/grpcio_tests/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_tests/grpc_version.py.template`!!! -VERSION = '1.12.0.dev0' +VERSION = '1.13.0.dev0' diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py index 4e0f6726fa..1262e48571 100644 --- a/src/python/grpcio_tests/setup.py +++ b/src/python/grpcio_tests/setup.py @@ -37,13 +37,16 @@ PACKAGE_DIRECTORIES = { } INSTALL_REQUIRES = ( - 'coverage>=4.0', 'enum34>=1.0.4', 'futures>=2.2.0', + 'coverage>=4.0', 'enum34>=1.0.4', 'grpcio>={version}'.format(version=grpc_version.VERSION), 'grpcio-tools>={version}'.format(version=grpc_version.VERSION), 'grpcio-health-checking>={version}'.format(version=grpc_version.VERSION), - 'oauth2client>=1.4.7', 'protobuf>=3.5.0.post1', 'six>=1.10', + 'oauth2client>=1.4.7', 'protobuf>=3.5.2.post1', 'six>=1.10', 'google-auth>=1.0.0', 'requests>=2.14.2') +if not PY3: + INSTALL_REQUIRES += ('futures>=2.2.0',) + COMMAND_CLASS = { # Run `preprocess` *before* doing any packaging! 'preprocess': commands.GatherProto, diff --git a/src/python/grpcio_tests/tests/_loader.py b/src/python/grpcio_tests/tests/_loader.py index 31680916b4..be0af64646 100644 --- a/src/python/grpcio_tests/tests/_loader.py +++ b/src/python/grpcio_tests/tests/_loader.py @@ -54,7 +54,7 @@ class Loader(object): for module in modules: try: package_paths = module.__path__ - except: + except AttributeError: continue self.walk_packages(package_paths) coverage_context.stop() diff --git a/src/python/grpcio_tests/tests/_result.py b/src/python/grpcio_tests/tests/_result.py index 9907c4e1f9..b105f18e78 100644 --- a/src/python/grpcio_tests/tests/_result.py +++ b/src/python/grpcio_tests/tests/_result.py @@ -46,7 +46,7 @@ class CaseResult( None. """ - class Kind: + class Kind(object): UNTESTED = 'untested' RUNNING = 'running' ERROR = 'error' @@ -257,7 +257,7 @@ class CoverageResult(AugmentedResult): #coverage.Coverage().combine() -class _Colors: +class _Colors(object): """Namespaced constants for terminal color magic numbers.""" HEADER = '\033[95m' INFO = '\033[94m' diff --git a/src/python/grpcio_tests/tests/interop/client.py b/src/python/grpcio_tests/tests/interop/client.py index 3780ed9020..698c37017f 100644 --- a/src/python/grpcio_tests/tests/interop/client.py +++ b/src/python/grpcio_tests/tests/interop/client.py @@ -66,10 +66,6 @@ def _args(): return parser.parse_args() -def _application_default_credentials(): - return oauth2client_client.GoogleCredentials.get_application_default() - - def _stub(args): target = '{}:{}'.format(args.server_host, args.server_port) if args.test_case == 'oauth2_auth_token': diff --git a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py index ad0ecf0079..b46e53315e 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py @@ -329,9 +329,7 @@ class PythonPluginTest(unittest.TestCase): _packagify(self._python_out) - with _system_path([ - self._python_out, - ]): + with _system_path([self._python_out]): self._payload_pb2 = importlib.import_module(_PAYLOAD_PB2) self._requests_pb2 = importlib.import_module(_REQUESTS_PB2) self._responses_pb2 = importlib.import_module(_RESPONSES_PB2) diff --git a/src/python/grpcio_tests/tests/qps/benchmark_client.py b/src/python/grpcio_tests/tests/qps/benchmark_client.py index e6392a8b8c..0488450740 100644 --- a/src/python/grpcio_tests/tests/qps/benchmark_client.py +++ b/src/python/grpcio_tests/tests/qps/benchmark_client.py @@ -22,7 +22,7 @@ from six.moves import queue import grpc from src.proto.grpc.testing import messages_pb2 -from src.proto.grpc.testing import services_pb2_grpc +from src.proto.grpc.testing import benchmark_service_pb2_grpc from tests.unit import resources from tests.unit import test_common @@ -58,7 +58,8 @@ class BenchmarkClient: if config.payload_config.WhichOneof('payload') == 'simple_params': self._generic = False - self._stub = services_pb2_grpc.BenchmarkServiceStub(channel) + self._stub = benchmark_service_pb2_grpc.BenchmarkServiceStub( + channel) payload = messages_pb2.Payload( body='\0' * config.payload_config.simple_params.req_size) self._request = messages_pb2.SimpleRequest( diff --git a/src/python/grpcio_tests/tests/qps/benchmark_server.py b/src/python/grpcio_tests/tests/qps/benchmark_server.py index bb07844491..2bd89cbbdf 100644 --- a/src/python/grpcio_tests/tests/qps/benchmark_server.py +++ b/src/python/grpcio_tests/tests/qps/benchmark_server.py @@ -13,10 +13,10 @@ # limitations under the License. from src.proto.grpc.testing import messages_pb2 -from src.proto.grpc.testing import services_pb2_grpc +from src.proto.grpc.testing import benchmark_service_pb2_grpc -class BenchmarkServer(services_pb2_grpc.BenchmarkServiceServicer): +class BenchmarkServer(benchmark_service_pb2_grpc.BenchmarkServiceServicer): """Synchronous Server implementation for the Benchmark service.""" def UnaryCall(self, request, context): @@ -29,7 +29,8 @@ class BenchmarkServer(services_pb2_grpc.BenchmarkServiceServicer): yield messages_pb2.SimpleResponse(payload=payload) -class GenericBenchmarkServer(services_pb2_grpc.BenchmarkServiceServicer): +class GenericBenchmarkServer( + benchmark_service_pb2_grpc.BenchmarkServiceServicer): """Generic Server implementation for the Benchmark service.""" def __init__(self, resp_size): diff --git a/src/python/grpcio_tests/tests/qps/qps_worker.py b/src/python/grpcio_tests/tests/qps/qps_worker.py index 54f69db109..c33d013882 100644 --- a/src/python/grpcio_tests/tests/qps/qps_worker.py +++ b/src/python/grpcio_tests/tests/qps/qps_worker.py @@ -17,7 +17,7 @@ import argparse import time import grpc -from src.proto.grpc.testing import services_pb2_grpc +from src.proto.grpc.testing import worker_service_pb2_grpc from tests.qps import worker_server from tests.unit import test_common @@ -26,7 +26,8 @@ from tests.unit import test_common def run_worker_server(port): server = test_common.test_server() servicer = worker_server.WorkerServer() - services_pb2_grpc.add_WorkerServiceServicer_to_server(servicer, server) + worker_service_pb2_grpc.add_WorkerServiceServicer_to_server( + servicer, server) server.add_insecure_port('[::]:{}'.format(port)) server.start() servicer.wait_for_quit() diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py index 41e2403c8f..db145fbf64 100644 --- a/src/python/grpcio_tests/tests/qps/worker_server.py +++ b/src/python/grpcio_tests/tests/qps/worker_server.py @@ -20,7 +20,7 @@ import time from concurrent import futures import grpc from src.proto.grpc.testing import control_pb2 -from src.proto.grpc.testing import services_pb2_grpc +from src.proto.grpc.testing import worker_service_pb2_grpc from src.proto.grpc.testing import stats_pb2 from tests.qps import benchmark_client @@ -31,7 +31,7 @@ from tests.unit import resources from tests.unit import test_common -class WorkerServer(services_pb2_grpc.WorkerServiceServicer): +class WorkerServer(worker_service_pb2_grpc.WorkerServiceServicer): """Python Worker Server implementation.""" def __init__(self): @@ -72,7 +72,7 @@ class WorkerServer(services_pb2_grpc.WorkerServiceServicer): server = test_common.test_server(max_workers=server_threads) if config.server_type == control_pb2.ASYNC_SERVER: servicer = benchmark_server.BenchmarkServer() - services_pb2_grpc.add_BenchmarkServiceServicer_to_server( + worker_service_pb2_grpc.add_BenchmarkServiceServicer_to_server( servicer, server) elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER: resp_size = config.payload_config.bytebuf_params.resp_size diff --git a/src/python/grpcio_tests/tests/stress/test_runner.py b/src/python/grpcio_tests/tests/stress/test_runner.py index d5038e3ba2..764cda17fb 100644 --- a/src/python/grpcio_tests/tests/stress/test_runner.py +++ b/src/python/grpcio_tests/tests/stress/test_runner.py @@ -50,7 +50,7 @@ class TestRunner(threading.Thread): test_case.test_interoperability(self._stub, None) end_time = time.time() self._histogram.add((end_time - start_time) * 1e9) - except Exception as e: + except Exception as e: # pylint: disable=broad-except traceback.print_exc() self._exception_queue.put( Exception("An exception occured during test {}" diff --git a/src/python/grpcio_tests/tests/testing/_client_application.py b/src/python/grpcio_tests/tests/testing/_client_application.py index 7d0d74c8c4..3ddeba2373 100644 --- a/src/python/grpcio_tests/tests/testing/_client_application.py +++ b/src/python/grpcio_tests/tests/testing/_client_application.py @@ -215,30 +215,6 @@ def _run_infinite_request_stream(stub): return _UNSATISFACTORY_OUTCOME -def run(scenario, channel): - stub = services_pb2_grpc.FirstServiceStub(channel) - try: - if scenario is Scenario.UNARY_UNARY: - return _run_unary_unary(stub) - elif scenario is Scenario.UNARY_STREAM: - return _run_unary_stream(stub) - elif scenario is Scenario.STREAM_UNARY: - return _run_stream_unary(stub) - elif scenario is Scenario.STREAM_STREAM: - return _run_stream_stream(stub) - elif scenario is Scenario.CONCURRENT_STREAM_UNARY: - return _run_concurrent_stream_unary(stub) - elif scenario is Scenario.CONCURRENT_STREAM_STREAM: - return _run_concurrent_stream_stream(stub) - elif scenario is Scenario.CANCEL_UNARY_UNARY: - return _run_cancel_unary_unary(stub) - elif scenario is Scenario.INFINITE_REQUEST_STREAM: - return _run_infinite_request_stream(stub) - except grpc.RpcError as rpc_error: - return Outcome(Outcome.Kind.RPC_ERROR, rpc_error.code(), - rpc_error.details()) - - _IMPLEMENTATIONS = { Scenario.UNARY_UNARY: _run_unary_unary, Scenario.UNARY_STREAM: _run_unary_stream, diff --git a/src/python/grpcio_tests/tests/testing/_server_application.py b/src/python/grpcio_tests/tests/testing/_server_application.py index 02769ca68d..243c385daf 100644 --- a/src/python/grpcio_tests/tests/testing/_server_application.py +++ b/src/python/grpcio_tests/tests/testing/_server_application.py @@ -38,7 +38,7 @@ class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer): context.set_code(grpc.StatusCode.INVALID_ARGUMENT) context.set_details('Something is wrong with your request!') return - yield services_pb2.Strange() + yield services_pb2.Strange() # pylint: disable=unreachable def StreUn(self, request_iterator, context): context.send_initial_metadata((( diff --git a/src/python/grpcio_tests/tests/testing/_server_test.py b/src/python/grpcio_tests/tests/testing/_server_test.py index 4f4abd7708..88e3a79ae5 100644 --- a/src/python/grpcio_tests/tests/testing/_server_test.py +++ b/src/python/grpcio_tests/tests/testing/_server_test.py @@ -21,13 +21,8 @@ import grpc_testing from tests.testing import _application_common from tests.testing import _application_testing_common from tests.testing import _server_application -from tests.testing.proto import services_pb2 -# TODO(https://github.com/google/protobuf/issues/3452): Drop this skip. -@unittest.skipIf( - services_pb2.DESCRIPTOR.services_by_name.get('FirstService') is None, - 'Fix protobuf issue 3452!') class FirstServiceServicerTest(unittest.TestCase): def setUp(self): diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index d38ee517f0..0d94426413 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -25,6 +25,7 @@ "unit._auth_test.AccessTokenAuthMetadataPluginTest", "unit._auth_test.GoogleCallCredentialsTest", "unit._channel_args_test.ChannelArgsTest", + "unit._channel_close_test.ChannelCloseTest", "unit._channel_connectivity_test.ChannelConnectivityTest", "unit._channel_ready_future_test.ChannelReadyFutureTest", "unit._compression_test.CompressionTest", @@ -52,7 +53,6 @@ "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestCertConfigReuse", "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithClientAuth", "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithoutClientAuth", - "unit._thread_cleanup_test.CleanupThreadTest", "unit.beta._beta_features_test.BetaFeaturesTest", "unit.beta._beta_features_test.ContextManagementAndLifecycleTest", "unit.beta._connectivity_channel_test.ConnectivityStatesTest", diff --git a/src/python/grpcio_tests/tests/unit/_channel_close_test.py b/src/python/grpcio_tests/tests/unit/_channel_close_test.py new file mode 100644 index 0000000000..af3a9ee1ee --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_channel_close_test.py @@ -0,0 +1,185 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests server and client side compression.""" + +import threading +import time +import unittest + +import grpc + +from tests.unit import test_common +from tests.unit.framework.common import test_constants + +_BEAT = 0.5 +_SOME_TIME = 5 +_MORE_TIME = 10 + + +class _MethodHandler(grpc.RpcMethodHandler): + + request_streaming = True + response_streaming = True + request_deserializer = None + response_serializer = None + + def stream_stream(self, request_iterator, servicer_context): + for request in request_iterator: + yield request * 2 + + +_METHOD_HANDLER = _MethodHandler() + + +class _GenericHandler(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + return _METHOD_HANDLER + + +_GENERIC_HANDLER = _GenericHandler() + + +class _Pipe(object): + + def __init__(self, values): + self._condition = threading.Condition() + self._values = list(values) + self._open = True + + def __iter__(self): + return self + + def _next(self): + with self._condition: + while not self._values and self._open: + self._condition.wait() + if self._values: + return self._values.pop(0) + else: + raise StopIteration() + + def next(self): + return self._next() + + def __next__(self): + return self._next() + + def add(self, value): + with self._condition: + self._values.append(value) + self._condition.notify() + + def close(self): + with self._condition: + self._open = False + self._condition.notify() + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.close() + + +class ChannelCloseTest(unittest.TestCase): + + def setUp(self): + self._server = test_common.test_server( + max_workers=test_constants.THREAD_CONCURRENCY) + self._server.add_generic_rpc_handlers((_GENERIC_HANDLER,)) + self._port = self._server.add_insecure_port('[::]:0') + self._server.start() + + def tearDown(self): + self._server.stop(None) + + def test_close_immediately_after_call_invocation(self): + channel = grpc.insecure_channel('localhost:{}'.format(self._port)) + multi_callable = channel.stream_stream('Meffod') + request_iterator = _Pipe(()) + response_iterator = multi_callable(request_iterator) + channel.close() + request_iterator.close() + + self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED) + + def test_close_while_call_active(self): + channel = grpc.insecure_channel('localhost:{}'.format(self._port)) + multi_callable = channel.stream_stream('Meffod') + request_iterator = _Pipe((b'abc',)) + response_iterator = multi_callable(request_iterator) + next(response_iterator) + channel.close() + request_iterator.close() + + self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED) + + def test_context_manager_close_while_call_active(self): + with grpc.insecure_channel('localhost:{}'.format( + self._port)) as channel: # pylint: disable=bad-continuation + multi_callable = channel.stream_stream('Meffod') + request_iterator = _Pipe((b'abc',)) + response_iterator = multi_callable(request_iterator) + next(response_iterator) + request_iterator.close() + + self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED) + + def test_context_manager_close_while_many_calls_active(self): + with grpc.insecure_channel('localhost:{}'.format( + self._port)) as channel: # pylint: disable=bad-continuation + multi_callable = channel.stream_stream('Meffod') + request_iterators = tuple( + _Pipe((b'abc',)) + for _ in range(test_constants.THREAD_CONCURRENCY)) + response_iterators = [] + for request_iterator in request_iterators: + response_iterator = multi_callable(request_iterator) + next(response_iterator) + response_iterators.append(response_iterator) + for request_iterator in request_iterators: + request_iterator.close() + + for response_iterator in response_iterators: + self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED) + + def test_many_concurrent_closes(self): + channel = grpc.insecure_channel('localhost:{}'.format(self._port)) + multi_callable = channel.stream_stream('Meffod') + request_iterator = _Pipe((b'abc',)) + response_iterator = multi_callable(request_iterator) + next(response_iterator) + start = time.time() + end = start + _MORE_TIME + + def sleep_some_time_then_close(): + time.sleep(_SOME_TIME) + channel.close() + + for _ in range(test_constants.THREAD_CONCURRENCY): + close_thread = threading.Thread(target=sleep_some_time_then_close) + close_thread.start() + while True: + request_iterator.add(b'def') + time.sleep(_BEAT) + if end < time.time(): + break + request_iterator.close() + + self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py index 7550cd39ba..0b11f03adf 100644 --- a/src/python/grpcio_tests/tests/unit/_compression_test.py +++ b/src/python/grpcio_tests/tests/unit/_compression_test.py @@ -52,9 +52,9 @@ class _MethodHandler(grpc.RpcMethodHandler): self.stream_unary = None self.stream_stream = None if self.request_streaming and self.response_streaming: - self.stream_stream = lambda x, y: handle_stream(x, y) + self.stream_stream = handle_stream elif not self.request_streaming and not self.response_streaming: - self.unary_unary = lambda x, y: handle_unary(x, y) + self.unary_unary = handle_unary class _GenericHandler(grpc.GenericRpcHandler): diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py index 3765ce4fb0..578a3d79ad 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py @@ -19,6 +19,7 @@ import unittest from grpc._cython import cygrpc from grpc.framework.foundation import logging_pool from tests.unit.framework.common import test_constants +from tests.unit._cython import test_utilities _EMPTY_FLAGS = 0 _EMPTY_METADATA = () @@ -30,6 +31,8 @@ _RECEIVE_MESSAGE_TAG = 'receive_message' _SERVER_COMPLETE_CALL_TAG = 'server_complete_call' _SUCCESS_CALL_FRACTION = 1.0 / 8.0 +_SUCCESSFUL_CALLS = int(test_constants.RPC_CONCURRENCY * _SUCCESS_CALL_FRACTION) +_UNSUCCESSFUL_CALLS = test_constants.RPC_CONCURRENCY - _SUCCESSFUL_CALLS class _State(object): @@ -43,7 +46,7 @@ class _State(object): def _is_cancellation_event(event): return (event.tag is _RECEIVE_CLOSE_ON_SERVER_TAG and - event.batch_operations[0].received_cancelled) + event.batch_operations[0].cancelled()) class _Handler(object): @@ -150,7 +153,8 @@ class CancelManyCallsTest(unittest.TestCase): server.register_completion_queue(server_completion_queue) port = server.add_http2_port(b'[::]:0') server.start() - channel = cygrpc.Channel('localhost:{}'.format(port).encode(), None) + channel = cygrpc.Channel('localhost:{}'.format(port).encode(), None, + None) state = _State() @@ -165,31 +169,33 @@ class CancelManyCallsTest(unittest.TestCase): client_condition = threading.Condition() client_due = set() - client_completion_queue = cygrpc.CompletionQueue() - client_driver = _QueueDriver(client_condition, client_completion_queue, - client_due) - client_driver.start() with client_condition: client_calls = [] for index in range(test_constants.RPC_CONCURRENCY): - client_call = channel.create_call(None, _EMPTY_FLAGS, - client_completion_queue, - b'/twinkies', None, None) - operations = ( - cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.SendMessageOperation(b'\x45\x56', _EMPTY_FLAGS), - cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), - cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), - cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), - ) tag = 'client_complete_call_{0:04d}_tag'.format(index) - client_call.start_client_batch(operations, tag) + client_call = channel.integrated_call( + _EMPTY_FLAGS, b'/twinkies', None, None, _EMPTY_METADATA, + None, (( + ( + cygrpc.SendInitialMetadataOperation( + _EMPTY_METADATA, _EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'\x45\x56', + _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation( + _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ), + tag, + ),)) client_due.add(tag) client_calls.append(client_call) + client_events_future = test_utilities.SimpleFuture( + lambda: tuple(channel.next_call_event() for _ in range(_SUCCESSFUL_CALLS))) + with state.condition: while True: if state.parked_handlers < test_constants.THREAD_CONCURRENCY: @@ -201,12 +207,14 @@ class CancelManyCallsTest(unittest.TestCase): state.condition.notify_all() break - client_driver.events( - test_constants.RPC_CONCURRENCY * _SUCCESS_CALL_FRACTION) + client_events_future.result() with client_condition: for client_call in client_calls: - client_call.cancel() + client_call.cancel(cygrpc.StatusCode.cancelled, 'Cancelled!') + for _ in range(_UNSUCCESSFUL_CALLS): + channel.next_call_event() + channel.close(cygrpc.StatusCode.unknown, 'Cancelled on channel close!') with state.condition: server.shutdown(server_completion_queue, _SERVER_SHUTDOWN_TAG) diff --git a/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py b/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py index 7305d0fa3f..d95286071d 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py @@ -21,25 +21,20 @@ from grpc._cython import cygrpc from tests.unit.framework.common import test_constants -def _channel_and_completion_queue(): - channel = cygrpc.Channel(b'localhost:54321', ()) - completion_queue = cygrpc.CompletionQueue() - return channel, completion_queue +def _channel(): + return cygrpc.Channel(b'localhost:54321', (), None) -def _connectivity_loop(channel, completion_queue): +def _connectivity_loop(channel): for _ in range(100): connectivity = channel.check_connectivity_state(True) - channel.watch_connectivity_state(connectivity, - time.time() + 0.2, completion_queue, - None) - completion_queue.poll() + channel.watch_connectivity_state(connectivity, time.time() + 0.2) def _create_loop_destroy(): - channel, completion_queue = _channel_and_completion_queue() - _connectivity_loop(channel, completion_queue) - completion_queue.shutdown() + channel = _channel() + _connectivity_loop(channel) + channel.close(cygrpc.StatusCode.ok, 'Channel close!') def _in_parallel(behavior, arguments): @@ -55,12 +50,9 @@ def _in_parallel(behavior, arguments): class ChannelTest(unittest.TestCase): def test_single_channel_lonely_connectivity(self): - channel, completion_queue = _channel_and_completion_queue() - _in_parallel(_connectivity_loop, ( - channel, - completion_queue, - )) - completion_queue.shutdown() + channel = _channel() + _connectivity_loop(channel) + channel.close(cygrpc.StatusCode.ok, 'Channel close!') def test_multiple_channels_lonely_connectivity(self): _in_parallel(_create_loop_destroy, ()) diff --git a/src/python/grpcio_tests/tests/unit/_cython/_common.py b/src/python/grpcio_tests/tests/unit/_cython/_common.py index 7fd3d19b4e..d8210f36f8 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_common.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_common.py @@ -100,7 +100,8 @@ class RpcTest(object): self.server.register_completion_queue(self.server_completion_queue) port = self.server.add_http2_port(b'[::]:0') self.server.start() - self.channel = cygrpc.Channel('localhost:{}'.format(port).encode(), []) + self.channel = cygrpc.Channel('localhost:{}'.format(port).encode(), [], + None) self._server_shutdown_tag = 'server_shutdown_tag' self.server_condition = threading.Condition() diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py index 7caa98f72d..8a721788f4 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py @@ -19,6 +19,7 @@ import unittest from grpc._cython import cygrpc from tests.unit._cython import _common +from tests.unit._cython import test_utilities class Test(_common.RpcTest, unittest.TestCase): @@ -41,31 +42,27 @@ class Test(_common.RpcTest, unittest.TestCase): server_request_call_tag, }) - client_call = self.channel.create_call(None, _common.EMPTY_FLAGS, - self.client_completion_queue, - b'/twinkies', None, None) client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' client_complete_rpc_tag = 'client_complete_rpc_tag' - with self.client_condition: - client_receive_initial_metadata_start_batch_result = ( - client_call.start_client_batch([ - cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), - ], client_receive_initial_metadata_tag)) - self.assertEqual(cygrpc.CallError.ok, - client_receive_initial_metadata_start_batch_result) - client_complete_rpc_start_batch_result = client_call.start_client_batch( + client_call = self.channel.integrated_call( + _common.EMPTY_FLAGS, b'/twinkies', None, None, + _common.INVOCATION_METADATA, None, [( [ - cygrpc.SendInitialMetadataOperation( - _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), - cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS), - ], client_complete_rpc_tag) - self.assertEqual(cygrpc.CallError.ok, - client_complete_rpc_start_batch_result) - self.client_driver.add_due({ + cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), + ], client_receive_initial_metadata_tag, - client_complete_rpc_tag, - }) + )]) + client_call.operate([ + cygrpc.SendInitialMetadataOperation(_common.INVOCATION_METADATA, + _common.EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS), + ], client_complete_rpc_tag) + + client_events_future = test_utilities.SimpleFuture( + lambda: [ + self.channel.next_call_event(), + self.channel.next_call_event(),]) server_request_call_event = self.server_driver.event_with_tag( server_request_call_tag) @@ -96,20 +93,23 @@ class Test(_common.RpcTest, unittest.TestCase): server_complete_rpc_event = server_call_driver.event_with_tag( server_complete_rpc_tag) - client_receive_initial_metadata_event = self.client_driver.event_with_tag( - client_receive_initial_metadata_tag) - client_complete_rpc_event = self.client_driver.event_with_tag( - client_complete_rpc_tag) + client_events = client_events_future.result() + if client_events[0].tag is client_receive_initial_metadata_tag: + client_receive_initial_metadata_event = client_events[0] + client_complete_rpc_event = client_events[1] + else: + client_complete_rpc_event = client_events[0] + client_receive_initial_metadata_event = client_events[1] return ( _common.OperationResult(server_request_call_start_batch_result, server_request_call_event.completion_type, server_request_call_event.success), _common.OperationResult( - client_receive_initial_metadata_start_batch_result, + cygrpc.CallError.ok, client_receive_initial_metadata_event.completion_type, client_receive_initial_metadata_event.success), - _common.OperationResult(client_complete_rpc_start_batch_result, + _common.OperationResult(cygrpc.CallError.ok, client_complete_rpc_event.completion_type, client_complete_rpc_event.success), _common.OperationResult( diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py index 8582a39c01..47f39ebce2 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py @@ -19,6 +19,7 @@ import unittest from grpc._cython import cygrpc from tests.unit._cython import _common +from tests.unit._cython import test_utilities class Test(_common.RpcTest, unittest.TestCase): @@ -36,28 +37,31 @@ class Test(_common.RpcTest, unittest.TestCase): server_request_call_tag, }) - client_call = self.channel.create_call(None, _common.EMPTY_FLAGS, - self.client_completion_queue, - b'/twinkies', None, None) client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' client_complete_rpc_tag = 'client_complete_rpc_tag' - with self.client_condition: - client_receive_initial_metadata_start_batch_result = ( - client_call.start_client_batch([ - cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), - ], client_receive_initial_metadata_tag)) - client_complete_rpc_start_batch_result = client_call.start_client_batch( - [ - cygrpc.SendInitialMetadataOperation( - _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), - cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS), - ], client_complete_rpc_tag) - self.client_driver.add_due({ - client_receive_initial_metadata_tag, - client_complete_rpc_tag, - }) - + client_call = self.channel.integrated_call( + _common.EMPTY_FLAGS, b'/twinkies', None, None, + _common.INVOCATION_METADATA, None, [ + ( + [ + cygrpc.SendInitialMetadataOperation( + _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation( + _common.EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation( + _common.EMPTY_FLAGS), + ], + client_complete_rpc_tag, + ), + ]) + client_call.operate([ + cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), + ], client_receive_initial_metadata_tag) + + client_events_future = test_utilities.SimpleFuture( + lambda: [ + self.channel.next_call_event(), + self.channel.next_call_event(),]) server_request_call_event = self.server_driver.event_with_tag( server_request_call_tag) @@ -87,20 +91,19 @@ class Test(_common.RpcTest, unittest.TestCase): server_complete_rpc_event = self.server_driver.event_with_tag( server_complete_rpc_tag) - client_receive_initial_metadata_event = self.client_driver.event_with_tag( - client_receive_initial_metadata_tag) - client_complete_rpc_event = self.client_driver.event_with_tag( - client_complete_rpc_tag) + client_events = client_events_future.result() + client_receive_initial_metadata_event = client_events[0] + client_complete_rpc_event = client_events[1] return ( _common.OperationResult(server_request_call_start_batch_result, server_request_call_event.completion_type, server_request_call_event.success), _common.OperationResult( - client_receive_initial_metadata_start_batch_result, + cygrpc.CallError.ok, client_receive_initial_metadata_event.completion_type, client_receive_initial_metadata_event.success), - _common.OperationResult(client_complete_rpc_start_batch_result, + _common.OperationResult(cygrpc.CallError.ok, client_complete_rpc_event.completion_type, client_complete_rpc_event.success), _common.OperationResult( diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py index bc63b54879..8a903bfaf9 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py @@ -17,6 +17,7 @@ import threading import unittest from grpc._cython import cygrpc +from tests.unit._cython import test_utilities _EMPTY_FLAGS = 0 _EMPTY_METADATA = () @@ -118,7 +119,8 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): server.register_completion_queue(server_completion_queue) port = server.add_http2_port(b'[::]:0') server.start() - channel = cygrpc.Channel('localhost:{}'.format(port).encode(), set()) + channel = cygrpc.Channel('localhost:{}'.format(port).encode(), set(), + None) server_shutdown_tag = 'server_shutdown_tag' server_driver = _ServerDriver(server_completion_queue, @@ -127,10 +129,6 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): client_condition = threading.Condition() client_due = set() - client_completion_queue = cygrpc.CompletionQueue() - client_driver = _QueueDriver(client_condition, client_completion_queue, - client_due) - client_driver.start() server_call_condition = threading.Condition() server_send_initial_metadata_tag = 'server_send_initial_metadata_tag' @@ -154,25 +152,28 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): server_completion_queue, server_rpc_tag) - client_call = channel.create_call(None, _EMPTY_FLAGS, - client_completion_queue, b'/twinkies', - None, None) client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' client_complete_rpc_tag = 'client_complete_rpc_tag' - with client_condition: - client_receive_initial_metadata_start_batch_result = ( - client_call.start_client_batch([ - cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), - ], client_receive_initial_metadata_tag)) - client_due.add(client_receive_initial_metadata_tag) - client_complete_rpc_start_batch_result = ( - client_call.start_client_batch([ - cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), - ], client_complete_rpc_tag)) - client_due.add(client_complete_rpc_tag) + client_call = channel.segregated_call( + _EMPTY_FLAGS, b'/twinkies', None, None, _EMPTY_METADATA, None, ( + ( + [ + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + ], + client_receive_initial_metadata_tag, + ), + ( + [ + cygrpc.SendInitialMetadataOperation( + _EMPTY_METADATA, _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ], + client_complete_rpc_tag, + ), + )) + client_receive_initial_metadata_event_future = test_utilities.SimpleFuture( + client_call.next_event) server_rpc_event = server_driver.first_event() @@ -208,19 +209,20 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): server_complete_rpc_tag) server_call_driver.events() - with client_condition: - client_receive_first_message_tag = 'client_receive_first_message_tag' - client_receive_first_message_start_batch_result = ( - client_call.start_client_batch([ - cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), - ], client_receive_first_message_tag)) - client_due.add(client_receive_first_message_tag) - client_receive_first_message_event = client_driver.event_with_tag( - client_receive_first_message_tag) + client_recieve_initial_metadata_event = client_receive_initial_metadata_event_future.result( + ) + + client_receive_first_message_tag = 'client_receive_first_message_tag' + client_call.operate([ + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + ], client_receive_first_message_tag) + client_receive_first_message_event = client_call.next_event() - client_call_cancel_result = client_call.cancel() - client_driver.events() + client_call_cancel_result = client_call.cancel( + cygrpc.StatusCode.cancelled, 'Cancelled during test!') + client_complete_rpc_event = client_call.next_event() + channel.close(cygrpc.StatusCode.unknown, 'Channel closed!') server.shutdown(server_completion_queue, server_shutdown_tag) server.cancel_all_calls() server_driver.events() @@ -228,11 +230,6 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): self.assertEqual(cygrpc.CallError.ok, request_call_result) self.assertEqual(cygrpc.CallError.ok, server_send_initial_metadata_start_batch_result) - self.assertEqual(cygrpc.CallError.ok, - client_receive_initial_metadata_start_batch_result) - self.assertEqual(cygrpc.CallError.ok, - client_complete_rpc_start_batch_result) - self.assertEqual(cygrpc.CallError.ok, client_call_cancel_result) self.assertIs(server_rpc_tag, server_rpc_event.tag) self.assertEqual(cygrpc.CompletionType.operation_complete, server_rpc_event.completion_type) diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py index 9045ff58a0..724a690746 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py @@ -51,8 +51,8 @@ class TypeSmokeTest(unittest.TestCase): del server def testChannelUpDown(self): - channel = cygrpc.Channel(b'[::]:0', None) - del channel + channel = cygrpc.Channel(b'[::]:0', None, None) + channel.close(cygrpc.StatusCode.cancelled, 'Test method anyway!') def test_metadata_plugin_call_credentials_up_down(self): cygrpc.MetadataPluginCallCredentials(_metadata_plugin, @@ -121,7 +121,7 @@ class ServerClientMixin(object): client_credentials) else: self.client_channel = cygrpc.Channel('localhost:{}'.format( - self.port).encode(), set()) + self.port).encode(), set(), None) if host_override: self.host_argument = None # default host self.expected_host = host_override @@ -131,17 +131,20 @@ class ServerClientMixin(object): self.expected_host = self.host_argument def tearDownMixin(self): + self.client_channel.close(cygrpc.StatusCode.ok, 'test being torn down!') + del self.client_channel del self.server del self.client_completion_queue del self.server_completion_queue - def _perform_operations(self, operations, call, queue, deadline, - description): - """Perform the list of operations with given call, queue, and deadline. + def _perform_queue_operations(self, operations, call, queue, deadline, + description): + """Perform the operations with given call, queue, and deadline. - Invocation errors are reported with as an exception with `description` in - the message. Performs the operations asynchronously, returning a future. - """ + Invocation errors are reported with as an exception with `description` + in the message. Performs the operations asynchronously, returning a + future. + """ def performer(): tag = object() @@ -185,9 +188,6 @@ class ServerClientMixin(object): self.assertEqual(cygrpc.CallError.ok, request_call_result) client_call_tag = object() - client_call = self.client_channel.create_call( - None, 0, self.client_completion_queue, METHOD, self.host_argument, - DEADLINE) client_initial_metadata = ( ( CLIENT_METADATA_ASCII_KEY, @@ -198,18 +198,24 @@ class ServerClientMixin(object): CLIENT_METADATA_BIN_VALUE, ), ) - client_start_batch_result = client_call.start_client_batch([ - cygrpc.SendInitialMetadataOperation(client_initial_metadata, - _EMPTY_FLAGS), - cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS), - cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), - cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), - cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), - ], client_call_tag) - self.assertEqual(cygrpc.CallError.ok, client_start_batch_result) - client_event_future = test_utilities.CompletionQueuePollFuture( - self.client_completion_queue, DEADLINE) + client_call = self.client_channel.integrated_call( + 0, METHOD, self.host_argument, DEADLINE, client_initial_metadata, + None, [ + ( + [ + cygrpc.SendInitialMetadataOperation( + client_initial_metadata, _EMPTY_FLAGS), + cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ], + client_call_tag, + ), + ]) + client_event_future = test_utilities.SimpleFuture( + self.client_channel.next_call_event) request_event = self.server_completion_queue.poll(deadline=DEADLINE) self.assertEqual(cygrpc.CompletionType.operation_complete, @@ -285,7 +291,7 @@ class ServerClientMixin(object): self.assertEqual(5, len(server_event.batch_operations)) found_server_op_types = set() for server_result in server_event.batch_operations: - self.assertNotIn(client_result.type(), found_server_op_types) + self.assertNotIn(server_result.type(), found_server_op_types) found_server_op_types.add(server_result.type()) if server_result.type() == cygrpc.OperationType.receive_message: self.assertEqual(REQUEST, server_result.message()) @@ -304,66 +310,76 @@ class ServerClientMixin(object): del client_call del server_call - def test6522(self): + def test_6522(self): DEADLINE = time.time() + 5 DEADLINE_TOLERANCE = 0.25 METHOD = b'twinkies' empty_metadata = () + # Prologue server_request_tag = object() self.server.request_call(self.server_completion_queue, self.server_completion_queue, server_request_tag) - client_call = self.client_channel.create_call( - None, 0, self.client_completion_queue, METHOD, self.host_argument, - DEADLINE) - - # Prologue - def perform_client_operations(operations, description): - return self._perform_operations(operations, client_call, - self.client_completion_queue, - DEADLINE, description) - - client_event_future = perform_client_operations([ - cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS), - cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), - ], "Client prologue") + client_call = self.client_channel.segregated_call( + 0, METHOD, self.host_argument, DEADLINE, None, None, ([( + [ + cygrpc.SendInitialMetadataOperation(empty_metadata, + _EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + ], + object(), + ), ( + [ + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ], + object(), + )])) + + client_initial_metadata_event_future = test_utilities.SimpleFuture( + client_call.next_event) request_event = self.server_completion_queue.poll(deadline=DEADLINE) server_call = request_event.call def perform_server_operations(operations, description): - return self._perform_operations(operations, server_call, - self.server_completion_queue, - DEADLINE, description) + return self._perform_queue_operations(operations, server_call, + self.server_completion_queue, + DEADLINE, description) server_event_future = perform_server_operations([ cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS), ], "Server prologue") - client_event_future.result() # force completion + client_initial_metadata_event_future.result() # force completion server_event_future.result() # Messaging for _ in range(10): - client_event_future = perform_client_operations([ + client_call.operate([ cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS), cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], "Client message") + client_message_event_future = test_utilities.SimpleFuture( + client_call.next_event) server_event_future = perform_server_operations([ cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS), cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], "Server receive") - client_event_future.result() # force completion + client_message_event_future.result() # force completion server_event_future.result() # Epilogue - client_event_future = perform_client_operations([ + client_call.operate([ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS) ], "Client epilogue") + # One for ReceiveStatusOnClient, one for SendCloseFromClient. + client_events_future = test_utilities.SimpleFuture( + lambda: { + client_call.next_event(), + client_call.next_event(),}) server_event_future = perform_server_operations([ cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), @@ -371,7 +387,7 @@ class ServerClientMixin(object): empty_metadata, cygrpc.StatusCode.ok, b'', _EMPTY_FLAGS) ], "Server epilogue") - client_event_future.result() # force completion + client_events_future.result() # force completion server_event_future.result() diff --git a/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py index 4a00b9ef2f..7d5eaaaa84 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py +++ b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py @@ -25,7 +25,7 @@ class SimpleFuture(object): def wrapped_function(): try: self._result = function(*args, **kwargs) - except Exception as error: + except Exception as error: # pylint: disable=broad-except self._error = error self._result = None @@ -41,7 +41,7 @@ class SimpleFuture(object): self._thread.join() if self._error: # TODO(atash): re-raise exceptions in a way that preserves tracebacks - raise self._error + raise self._error # pylint: disable=raising-bad-type return self._result diff --git a/src/python/grpcio_tests/tests/unit/_exit_test.py b/src/python/grpcio_tests/tests/unit/_exit_test.py index 6e6d9de0fb..f40f3ae07c 100644 --- a/src/python/grpcio_tests/tests/unit/_exit_test.py +++ b/src/python/grpcio_tests/tests/unit/_exit_test.py @@ -49,7 +49,7 @@ def cleanup_processes(): for process in processes: try: process.kill() - except Exception: + except Exception: # pylint: disable=broad-except pass diff --git a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py index e683131722..ad847ae03e 100644 --- a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py +++ b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py @@ -14,7 +14,7 @@ _BEFORE_IMPORT = tuple(globals()) -from grpc import * +from grpc import * # pylint: disable=wildcard-import _AFTER_IMPORT = tuple(globals()) diff --git a/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py index 4edf0fc4ad..f153089a24 100644 --- a/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py +++ b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py @@ -81,29 +81,16 @@ class InvalidMetadataTest(unittest.TestCase): request = b'\x07\x08' metadata = (('InVaLiD', 'UnaryRequestFutureUnaryResponse'),) expected_error_details = "metadata was invalid: %s" % metadata - response_future = self._unary_unary.future(request, metadata=metadata) - with self.assertRaises(grpc.RpcError) as exception_context: - response_future.result() - self.assertEqual(exception_context.exception.details(), - expected_error_details) - self.assertEqual(exception_context.exception.code(), - grpc.StatusCode.INTERNAL) - self.assertEqual(response_future.details(), expected_error_details) - self.assertEqual(response_future.code(), grpc.StatusCode.INTERNAL) + with self.assertRaises(ValueError) as exception_context: + self._unary_unary.future(request, metadata=metadata) def testUnaryRequestStreamResponse(self): request = b'\x37\x58' metadata = (('InVaLiD', 'UnaryRequestStreamResponse'),) expected_error_details = "metadata was invalid: %s" % metadata - response_iterator = self._unary_stream(request, metadata=metadata) - with self.assertRaises(grpc.RpcError) as exception_context: - next(response_iterator) - self.assertEqual(exception_context.exception.details(), - expected_error_details) - self.assertEqual(exception_context.exception.code(), - grpc.StatusCode.INTERNAL) - self.assertEqual(response_iterator.details(), expected_error_details) - self.assertEqual(response_iterator.code(), grpc.StatusCode.INTERNAL) + with self.assertRaises(ValueError) as exception_context: + self._unary_stream(request, metadata=metadata) + self.assertIn(expected_error_details, str(exception_context.exception)) def testStreamRequestBlockingUnaryResponse(self): request_iterator = ( @@ -129,32 +116,18 @@ class InvalidMetadataTest(unittest.TestCase): b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) metadata = (('InVaLiD', 'StreamRequestFutureUnaryResponse'),) expected_error_details = "metadata was invalid: %s" % metadata - response_future = self._stream_unary.future( - request_iterator, metadata=metadata) - with self.assertRaises(grpc.RpcError) as exception_context: - response_future.result() - self.assertEqual(exception_context.exception.details(), - expected_error_details) - self.assertEqual(exception_context.exception.code(), - grpc.StatusCode.INTERNAL) - self.assertEqual(response_future.details(), expected_error_details) - self.assertEqual(response_future.code(), grpc.StatusCode.INTERNAL) + with self.assertRaises(ValueError) as exception_context: + self._stream_unary.future(request_iterator, metadata=metadata) + self.assertIn(expected_error_details, str(exception_context.exception)) def testStreamRequestStreamResponse(self): request_iterator = ( b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) metadata = (('InVaLiD', 'StreamRequestStreamResponse'),) expected_error_details = "metadata was invalid: %s" % metadata - response_iterator = self._stream_stream( - request_iterator, metadata=metadata) - with self.assertRaises(grpc.RpcError) as exception_context: - next(response_iterator) - self.assertEqual(exception_context.exception.details(), - expected_error_details) - self.assertEqual(exception_context.exception.code(), - grpc.StatusCode.INTERNAL) - self.assertEqual(response_iterator.details(), expected_error_details) - self.assertEqual(response_iterator.code(), grpc.StatusCode.INTERNAL) + with self.assertRaises(ValueError) as exception_context: + self._stream_stream(request_iterator, metadata=metadata) + self.assertIn(expected_error_details, str(exception_context.exception)) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py index e40cca8b24..93a5fdf9ff 100644 --- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py +++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py @@ -165,11 +165,13 @@ class FailAfterFewIterationsCounter(object): def __next__(self): if self._current >= self._high: - raise Exception("This is a deliberate failure in a unit test.") + raise test_control.Defect() else: self._current += 1 return self._bytestring + next = __next__ + def _unary_unary_multi_callable(channel): return channel.unary_unary(_UNARY_UNARY) diff --git a/src/python/grpcio_tests/tests/unit/_rpc_test.py b/src/python/grpcio_tests/tests/unit/_rpc_test.py index 54f01d9f8d..34e7831a98 100644 --- a/src/python/grpcio_tests/tests/unit/_rpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_rpc_test.py @@ -225,6 +225,7 @@ class RPCTest(unittest.TestCase): self.assertEqual(expected_response, response) self.assertIs(grpc.StatusCode.OK, call.code()) + self.assertEqual("", call.debug_error_string()) def testSuccessfulUnaryRequestFutureUnaryResponse(self): request = b'\x07\x08' @@ -706,6 +707,13 @@ class RPCTest(unittest.TestCase): self.assertIs(grpc.StatusCode.UNKNOWN, exception_context.exception.code()) + # sanity checks on to make sure returned string contains default members + # of the error + debug_error_string = exception_context.exception.debug_error_string() + self.assertIn("created", debug_error_string) + self.assertIn("description", debug_error_string) + self.assertIn("file", debug_error_string) + self.assertIn("file_line", debug_error_string) def testFailedUnaryRequestFutureUnaryResponse(self): request = b'\x37\x17' diff --git a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py b/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py deleted file mode 100644 index 18f5af058a..0000000000 --- a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py +++ /dev/null @@ -1,115 +0,0 @@ -# Copyright 2016 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for CleanupThread.""" - -import threading -import time -import unittest - -from grpc import _common - -_SHORT_TIME = 0.5 -_LONG_TIME = 5.0 -_EPSILON = 0.5 - - -def cleanup(timeout): - if timeout is not None: - time.sleep(timeout) - else: - time.sleep(_LONG_TIME) - - -def slow_cleanup(timeout): - # Don't respect timeout - time.sleep(_LONG_TIME) - - -class CleanupThreadTest(unittest.TestCase): - - def testTargetInvocation(self): - event = threading.Event() - - def target(arg1, arg2, arg3=None): - self.assertEqual('arg1', arg1) - self.assertEqual('arg2', arg2) - self.assertEqual('arg3', arg3) - event.set() - - cleanup_thread = _common.CleanupThread( - behavior=lambda x: None, - target=target, - name='test-name', - args=('arg1', 'arg2'), - kwargs={ - 'arg3': 'arg3' - }) - cleanup_thread.start() - cleanup_thread.join() - self.assertEqual(cleanup_thread.name, 'test-name') - self.assertTrue(event.is_set()) - - def testJoinNoTimeout(self): - cleanup_thread = _common.CleanupThread(behavior=cleanup) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join() - end_time = time.time() - self.assertAlmostEqual( - _LONG_TIME, end_time - start_time, delta=_EPSILON) - - def testJoinTimeout(self): - cleanup_thread = _common.CleanupThread(behavior=cleanup) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join(_SHORT_TIME) - end_time = time.time() - self.assertAlmostEqual( - _SHORT_TIME, end_time - start_time, delta=_EPSILON) - - def testJoinTimeoutSlowBehavior(self): - cleanup_thread = _common.CleanupThread(behavior=slow_cleanup) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join(_SHORT_TIME) - end_time = time.time() - self.assertAlmostEqual( - _LONG_TIME, end_time - start_time, delta=_EPSILON) - - def testJoinTimeoutSlowTarget(self): - event = threading.Event() - - def target(): - event.wait(_LONG_TIME) - - cleanup_thread = _common.CleanupThread(behavior=cleanup, target=target) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join(_SHORT_TIME) - end_time = time.time() - self.assertAlmostEqual( - _SHORT_TIME, end_time - start_time, delta=_EPSILON) - event.set() - - def testJoinZeroTimeout(self): - cleanup_thread = _common.CleanupThread(behavior=cleanup) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join(0) - end_time = time.time() - self.assertAlmostEqual(0, end_time - start_time, delta=_EPSILON) - - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py index 61c03f64ba..b43c647fc9 100644 --- a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py +++ b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py @@ -65,7 +65,7 @@ class _Servicer(object): self._serviced = True self._condition.notify_all() return - yield + yield # pylint: disable=unreachable def stream_unary(self, request_iterator, context): for request in request_iterator: |